import pydantic from telebot import apihelper from daemons import base from utils import platform from utils import queues class Message(pydantic.BaseModel): project: str name: str body: dict method: str = 'send_message' class Daemon(base.Daemon, queues.TasksHandlerMixin): def execute(self): self.poll() @property def queue_name(self): return 'botalka_mailbox' def process(self, payload: dict): message = Message.model_validate(payload) bot = platform.platform_client.get_config('bots')[message.project][message.name] if not bot['mailbox_enabled']: return if bot['type'] == 'telegram': token = bot['secrets']['telegram_token'] self.process_telegram(token, message.method, message.body) else: print('Unknown bot type:', bot['type']) def process_telegram(self, token, method, payload): try: getattr(apihelper, method)(token, **payload) except Exception as exc: print('Error', str(exc))