diff --git a/.deploy/deploy-dev.yaml b/.deploy/deploy-dev.yaml index 80e0af2..20e2223 100644 --- a/.deploy/deploy-dev.yaml +++ b/.deploy/deploy-dev.yaml @@ -9,6 +9,8 @@ services: environment: MONGO_HOST: "mongo.develop.sprinthub.ru" MONGO_PASSWORD: $MONGO_PASSWORD_DEV + REDIS_HOST: "redis.develop.sprinthub.ru" + REDIS_PASSWORD: $REDIS_PASSWORD_DEV deploy: mode: replicated restart_policy: diff --git a/.deploy/deploy-prod.yaml b/.deploy/deploy-prod.yaml index 1e94afa..b01f4c6 100644 --- a/.deploy/deploy-prod.yaml +++ b/.deploy/deploy-prod.yaml @@ -9,6 +9,8 @@ services: environment: MONGO_HOST: "mongo.sprinthub.ru" MONGO_PASSWORD: $MONGO_PASSWORD_PROD + REDIS_HOST: "redis.sprinthub.ru" + REDIS_PASSWORD: $REDIS_PASSWORD_PROD deploy: mode: replicated restart_policy: diff --git a/.gitea/workflows/deploy-dev.yaml b/.gitea/workflows/deploy-dev.yaml index 3efe3ac..843315a 100644 --- a/.gitea/workflows/deploy-dev.yaml +++ b/.gitea/workflows/deploy-dev.yaml @@ -40,4 +40,5 @@ jobs: - name: deploy env: MONGO_PASSWORD_DEV: ${{ secrets.MONGO_PASSWORD_DEV }} + REDIS_PASSWORD_DEV: ${{ secrets.REDIS_PASSWORD_DEV }} run: docker stack deploy --with-registry-auth -c ./.deploy/deploy-dev.yaml infra-development diff --git a/.gitea/workflows/deploy-prod.yaml b/.gitea/workflows/deploy-prod.yaml index 9a6c82f..4ce27eb 100644 --- a/.gitea/workflows/deploy-prod.yaml +++ b/.gitea/workflows/deploy-prod.yaml @@ -40,4 +40,5 @@ jobs: - name: deploy env: MONGO_PASSWORD_PROD: ${{ secrets.MONGO_PASSWORD_PROD }} + REDIS_PASSWORD_PROD: ${{ secrets.REDIS_PASSWORD_PROD }} run: docker stack deploy --with-registry-auth -c ./.deploy/deploy-prod.yaml infra diff --git a/app/routers/take.py b/app/routers/take.py index 9f8c8e4..a2abae1 100644 --- a/app/routers/take.py +++ b/app/routers/take.py @@ -3,6 +3,7 @@ import pydantic import typing from app.storage.mongo import tasks +from app.storage.redis import lock router = fastapi.APIRouter() @@ -16,7 +17,8 @@ class Response(pydantic.BaseModel): @router.get('/api/v1/take', responses={404: {'description': 'Not found'}}) async def execute(queue: typing.Annotated[str, fastapi.Header()]) -> Response: - task = await tasks.take_task(queue) + async with lock.acquire(queue): + task = await tasks.take_task(queue) if not task: raise fastapi.HTTPException(404) return Response(id=str(task._id), attempt=task.attempts, payload=task.payload) diff --git a/app/storage/redis/__init__.py b/app/storage/redis/__init__.py new file mode 100644 index 0000000..417b2f5 --- /dev/null +++ b/app/storage/redis/__init__.py @@ -0,0 +1,15 @@ +import os +import redis.asyncio + + +REDIS_HOST = os.getenv('REDIS_HOST', 'localhost') +REDIS_PASSWORD = os.getenv('REDIS_PASSWORD') +if REDIS_PASSWORD: + URL = f'redis://:{REDIS_PASSWORD}@{REDIS_HOST}:6379' +else: + URL = f'redis://{REDIS_HOST}:6379' + + + +pool = redis.asyncio.ConnectionPool.from_url(URL) +database = redis.Redis.from_pool(pool) diff --git a/app/storage/redis/lock.py b/app/storage/redis/lock.py new file mode 100644 index 0000000..5282a58 --- /dev/null +++ b/app/storage/redis/lock.py @@ -0,0 +1,8 @@ +import contextlib + +from app.storage import redis + + +@contextlib.contextmanager +def acquire(lock_name: str): + return redis.database.lock(lock_name)