diff --git a/.deploy/deploy-dev.yaml b/.deploy/deploy-dev.yaml index 227877a..4c14344 100644 --- a/.deploy/deploy-dev.yaml +++ b/.deploy/deploy-dev.yaml @@ -22,6 +22,7 @@ services: networks: - configurator - queues-development + - locks-development environment: STAGE: "development" command: mailbox @@ -38,3 +39,5 @@ networks: external: true queues-development: external: true + locks-development: + external: true diff --git a/.deploy/deploy-prod.yaml b/.deploy/deploy-prod.yaml index 0043ede..9c4870d 100644 --- a/.deploy/deploy-prod.yaml +++ b/.deploy/deploy-prod.yaml @@ -22,6 +22,7 @@ services: networks: - configurator - queues + - locks environment: STAGE: "production" command: mailbox @@ -38,3 +39,5 @@ networks: external: true queues: external: true + locks: + external: true diff --git a/daemons/mailbox.py b/daemons/mailbox.py index 2fc236e..2589c6c 100644 --- a/daemons/mailbox.py +++ b/daemons/mailbox.py @@ -5,6 +5,7 @@ from telebot import apihelper from daemons import base from utils import platform from utils import queues +from utils import locks class Message(pydantic.BaseModel): @@ -22,6 +23,12 @@ class Daemon(base.Daemon, queues.TasksHandlerMixin): def queue_name(self): return 'botalka_mailbox' + def before_execute(self, task: dict): + locks.acquire(task['id']) + + def after_execute(self, task: dict): + locks.release(task['id']) + def process(self, payload: dict): message = Message.model_validate(payload) bot = platform.platform_client.get_config('bots')[message.project][message.name] diff --git a/utils/locks.py b/utils/locks.py new file mode 100644 index 0000000..f916013 --- /dev/null +++ b/utils/locks.py @@ -0,0 +1,27 @@ +import requests + + +LOCKS_URL = 'http://locks' + + +class Conflict(Exception): + pass + + +def acquire(name: str, ttl: int = 1): + resp = requests.post(f'{LOCKS_URL}/api/v1/acquire', json={ + 'name': name, + 'ttl': ttl, + }) + if resp.status_code == 409: + raise Conflict + if resp.status_code != 202: + raise Exception + + +def release(name: str): + resp = requests.post(f'{LOCKS_URL}/api/v1/release', json={ + 'name': name, + }) + if resp.status_code != 202: + raise Exception diff --git a/utils/queues.py b/utils/queues.py index c00a18c..556e3a8 100644 --- a/utils/queues.py +++ b/utils/queues.py @@ -23,6 +23,7 @@ class TasksHandlerMixin: time.sleep(0.2) continue try: + self.before_execute(task) self.process(task['payload']) except Exception as exc: print(f'Error processing message id={task["id"]}, payload={task["payload"]}, exc={exc}') @@ -31,9 +32,16 @@ class TasksHandlerMixin: resp = requests.post(f'{QUEUES_URL}/api/v1/finish', json={'id': task['id']}) if resp.status_code != 202: raise QueuesException + self.after_execute(task) except: print(f'Failed to finish task id={task["id"]}') + def before_execute(self, task: dict): + pass + + def after_execute(self, task: dict): + pass + @property def queue_name(self): raise NotImplemented