commit 73ba3295104e6034c2b48a2b96e8abda13164d47 Author: emmatveev Date: Sat Dec 7 23:18:37 2024 +0300 initial diff --git a/.deploy/deploy-dev.yaml b/.deploy/deploy-dev.yaml new file mode 100644 index 0000000..732207c --- /dev/null +++ b/.deploy/deploy-dev.yaml @@ -0,0 +1,22 @@ +version: "3.4" + + +services: + queues: + image: mathwave/sprint-repo:queues-py3-grpc + networks: + - queues-development + environment: + MONGO_HOST: "mongo.develop.sprinthub.ru" + MONGO_PASSWORD: $MONGO_PASSWORD_DEV + deploy: + mode: replicated + restart_policy: + condition: any + update_config: + parallelism: 1 + order: start-first + +networks: + queues-development: + external: true diff --git a/.deploy/deploy-prod.yaml b/.deploy/deploy-prod.yaml new file mode 100644 index 0000000..1570803 --- /dev/null +++ b/.deploy/deploy-prod.yaml @@ -0,0 +1,22 @@ +version: "3.4" + + +services: + queues: + image: mathwave/sprint-repo:queues-py3-grpc + networks: + - queues + environment: + MONGO_HOST: "mongo.sprinthub.ru" + MONGO_PASSWORD: $MONGO_PASSWORD_PROD + deploy: + mode: replicated + restart_policy: + condition: any + update_config: + parallelism: 1 + order: start-first + +networks: + queues: + external: true diff --git a/.gitea/workflows/deploy-dev.yaml b/.gitea/workflows/deploy-dev.yaml new file mode 100644 index 0000000..b10402f --- /dev/null +++ b/.gitea/workflows/deploy-dev.yaml @@ -0,0 +1,43 @@ +name: Deploy Dev + +on: + pull_request: + branches: + - dev + types: [closed] + +jobs: + build: + name: Build + runs-on: [ dev ] + steps: + - name: login + run: docker login -u mathwave -p ${{ secrets.DOCKERHUB_PASSWORD }} + - name: checkout + uses: actions/checkout@v4 + with: + ref: dev + - name: build + run: docker build -t mathwave/sprint-repo:queues-py3-grpc . + push: + name: Push + runs-on: [ dev ] + needs: build + steps: + - name: push + run: docker push mathwave/sprint-repo:queues-py3-grpc + deploy-dev: + name: Deploy dev + runs-on: [prod] + needs: push + steps: + - name: login + run: docker login -u mathwave -p ${{ secrets.DOCKERHUB_PASSWORD }} + - name: checkout + uses: actions/checkout@v4 + with: + ref: dev + - name: deploy + env: + MONGO_PASSWORD_DEV: ${{ secrets.MONGO_PASSWORD_DEV }} + run: docker stack deploy --with-registry-auth -c ./.deploy/deploy-dev.yaml infra-development diff --git a/.gitea/workflows/deploy-prod.yaml b/.gitea/workflows/deploy-prod.yaml new file mode 100644 index 0000000..c6ad1c5 --- /dev/null +++ b/.gitea/workflows/deploy-prod.yaml @@ -0,0 +1,43 @@ +name: Deploy Prod + +on: + pull_request: + branches: + - prod + types: [closed] + +jobs: + build: + name: Build + runs-on: [ dev ] + steps: + - name: login + run: docker login -u mathwave -p ${{ secrets.DOCKERHUB_PASSWORD }} + - name: checkout + uses: actions/checkout@v4 + with: + ref: prod + - name: build + run: docker build -t mathwave/sprint-repo:queues-py3-grpc . + push: + name: Push + runs-on: [ dev ] + needs: build + steps: + - name: push + run: docker push mathwave/sprint-repo:queues-py3-grpc + deploy-prod: + name: Deploy prod + runs-on: [prod] + needs: push + steps: + - name: login + run: docker login -u mathwave -p ${{ secrets.DOCKERHUB_PASSWORD }} + - name: checkout + uses: actions/checkout@v4 + with: + ref: prod + - name: deploy + env: + MONGO_PASSWORD_PROD: ${{ secrets.MONGO_PASSWORD_PROD }} + run: docker stack deploy --with-registry-auth -c ./.deploy/deploy-prod.yaml infra diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..7375914 --- /dev/null +++ b/.gitignore @@ -0,0 +1,121 @@ +# Django # +*.log +*.pot +*.pyc +__pycache__ +db.sqlite3 +media +data +*/__pycache__ + +# Backup files # +*.bak + +# If you are using PyCharm # +.idea +.idea/**/workspace.xml +.idea/**/tasks.xml +.idea/dictionaries +.idea/**/dataSources/ +.idea/**/dataSources.ids +.idea/**/dataSources.xml +.idea/**/dataSources.local.xml +.idea/**/sqlDataSources.xml +.idea/**/dynamic.xml +.idea/**/uiDesigner.xml +.idea/**/gradle.xml +.idea/**/libraries +*.iws /out/ + +# Python # +*.py[cod] +*$py.class + +# Distribution / packaging +.Python build/ +develop-eggs/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +*.egg-info/ +.installed.cfg +*.egg +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.coverage +.coverage.* +.cache +.pytest_cache/ +nosetests.xml +coverage.xml +*.cover +.hypothesis/ +postgres-data + +# Jupyter Notebook +.ipynb_checkpoints + +# pyenv +.python-version + +# celery +celerybeat-schedule.* + +# SageMath parsed files +*.sage.py + +# Environments +.venv +env/ +ENV/ +venv/ +env.bak/ +venv.bak/ + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ + +# Sublime Text # +*.tmlanguage.cache +*.tmPreferences.cache +*.stTheme.cache +*.sublime-workspace +*.sublime-project + +# sftp configuration file +sftp-config.json + +# Package control specific files Package +Control.last-run +Control.ca-list +Control.ca-bundle +Control.system-ca-bundle +GitHub.sublime-settings + +# Visual Studio Code # +.vscode +.vscode/* +!.vscode/settings.json +!.vscode/tasks.json +!.vscode/launch.json +!.vscode/extensions.json +.history + +*pb2* \ No newline at end of file diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..b139e67 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,12 @@ +FROM python:3.12 + +RUN mkdir /usr/src/app +WORKDIR /usr/src/app + +COPY requirements.txt requirements.txt +RUN pip install -r requirements.txt + +COPY . . +RUN make gen + +ENTRYPOINT ["make", "run"] \ No newline at end of file diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..18b875d --- /dev/null +++ b/Makefile @@ -0,0 +1,4 @@ +gen: + python -m grpc_tools.protoc --proto_path schemas --python_out=. --pyi_out=. --grpc_python_out=. ./schemas/tasks.proto +run: + python ./server.py \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..d1155a6 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,14 @@ +annotated-types==0.7.0 +bson==0.5.10 +dnspython==2.7.0 +grpcio==1.68.1 +grpcio-tools==1.68.1 +motor==3.6.0 +protobuf==5.29.1 +pydantic==2.10.3 +pydantic_core==2.27.1 +pymongo==4.9.2 +python-dateutil==2.9.0.post0 +setuptools==75.6.0 +six==1.17.0 +typing_extensions==4.12.2 diff --git a/schemas/tasks.proto b/schemas/tasks.proto new file mode 100644 index 0000000..fcbd6dc --- /dev/null +++ b/schemas/tasks.proto @@ -0,0 +1,40 @@ +syntax = "proto3"; + +package queues; + +import "google/protobuf/struct.proto"; + +service Tasks { + rpc Put (PutRequest) returns (EmptyResponse) {} + + rpc Take (TakeRequest) returns (TakeResponse) {} + + rpc Finish (FinishRequest) returns (EmptyResponse) {} +} + +message Task { + string id = 1; + int64 attempt = 2; + google.protobuf.Struct payload = 3; +} + +message PutRequest { + string queue = 1; + int64 seconds_to_execute = 2; + optional int64 delay = 3; + google.protobuf.Struct payload = 4; +} + +message TakeRequest { + string queue = 1; +} + +message FinishRequest { + string id = 1; +} + +message EmptyResponse {} + +message TakeResponse { + optional Task task = 1; +} diff --git a/server.py b/server.py new file mode 100644 index 0000000..5496f66 --- /dev/null +++ b/server.py @@ -0,0 +1,60 @@ +import asyncio +import datetime +import grpc +import bson + +import tasks_pb2 +import tasks_pb2_grpc + +from utils import time +from storage.mongo import tasks + + +def get_feature(feature_db, point): + """Returns Feature at given location or None.""" + for feature in feature_db: + if feature.location == point: + return feature + return None + + +class TasksServicer(tasks_pb2_grpc.TasksServicer): + async def Put(self, request: tasks_pb2.PutRequest, context) -> tasks_pb2.EmptyResponse: + now = time.now() + await tasks.add_task( + task=tasks.Task( + queue=request.queue, + payload=request.payload, + put_at=now, + available_from=(now + datetime.timedelta(seconds=request.delay)) if request.delay else now, + seconds_to_execute=request.seconds_to_execute, + ), + ) + return tasks_pb2.EmptyResponse() + + async def Take(self, request: tasks_pb2.TakeRequest, context) -> tasks_pb2.TakeResponse: + task = await tasks.take_task(request.queue) + if not task: + return tasks_pb2.TakeResponse(task=None) + return tasks_pb2.TakeResponse(task=tasks_pb2.Task(id=str(task._id), attempt=task.attempts, payload=task.payload)) + + async def Finish(self, request: tasks_pb2.FinishRequest, context) -> tasks_pb2.EmptyResponse: + if await tasks.finish_task(bson.ObjectId(request.id)): + return tasks_pb2.EmptyResponse + context.set_code(grpc.StatusCode.NOT_FOUND) + context.set_details('Task not found!') + return context + + +async def serve(): + server = grpc.aio.server() + tasks_pb2_grpc.add_TasksServicer_to_server( + TasksServicer(), server + ) + server.add_insecure_port("[::]:50051") + await server.start() + await server.wait_for_termination() + + +if __name__ == "__main__": + asyncio.run(serve()) \ No newline at end of file diff --git a/storage/__init__.py b/storage/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/storage/mongo/__init__.py b/storage/mongo/__init__.py new file mode 100644 index 0000000..d1c6175 --- /dev/null +++ b/storage/mongo/__init__.py @@ -0,0 +1,21 @@ +import os +import motor +import motor.motor_asyncio +import pymongo + + +MONGO_HOST = os.getenv('MONGO_HOST', 'localhost') +MONGO_PASSWORD = os.getenv('MONGO_PASSWORD', 'password') + +CONNECTION_STRING = f'mongodb://mongo:{MONGO_PASSWORD}@{MONGO_HOST}:27017/' + + +database: 'motor.MotorDatabase' = motor.motor_asyncio.AsyncIOMotorClient(CONNECTION_STRING).queues + +def create_indexes(): + client = pymongo.MongoClient(CONNECTION_STRING) + database = client.get_database('queues') + database.get_collection('tasks').create_index([ + ('queue', 1), + ('available_from', 1), + ]) diff --git a/storage/mongo/tasks.py b/storage/mongo/tasks.py new file mode 100644 index 0000000..81cb453 --- /dev/null +++ b/storage/mongo/tasks.py @@ -0,0 +1,42 @@ +import bson +import datetime +import pydantic + +from storage.mongo import database +from utils import time +from bson import codec_options + + +collection = database.get_collection("tasks", codec_options=codec_options.CodecOptions(tz_aware=True)) + + +class Task(pydantic.BaseModel): + queue: str + payload: dict + put_at: pydantic.AwareDatetime + available_from: pydantic.AwareDatetime + seconds_to_execute: int + _id: bson.ObjectId|None = None + taken_at: pydantic.AwareDatetime|None = None + attempts: int = 0 + + +async def add_task(task: Task) -> str: + result = await collection.insert_one(task.model_dump()) + return result.inserted_id + + +async def take_task(queue: str) -> Task|None: + now = time.now() + async for raw_task in collection.find({'queue': queue, 'available_from': {'$lte': now}}): + task = Task.model_validate(raw_task) + task._id = raw_task['_id'] + if not task.taken_at or (task.taken_at + datetime.timedelta(seconds=task.seconds_to_execute)) < now: + await collection.update_one({'_id': task._id}, {'$set': {'taken_at': now, 'attempts': task.attempts + 1}}) + return task + return None + + +async def finish_task(id: bson.ObjectId) -> bool: + result = await collection.delete_one({'_id': id}) + return result diff --git a/utils/time.py b/utils/time.py new file mode 100644 index 0000000..b51c065 --- /dev/null +++ b/utils/time.py @@ -0,0 +1,4 @@ +import datetime + + +now = lambda: datetime.datetime.now(datetime.UTC)