From 4b63faabef1b86ec459d8bf4953bb9759b005633 Mon Sep 17 00:00:00 2001 From: emmatveev Date: Tue, 3 Dec 2024 14:09:58 +0300 Subject: [PATCH 1/2] Update daemons/worker.py --- daemons/worker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/daemons/worker.py b/daemons/worker.py index 8c09bb7..06e43f3 100644 --- a/daemons/worker.py +++ b/daemons/worker.py @@ -118,9 +118,9 @@ class Daemon(base.BaseDaemon, queues.TasksHandlerMixin): set_values(message.chat.id, state="set_probability") def process(self, payload): + message: Message = Message.de_json(json.dumps(payload)) if not message.text: return - message: Message = Message.de_json(json.dumps(payload)) if message.text.startswith('/'): self.process_command(message) return From 5b9c1a18b7a75e0e43f3c1ccc4db642b358c79c1 Mon Sep 17 00:00:00 2001 From: emmatveev Date: Sun, 8 Dec 2024 21:12:32 +0300 Subject: [PATCH 2/2] grpc --- .gitignore | 4 +- .gitlab-ci.yml | 51 ------------------- Dockerfile | 1 + Makefile | 7 +++ api.py => daemons/api.py | 2 +- daemons/base.py | 19 ++++++- updater.py => daemons/updater.py | 2 +- daemons/worker.py | 8 +-- launch.json | 0 local_platform.json | 5 -- main.py | 2 +- settings.py | 10 ---- mongo.py => utils/mongo.py | 8 ++- utils/queues.py | 45 +++++++--------- .../sprint_platform.py | 0 storage.py => utils/storage.py | 9 ++-- 16 files changed, 67 insertions(+), 106 deletions(-) delete mode 100644 .gitlab-ci.yml create mode 100644 Makefile rename api.py => daemons/api.py (97%) rename updater.py => daemons/updater.py (97%) delete mode 100644 launch.json delete mode 100644 local_platform.json delete mode 100644 settings.py rename mongo.py => utils/mongo.py (84%) rename sprint_platform.py => utils/sprint_platform.py (100%) rename storage.py => utils/storage.py (88%) diff --git a/.gitignore b/.gitignore index 92785dd..b6e7b52 100644 --- a/.gitignore +++ b/.gitignore @@ -116,4 +116,6 @@ GitHub.sublime-settings !.vscode/tasks.json !.vscode/launch.json !.vscode/extensions.json -.history \ No newline at end of file +.history + +*pb2* diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml deleted file mode 100644 index 15d2536..0000000 --- a/.gitlab-ci.yml +++ /dev/null @@ -1,51 +0,0 @@ -stages: - - build - - deploy-dev - - deploy-prod - -build: - stage: build - tags: - - dev - before_script: - - docker login -u mathwave -p $DOCKERHUB_PASSWORD - script: - - docker build -t mathwave/sprint-repo:pizda-bot . - -push: - stage: build - tags: - - dev - before_script: - - docker login -u mathwave -p $DOCKERHUB_PASSWORD - script: - - docker push mathwave/sprint-repo:pizda-bot - -.deploy: - before_script: - - docker login -u mathwave -p $DOCKERHUB_PASSWORD - -deploy-dev: - extends: - - .deploy - stage: deploy-dev - tags: - - dev - rules: - - if: '$CI_COMMIT_BRANCH == "master"' - when: on_success - - when: manual - script: - - docker stack deploy --with-registry-auth -c ./.deploy/deploy-dev.yaml pizda-bot - -deploy-prod: - extends: - - .deploy - stage: deploy-prod - tags: - - prod - only: - - master - when: manual - script: - - docker stack deploy --with-registry-auth -c ./.deploy/deploy-prod.yaml pizda-bot diff --git a/Dockerfile b/Dockerfile index 3c5568e..dca16b3 100644 --- a/Dockerfile +++ b/Dockerfile @@ -4,5 +4,6 @@ WORKDIR /usr/src/app COPY requirements.txt requirements.txt RUN pip install -r requirements.txt COPY . . +RUN make gen ENV PYTHONUNBUFFERED 1 ENTRYPOINT ["python", "main.py"] \ No newline at end of file diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..f071d0d --- /dev/null +++ b/Makefile @@ -0,0 +1,7 @@ +gen: + pip install grpcio grpcio-tools + curl https://platform.sprinthub.ru/generator >> generator.py + python generator.py + rm generator.py +run: + python ./server.py \ No newline at end of file diff --git a/api.py b/daemons/api.py similarity index 97% rename from api.py rename to daemons/api.py index 4b70f59..6302c28 100644 --- a/api.py +++ b/daemons/api.py @@ -2,7 +2,7 @@ from collections import defaultdict from flask import Flask -from mongo import mongo +from utils.mongo import mongo app = Flask(__name__) diff --git a/daemons/base.py b/daemons/base.py index 8bd8646..7f32f71 100644 --- a/daemons/base.py +++ b/daemons/base.py @@ -1,3 +1,18 @@ -class BaseDaemon: +import os +import grpc +from queues import tasks_pb2_grpc + + +stage = os.getenv("STAGE", 'local') +if stage == 'local': + QUEUES_URL = 'localhost:50051' +else: + QUEUES_URL = 'queues-grpc:50051' + + +class Daemon: + def __init__(self): + self.channel = grpc.insecure_channel(QUEUES_URL) + self.stub = tasks_pb2_grpc.TasksStub(channel=self.channel) def execute(self): - raise NotImplementedError + raise NotImplemented diff --git a/updater.py b/daemons/updater.py similarity index 97% rename from updater.py rename to daemons/updater.py index 914e2a4..e7b1ef9 100644 --- a/updater.py +++ b/daemons/updater.py @@ -2,7 +2,7 @@ import datetime import random from time import sleep -from mongo import mongo +from utils.mongo import mongo def update(): diff --git a/daemons/worker.py b/daemons/worker.py index 06e43f3..522de68 100644 --- a/daemons/worker.py +++ b/daemons/worker.py @@ -6,9 +6,9 @@ from utils import queues from telebot.types import Message import json -from mongo import mongo -from sprint_platform import PlatformClient -from storage import set_values, get_chat_info +from utils.mongo import mongo +from utils.sprint_platform import PlatformClient +from utils.storage import set_values, get_chat_info security_token = os.getenv("PLATFORM_SECURITY_TOKEN") stage = os.getenv("STAGE", 'local') @@ -49,6 +49,7 @@ class Daemon(base.BaseDaemon, queues.TasksHandlerMixin): def reply(self, text: str, chat_id: int, message_id: int): queues.set_task( + self.stub, 'botalka_mailbox', { 'project': 'pizda-bot', @@ -64,6 +65,7 @@ class Daemon(base.BaseDaemon, queues.TasksHandlerMixin): def send(self, text: str, chat_id: int): queues.set_task( + self.stub, 'botalka_mailbox', { 'project': 'pizda-bot', diff --git a/launch.json b/launch.json deleted file mode 100644 index e69de29..0000000 diff --git a/local_platform.json b/local_platform.json deleted file mode 100644 index 2f9654d..0000000 --- a/local_platform.json +++ /dev/null @@ -1,5 +0,0 @@ -{ - "configs": {}, - "experiments": {}, - "platform_staff": {} -} \ No newline at end of file diff --git a/main.py b/main.py index f31884a..254d14f 100644 --- a/main.py +++ b/main.py @@ -6,7 +6,7 @@ if arg == 'worker': from daemons import worker daemon = worker.Daemon() else: - from api import app + from daemons.api import app app.run(host="0.0.0.0", port=1238) daemon.execute() diff --git a/settings.py b/settings.py deleted file mode 100644 index 9fa7f48..0000000 --- a/settings.py +++ /dev/null @@ -1,10 +0,0 @@ -import os -import sys -import zoneinfo - -MONGO_USER = os.getenv("MONGO_USER", "mongo") -MONGO_PASSWORD = os.getenv("MONGO_PASSWORD", "password") -MONGO_HOST = os.getenv("MONGO_HOST", "localhost") - -CACHE_SIZE = int(os.getenv("CACHE_SIZE", 1000)) -CACHE_TTL = int(os.getenv("CACHE_TTL", 3600)) diff --git a/mongo.py b/utils/mongo.py similarity index 84% rename from mongo.py rename to utils/mongo.py index fe206df..ef19da4 100644 --- a/mongo.py +++ b/utils/mongo.py @@ -1,10 +1,14 @@ import pymongo -import settings +import os + +MONGO_USER = os.getenv("MONGO_USER", "mongo") +MONGO_PASSWORD = os.getenv("MONGO_PASSWORD", "password") +MONGO_HOST = os.getenv("MONGO_HOST", "localhost") class Mongo: def __init__(self): - url = f"mongodb://{settings.MONGO_USER}:{settings.MONGO_PASSWORD}@{settings.MONGO_HOST}:27017/" + url = f"mongodb://{MONGO_USER}:{MONGO_PASSWORD}@{MONGO_HOST}:27017/" self.client = pymongo.MongoClient(url) self.database = self.client.get_database("pizda-bot") self.chats_collection.create_index([ diff --git a/utils/queues.py b/utils/queues.py index 030bf34..9701634 100644 --- a/utils/queues.py +++ b/utils/queues.py @@ -1,14 +1,8 @@ -import json -import os -import requests import time +from queues import tasks_pb2_grpc +from queues import tasks_pb2 - -stage = os.getenv("STAGE", 'local') -if stage == 'local': - QUEUES_URL = 'http://localhost:1239' -else: - QUEUES_URL = 'http://queues:1239' +from google.protobuf import json_format class QueuesException(Exception): @@ -18,22 +12,21 @@ class QueuesException(Exception): class TasksHandlerMixin: def poll(self): while True: - response = requests.get(f'{QUEUES_URL}/api/v1/take', headers={'queue': self.queue_name}).json() - task = response.get('task') + response: tasks_pb2.TakeResponse = self.stub.Take(tasks_pb2.TakeRequest(queue=self.queue_name)) + task = response.task if not task: time.sleep(0.2) continue try: - self.process(task['payload']) + payload = json_format.MessageToDict(task.payload) + self.process(payload) except Exception as exc: - print(f'Error processing message id={task["id"]}, payload={task["payload"]}, exc={exc}') + print(f'Error processing message id={task.id}, payload={payload}, exc={exc}') continue try: - resp = requests.post(f'{QUEUES_URL}/api/v1/finish', json={'id': task['id']}) - if resp.status_code != 202: - raise QueuesException + self.stub.Finish(tasks_pb2.FinishRequest(id=task.id)) except: - print(f'Failed to finish task id={task["id"]}') + print(f'Failed to finish task id={task.id}') @property def queue_name(self): @@ -42,12 +35,12 @@ class TasksHandlerMixin: def process(self, payload): raise NotImplemented - -def set_task(queue_name: str, payload: dict, seconds_to_execute: int, delay: int|None = None): - resp = requests.post(f'{QUEUES_URL}/api/v1/put', headers={'queue': queue_name}, json={ - 'payload': payload, - 'seconds_to_execute': seconds_to_execute, - 'delay': delay, - }) - if resp.status_code != 202: - raise QueuesException +def set_task(stub: tasks_pb2_grpc.TasksStub, queue_name: str, payload: dict, seconds_to_execute: int, delay: int|None = None): + stub.Put( + tasks_pb2.PutRequest( + queue=queue_name, + seconds_to_execute=seconds_to_execute, + delay=delay, + payload=payload + ) + ) \ No newline at end of file diff --git a/sprint_platform.py b/utils/sprint_platform.py similarity index 100% rename from sprint_platform.py rename to utils/sprint_platform.py diff --git a/storage.py b/utils/storage.py similarity index 88% rename from storage.py rename to utils/storage.py index ed7c18d..2c5438d 100644 --- a/storage.py +++ b/utils/storage.py @@ -1,9 +1,12 @@ from cachetools import TTLCache +import os -import settings -from mongo import mongo +from utils.mongo import mongo -cache = TTLCache(settings.CACHE_SIZE, settings.CACHE_TTL) +CACHE_SIZE = int(os.getenv("CACHE_SIZE", 1000)) +CACHE_TTL = int(os.getenv("CACHE_TTL", 3600)) + +cache = TTLCache(CACHE_SIZE, CACHE_TTL) def get_chat_info(chat_id: int) -> dict: