From 2ddd88337eeb90676c9eac3960e4db30d7f3d2d4 Mon Sep 17 00:00:00 2001 From: Egor Matveev Date: Sat, 28 Dec 2024 15:41:23 +0300 Subject: [PATCH] fix --- app/routers/take.py | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/app/routers/take.py b/app/routers/take.py index 53a1ecb..d8fe38d 100644 --- a/app/routers/take.py +++ b/app/routers/take.py @@ -1,3 +1,5 @@ +import asyncio +import collections import fastapi import pydantic import typing @@ -5,7 +7,7 @@ import typing from app.storage.mongo import tasks -DEFAULT_RETRY_AFTER = 0.2 +locks = collections.defaultdict(asyncio.Lock) router = fastapi.APIRouter() @@ -19,13 +21,10 @@ class Response(pydantic.BaseModel): task: Task|None -@router.get('/api/v1/take') +@router.get('/api/v1/take', responses={404: {'description': 'Not found'}}) async def execute(queue: typing.Annotated[str, fastapi.Header()]) -> Response: - try: + async with locks[queue]: task = await tasks.take_task(queue) - if not task: - return Response(task=None) - return Response(task=Task(id=str(task._id), attempt=task.attempts, payload=task.payload)) - except Exception as e: - print("GOT ERROR", e) + if not task: return Response(task=None) + return Response(task=Task(id=str(task._id), attempt=task.attempts, payload=task.payload))