import asyncio import collections import fastapi import pydantic import typing from app.storage.mongo import tasks # from app.utils.configurator import configurator DEFAULT_RETRY_AFTER = 0.2 locks = collections.defaultdict(asyncio.Lock) router = fastapi.APIRouter() class Task(pydantic.BaseModel): id: str attempt: int payload: dict class Response(pydantic.BaseModel): task: Task|None retry_after: float @router.get('/api/v1/take') async def execute(queue: typing.Annotated[str, fastapi.Header()]) -> Response: async with locks[queue]: task = await tasks.take_task(queue) if not task: return Response(task=None) # retry_after_config = configurator.get_config('retry_after') # retry_after = retry_after_config.get(queue) or retry_after_config.get('default') or DEFAULT_RETRY_AFTER return Response(task=Task(id=str(task._id), attempt=task.attempts, payload=task.payload), retry_after=DEFAULT_RETRY_AFTER)