fix
All checks were successful
Deploy Dev / Build (pull_request) Successful in 6s
Deploy Dev / Push (pull_request) Successful in 8s
Deploy Dev / Deploy dev (pull_request) Successful in 10s

This commit is contained in:
emmatveev 2024-12-02 21:53:24 +03:00
parent 2527524ca4
commit 784a2ed393
5 changed files with 48 additions and 0 deletions

View File

@ -22,6 +22,7 @@ services:
networks: networks:
- configurator - configurator
- queues-development - queues-development
- locks-development
environment: environment:
STAGE: "development" STAGE: "development"
command: mailbox command: mailbox
@ -38,3 +39,5 @@ networks:
external: true external: true
queues-development: queues-development:
external: true external: true
locks-development:
external: true

View File

@ -22,6 +22,7 @@ services:
networks: networks:
- configurator - configurator
- queues - queues
- locks
environment: environment:
STAGE: "production" STAGE: "production"
command: mailbox command: mailbox
@ -38,3 +39,5 @@ networks:
external: true external: true
queues: queues:
external: true external: true
locks:
external: true

View File

@ -5,6 +5,7 @@ from telebot import apihelper
from daemons import base from daemons import base
from utils import platform from utils import platform
from utils import queues from utils import queues
from utils import locks
class Message(pydantic.BaseModel): class Message(pydantic.BaseModel):
@ -22,6 +23,12 @@ class Daemon(base.Daemon, queues.TasksHandlerMixin):
def queue_name(self): def queue_name(self):
return 'botalka_mailbox' return 'botalka_mailbox'
def before_execute(self, task: dict):
locks.acquire(task['id'])
def after_execute(self, task: dict):
locks.release(task['id'])
def process(self, payload: dict): def process(self, payload: dict):
message = Message.model_validate(payload) message = Message.model_validate(payload)
bot = platform.platform_client.get_config('bots')[message.project][message.name] bot = platform.platform_client.get_config('bots')[message.project][message.name]

27
utils/locks.py Normal file
View File

@ -0,0 +1,27 @@
import requests
LOCKS_URL = 'http://locks'
class Conflict(Exception):
pass
def acquire(name: str, ttl: int = 1):
resp = requests.post(f'{LOCKS_URL}/api/v1/acquire', json={
'name': name,
'ttl': ttl,
})
if resp.status_code == 409:
raise Conflict
if resp.status_code != 202:
raise Exception
def release(name: str):
resp = requests.post(f'{LOCKS_URL}/api/v1/release', json={
'name': name,
})
if resp.status_code != 202:
raise Exception

View File

@ -23,6 +23,7 @@ class TasksHandlerMixin:
time.sleep(0.2) time.sleep(0.2)
continue continue
try: try:
self.before_execute(task)
self.process(task['payload']) self.process(task['payload'])
except Exception as exc: except Exception as exc:
print(f'Error processing message id={task["id"]}, payload={task["payload"]}, exc={exc}') print(f'Error processing message id={task["id"]}, payload={task["payload"]}, exc={exc}')
@ -31,9 +32,16 @@ class TasksHandlerMixin:
resp = requests.post(f'{QUEUES_URL}/api/v1/finish', json={'id': task['id']}) resp = requests.post(f'{QUEUES_URL}/api/v1/finish', json={'id': task['id']})
if resp.status_code != 202: if resp.status_code != 202:
raise QueuesException raise QueuesException
self.after_execute(task)
except: except:
print(f'Failed to finish task id={task["id"]}') print(f'Failed to finish task id={task["id"]}')
def before_execute(self, task: dict):
pass
def after_execute(self, task: dict):
pass
@property @property
def queue_name(self): def queue_name(self):
raise NotImplemented raise NotImplemented