From f6b300fc2c22254a3cddc7df03ff10db1fa91f0b Mon Sep 17 00:00:00 2001 From: emmatveev Date: Sun, 17 Nov 2024 01:39:09 +0300 Subject: [PATCH] initial --- .deploy/deploy-dev.yaml | 22 ++++++ .deploy/deploy-prod.yaml | 22 ++++++ .gitea/workflows/deploy-dev.yaml | 43 +++++++++++ .gitea/workflows/deploy-prod.yaml | 43 +++++++++++ .gitignore | 119 ++++++++++++++++++++++++++++++ Dockerfile | 11 +++ app/__init__.py | 0 app/routers/__init__.py | 0 app/routers/finish.py | 19 +++++ app/routers/put.py | 30 ++++++++ app/routers/take.py | 22 ++++++ app/storage/__init__.py | 0 app/storage/mongo/__init__.py | 21 ++++++ app/storage/mongo/tasks.py | 44 +++++++++++ app/utils/time.py | 4 + main.py | 21 ++++++ requirements.txt | 20 +++++ 17 files changed, 441 insertions(+) create mode 100644 .deploy/deploy-dev.yaml create mode 100644 .deploy/deploy-prod.yaml create mode 100644 .gitea/workflows/deploy-dev.yaml create mode 100644 .gitea/workflows/deploy-prod.yaml create mode 100644 .gitignore create mode 100644 Dockerfile create mode 100644 app/__init__.py create mode 100644 app/routers/__init__.py create mode 100644 app/routers/finish.py create mode 100644 app/routers/put.py create mode 100644 app/routers/take.py create mode 100644 app/storage/__init__.py create mode 100644 app/storage/mongo/__init__.py create mode 100644 app/storage/mongo/tasks.py create mode 100644 app/utils/time.py create mode 100644 main.py create mode 100644 requirements.txt diff --git a/.deploy/deploy-dev.yaml b/.deploy/deploy-dev.yaml new file mode 100644 index 0000000..0bff8f8 --- /dev/null +++ b/.deploy/deploy-dev.yaml @@ -0,0 +1,22 @@ +version: "3.4" + + +services: + queues-nginx: + image: mathwave/sprint-repo:queues + networks: + - common-infra-nginx + environment: + MONGO_HOST: "mongo.develop.sprinthub.ru" + MONGO_PASSWORD: $MONGO_PASSWORD_DEV + deploy: + mode: replicated + restart_policy: + condition: any + update_config: + parallelism: 1 + order: start-first + +networks: + common-infra-nginx: + external: true \ No newline at end of file diff --git a/.deploy/deploy-prod.yaml b/.deploy/deploy-prod.yaml new file mode 100644 index 0000000..da01733 --- /dev/null +++ b/.deploy/deploy-prod.yaml @@ -0,0 +1,22 @@ +version: "3.4" + + +services: + queues-nginx: + image: mathwave/sprint-repo:queues + networks: + - common-infra-nginx + environment: + MONGO_HOST: "mongo.sprinthub.ru" + MONGO_PASSWORD: $MONGO_PASSWORD_PROD + deploy: + mode: replicated + restart_policy: + condition: any + update_config: + parallelism: 1 + order: start-first + +networks: + common-infra-nginx: + external: true diff --git a/.gitea/workflows/deploy-dev.yaml b/.gitea/workflows/deploy-dev.yaml new file mode 100644 index 0000000..cc552f9 --- /dev/null +++ b/.gitea/workflows/deploy-dev.yaml @@ -0,0 +1,43 @@ +name: Deploy Dev + +on: + pull_request: + branches: + - dev + types: [closed] + +jobs: + build: + name: Build + runs-on: [ dev ] + steps: + - name: login + run: docker login -u mathwave -p ${{ secrets.DOCKERHUB_PASSWORD }} + - name: checkout + uses: actions/checkout@v4 + with: + ref: dev + - name: build + run: docker build -t mathwave/sprint-repo:queues . + push: + name: Push + runs-on: [ dev ] + needs: build + steps: + - name: push + run: docker push mathwave/sprint-repo:queues + deploy-dev: + name: Deploy dev + runs-on: [dev] + needs: push + steps: + - name: login + run: docker login -u mathwave -p ${{ secrets.DOCKERHUB_PASSWORD }} + - name: checkout + uses: actions/checkout@v4 + with: + ref: dev + - name: deploy + env: + MONGO_PASSWORD_DEV: ${{ secrets.MONGO_PASSWORD_DEV }} + run: docker stack deploy --with-registry-auth -c ./.deploy/deploy-dev.yaml infra \ No newline at end of file diff --git a/.gitea/workflows/deploy-prod.yaml b/.gitea/workflows/deploy-prod.yaml new file mode 100644 index 0000000..9a6c82f --- /dev/null +++ b/.gitea/workflows/deploy-prod.yaml @@ -0,0 +1,43 @@ +name: Deploy Prod + +on: + pull_request: + branches: + - prod + types: [closed] + +jobs: + build: + name: Build + runs-on: [ dev ] + steps: + - name: login + run: docker login -u mathwave -p ${{ secrets.DOCKERHUB_PASSWORD }} + - name: checkout + uses: actions/checkout@v4 + with: + ref: prod + - name: build + run: docker build -t mathwave/sprint-repo:queues . + push: + name: Push + runs-on: [ dev ] + needs: build + steps: + - name: push + run: docker push mathwave/sprint-repo:queues + deploy-prod: + name: Deploy prod + runs-on: [prod] + needs: push + steps: + - name: login + run: docker login -u mathwave -p ${{ secrets.DOCKERHUB_PASSWORD }} + - name: checkout + uses: actions/checkout@v4 + with: + ref: prod + - name: deploy + env: + MONGO_PASSWORD_PROD: ${{ secrets.MONGO_PASSWORD_PROD }} + run: docker stack deploy --with-registry-auth -c ./.deploy/deploy-prod.yaml infra diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..849d3ca --- /dev/null +++ b/.gitignore @@ -0,0 +1,119 @@ +# Django # +*.log +*.pot +*.pyc +__pycache__ +db.sqlite3 +media +data +*/__pycache__ + +# Backup files # +*.bak + +# If you are using PyCharm # +.idea +.idea/**/workspace.xml +.idea/**/tasks.xml +.idea/dictionaries +.idea/**/dataSources/ +.idea/**/dataSources.ids +.idea/**/dataSources.xml +.idea/**/dataSources.local.xml +.idea/**/sqlDataSources.xml +.idea/**/dynamic.xml +.idea/**/uiDesigner.xml +.idea/**/gradle.xml +.idea/**/libraries +*.iws /out/ + +# Python # +*.py[cod] +*$py.class + +# Distribution / packaging +.Python build/ +develop-eggs/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +*.egg-info/ +.installed.cfg +*.egg +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.coverage +.coverage.* +.cache +.pytest_cache/ +nosetests.xml +coverage.xml +*.cover +.hypothesis/ +postgres-data + +# Jupyter Notebook +.ipynb_checkpoints + +# pyenv +.python-version + +# celery +celerybeat-schedule.* + +# SageMath parsed files +*.sage.py + +# Environments +.venv +env/ +ENV/ +venv/ +env.bak/ +venv.bak/ + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ + +# Sublime Text # +*.tmlanguage.cache +*.tmPreferences.cache +*.stTheme.cache +*.sublime-workspace +*.sublime-project + +# sftp configuration file +sftp-config.json + +# Package control specific files Package +Control.last-run +Control.ca-list +Control.ca-bundle +Control.system-ca-bundle +GitHub.sublime-settings + +# Visual Studio Code # +.vscode +.vscode/* +!.vscode/settings.json +!.vscode/tasks.json +!.vscode/launch.json +!.vscode/extensions.json +.history diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..28c0755 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,11 @@ +FROM python:3.10 + +RUN mkdir /usr/src/app +WORKDIR /usr/src/app + +COPY requirements.txt requirements.txt +RUN pip install -r requirements.txt + +COPY . . + +ENTRYPOINT ["python", "main.py"] diff --git a/app/__init__.py b/app/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/routers/__init__.py b/app/routers/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/routers/finish.py b/app/routers/finish.py new file mode 100644 index 0000000..205c05c --- /dev/null +++ b/app/routers/finish.py @@ -0,0 +1,19 @@ +import bson +import fastapi +import pydantic + +from app.storage.mongo import tasks + + +class RequestBody(pydantic.BaseModel): + id: str + + +router = fastapi.APIRouter() + + +@router.post('/api/v1/finish', status_code=fastapi.status.HTTP_202_ACCEPTED, responses={'404': {'description': 'Not found'}}) +async def execute(body: RequestBody): + if await tasks.finish_task(bson.ObjectId(body.id)): + return + raise fastapi.HTTPException(404) diff --git a/app/routers/put.py b/app/routers/put.py new file mode 100644 index 0000000..df31c0b --- /dev/null +++ b/app/routers/put.py @@ -0,0 +1,30 @@ +import datetime +import fastapi +import pydantic +import typing + +from app.storage.mongo import tasks +from app.utils import time + + +class RequestBody(pydantic.BaseModel): + payload: dict + seconds_to_execute: int + delay: int|None = None + + +router = fastapi.APIRouter() + + +@router.post('/api/v1/put', status_code=fastapi.status.HTTP_202_ACCEPTED) +async def execute(body: RequestBody, queue: typing.Annotated[str, fastapi.Header()]): + now = time.now() + await tasks.add_task( + task=tasks.Task( + queue=queue, + payload=body.payload, + put_at=now, + available_from=(now + datetime.timedelta(seconds=body.delay)) if body.delay else now, + seconds_to_execute=body.seconds_to_execute, + ), + ) diff --git a/app/routers/take.py b/app/routers/take.py new file mode 100644 index 0000000..9f8c8e4 --- /dev/null +++ b/app/routers/take.py @@ -0,0 +1,22 @@ +import fastapi +import pydantic +import typing + +from app.storage.mongo import tasks + + +router = fastapi.APIRouter() + + +class Response(pydantic.BaseModel): + id: str + attempt: int + payload: dict + + +@router.get('/api/v1/take', responses={404: {'description': 'Not found'}}) +async def execute(queue: typing.Annotated[str, fastapi.Header()]) -> Response: + task = await tasks.take_task(queue) + if not task: + raise fastapi.HTTPException(404) + return Response(id=str(task._id), attempt=task.attempts, payload=task.payload) diff --git a/app/storage/__init__.py b/app/storage/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/storage/mongo/__init__.py b/app/storage/mongo/__init__.py new file mode 100644 index 0000000..d1c6175 --- /dev/null +++ b/app/storage/mongo/__init__.py @@ -0,0 +1,21 @@ +import os +import motor +import motor.motor_asyncio +import pymongo + + +MONGO_HOST = os.getenv('MONGO_HOST', 'localhost') +MONGO_PASSWORD = os.getenv('MONGO_PASSWORD', 'password') + +CONNECTION_STRING = f'mongodb://mongo:{MONGO_PASSWORD}@{MONGO_HOST}:27017/' + + +database: 'motor.MotorDatabase' = motor.motor_asyncio.AsyncIOMotorClient(CONNECTION_STRING).queues + +def create_indexes(): + client = pymongo.MongoClient(CONNECTION_STRING) + database = client.get_database('queues') + database.get_collection('tasks').create_index([ + ('queue', 1), + ('available_from', 1), + ]) diff --git a/app/storage/mongo/tasks.py b/app/storage/mongo/tasks.py new file mode 100644 index 0000000..0758760 --- /dev/null +++ b/app/storage/mongo/tasks.py @@ -0,0 +1,44 @@ +import asyncio +import bson +import datetime +import pydantic +import typing + +from app.storage.mongo import database +from app.utils import time +from bson import codec_options + + +collection = database.get_collection("tasks", codec_options=codec_options.CodecOptions(tz_aware=True)) + + +class Task(pydantic.BaseModel): + queue: str + payload: dict + put_at: pydantic.AwareDatetime + available_from: pydantic.AwareDatetime + seconds_to_execute: int + _id: bson.ObjectId|None = None + taken_at: pydantic.AwareDatetime|None = None + attempts: int = 0 + + +async def add_task(task: Task) -> str: + result = await collection.insert_one(task.model_dump()) + return result.inserted_id + + +async def take_task(queue: str) -> typing.Optional[Task]: + now = time.now() + async for raw_task in collection.find({'queue': queue, 'available_from': {'$lte': now}}): + task = Task.model_validate(raw_task) + task._id = raw_task['_id'] + if not task.taken_at or (task.taken_at + datetime.timedelta(seconds=task.seconds_to_execute)) < now: + await collection.update_one({'_id': task._id}, {'$set': {'taken_at': now, 'attempts': task.attempts + 1}}) + return task + return None + + +async def finish_task(id: bson.ObjectId) -> bool: + result = await collection.delete_one({'_id': id}) + return result diff --git a/app/utils/time.py b/app/utils/time.py new file mode 100644 index 0000000..b51c065 --- /dev/null +++ b/app/utils/time.py @@ -0,0 +1,4 @@ +import datetime + + +now = lambda: datetime.datetime.now(datetime.UTC) diff --git a/main.py b/main.py new file mode 100644 index 0000000..17f0c6d --- /dev/null +++ b/main.py @@ -0,0 +1,21 @@ +import fastapi +import uvicorn + +from app.storage import mongo + +from app.routers import take +from app.routers import put +from app.routers import finish + + +app = fastapi.FastAPI() + +app.include_router(take.router) +app.include_router(put.router) +app.include_router(finish.router) + +mongo.create_indexes() + + +if __name__ == '__main__': + uvicorn.run(app, host="0.0.0.0", port=1238) diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..325371f --- /dev/null +++ b/requirements.txt @@ -0,0 +1,20 @@ +annotated-types==0.7.0 +anyio==4.6.2.post1 +APScheduler==3.10.4 +click==8.1.7 +dnspython==2.7.0 +fastapi==0.115.4 +h11==0.14.0 +idna==3.10 +motor==3.6.0 +pydantic==2.9.2 +pydantic_core==2.23.4 +pymongo==4.9.2 +pytz==2024.2 +redis==5.2.0 +six==1.16.0 +sniffio==1.3.1 +starlette==0.41.2 +typing_extensions==4.12.2 +tzlocal==5.2 +uvicorn==0.32.0