commit b5536a985ad390dadbc38b100877fbb718251728 Author: Egor Matveev Date: Sat Mar 29 00:07:55 2025 +0300 initial diff --git a/.deploy/deploy-dev.yaml b/.deploy/deploy-dev.yaml new file mode 100644 index 0000000..a17b0bb --- /dev/null +++ b/.deploy/deploy-dev.yaml @@ -0,0 +1,24 @@ +version: "3.4" + + +services: + worker: + image: mathwave/sprint-repo:law-exam-bot + environment: + STAGE: "development" + networks: + - queues-development + - configurator + deploy: + mode: replicated + restart_policy: + condition: any + update_config: + parallelism: 1 + order: start-first + +networks: + queues-development: + external: true + configurator: + external: true \ No newline at end of file diff --git a/.gitea/workflows/deploy-dev.yaml b/.gitea/workflows/deploy-dev.yaml new file mode 100644 index 0000000..a712f6b --- /dev/null +++ b/.gitea/workflows/deploy-dev.yaml @@ -0,0 +1,41 @@ +name: Deploy Dev + +on: + pull_request: + branches: + - dev + types: [closed] + +jobs: + build: + name: Build + runs-on: [ dev ] + steps: + - name: login + run: docker login -u mathwave -p ${{ secrets.DOCKERHUB_PASSWORD }} + - name: checkout + uses: actions/checkout@v4 + with: + ref: dev + - name: build + run: docker build -t mathwave/sprint-repo:law-exam-bot . + push: + name: Push + runs-on: [ dev ] + needs: build + steps: + - name: push + run: docker push mathwave/sprint-repo:law-exam-bot + deploy-dev: + name: Deploy dev + runs-on: [prod] + needs: push + steps: + - name: login + run: docker login -u mathwave -p ${{ secrets.DOCKERHUB_PASSWORD }} + - name: checkout + uses: actions/checkout@v4 + with: + ref: dev + - name: deploy + run: docker stack deploy --with-registry-auth -c ./.deploy/deploy-dev.yaml law-exam-bot-development \ No newline at end of file diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..7375914 --- /dev/null +++ b/.gitignore @@ -0,0 +1,121 @@ +# Django # +*.log +*.pot +*.pyc +__pycache__ +db.sqlite3 +media +data +*/__pycache__ + +# Backup files # +*.bak + +# If you are using PyCharm # +.idea +.idea/**/workspace.xml +.idea/**/tasks.xml +.idea/dictionaries +.idea/**/dataSources/ +.idea/**/dataSources.ids +.idea/**/dataSources.xml +.idea/**/dataSources.local.xml +.idea/**/sqlDataSources.xml +.idea/**/dynamic.xml +.idea/**/uiDesigner.xml +.idea/**/gradle.xml +.idea/**/libraries +*.iws /out/ + +# Python # +*.py[cod] +*$py.class + +# Distribution / packaging +.Python build/ +develop-eggs/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +*.egg-info/ +.installed.cfg +*.egg +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.coverage +.coverage.* +.cache +.pytest_cache/ +nosetests.xml +coverage.xml +*.cover +.hypothesis/ +postgres-data + +# Jupyter Notebook +.ipynb_checkpoints + +# pyenv +.python-version + +# celery +celerybeat-schedule.* + +# SageMath parsed files +*.sage.py + +# Environments +.venv +env/ +ENV/ +venv/ +env.bak/ +venv.bak/ + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ + +# Sublime Text # +*.tmlanguage.cache +*.tmPreferences.cache +*.stTheme.cache +*.sublime-workspace +*.sublime-project + +# sftp configuration file +sftp-config.json + +# Package control specific files Package +Control.last-run +Control.ca-list +Control.ca-bundle +Control.system-ca-bundle +GitHub.sublime-settings + +# Visual Studio Code # +.vscode +.vscode/* +!.vscode/settings.json +!.vscode/tasks.json +!.vscode/launch.json +!.vscode/extensions.json +.history + +*pb2* \ No newline at end of file diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..4c86b53 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,8 @@ +FROM python:3.10 +RUN mkdir /usr/src/app +WORKDIR /usr/src/app +COPY requirements.txt requirements.txt +RUN pip install -r requirements.txt +COPY . . +ENV PYTHONUNBUFFERED 1 +ENTRYPOINT ["python", "worker.py"] \ No newline at end of file diff --git a/base.py b/base.py new file mode 100644 index 0000000..71d2485 --- /dev/null +++ b/base.py @@ -0,0 +1,13 @@ +import os + + +stage = os.getenv("STAGE", 'local') +if stage == 'local': + QUEUES_URL = 'localhost:50051' +else: + QUEUES_URL = 'queues-grpc:50051' + + +class Daemon: + def execute(self): + raise NotImplemented \ No newline at end of file diff --git a/queues.py b/queues.py new file mode 100644 index 0000000..cebf257 --- /dev/null +++ b/queues.py @@ -0,0 +1,52 @@ +import os +import requests +import time + + +stage = os.getenv("STAGE", 'local') +if stage == 'local': + QUEUES_URL = 'http://localhost:1239' +else: + QUEUES_URL = 'http://queues:1239' + + +class QueuesException(Exception): + ... + + +class TasksHandlerMixin: + def poll(self): + while True: + response = requests.get(f'{QUEUES_URL}/api/v1/take', headers={'queue': self.queue_name}).json() + task = response.get('task') + if not task: + time.sleep(0.2) + continue + try: + self.process(task['payload']) + except Exception as exc: + print(f'Error processing message id={task["id"]}, payload={task["payload"]}, exc={exc}') + continue + try: + resp = requests.post(f'{QUEUES_URL}/api/v1/finish', json={'id': task['id']}) + if resp.status_code != 202: + raise QueuesException + except: + print(f'Failed to finish task id={task["id"]}') + + @property + def queue_name(self): + raise NotImplemented + + def process(self, payload): + raise NotImplemented + + +def set_task(queue_name: str, payload: dict, seconds_to_execute: int, delay: int|None = None): + resp = requests.post(f'{QUEUES_URL}/api/v1/put', headers={'queue': queue_name}, json={ + 'payload': payload, + 'seconds_to_execute': seconds_to_execute, + 'delay': delay, + }) + if resp.status_code != 202: + raise QueuesException \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..09fcaa3 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,6 @@ +certifi==2025.1.31 +charset-normalizer==3.4.1 +idna==3.10 +pyTelegramBotAPI==4.1.1 +requests==2.32.3 +urllib3==2.3.0 diff --git a/sprint_platform.py b/sprint_platform.py new file mode 100644 index 0000000..af3b1c7 --- /dev/null +++ b/sprint_platform.py @@ -0,0 +1,114 @@ +import json +import typing +import urllib.parse +from threading import Thread +from time import sleep + +from requests import get + + +class PlatformClient: + def __init__(self, platform_security_token: str, app_name: str, stage: str, configs: typing.List[str], experiments: typing.List[str], need_poll: bool = True): + self.platform_security_token = platform_security_token + self.app_name = app_name + self.stage = stage + self.configs = configs + self.experiments = experiments + self.endpoint = 'http://configurator/' + self.fetch_url = urllib.parse.urljoin(self.endpoint, '/api/v1/fetch') + self.config_storage = {} + self.experiment_storage = {} + self.staff_storage = {} + self.poll_data() + if need_poll: + self.poll_data_in_thread() + + def poll_data_in_thread(self): + def inner(): + while True: + sleep(30) + self.fetch() + + Thread(target=inner, daemon=True).start() + + def poll_data(self): + self.fetch(with_exception=True) + + def request_with_retries(self, url, params, with_exception=False, retries_count=3): + exception_to_throw = None + for _ in range(retries_count): + try: + response = get( + url, + params=params + ) + if response.status_code == 200: + return response.json() + print(f'Failed to request {url}, status_code={response.status_code}') + exception_to_throw = Exception('Not 200 status') + except Exception as exc: + print(exc) + exception_to_throw = exc + sleep(1) + print(f'Failed fetching with retries: {url}, {params}') + if with_exception: + raise exception_to_throw + + def fetch(self, with_exception=False): + if self.stage == 'local': + local_platform = json.loads(open('local_platform.json', 'r').read()) + self.config_storage = local_platform['configs'] + self.experiment_storage = local_platform['experiments'] + self.staff_storage = { + key: set(value) + for key, value in local_platform['platform_staff'].items() + } + return + response_data = self.request_with_retries(self.fetch_url, { + 'project': self.app_name, + 'stage': self.stage, + }, with_exception) + self.config_storage = response_data['configs'] + self.experiment_storage = response_data['experiments'] + self.staff_storage = { + key: set(value) + for key, value in response_data['platform_staff'].items() + } + + def fetch_configs(self, with_exception=False): + if self.stage == 'local': + local_platform = json.loads(open('local_platform.json', 'r').read()) + self.config_storage = local_platform['configs'] + return + for config in self.configs: + response_data = self.request_with_retries(self.configs_url, { + 'project': self.app_name, + 'stage': self.stage, + 'name': config + }, with_exception) + self.config_storage[config] = response_data + + def fetch_experiments(self, with_exception=False): + if self.stage == 'local': + local_platform = json.loads(open('local_platform.json', 'r').read()) + self.experiment_storage = local_platform['experiments'] + return + for experiment in self.experiments: + response_data = self.request_with_retries(self.experiments_url, { + 'project': self.app_name, + 'stage': self.stage, + 'name': experiment + }, with_exception) + self.experiment_storage[experiment] = response_data + + def is_staff(self, **kwargs): + for key, value in kwargs.items(): + if value in self.staff_storage[key]: + return True + return False + + def get_config(self, name): + return self.config_storage[name] + + def get_experiment(self, name): + return self.experiment_storage[name] diff --git a/worker.py b/worker.py new file mode 100644 index 0000000..e8deabd --- /dev/null +++ b/worker.py @@ -0,0 +1,63 @@ +import datetime +import os +from random import randrange, choice +import base +import queues +from telebot.types import Message +import json + +from sprint_platform import PlatformClient + +security_token = os.getenv("PLATFORM_SECURITY_TOKEN") +stage = os.getenv("STAGE", 'local') + + +client = PlatformClient( + security_token, + 'law-exam-bot', + stage, + ['constants', 'answers', 'replies'], + [], + need_poll=True, +) + +store = {} + + +class Daemon(base.Daemon, queues.TasksHandlerMixin): + @property + def queue_name(self): + return 'law_exam_bot_worker' + + def execute(self): + self.poll() + + def send(self, text: str, chat_id: int): + queues.set_task( + 'botalka_mailbox', + { + 'project': 'law-exam-bot', + 'name': 'telegram-bot', + 'body': { + 'text': text, + 'chat_id': chat_id, + } + }, + 1, + ) + + def process(self, payload): + message: Message = Message.de_json(json.dumps(payload)) + questions = client.get_config('questions') + current_question = store.get(message.chat.id) + if current_question: + self.send("Вот ответ на вопрос", message.chat.id) + text_split = [current_question['answer'][i:i+1000] for i in range(0, len(current_question['answer']), 1000)] + for elem in text_split: + self.send(elem, message.chat.id) + selected = randrange(len(questions)) + store[message.chat.id] = selected + self.send("Следующий вопрос: " + questions[selected]['question'], message.chat.id) + + +Daemon().execute()