Compare commits

..

No commits in common. "e02646a1a2a1abb66b7e8f171eb609fedca8fb33" and "1b7e8686e4b3ad2fcfeb21027e5b4a014f0f630f" have entirely different histories.

5 changed files with 0 additions and 48 deletions

View File

@ -22,7 +22,6 @@ services:
networks:
- configurator
- queues-development
- locks-development
environment:
STAGE: "development"
command: mailbox
@ -39,5 +38,3 @@ networks:
external: true
queues-development:
external: true
locks-development:
external: true

View File

@ -22,7 +22,6 @@ services:
networks:
- configurator
- queues
- locks
environment:
STAGE: "production"
command: mailbox
@ -39,5 +38,3 @@ networks:
external: true
queues:
external: true
locks:
external: true

View File

@ -5,7 +5,6 @@ from telebot import apihelper
from daemons import base
from utils import platform
from utils import queues
from utils import locks
class Message(pydantic.BaseModel):
@ -23,12 +22,6 @@ class Daemon(base.Daemon, queues.TasksHandlerMixin):
def queue_name(self):
return 'botalka_mailbox'
def before_execute(self, task: dict):
locks.acquire(task['id'])
def after_execute(self, task: dict):
locks.release(task['id'])
def process(self, payload: dict):
message = Message.model_validate(payload)
bot = platform.platform_client.get_config('bots')[message.project][message.name]

View File

@ -1,27 +0,0 @@
import requests
LOCKS_URL = 'http://locks'
class Conflict(Exception):
pass
def acquire(name: str, ttl: int = 1):
resp = requests.post(f'{LOCKS_URL}/api/v1/acquire', json={
'name': name,
'ttl': ttl,
})
if resp.status_code == 409:
raise Conflict
if resp.status_code != 202:
raise Exception
def release(name: str):
resp = requests.post(f'{LOCKS_URL}/api/v1/release', json={
'name': name,
})
if resp.status_code != 202:
raise Exception

View File

@ -23,7 +23,6 @@ class TasksHandlerMixin:
time.sleep(0.2)
continue
try:
self.before_execute(task)
self.process(task['payload'])
except Exception as exc:
print(f'Error processing message id={task["id"]}, payload={task["payload"]}, exc={exc}')
@ -32,16 +31,9 @@ class TasksHandlerMixin:
resp = requests.post(f'{QUEUES_URL}/api/v1/finish', json={'id': task['id']})
if resp.status_code != 202:
raise QueuesException
self.after_execute(task)
except:
print(f'Failed to finish task id={task["id"]}')
def before_execute(self, task: dict):
pass
def after_execute(self, task: dict):
pass
@property
def queue_name(self):
raise NotImplemented