import asyncio import collections import fastapi import pydantic import typing from app.storage.mongo import tasks locks = collections.defaultdict(asyncio.Lock) router = fastapi.APIRouter() class Task(pydantic.BaseModel): id: str attempt: int payload: dict class Response(pydantic.BaseModel): task: Task|None @router.get('/api/v1/take', responses={404: {'description': 'Not found'}}) async def execute(queue: typing.Annotated[str, fastapi.Header()]) -> Response: async with locks[queue]: task = await tasks.take_task(queue) if not task: return Response(task=None) return Response(task=Task(id=str(task._id), attempt=task.attempts, payload=task.payload))