diff --git a/app/routers/take.py b/app/routers/take.py index 9ec0ff6..d8fe38d 100644 --- a/app/routers/take.py +++ b/app/routers/take.py @@ -1,3 +1,5 @@ +import asyncio +import collections import fastapi import pydantic import typing @@ -5,6 +7,7 @@ import typing from app.storage.mongo import tasks +locks = collections.defaultdict(asyncio.Lock) router = fastapi.APIRouter() @@ -20,7 +23,8 @@ class Response(pydantic.BaseModel): @router.get('/api/v1/take', responses={404: {'description': 'Not found'}}) async def execute(queue: typing.Annotated[str, fastapi.Header()]) -> Response: - task = await tasks.take_task(queue) + async with locks[queue]: + task = await tasks.take_task(queue) if not task: return Response(task=None) return Response(task=Task(id=str(task._id), attempt=task.attempts, payload=task.payload))