diff --git a/.deploy/deploy-dev.yaml b/.deploy/deploy-dev.yaml index a9abe02..efcb71b 100644 --- a/.deploy/deploy-dev.yaml +++ b/.deploy/deploy-dev.yaml @@ -46,8 +46,29 @@ services: parallelism: 1 order: start-first + bot: + image: mathwave/sprint-repo:b-jokes + environment: + MONGO_HOST: "mongo.develop.sprinthub.ru" + MONGO_PASSWORD: $MONGO_PASSWORD_DEV + command: bot + networks: + - configurator + - queues-development + deploy: + mode: replicated + restart_policy: + condition: any + update_config: + parallelism: 1 + order: start-first + networks: b-jokes-net: driver: overlay common-infra-nginx: - external: true \ No newline at end of file + external: true + configurator: + external: true + queues-development: + external: true diff --git a/.deploy/deploy-prod.yaml b/.deploy/deploy-prod.yaml index 7dd5055..60305b2 100644 --- a/.deploy/deploy-prod.yaml +++ b/.deploy/deploy-prod.yaml @@ -60,8 +60,29 @@ services: parallelism: 1 order: start-first + bot: + image: mathwave/sprint-repo:b-jokes + environment: + MONGO_HOST: "mongo.sprinthub.ru" + MONGO_PASSWORD: $MONGO_PASSWORD_PROD + command: bot + networks: + - configurator + - queues + deploy: + mode: replicated + restart_policy: + condition: any + update_config: + parallelism: 1 + order: start-first + networks: b-jokes-net: driver: overlay common-infra-nginx: external: true + configurator: + external: true + queues: + external: true diff --git a/helpers/configurator.py b/helpers/configurator.py new file mode 100644 index 0000000..54dd496 --- /dev/null +++ b/helpers/configurator.py @@ -0,0 +1,84 @@ +import json +import urllib.parse +from threading import Thread +from time import sleep + +from requests import get + + +class ConfiguratorClient: + def __init__(self, app_name: str, stage: str, need_poll: bool = True): + self.app_name = app_name + self.stage = stage + self.endpoint = 'http://configurator/' + self.fetch_url = urllib.parse.urljoin(self.endpoint, '/api/v1/fetch') + self.config_storage = {} + self.experiment_storage = {} + self.staff_storage = {} + self.poll_data() + if need_poll: + self.poll_data_in_thread() + + def poll_data_in_thread(self): + def inner(): + while True: + sleep(30) + self.fetch() + + Thread(target=inner, daemon=True).start() + + def poll_data(self): + self.fetch(with_exception=True) + + def request_with_retries(self, url, params, with_exception=False, retries_count=3): + exception_to_throw = None + for _ in range(retries_count): + try: + response = get( + url, + params=params + ) + if response.status_code == 200: + return response.json() + print(f'Failed to request {url}, status_code={response.status_code}') + exception_to_throw = Exception('Not 200 status') + except Exception as exc: + print(exc) + exception_to_throw = exc + sleep(1) + print(f'Failed fetching with retries: {url}, {params}') + if with_exception: + raise exception_to_throw + + def fetch(self, with_exception=False): + if self.stage == 'local': + local_platform = json.loads(open('local_platform.json', 'r').read()) + self.config_storage = local_platform['configs'] + self.experiment_storage = local_platform['experiments'] + self.staff_storage = { + key: set(value) + for key, value in local_platform['platform_staff'].items() + } + return + response_data = self.request_with_retries(self.fetch_url, { + 'project': self.app_name, + 'stage': self.stage, + }, with_exception) + self.config_storage = response_data['configs'] + self.experiment_storage = response_data['experiments'] + self.staff_storage = { + key: set(value) + for key, value in response_data['platform_staff'].items() + } + + def is_staff(self, **kwargs): + for key, value in kwargs.items(): + if value in self.staff_storage[key]: + return True + return False + + def get_config(self, name): + return self.config_storage[name] + + def get_experiment(self, name): + return self.experiment_storage[name] diff --git a/helpers/queues.py b/helpers/queues.py new file mode 100644 index 0000000..e8054a7 --- /dev/null +++ b/helpers/queues.py @@ -0,0 +1,85 @@ +from concurrent.futures import ThreadPoolExecutor +import datetime +import os +import zoneinfo +import requests +import time + + +stage = os.getenv("STAGE", 'local') +if stage == 'local': + QUEUES_URL = 'http://localhost:1239' +else: + QUEUES_URL = 'http://queues:1239' + + +class QueuesException(Exception): + ... + + +class TasksHandlerMixin: + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.executor = ThreadPoolExecutor(max_workers=1) + + def _send_metric(self, start: datetime.datetime, end: datetime.datetime, success: bool): + def send(): + requests.post(f'{QUEUES_URL}/api/v1/metric', json={ + 'service': 'b-jokes', + 'queue': self.queue_name, + 'success': success, + 'timestamp': start.strftime("%Y-%m-%dT%H:%M:%S") + "Z", + "success": success, + "execution_time_ms": (end - start).microseconds // 1000, + "environment": stage, + }) + + self.executor.submit(send) + + def poll(self): + while True: + try: + response = requests.get(f'{QUEUES_URL}/api/v1/take', headers={'queue': self.queue_name}).json() + except requests.JSONDecodeError: + print('Unable to decode json') + time.sleep(3) + continue + task = response.get('task') + if not task: + time.sleep(0.2) + continue + start = datetime.datetime.now(zoneinfo.ZoneInfo("Europe/Moscow")) + try: + print(f'process task with id {task["id"]}, attempt {task["attempt"]}') + self.process(task['payload']) + success = True + except Exception as exc: + print(f'Error processing message id={task["id"]}, payload={task["payload"]}, exc={exc}') + success = False + end = datetime.datetime.now(zoneinfo.ZoneInfo("Europe/Moscow")) + if success: + try: + resp = requests.post(f'{QUEUES_URL}/api/v1/finish', json={'id': task['id']}) + if resp.status_code != 202: + raise QueuesException + print(f'finish task with id {task["id"]}') + except: + print(f'Failed to finish task id={task["id"]}') + self._send_metric(start, end, success) + + @property + def queue_name(self): + raise NotImplemented + + def process(self, payload): + raise NotImplemented + + +def set_task(queue_name: str, payload: dict, seconds_to_execute: int, delay: int|None = None): + resp = requests.post(f'{QUEUES_URL}/api/v1/put', headers={'queue': queue_name}, json={ + 'payload': payload, + 'seconds_to_execute': seconds_to_execute, + 'delay': delay, + }) + if resp.status_code != 202: + raise QueuesException diff --git a/main.py b/main.py index e481dfa..4f46461 100644 --- a/main.py +++ b/main.py @@ -1,5 +1,9 @@ +import os from flask import Flask, request, make_response +from helpers.configurator import ConfiguratorClient +from helpers.jokes import get_random +from helpers.queues import TasksHandlerMixin, set_task import settings from helpers.events import events from processor import Processor @@ -38,3 +42,35 @@ def run(): a = 1 / 0 app.run(host="0.0.0.0", port=8000, debug=settings.DEBUG) + + +def bot(): + configurator = ConfiguratorClient("b-jokes", os.getenv("STAGE", "local")) + class Bot(TasksHandlerMixin): + @property + def queue_name(self): + return "b_jokes_worker" + + def process(self, payload): + text = payload.get('text') + if not text: + return + for word in configurator.get_config('words'): + if word in text: + mes = 'Держи шутку!\n' + get_random() + set_task( + "botalka_mailbox", + { + 'project': 'b-jokes', + 'name': 'telegram-bot', + 'body': { + 'text': mes, + 'reply_to_message_id': payload['message_id'], + 'chat_id': payload['chat']['id'], + } + }, + 1 + ) + return + + Bot().poll()