commit 55f6582b006fd585f56e6445774d0685d8b8d30a Author: emmatveev Date: Wed Nov 27 04:04:19 2024 +0300 initial diff --git a/.deploy/deploy-dev.yaml b/.deploy/deploy-dev.yaml new file mode 100644 index 0000000..227877a --- /dev/null +++ b/.deploy/deploy-dev.yaml @@ -0,0 +1,40 @@ +version: "3.4" + + +services: + poll: + image: mathwave/sprint-repo:botalka + networks: + - configurator + - queues-development + environment: + STAGE: "development" + command: poll + deploy: + mode: replicated + restart_policy: + condition: any + update_config: + parallelism: 1 + order: start-first + mailbox: + image: mathwave/sprint-repo:botalka + networks: + - configurator + - queues-development + environment: + STAGE: "development" + command: mailbox + deploy: + mode: replicated + restart_policy: + condition: any + update_config: + parallelism: 1 + order: start-first + +networks: + configurator: + external: true + queues-development: + external: true diff --git a/.deploy/deploy-prod.yaml b/.deploy/deploy-prod.yaml new file mode 100644 index 0000000..0043ede --- /dev/null +++ b/.deploy/deploy-prod.yaml @@ -0,0 +1,40 @@ +version: "3.4" + + +services: + poll: + image: mathwave/sprint-repo:botalka + networks: + - configurator + - queues + environment: + STAGE: "production" + command: poll + deploy: + mode: replicated + restart_policy: + condition: any + update_config: + parallelism: 1 + order: start-first + mailbox: + image: mathwave/sprint-repo:botalka + networks: + - configurator + - queues + environment: + STAGE: "production" + command: mailbox + deploy: + mode: replicated + restart_policy: + condition: any + update_config: + parallelism: 1 + order: start-first + +networks: + configurator: + external: true + queues: + external: true diff --git a/.gitea/workflows/deploy-dev.yaml b/.gitea/workflows/deploy-dev.yaml new file mode 100644 index 0000000..96ecb74 --- /dev/null +++ b/.gitea/workflows/deploy-dev.yaml @@ -0,0 +1,41 @@ +name: Deploy Dev + +on: + pull_request: + branches: + - dev + types: [closed] + +jobs: + build: + name: Build + runs-on: [ dev ] + steps: + - name: login + run: docker login -u mathwave -p ${{ secrets.DOCKERHUB_PASSWORD }} + - name: checkout + uses: actions/checkout@v4 + with: + ref: dev + - name: build + run: docker build -t mathwave/sprint-repo:botalka . + push: + name: Push + runs-on: [ dev ] + needs: build + steps: + - name: push + run: docker push mathwave/sprint-repo:botalka + deploy-dev: + name: Deploy dev + runs-on: [prod] + needs: push + steps: + - name: login + run: docker login -u mathwave -p ${{ secrets.DOCKERHUB_PASSWORD }} + - name: checkout + uses: actions/checkout@v4 + with: + ref: dev + - name: deploy + run: docker stack deploy --with-registry-auth -c ./.deploy/deploy-dev.yaml botalka-development diff --git a/.gitea/workflows/deploy-prod.yaml b/.gitea/workflows/deploy-prod.yaml new file mode 100644 index 0000000..ff6463c --- /dev/null +++ b/.gitea/workflows/deploy-prod.yaml @@ -0,0 +1,41 @@ +name: Deploy Prod + +on: + pull_request: + branches: + - prod + types: [closed] + +jobs: + build: + name: Build + runs-on: [ dev ] + steps: + - name: login + run: docker login -u mathwave -p ${{ secrets.DOCKERHUB_PASSWORD }} + - name: checkout + uses: actions/checkout@v4 + with: + ref: prod + - name: build + run: docker build -t mathwave/sprint-repo:botalka . + push: + name: Push + runs-on: [ dev ] + needs: build + steps: + - name: push + run: docker push mathwave/sprint-repo:botalka + deploy-prod: + name: Deploy prod + runs-on: [prod] + needs: push + steps: + - name: login + run: docker login -u mathwave -p ${{ secrets.DOCKERHUB_PASSWORD }} + - name: checkout + uses: actions/checkout@v4 + with: + ref: prod + - name: deploy + run: docker stack deploy --with-registry-auth -c ./.deploy/deploy-prod.yaml botalka diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..849d3ca --- /dev/null +++ b/.gitignore @@ -0,0 +1,119 @@ +# Django # +*.log +*.pot +*.pyc +__pycache__ +db.sqlite3 +media +data +*/__pycache__ + +# Backup files # +*.bak + +# If you are using PyCharm # +.idea +.idea/**/workspace.xml +.idea/**/tasks.xml +.idea/dictionaries +.idea/**/dataSources/ +.idea/**/dataSources.ids +.idea/**/dataSources.xml +.idea/**/dataSources.local.xml +.idea/**/sqlDataSources.xml +.idea/**/dynamic.xml +.idea/**/uiDesigner.xml +.idea/**/gradle.xml +.idea/**/libraries +*.iws /out/ + +# Python # +*.py[cod] +*$py.class + +# Distribution / packaging +.Python build/ +develop-eggs/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +*.egg-info/ +.installed.cfg +*.egg +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.coverage +.coverage.* +.cache +.pytest_cache/ +nosetests.xml +coverage.xml +*.cover +.hypothesis/ +postgres-data + +# Jupyter Notebook +.ipynb_checkpoints + +# pyenv +.python-version + +# celery +celerybeat-schedule.* + +# SageMath parsed files +*.sage.py + +# Environments +.venv +env/ +ENV/ +venv/ +env.bak/ +venv.bak/ + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ + +# Sublime Text # +*.tmlanguage.cache +*.tmPreferences.cache +*.stTheme.cache +*.sublime-workspace +*.sublime-project + +# sftp configuration file +sftp-config.json + +# Package control specific files Package +Control.last-run +Control.ca-list +Control.ca-bundle +Control.system-ca-bundle +GitHub.sublime-settings + +# Visual Studio Code # +.vscode +.vscode/* +!.vscode/settings.json +!.vscode/tasks.json +!.vscode/launch.json +!.vscode/extensions.json +.history diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..90f9ad2 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,7 @@ +FROM python:3.12 +RUN mkdir /usr/src/app +WORKDIR /usr/src/app +COPY requirements.txt requirements.txt +RUN pip install -r requirements.txt +COPY . . +ENTRYPOINT ["python", "entrypoint.py"] diff --git a/daemons/__init__.py b/daemons/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/daemons/base.py b/daemons/base.py new file mode 100644 index 0000000..7201992 --- /dev/null +++ b/daemons/base.py @@ -0,0 +1,3 @@ +class Base: + def execute(self): + raise NotImplemented diff --git a/daemons/mailbox.py b/daemons/mailbox.py new file mode 100644 index 0000000..9d531a4 --- /dev/null +++ b/daemons/mailbox.py @@ -0,0 +1,28 @@ +from telebot import apihelper + +from daemons import base +from utils import platform +from utils import queues + + +class Daemon(base.Daemon, queues.TasksHandlerMixin): + def execute(self): + self.poll() + + @property + def queue_name(self): + return 'botalka_mailbox' + + def process(self, payload): + bot = platform.platform_client.get_config('bots')[payload['project']][payload['name']] + if not bot['mailbox_enabled']: + return + if bot['type'] == 'telegram': + token = bot['secrets']['telegram_token'] + self.process_telegram(token, payload['body']) + + def process_telegram(self, token, payload): + try: + apihelper.send_message(token, **payload) + except Exception as exc: + print('Error', str(exc)) diff --git a/daemons/poll.py b/daemons/poll.py new file mode 100644 index 0000000..362ec52 --- /dev/null +++ b/daemons/poll.py @@ -0,0 +1,42 @@ +import telebot +import multiprocessing +import time + +from daemons import base +from utils import platform +from utils import queues + + +class Daemon(base.Daemon): + def __init__(self): + self.telegram_pollers: dict[str, dict[str, multiprocessing.Process|None]] = {} + + def execute(self): + bots = platform.platform_client.get_config('bots') + for project_name, project in bots.items(): + if project_name not in self.telegram_pollers: + self.telegram_pollers[project_name] = {} + for bot_name, bot_info in project.items(): + if bot_name not in self.telegram_pollers[project_name]: + self.telegram_pollers[project_name][bot_name] = None + process = self.telegram_pollers[project_name][bot_name] + if bot_info.get('poll_enabled'): + if process is not None and process.is_alive: + continue + new_process = multiprocessing.Process(target=self.start_polling, args=[bot_info['secrets']['telegram_token'], bot_info['queue']]) + new_process.start() + self.telegram_pollers[project_name][bot_name] = new_process + else: + if process is None: + continue + if process.is_alive: + process.terminate() + self.telegram_pollers[project_name][bot_name] = None + time.sleep(10) + + def start_polling(telegram_token, queue): + bot = telebot.TeleBot(telegram_token) + @bot.message_handler() + def do_action(message): + queues.set_task(queue, message.json, 1) + bot.polling() diff --git a/main.py b/main.py new file mode 100644 index 0000000..49ce915 --- /dev/null +++ b/main.py @@ -0,0 +1,15 @@ +import sys + + +arg = sys.argv[-1] + +if arg == "poll": + print("poll is starting") + from daemons.poll import Daemon +elif arg == 'mailbox': + print("mailbox is starting") + from daemons.mailbox import Daemon +else: + raise ValueError(f"Unknown param {arg}") + +Daemon().execute() diff --git a/utils/__init__.py b/utils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/utils/platform.py b/utils/platform.py new file mode 100644 index 0000000..9b4e864 --- /dev/null +++ b/utils/platform.py @@ -0,0 +1,93 @@ +import json +import os +import typing +import urllib.parse +from threading import Thread +from time import sleep + +from requests import get + + +class PlatformClient: + def __init__(self, app_name: str, stage: str, need_poll: bool = True): + self.app_name = app_name + self.stage = stage + self.endpoint = 'http://configurator/' + self.fetch_url = urllib.parse.urljoin(self.endpoint, '/api/v1/fetch') + self.config_storage = {} + self.experiment_storage = {} + self.staff_storage = {} + self.poll_data() + if need_poll: + self.poll_data_in_thread() + + def poll_data_in_thread(self): + def inner(): + while True: + sleep(30) + self.fetch() + + Thread(target=inner, daemon=True).start() + + def poll_data(self): + self.fetch(with_exception=True) + + def request_with_retries(self, url: str, params: dict, with_exception=False, retries_count=3): + exception_to_throw = None + for _ in range(retries_count): + try: + response = get( + url, + params=params + ) + if response.status_code == 200: + return response.json() + print(f'Failed to request {url}, status_code={response.status_code}') + exception_to_throw = Exception('Not 200 status') + except Exception as exc: + print(exc) + exception_to_throw = exc + sleep(1) + print(f'Failed fetching with retries: {url}, {params}') + if with_exception: + raise exception_to_throw + + def fetch(self, with_exception=False): + if self.stage == 'local': + local_platform = json.loads(open('local_platform.json', 'r').read()) + self.config_storage = local_platform['configs'] + self.experiment_storage = local_platform['experiments'] + self.staff_storage = { + key: set(value) + for key, value in local_platform['platform_staff'].items() + } + return + response_data = self.request_with_retries(self.fetch_url, { + 'project': self.app_name, + 'stage': self.stage, + }, with_exception) + self.config_storage = response_data['configs'] + self.experiment_storage = response_data['experiments'] + self.staff_storage = { + key: set(value) + for key, value in response_data['platform_staff'].items() + } + + def is_staff(self, **kwargs): + for key, value in kwargs.items(): + if value in self.staff_storage[key]: + return True + return False + + def get_config(self, name: str) -> dict: + return self.config_storage[name] + + def get_experiment(self, name: str) -> dict: + return self.experiment_storage[name] + + +platform_client = PlatformClient( + 'Botalka', + os.getenv('STAGE'), + need_poll=True, +) diff --git a/utils/queues.py b/utils/queues.py new file mode 100644 index 0000000..c00a18c --- /dev/null +++ b/utils/queues.py @@ -0,0 +1,52 @@ +import os +import requests +import time + + +stage = os.getenv("STAGE", 'local') +if stage == 'local': + QUEUES_URL = 'http://localhost:1239' +else: + QUEUES_URL = 'http://queues:1239' + + +class QueuesException(Exception): + ... + + +class TasksHandlerMixin: + def poll(self): + while True: + response = requests.get(f'{QUEUES_URL}/api/v1/take', headers={'queue': self.queue_name}).json() + task = response.get('task') + if not task: + time.sleep(0.2) + continue + try: + self.process(task['payload']) + except Exception as exc: + print(f'Error processing message id={task["id"]}, payload={task["payload"]}, exc={exc}') + continue + try: + resp = requests.post(f'{QUEUES_URL}/api/v1/finish', json={'id': task['id']}) + if resp.status_code != 202: + raise QueuesException + except: + print(f'Failed to finish task id={task["id"]}') + + @property + def queue_name(self): + raise NotImplemented + + def process(self, payload): + raise NotImplemented + + +def set_task(queue_name: str, payload: dict, seconds_to_execute: int, delay: int|None = None): + resp = requests.post(f'{QUEUES_URL}/api/v1/put', headers={'queue': queue_name}, json={ + 'payload': payload, + 'seconds_to_execute': seconds_to_execute, + 'delay': delay, + }) + if resp.status_code != 202: + raise QueuesException