diff --git a/.gitignore b/.gitignore index 0740544..05cb585 100644 --- a/.gitignore +++ b/.gitignore @@ -119,3 +119,5 @@ GitHub.sublime-settings .history local_platform.json + +*pb2* \ No newline at end of file diff --git a/Dockerfile b/Dockerfile index 5375a3d..643bf14 100644 --- a/Dockerfile +++ b/Dockerfile @@ -4,5 +4,6 @@ WORKDIR /usr/src/app COPY requirements.txt requirements.txt RUN pip install -r requirements.txt COPY . . +RUN make gen ENV PYTHONUNBUFFERED 1 ENTRYPOINT ["python", "main.py"] diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..76fde0c --- /dev/null +++ b/Makefile @@ -0,0 +1,2 @@ +gen: + python -m grpc_tools.protoc --proto_path schemas --python_out=. --pyi_out=. --grpc_python_out=. ./schemas/tasks.proto diff --git a/daemons/base.py b/daemons/base.py index bd6fd00..c917f71 100644 --- a/daemons/base.py +++ b/daemons/base.py @@ -1,3 +1,18 @@ +import os +import grpc +import tasks_pb2_grpc + + +stage = os.getenv("STAGE", 'local') +if stage == 'local': + QUEUES_URL = 'localhost:50051' +else: + QUEUES_URL = 'queues-grpc:50051' + + class Daemon: + def __init__(self): + self.channel = grpc.insecure_channel(QUEUES_URL) + self.stub = tasks_pb2_grpc.TasksStub(channel=self.channel) def execute(self): raise NotImplemented diff --git a/daemons/mailbox.py b/daemons/mailbox.py index 2589c6c..630fb29 100644 --- a/daemons/mailbox.py +++ b/daemons/mailbox.py @@ -23,12 +23,6 @@ class Daemon(base.Daemon, queues.TasksHandlerMixin): def queue_name(self): return 'botalka_mailbox' - def before_execute(self, task: dict): - locks.acquire(task['id']) - - def after_execute(self, task: dict): - locks.release(task['id']) - def process(self, payload: dict): message = Message.model_validate(payload) bot = platform.platform_client.get_config('bots')[message.project][message.name] diff --git a/daemons/poll.py b/daemons/poll.py index abf508d..a5ad36b 100644 --- a/daemons/poll.py +++ b/daemons/poll.py @@ -47,7 +47,7 @@ class Daemon(base.Daemon): def start_polling(self, bot: telebot.TeleBot, queue: str) -> threading.Thread: @bot.message_handler(content_types=['audio', 'photo', 'voice', 'video', 'document', 'animation', 'text', 'location', 'contact', 'sticker', 'video_note']) def do_action(message: telebot.types.Message): - queues.set_task(queue, message.json, 1) + queues.set_task(self.stub, queue, message.json, 1) thread = threading.Thread(target=bot.polling) thread.start() return thread diff --git a/requirements.txt b/requirements.txt index d216c75..6d2027f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,10 +1,14 @@ annotated-types==0.7.0 certifi==2024.8.30 charset-normalizer==3.4.0 +grpcio==1.68.1 +grpcio-tools==1.68.1 idna==3.10 +protobuf==5.29.1 pydantic==2.10.2 pydantic_core==2.27.1 pyTelegramBotAPI==4.1.1 requests==2.32.3 +setuptools==75.6.0 typing_extensions==4.12.2 urllib3==2.2.3 diff --git a/schemas/tasks.proto b/schemas/tasks.proto new file mode 100644 index 0000000..fcbd6dc --- /dev/null +++ b/schemas/tasks.proto @@ -0,0 +1,40 @@ +syntax = "proto3"; + +package queues; + +import "google/protobuf/struct.proto"; + +service Tasks { + rpc Put (PutRequest) returns (EmptyResponse) {} + + rpc Take (TakeRequest) returns (TakeResponse) {} + + rpc Finish (FinishRequest) returns (EmptyResponse) {} +} + +message Task { + string id = 1; + int64 attempt = 2; + google.protobuf.Struct payload = 3; +} + +message PutRequest { + string queue = 1; + int64 seconds_to_execute = 2; + optional int64 delay = 3; + google.protobuf.Struct payload = 4; +} + +message TakeRequest { + string queue = 1; +} + +message FinishRequest { + string id = 1; +} + +message EmptyResponse {} + +message TakeResponse { + optional Task task = 1; +} diff --git a/utils/locks.py b/utils/locks.py deleted file mode 100644 index f916013..0000000 --- a/utils/locks.py +++ /dev/null @@ -1,27 +0,0 @@ -import requests - - -LOCKS_URL = 'http://locks' - - -class Conflict(Exception): - pass - - -def acquire(name: str, ttl: int = 1): - resp = requests.post(f'{LOCKS_URL}/api/v1/acquire', json={ - 'name': name, - 'ttl': ttl, - }) - if resp.status_code == 409: - raise Conflict - if resp.status_code != 202: - raise Exception - - -def release(name: str): - resp = requests.post(f'{LOCKS_URL}/api/v1/release', json={ - 'name': name, - }) - if resp.status_code != 202: - raise Exception diff --git a/utils/queues.py b/utils/queues.py index 556e3a8..3f520ea 100644 --- a/utils/queues.py +++ b/utils/queues.py @@ -1,13 +1,6 @@ -import os -import requests import time - - -stage = os.getenv("STAGE", 'local') -if stage == 'local': - QUEUES_URL = 'http://localhost:1239' -else: - QUEUES_URL = 'http://queues:1239' +import tasks_pb2_grpc +import tasks_pb2 class QueuesException(Exception): @@ -17,30 +10,20 @@ class QueuesException(Exception): class TasksHandlerMixin: def poll(self): while True: - response = requests.get(f'{QUEUES_URL}/api/v1/take', headers={'queue': self.queue_name}).json() - task = response.get('task') + response: tasks_pb2.TakeResponse = self.stub.Take(tasks_pb2.TakeRequest(self.queue_name)) + task: tasks_pb2.Task = response.task if not task: time.sleep(0.2) continue try: - self.before_execute(task) - self.process(task['payload']) + self.process(task.payload) except Exception as exc: print(f'Error processing message id={task["id"]}, payload={task["payload"]}, exc={exc}') continue try: - resp = requests.post(f'{QUEUES_URL}/api/v1/finish', json={'id': task['id']}) - if resp.status_code != 202: - raise QueuesException - self.after_execute(task) + self.stub.Finish(tasks_pb2.FinishRequest(task.id)) except: - print(f'Failed to finish task id={task["id"]}') - - def before_execute(self, task: dict): - pass - - def after_execute(self, task: dict): - pass + print(f'Failed to finish task id={task.id}') @property def queue_name(self): @@ -49,12 +32,12 @@ class TasksHandlerMixin: def process(self, payload): raise NotImplemented - -def set_task(queue_name: str, payload: dict, seconds_to_execute: int, delay: int|None = None): - resp = requests.post(f'{QUEUES_URL}/api/v1/put', headers={'queue': queue_name}, json={ - 'payload': payload, - 'seconds_to_execute': seconds_to_execute, - 'delay': delay, - }) - if resp.status_code != 202: - raise QueuesException +def set_task(stub: tasks_pb2_grpc.TasksStub, queue_name: str, payload: dict, seconds_to_execute: int, delay: int|None = None): + stub.Put( + tasks_pb2.PutRequest( + queue=queue_name, + seconds_to_execute=seconds_to_execute, + delay=delay, + payload=payload + ) + )