From 784a2ed3932b8f4f990d12aba74ee27d48028fe0 Mon Sep 17 00:00:00 2001 From: emmatveev Date: Mon, 2 Dec 2024 21:53:24 +0300 Subject: [PATCH 01/13] fix --- .deploy/deploy-dev.yaml | 3 +++ .deploy/deploy-prod.yaml | 3 +++ daemons/mailbox.py | 7 +++++++ utils/locks.py | 27 +++++++++++++++++++++++++++ utils/queues.py | 8 ++++++++ 5 files changed, 48 insertions(+) create mode 100644 utils/locks.py diff --git a/.deploy/deploy-dev.yaml b/.deploy/deploy-dev.yaml index 227877a..4c14344 100644 --- a/.deploy/deploy-dev.yaml +++ b/.deploy/deploy-dev.yaml @@ -22,6 +22,7 @@ services: networks: - configurator - queues-development + - locks-development environment: STAGE: "development" command: mailbox @@ -38,3 +39,5 @@ networks: external: true queues-development: external: true + locks-development: + external: true diff --git a/.deploy/deploy-prod.yaml b/.deploy/deploy-prod.yaml index 0043ede..9c4870d 100644 --- a/.deploy/deploy-prod.yaml +++ b/.deploy/deploy-prod.yaml @@ -22,6 +22,7 @@ services: networks: - configurator - queues + - locks environment: STAGE: "production" command: mailbox @@ -38,3 +39,5 @@ networks: external: true queues: external: true + locks: + external: true diff --git a/daemons/mailbox.py b/daemons/mailbox.py index 2fc236e..2589c6c 100644 --- a/daemons/mailbox.py +++ b/daemons/mailbox.py @@ -5,6 +5,7 @@ from telebot import apihelper from daemons import base from utils import platform from utils import queues +from utils import locks class Message(pydantic.BaseModel): @@ -22,6 +23,12 @@ class Daemon(base.Daemon, queues.TasksHandlerMixin): def queue_name(self): return 'botalka_mailbox' + def before_execute(self, task: dict): + locks.acquire(task['id']) + + def after_execute(self, task: dict): + locks.release(task['id']) + def process(self, payload: dict): message = Message.model_validate(payload) bot = platform.platform_client.get_config('bots')[message.project][message.name] diff --git a/utils/locks.py b/utils/locks.py new file mode 100644 index 0000000..f916013 --- /dev/null +++ b/utils/locks.py @@ -0,0 +1,27 @@ +import requests + + +LOCKS_URL = 'http://locks' + + +class Conflict(Exception): + pass + + +def acquire(name: str, ttl: int = 1): + resp = requests.post(f'{LOCKS_URL}/api/v1/acquire', json={ + 'name': name, + 'ttl': ttl, + }) + if resp.status_code == 409: + raise Conflict + if resp.status_code != 202: + raise Exception + + +def release(name: str): + resp = requests.post(f'{LOCKS_URL}/api/v1/release', json={ + 'name': name, + }) + if resp.status_code != 202: + raise Exception diff --git a/utils/queues.py b/utils/queues.py index c00a18c..556e3a8 100644 --- a/utils/queues.py +++ b/utils/queues.py @@ -23,6 +23,7 @@ class TasksHandlerMixin: time.sleep(0.2) continue try: + self.before_execute(task) self.process(task['payload']) except Exception as exc: print(f'Error processing message id={task["id"]}, payload={task["payload"]}, exc={exc}') @@ -31,9 +32,16 @@ class TasksHandlerMixin: resp = requests.post(f'{QUEUES_URL}/api/v1/finish', json={'id': task['id']}) if resp.status_code != 202: raise QueuesException + self.after_execute(task) except: print(f'Failed to finish task id={task["id"]}') + def before_execute(self, task: dict): + pass + + def after_execute(self, task: dict): + pass + @property def queue_name(self): raise NotImplemented From df2a7c3bb18ce80940e7ba32ff699d218b1244a6 Mon Sep 17 00:00:00 2001 From: emmatveev Date: Mon, 2 Dec 2024 22:38:50 +0300 Subject: [PATCH 02/13] Update daemons/mailbox.py --- daemons/mailbox.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/daemons/mailbox.py b/daemons/mailbox.py index 2589c6c..3e25e47 100644 --- a/daemons/mailbox.py +++ b/daemons/mailbox.py @@ -24,10 +24,7 @@ class Daemon(base.Daemon, queues.TasksHandlerMixin): return 'botalka_mailbox' def before_execute(self, task: dict): - locks.acquire(task['id']) - - def after_execute(self, task: dict): - locks.release(task['id']) + locks.acquire(task['id'], 60) def process(self, payload: dict): message = Message.model_validate(payload) From b3e44c4532f05a484e0e9ab9eea70af20be17cfc Mon Sep 17 00:00:00 2001 From: emmatveev Date: Sun, 8 Dec 2024 11:20:54 +0300 Subject: [PATCH 03/13] grpc --- .gitignore | 2 ++ Dockerfile | 1 + Makefile | 2 ++ daemons/base.py | 15 ++++++++++++++ daemons/mailbox.py | 6 ------ daemons/poll.py | 2 +- requirements.txt | 4 ++++ schemas/tasks.proto | 40 ++++++++++++++++++++++++++++++++++++ utils/locks.py | 27 ------------------------- utils/queues.py | 49 +++++++++++++++------------------------------ 10 files changed, 81 insertions(+), 67 deletions(-) create mode 100644 Makefile create mode 100644 schemas/tasks.proto delete mode 100644 utils/locks.py diff --git a/.gitignore b/.gitignore index 0740544..05cb585 100644 --- a/.gitignore +++ b/.gitignore @@ -119,3 +119,5 @@ GitHub.sublime-settings .history local_platform.json + +*pb2* \ No newline at end of file diff --git a/Dockerfile b/Dockerfile index 5375a3d..643bf14 100644 --- a/Dockerfile +++ b/Dockerfile @@ -4,5 +4,6 @@ WORKDIR /usr/src/app COPY requirements.txt requirements.txt RUN pip install -r requirements.txt COPY . . +RUN make gen ENV PYTHONUNBUFFERED 1 ENTRYPOINT ["python", "main.py"] diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..76fde0c --- /dev/null +++ b/Makefile @@ -0,0 +1,2 @@ +gen: + python -m grpc_tools.protoc --proto_path schemas --python_out=. --pyi_out=. --grpc_python_out=. ./schemas/tasks.proto diff --git a/daemons/base.py b/daemons/base.py index bd6fd00..c917f71 100644 --- a/daemons/base.py +++ b/daemons/base.py @@ -1,3 +1,18 @@ +import os +import grpc +import tasks_pb2_grpc + + +stage = os.getenv("STAGE", 'local') +if stage == 'local': + QUEUES_URL = 'localhost:50051' +else: + QUEUES_URL = 'queues-grpc:50051' + + class Daemon: + def __init__(self): + self.channel = grpc.insecure_channel(QUEUES_URL) + self.stub = tasks_pb2_grpc.TasksStub(channel=self.channel) def execute(self): raise NotImplemented diff --git a/daemons/mailbox.py b/daemons/mailbox.py index 2589c6c..630fb29 100644 --- a/daemons/mailbox.py +++ b/daemons/mailbox.py @@ -23,12 +23,6 @@ class Daemon(base.Daemon, queues.TasksHandlerMixin): def queue_name(self): return 'botalka_mailbox' - def before_execute(self, task: dict): - locks.acquire(task['id']) - - def after_execute(self, task: dict): - locks.release(task['id']) - def process(self, payload: dict): message = Message.model_validate(payload) bot = platform.platform_client.get_config('bots')[message.project][message.name] diff --git a/daemons/poll.py b/daemons/poll.py index abf508d..a5ad36b 100644 --- a/daemons/poll.py +++ b/daemons/poll.py @@ -47,7 +47,7 @@ class Daemon(base.Daemon): def start_polling(self, bot: telebot.TeleBot, queue: str) -> threading.Thread: @bot.message_handler(content_types=['audio', 'photo', 'voice', 'video', 'document', 'animation', 'text', 'location', 'contact', 'sticker', 'video_note']) def do_action(message: telebot.types.Message): - queues.set_task(queue, message.json, 1) + queues.set_task(self.stub, queue, message.json, 1) thread = threading.Thread(target=bot.polling) thread.start() return thread diff --git a/requirements.txt b/requirements.txt index d216c75..6d2027f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,10 +1,14 @@ annotated-types==0.7.0 certifi==2024.8.30 charset-normalizer==3.4.0 +grpcio==1.68.1 +grpcio-tools==1.68.1 idna==3.10 +protobuf==5.29.1 pydantic==2.10.2 pydantic_core==2.27.1 pyTelegramBotAPI==4.1.1 requests==2.32.3 +setuptools==75.6.0 typing_extensions==4.12.2 urllib3==2.2.3 diff --git a/schemas/tasks.proto b/schemas/tasks.proto new file mode 100644 index 0000000..fcbd6dc --- /dev/null +++ b/schemas/tasks.proto @@ -0,0 +1,40 @@ +syntax = "proto3"; + +package queues; + +import "google/protobuf/struct.proto"; + +service Tasks { + rpc Put (PutRequest) returns (EmptyResponse) {} + + rpc Take (TakeRequest) returns (TakeResponse) {} + + rpc Finish (FinishRequest) returns (EmptyResponse) {} +} + +message Task { + string id = 1; + int64 attempt = 2; + google.protobuf.Struct payload = 3; +} + +message PutRequest { + string queue = 1; + int64 seconds_to_execute = 2; + optional int64 delay = 3; + google.protobuf.Struct payload = 4; +} + +message TakeRequest { + string queue = 1; +} + +message FinishRequest { + string id = 1; +} + +message EmptyResponse {} + +message TakeResponse { + optional Task task = 1; +} diff --git a/utils/locks.py b/utils/locks.py deleted file mode 100644 index f916013..0000000 --- a/utils/locks.py +++ /dev/null @@ -1,27 +0,0 @@ -import requests - - -LOCKS_URL = 'http://locks' - - -class Conflict(Exception): - pass - - -def acquire(name: str, ttl: int = 1): - resp = requests.post(f'{LOCKS_URL}/api/v1/acquire', json={ - 'name': name, - 'ttl': ttl, - }) - if resp.status_code == 409: - raise Conflict - if resp.status_code != 202: - raise Exception - - -def release(name: str): - resp = requests.post(f'{LOCKS_URL}/api/v1/release', json={ - 'name': name, - }) - if resp.status_code != 202: - raise Exception diff --git a/utils/queues.py b/utils/queues.py index 556e3a8..3f520ea 100644 --- a/utils/queues.py +++ b/utils/queues.py @@ -1,13 +1,6 @@ -import os -import requests import time - - -stage = os.getenv("STAGE", 'local') -if stage == 'local': - QUEUES_URL = 'http://localhost:1239' -else: - QUEUES_URL = 'http://queues:1239' +import tasks_pb2_grpc +import tasks_pb2 class QueuesException(Exception): @@ -17,30 +10,20 @@ class QueuesException(Exception): class TasksHandlerMixin: def poll(self): while True: - response = requests.get(f'{QUEUES_URL}/api/v1/take', headers={'queue': self.queue_name}).json() - task = response.get('task') + response: tasks_pb2.TakeResponse = self.stub.Take(tasks_pb2.TakeRequest(self.queue_name)) + task: tasks_pb2.Task = response.task if not task: time.sleep(0.2) continue try: - self.before_execute(task) - self.process(task['payload']) + self.process(task.payload) except Exception as exc: print(f'Error processing message id={task["id"]}, payload={task["payload"]}, exc={exc}') continue try: - resp = requests.post(f'{QUEUES_URL}/api/v1/finish', json={'id': task['id']}) - if resp.status_code != 202: - raise QueuesException - self.after_execute(task) + self.stub.Finish(tasks_pb2.FinishRequest(task.id)) except: - print(f'Failed to finish task id={task["id"]}') - - def before_execute(self, task: dict): - pass - - def after_execute(self, task: dict): - pass + print(f'Failed to finish task id={task.id}') @property def queue_name(self): @@ -49,12 +32,12 @@ class TasksHandlerMixin: def process(self, payload): raise NotImplemented - -def set_task(queue_name: str, payload: dict, seconds_to_execute: int, delay: int|None = None): - resp = requests.post(f'{QUEUES_URL}/api/v1/put', headers={'queue': queue_name}, json={ - 'payload': payload, - 'seconds_to_execute': seconds_to_execute, - 'delay': delay, - }) - if resp.status_code != 202: - raise QueuesException +def set_task(stub: tasks_pb2_grpc.TasksStub, queue_name: str, payload: dict, seconds_to_execute: int, delay: int|None = None): + stub.Put( + tasks_pb2.PutRequest( + queue=queue_name, + seconds_to_execute=seconds_to_execute, + delay=delay, + payload=payload + ) + ) From ec2c39ed35d2df219135b45f2a6eae193c8b494f Mon Sep 17 00:00:00 2001 From: emmatveev Date: Sun, 8 Dec 2024 11:26:07 +0300 Subject: [PATCH 04/13] fix --- utils/queues.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/utils/queues.py b/utils/queues.py index 3f520ea..bbae082 100644 --- a/utils/queues.py +++ b/utils/queues.py @@ -10,7 +10,7 @@ class QueuesException(Exception): class TasksHandlerMixin: def poll(self): while True: - response: tasks_pb2.TakeResponse = self.stub.Take(tasks_pb2.TakeRequest(self.queue_name)) + response: tasks_pb2.TakeResponse = self.stub.Take(tasks_pb2.TakeRequest(queue=self.queue_name)) task: tasks_pb2.Task = response.task if not task: time.sleep(0.2) @@ -21,7 +21,7 @@ class TasksHandlerMixin: print(f'Error processing message id={task["id"]}, payload={task["payload"]}, exc={exc}') continue try: - self.stub.Finish(tasks_pb2.FinishRequest(task.id)) + self.stub.Finish(tasks_pb2.FinishRequest(id=task.id)) except: print(f'Failed to finish task id={task.id}') From 92fe0c19997cfdc5b2a0e902f2d8ea268a15b385 Mon Sep 17 00:00:00 2001 From: emmatveev Date: Sun, 8 Dec 2024 11:34:08 +0300 Subject: [PATCH 05/13] fix --- utils/queues.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/utils/queues.py b/utils/queues.py index bbae082..43ead5e 100644 --- a/utils/queues.py +++ b/utils/queues.py @@ -2,6 +2,8 @@ import time import tasks_pb2_grpc import tasks_pb2 +from google.protobuf import json_format + class QueuesException(Exception): ... @@ -16,7 +18,7 @@ class TasksHandlerMixin: time.sleep(0.2) continue try: - self.process(task.payload) + self.process(json_format.MessageToDict(task.payload)) except Exception as exc: print(f'Error processing message id={task["id"]}, payload={task["payload"]}, exc={exc}') continue From bb4c6d4748ee56cb0fe10ee83d61b76f8196c41b Mon Sep 17 00:00:00 2001 From: emmatveev Date: Sun, 8 Dec 2024 11:35:54 +0300 Subject: [PATCH 06/13] fix --- utils/queues.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/utils/queues.py b/utils/queues.py index 43ead5e..c1028ff 100644 --- a/utils/queues.py +++ b/utils/queues.py @@ -18,9 +18,10 @@ class TasksHandlerMixin: time.sleep(0.2) continue try: - self.process(json_format.MessageToDict(task.payload)) + payload = json_format.MessageToDict(task.payload) + self.process(payload) except Exception as exc: - print(f'Error processing message id={task["id"]}, payload={task["payload"]}, exc={exc}') + print(f'Error processing message id={task.id}, payload={payload}, exc={exc}') continue try: self.stub.Finish(tasks_pb2.FinishRequest(id=task.id)) From b4dd0a4b7bde5c4d50de0f8b75403ee09d20e6f4 Mon Sep 17 00:00:00 2001 From: emmatveev Date: Sun, 8 Dec 2024 11:48:41 +0300 Subject: [PATCH 07/13] fix --- utils/queues.py | 1 + 1 file changed, 1 insertion(+) diff --git a/utils/queues.py b/utils/queues.py index c1028ff..85a4e63 100644 --- a/utils/queues.py +++ b/utils/queues.py @@ -14,6 +14,7 @@ class TasksHandlerMixin: while True: response: tasks_pb2.TakeResponse = self.stub.Take(tasks_pb2.TakeRequest(queue=self.queue_name)) task: tasks_pb2.Task = response.task + print(task) if not task: time.sleep(0.2) continue From 0dbae621abf5e97af056771f29d0ea8c8a0bd759 Mon Sep 17 00:00:00 2001 From: emmatveev Date: Sun, 8 Dec 2024 12:05:54 +0300 Subject: [PATCH 08/13] fix --- utils/queues.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/utils/queues.py b/utils/queues.py index 85a4e63..84338fd 100644 --- a/utils/queues.py +++ b/utils/queues.py @@ -13,9 +13,8 @@ class TasksHandlerMixin: def poll(self): while True: response: tasks_pb2.TakeResponse = self.stub.Take(tasks_pb2.TakeRequest(queue=self.queue_name)) - task: tasks_pb2.Task = response.task - print(task) - if not task: + task = response.task + if not task.id: time.sleep(0.2) continue try: From 98d69fff70c29edc422dc104e5636c5aa8be1b12 Mon Sep 17 00:00:00 2001 From: emmatveev Date: Sun, 8 Dec 2024 13:43:04 +0300 Subject: [PATCH 09/13] fix --- daemons/poll.py | 1 + utils/queues.py | 3 +-- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/daemons/poll.py b/daemons/poll.py index a5ad36b..c703c7f 100644 --- a/daemons/poll.py +++ b/daemons/poll.py @@ -9,6 +9,7 @@ from utils import queues class Daemon(base.Daemon): def __init__(self): + super().__init__(self) self.telegram_bots: dict[str, dict[str, telebot.TeleBot|None]] = {} self.threads: dict[str, dict[str, threading.Thread|None]] = {} diff --git a/utils/queues.py b/utils/queues.py index 84338fd..5fdae54 100644 --- a/utils/queues.py +++ b/utils/queues.py @@ -13,8 +13,7 @@ class TasksHandlerMixin: def poll(self): while True: response: tasks_pb2.TakeResponse = self.stub.Take(tasks_pb2.TakeRequest(queue=self.queue_name)) - task = response.task - if not task.id: + if not response.task: time.sleep(0.2) continue try: From 0b6dc3af1aaf103225fa856842299c581c2a468e Mon Sep 17 00:00:00 2001 From: emmatveev Date: Sun, 8 Dec 2024 13:45:11 +0300 Subject: [PATCH 10/13] fix --- daemons/poll.py | 2 +- utils/queues.py | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/daemons/poll.py b/daemons/poll.py index c703c7f..eb84fd4 100644 --- a/daemons/poll.py +++ b/daemons/poll.py @@ -9,7 +9,7 @@ from utils import queues class Daemon(base.Daemon): def __init__(self): - super().__init__(self) + super().__init__() self.telegram_bots: dict[str, dict[str, telebot.TeleBot|None]] = {} self.threads: dict[str, dict[str, threading.Thread|None]] = {} diff --git a/utils/queues.py b/utils/queues.py index 5fdae54..29aee61 100644 --- a/utils/queues.py +++ b/utils/queues.py @@ -13,7 +13,8 @@ class TasksHandlerMixin: def poll(self): while True: response: tasks_pb2.TakeResponse = self.stub.Take(tasks_pb2.TakeRequest(queue=self.queue_name)) - if not response.task: + task = response.task + if not task: time.sleep(0.2) continue try: From 5dbacec1b03051550bc789066f23738207b5fb85 Mon Sep 17 00:00:00 2001 From: emmatveev Date: Sun, 8 Dec 2024 19:40:13 +0300 Subject: [PATCH 11/13] fix --- .gitignore | 3 ++- Makefile | 6 +++++- schemas/tasks.proto | 40 ---------------------------------------- utils/queues.py | 4 ++-- 4 files changed, 9 insertions(+), 44 deletions(-) delete mode 100644 schemas/tasks.proto diff --git a/.gitignore b/.gitignore index 05cb585..cc58c17 100644 --- a/.gitignore +++ b/.gitignore @@ -120,4 +120,5 @@ GitHub.sublime-settings local_platform.json -*pb2* \ No newline at end of file +*pb2* +schemas diff --git a/Makefile b/Makefile index 76fde0c..b971207 100644 --- a/Makefile +++ b/Makefile @@ -1,2 +1,6 @@ gen: - python -m grpc_tools.protoc --proto_path schemas --python_out=. --pyi_out=. --grpc_python_out=. ./schemas/tasks.proto + curl https://platform.sprinthub.ru/generator >> generator.py + python generator.py + rm generator.py +run: + python ./server.py \ No newline at end of file diff --git a/schemas/tasks.proto b/schemas/tasks.proto deleted file mode 100644 index fcbd6dc..0000000 --- a/schemas/tasks.proto +++ /dev/null @@ -1,40 +0,0 @@ -syntax = "proto3"; - -package queues; - -import "google/protobuf/struct.proto"; - -service Tasks { - rpc Put (PutRequest) returns (EmptyResponse) {} - - rpc Take (TakeRequest) returns (TakeResponse) {} - - rpc Finish (FinishRequest) returns (EmptyResponse) {} -} - -message Task { - string id = 1; - int64 attempt = 2; - google.protobuf.Struct payload = 3; -} - -message PutRequest { - string queue = 1; - int64 seconds_to_execute = 2; - optional int64 delay = 3; - google.protobuf.Struct payload = 4; -} - -message TakeRequest { - string queue = 1; -} - -message FinishRequest { - string id = 1; -} - -message EmptyResponse {} - -message TakeResponse { - optional Task task = 1; -} diff --git a/utils/queues.py b/utils/queues.py index 29aee61..d346ffb 100644 --- a/utils/queues.py +++ b/utils/queues.py @@ -1,6 +1,6 @@ import time -import tasks_pb2_grpc -import tasks_pb2 +from queues import tasks_pb2_grpc +from queues import tasks_pb2 from google.protobuf import json_format From 73cc7b2c03572dadf4bcfdfe6e318b65b2eb4d97 Mon Sep 17 00:00:00 2001 From: emmatveev Date: Sun, 8 Dec 2024 19:42:19 +0300 Subject: [PATCH 12/13] fix --- daemons/base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/daemons/base.py b/daemons/base.py index c917f71..7f32f71 100644 --- a/daemons/base.py +++ b/daemons/base.py @@ -1,6 +1,6 @@ import os import grpc -import tasks_pb2_grpc +from queues import tasks_pb2_grpc stage = os.getenv("STAGE", 'local') From 7ed7afbef43cc43b28da4e7133b39efe99682ad1 Mon Sep 17 00:00:00 2001 From: emmatveev Date: Sun, 8 Dec 2024 19:56:33 +0300 Subject: [PATCH 13/13] fix --- .deploy/deploy-dev.yaml | 3 --- .deploy/deploy-prod.yaml | 3 --- 2 files changed, 6 deletions(-) diff --git a/.deploy/deploy-dev.yaml b/.deploy/deploy-dev.yaml index 4c14344..227877a 100644 --- a/.deploy/deploy-dev.yaml +++ b/.deploy/deploy-dev.yaml @@ -22,7 +22,6 @@ services: networks: - configurator - queues-development - - locks-development environment: STAGE: "development" command: mailbox @@ -39,5 +38,3 @@ networks: external: true queues-development: external: true - locks-development: - external: true diff --git a/.deploy/deploy-prod.yaml b/.deploy/deploy-prod.yaml index 9c4870d..0043ede 100644 --- a/.deploy/deploy-prod.yaml +++ b/.deploy/deploy-prod.yaml @@ -22,7 +22,6 @@ services: networks: - configurator - queues - - locks environment: STAGE: "production" command: mailbox @@ -39,5 +38,3 @@ networks: external: true queues: external: true - locks: - external: true