fix #26

Merged
emmatveev merged 1 commits from master into dev 2024-12-02 21:57:10 +03:00
5 changed files with 48 additions and 0 deletions

View File

@ -22,6 +22,7 @@ services:
networks: networks:
- configurator - configurator
- queues-development - queues-development
- locks-development
environment: environment:
STAGE: "development" STAGE: "development"
command: mailbox command: mailbox
@ -38,3 +39,5 @@ networks:
external: true external: true
queues-development: queues-development:
external: true external: true
locks-development:
external: true

View File

@ -22,6 +22,7 @@ services:
networks: networks:
- configurator - configurator
- queues - queues
- locks
environment: environment:
STAGE: "production" STAGE: "production"
command: mailbox command: mailbox
@ -38,3 +39,5 @@ networks:
external: true external: true
queues: queues:
external: true external: true
locks:
external: true

View File

@ -5,6 +5,7 @@ from telebot import apihelper
from daemons import base from daemons import base
from utils import platform from utils import platform
from utils import queues from utils import queues
from utils import locks
class Message(pydantic.BaseModel): class Message(pydantic.BaseModel):
@ -22,6 +23,12 @@ class Daemon(base.Daemon, queues.TasksHandlerMixin):
def queue_name(self): def queue_name(self):
return 'botalka_mailbox' return 'botalka_mailbox'
def before_execute(self, task: dict):
locks.acquire(task['id'])
def after_execute(self, task: dict):
locks.release(task['id'])
def process(self, payload: dict): def process(self, payload: dict):
message = Message.model_validate(payload) message = Message.model_validate(payload)
bot = platform.platform_client.get_config('bots')[message.project][message.name] bot = platform.platform_client.get_config('bots')[message.project][message.name]

27
utils/locks.py Normal file
View File

@ -0,0 +1,27 @@
import requests
LOCKS_URL = 'http://locks'
class Conflict(Exception):
pass
def acquire(name: str, ttl: int = 1):
resp = requests.post(f'{LOCKS_URL}/api/v1/acquire', json={
'name': name,
'ttl': ttl,
})
if resp.status_code == 409:
raise Conflict
if resp.status_code != 202:
raise Exception
def release(name: str):
resp = requests.post(f'{LOCKS_URL}/api/v1/release', json={
'name': name,
})
if resp.status_code != 202:
raise Exception

View File

@ -23,6 +23,7 @@ class TasksHandlerMixin:
time.sleep(0.2) time.sleep(0.2)
continue continue
try: try:
self.before_execute(task)
self.process(task['payload']) self.process(task['payload'])
except Exception as exc: except Exception as exc:
print(f'Error processing message id={task["id"]}, payload={task["payload"]}, exc={exc}') print(f'Error processing message id={task["id"]}, payload={task["payload"]}, exc={exc}')
@ -31,9 +32,16 @@ class TasksHandlerMixin:
resp = requests.post(f'{QUEUES_URL}/api/v1/finish', json={'id': task['id']}) resp = requests.post(f'{QUEUES_URL}/api/v1/finish', json={'id': task['id']})
if resp.status_code != 202: if resp.status_code != 202:
raise QueuesException raise QueuesException
self.after_execute(task)
except: except:
print(f'Failed to finish task id={task["id"]}') print(f'Failed to finish task id={task["id"]}')
def before_execute(self, task: dict):
pass
def after_execute(self, task: dict):
pass
@property @property
def queue_name(self): def queue_name(self):
raise NotImplemented raise NotImplemented