diff --git a/.gitignore b/.gitignore index 0740544..cc58c17 100644 --- a/.gitignore +++ b/.gitignore @@ -119,3 +119,6 @@ GitHub.sublime-settings .history local_platform.json + +*pb2* +schemas diff --git a/Dockerfile b/Dockerfile index 5375a3d..643bf14 100644 --- a/Dockerfile +++ b/Dockerfile @@ -4,5 +4,6 @@ WORKDIR /usr/src/app COPY requirements.txt requirements.txt RUN pip install -r requirements.txt COPY . . +RUN make gen ENV PYTHONUNBUFFERED 1 ENTRYPOINT ["python", "main.py"] diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..b971207 --- /dev/null +++ b/Makefile @@ -0,0 +1,6 @@ +gen: + curl https://platform.sprinthub.ru/generator >> generator.py + python generator.py + rm generator.py +run: + python ./server.py \ No newline at end of file diff --git a/daemons/base.py b/daemons/base.py index bd6fd00..7f32f71 100644 --- a/daemons/base.py +++ b/daemons/base.py @@ -1,3 +1,18 @@ +import os +import grpc +from queues import tasks_pb2_grpc + + +stage = os.getenv("STAGE", 'local') +if stage == 'local': + QUEUES_URL = 'localhost:50051' +else: + QUEUES_URL = 'queues-grpc:50051' + + class Daemon: + def __init__(self): + self.channel = grpc.insecure_channel(QUEUES_URL) + self.stub = tasks_pb2_grpc.TasksStub(channel=self.channel) def execute(self): raise NotImplemented diff --git a/daemons/poll.py b/daemons/poll.py index abf508d..eb84fd4 100644 --- a/daemons/poll.py +++ b/daemons/poll.py @@ -9,6 +9,7 @@ from utils import queues class Daemon(base.Daemon): def __init__(self): + super().__init__() self.telegram_bots: dict[str, dict[str, telebot.TeleBot|None]] = {} self.threads: dict[str, dict[str, threading.Thread|None]] = {} @@ -47,7 +48,7 @@ class Daemon(base.Daemon): def start_polling(self, bot: telebot.TeleBot, queue: str) -> threading.Thread: @bot.message_handler(content_types=['audio', 'photo', 'voice', 'video', 'document', 'animation', 'text', 'location', 'contact', 'sticker', 'video_note']) def do_action(message: telebot.types.Message): - queues.set_task(queue, message.json, 1) + queues.set_task(self.stub, queue, message.json, 1) thread = threading.Thread(target=bot.polling) thread.start() return thread diff --git a/requirements.txt b/requirements.txt index d216c75..6d2027f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,10 +1,14 @@ annotated-types==0.7.0 certifi==2024.8.30 charset-normalizer==3.4.0 +grpcio==1.68.1 +grpcio-tools==1.68.1 idna==3.10 +protobuf==5.29.1 pydantic==2.10.2 pydantic_core==2.27.1 pyTelegramBotAPI==4.1.1 requests==2.32.3 +setuptools==75.6.0 typing_extensions==4.12.2 urllib3==2.2.3 diff --git a/utils/queues.py b/utils/queues.py index c00a18c..d346ffb 100644 --- a/utils/queues.py +++ b/utils/queues.py @@ -1,13 +1,8 @@ -import os -import requests import time +from queues import tasks_pb2_grpc +from queues import tasks_pb2 - -stage = os.getenv("STAGE", 'local') -if stage == 'local': - QUEUES_URL = 'http://localhost:1239' -else: - QUEUES_URL = 'http://queues:1239' +from google.protobuf import json_format class QueuesException(Exception): @@ -17,22 +12,21 @@ class QueuesException(Exception): class TasksHandlerMixin: def poll(self): while True: - response = requests.get(f'{QUEUES_URL}/api/v1/take', headers={'queue': self.queue_name}).json() - task = response.get('task') + response: tasks_pb2.TakeResponse = self.stub.Take(tasks_pb2.TakeRequest(queue=self.queue_name)) + task = response.task if not task: time.sleep(0.2) continue try: - self.process(task['payload']) + payload = json_format.MessageToDict(task.payload) + self.process(payload) except Exception as exc: - print(f'Error processing message id={task["id"]}, payload={task["payload"]}, exc={exc}') + print(f'Error processing message id={task.id}, payload={payload}, exc={exc}') continue try: - resp = requests.post(f'{QUEUES_URL}/api/v1/finish', json={'id': task['id']}) - if resp.status_code != 202: - raise QueuesException + self.stub.Finish(tasks_pb2.FinishRequest(id=task.id)) except: - print(f'Failed to finish task id={task["id"]}') + print(f'Failed to finish task id={task.id}') @property def queue_name(self): @@ -41,12 +35,12 @@ class TasksHandlerMixin: def process(self, payload): raise NotImplemented - -def set_task(queue_name: str, payload: dict, seconds_to_execute: int, delay: int|None = None): - resp = requests.post(f'{QUEUES_URL}/api/v1/put', headers={'queue': queue_name}, json={ - 'payload': payload, - 'seconds_to_execute': seconds_to_execute, - 'delay': delay, - }) - if resp.status_code != 202: - raise QueuesException +def set_task(stub: tasks_pb2_grpc.TasksStub, queue_name: str, payload: dict, seconds_to_execute: int, delay: int|None = None): + stub.Put( + tasks_pb2.PutRequest( + queue=queue_name, + seconds_to_execute=seconds_to_execute, + delay=delay, + payload=payload + ) + )