botalka/daemons/mailbox.py
emmatveev df2a7c3bb1
All checks were successful
Deploy Dev / Build (pull_request) Successful in 6s
Deploy Dev / Push (pull_request) Successful in 8s
Deploy Dev / Deploy dev (pull_request) Successful in 10s
Update daemons/mailbox.py
2024-12-02 22:38:50 +03:00

45 lines
1.2 KiB
Python

import pydantic
from telebot import apihelper
from daemons import base
from utils import platform
from utils import queues
from utils import locks
class Message(pydantic.BaseModel):
project: str
name: str
body: dict
method: str = 'send_message'
class Daemon(base.Daemon, queues.TasksHandlerMixin):
def execute(self):
self.poll()
@property
def queue_name(self):
return 'botalka_mailbox'
def before_execute(self, task: dict):
locks.acquire(task['id'], 60)
def process(self, payload: dict):
message = Message.model_validate(payload)
bot = platform.platform_client.get_config('bots')[message.project][message.name]
if not bot['mailbox_enabled']:
return
if bot['type'] == 'telegram':
token = bot['secrets']['telegram_token']
self.process_telegram(token, message.method, message.body)
else:
print('Unknown bot type:', bot['type'])
def process_telegram(self, token, method, payload):
try:
getattr(apihelper, method)(token, **payload)
except Exception as exc:
print('Error', str(exc))