diff --git a/.deploy/deploy-dev.yaml b/.deploy/deploy-dev.yaml index c019193..f84d163 100644 --- a/.deploy/deploy-dev.yaml +++ b/.deploy/deploy-dev.yaml @@ -5,43 +5,12 @@ services: bot: image: mathwave/sprint-repo:roulette-bot - command: bot environment: - TELEGRAM_TOKEN: $TELEGRAM_TOKEN_DEV MONGO_HOST: "mongo.develop.sprinthub.ru" MONGO_PASSWORD: $MONGO_PASSWORD_DEV - MINIO_HOST: "minio.develop.sprinthub.ru" - MINIO_SECRET_KEY: $MINIO_SECRET_KEY_DEV - PLATFORM_SECURITY_TOKEN: $PLATFORM_SECURITY_TOKEN STAGE: "development" - REDIS_HOST: "redis.develop.sprinthub.ru" - REDIS_PASSWORD: $REDIS_PASSWORD_DEV networks: - - net - deploy: - mode: replicated - restart_policy: - condition: any - update_config: - parallelism: 1 - order: start-first - - roulette-nginx: - image: mathwave/sprint-repo:roulette-bot - command: api - environment: - TELEGRAM_TOKEN: $TELEGRAM_TOKEN_DEV - MONGO_HOST: "mongo.develop.sprinthub.ru" - MONGO_PASSWORD: $MONGO_PASSWORD_DEV - MINIO_HOST: "minio.develop.sprinthub.ru" - MINIO_SECRET_KEY: $MINIO_SECRET_KEY_DEV - PLATFORM_SECURITY_TOKEN: $PLATFORM_SECURITY_TOKEN - STAGE: "development" - REDIS_HOST: "redis.develop.sprinthub.ru" - REDIS_PASSWORD: $REDIS_PASSWORD_DEV - networks: - - net - - common-infra-nginx + - queues-development deploy: mode: replicated restart_policy: @@ -51,7 +20,5 @@ services: order: start-first networks: - net: - driver: overlay - common-infra-nginx: + queues-development: external: true \ No newline at end of file diff --git a/.deploy/deploy-prod.yaml b/.deploy/deploy-prod.yaml index 864df61..2310bec 100644 --- a/.deploy/deploy-prod.yaml +++ b/.deploy/deploy-prod.yaml @@ -5,43 +5,12 @@ services: bot: image: mathwave/sprint-repo:roulette-bot - command: bot - networks: - - net environment: - TELEGRAM_TOKEN: $TELEGRAM_TOKEN_PROD MONGO_HOST: "mongo.sprinthub.ru" MONGO_PASSWORD: $MONGO_PASSWORD_PROD - MINIO_HOST: "minio.sprinthub.ru" - MINIO_SECRET_KEY: $MINIO_SECRET_KEY_PROD - PLATFORM_SECURITY_TOKEN: $PLATFORM_SECURITY_TOKEN STAGE: "production" - REDIS_HOST: "redis.sprinthub.ru" - REDIS_PASSWORD: $REDIS_PASSWORD_PROD - deploy: - mode: replicated - restart_policy: - condition: any - update_config: - parallelism: 1 - order: start-first - - roulette-nginx: - image: mathwave/sprint-repo:roulette-bot - command: api networks: - - net - - common-infra-nginx - environment: - TELEGRAM_TOKEN: $TELEGRAM_TOKEN_PROD - MONGO_HOST: "mongo.sprinthub.ru" - MONGO_PASSWORD: $MONGO_PASSWORD_PROD - MINIO_HOST: "minio.sprinthub.ru" - MINIO_SECRET_KEY: $MINIO_SECRET_KEY_PROD - PLATFORM_SECURITY_TOKEN: $PLATFORM_SECURITY_TOKEN - STAGE: "production" - REDIS_HOST: "redis.sprinthub.ru" - REDIS_PASSWORD: $REDIS_PASSWORD_PROD + - queues deploy: mode: replicated restart_policy: @@ -51,7 +20,5 @@ services: order: start-first networks: - net: - driver: overlay - common-infra-nginx: + queues: external: true \ No newline at end of file diff --git a/.gitea/workflows/deploy-dev.yaml b/.gitea/workflows/deploy-dev.yaml index 8d04465..698cd78 100644 --- a/.gitea/workflows/deploy-dev.yaml +++ b/.gitea/workflows/deploy-dev.yaml @@ -28,7 +28,7 @@ jobs: run: docker push mathwave/sprint-repo:roulette-bot deploy-dev: name: Deploy dev - runs-on: [dev] + runs-on: [prod] needs: push steps: - name: login @@ -39,9 +39,5 @@ jobs: ref: dev - name: deploy env: - TELEGRAM_TOKEN_DEV: ${{ secrets.TELEGRAM_TOKEN_DEV }} MONGO_PASSWORD_DEV: ${{ secrets.MONGO_PASSWORD_DEV }} - PLATFORM_SECURITY_TOKEN: ${{ secrets.PLATFORM_SECURITY_TOKEN }} - MINIO_SECRET_KEY_DEV: ${{ secrets.MINIO_SECRET_KEY_DEV }} - REDIS_PASSWORD_DEV: ${{ secrets.REDIS_PASSWORD_DEV }} - run: docker stack deploy --with-registry-auth -c ./.deploy/deploy-dev.yaml roulette-bot + run: docker stack deploy --with-registry-auth -c ./.deploy/deploy-dev.yaml roulette-bot-development diff --git a/.gitea/workflows/deploy-prod.yaml b/.gitea/workflows/deploy-prod.yaml index 6d2f420..e268095 100644 --- a/.gitea/workflows/deploy-prod.yaml +++ b/.gitea/workflows/deploy-prod.yaml @@ -39,9 +39,5 @@ jobs: ref: prod - name: deploy env: - TELEGRAM_TOKEN_PROD: ${{ secrets.TELEGRAM_TOKEN_PROD }} MONGO_PASSWORD_PROD: ${{ secrets.MONGO_PASSWORD_PROD }} - PLATFORM_SECURITY_TOKEN: ${{ secrets.PLATFORM_SECURITY_TOKEN }} - MINIO_SECRET_KEY_PROD: ${{ secrets.MINIO_SECRET_KEY_PROD }} - REDIS_PASSWORD_PROD: ${{ secrets.REDIS_PASSWORD_PROD }} run: docker stack deploy --with-registry-auth -c ./.deploy/deploy-prod.yaml roulette-bot diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml deleted file mode 100644 index 3affa89..0000000 --- a/.gitlab-ci.yml +++ /dev/null @@ -1,43 +0,0 @@ -stages: - - build - - deploy-dev - - deploy-prod - -build: - stage: build - tags: - - dev - before_script: - - docker login -u mathwave -p $DOCKERHUB_PASSWORD - script: - - docker build -t mathwave/sprint-repo:roulette-bot . - - docker push mathwave/sprint-repo:roulette-bot - -.deploy: - before_script: - - docker login -u mathwave -p $DOCKERHUB_PASSWORD - -deploy-dev: - extends: - - .deploy - stage: deploy-dev - tags: - - dev - rules: - - if: '$CI_COMMIT_BRANCH == "main"' - when: on_success - - when: manual - script: - - docker stack deploy --with-registry-auth -c ./.deploy/deploy-dev.yaml roulette-bot - -deploy-prod: - extends: - - .deploy - stage: deploy-prod - tags: - - prod - only: - - main - when: manual - script: - - docker stack deploy --with-registry-auth -c ./.deploy/deploy-prod.yaml roulette-bot diff --git a/Dockerfile b/Dockerfile index 3c5568e..1ee715e 100644 --- a/Dockerfile +++ b/Dockerfile @@ -5,4 +5,4 @@ COPY requirements.txt requirements.txt RUN pip install -r requirements.txt COPY . . ENV PYTHONUNBUFFERED 1 -ENTRYPOINT ["python", "main.py"] \ No newline at end of file +ENTRYPOINT ["python", "bot.py"] \ No newline at end of file diff --git a/api.py b/api.py deleted file mode 100644 index fb3257e..0000000 --- a/api.py +++ /dev/null @@ -1,24 +0,0 @@ -from bson import ObjectId -from flask import Flask, request - -from tools.mongo import mongo - -app = Flask("roulette") - - -@app.route('/dialogs') -def main(): - html = "" - for d in mongo.dialogs_collection.find({}).sort([('started_at', -1)]): - html += f'{d["_id"]}
' - html += "" - return html - - -@app.route('/dialog') -def dialog(): - html = "" - for message in mongo.messages_collection.find({"dialog_id": ObjectId(request.args.get('id'))}).sort([('sent_at', 1)]): - html += f'{message["sender"]}: {message["text"]}
' - html += "" - return html diff --git a/bot.py b/bot.py index fb3a140..20c1eab 100644 --- a/bot.py +++ b/bot.py @@ -1,20 +1,17 @@ -import os -import uuid - -import requests -import telebot +import json from telebot.types import Message, ReplyKeyboardRemove -from tools.minio import minio_client as minio from tools.mongo import mongo -from tools.sprint_platform import platform -from tools.redis import redis_client as redis - -bot = telebot.TeleBot(os.getenv("TELEGRAM_TOKEN")) +from tools.queues import TasksHandlerMixin, set_task -class Core: - def __init__(self, message: Message): +class Core(TasksHandlerMixin): + @property + def queue_name(self): + return 'roulette_bot_worker' + + def process(self, payload): + message: Message = Message.de_json(json.dumps(payload)) self.message = message self.chat_id = message.chat.id self.message_text = message.text or message.caption or "" @@ -30,8 +27,6 @@ class Core: doc = user self.doc = doc self.state = doc['state'] - - def process(self): if self.message_text.startswith('/'): self.exec_command() return @@ -68,12 +63,42 @@ class Core: def handle_state_search(self): self.send_message('πŸ€– Поиски собСсСдника ΠΏΡ€ΠΎΠ΄ΠΎΠ»ΠΆΠ°ΡŽΡ‚ΡΡ') - def send_message(self, text, chat_id=None, reply_markup=None, remove_keyboard=True, **kwargs): + def send_message(self, text, chat_id=None, reply_markup=None, remove_keyboard=True): if not text: return if reply_markup is None and remove_keyboard: reply_markup = ReplyKeyboardRemove() - bot.send_message(chat_id or self.chat_id, text, reply_markup=reply_markup, **kwargs) + body = { + 'chat_id': chat_id or self.chat_id, + 'text': text, + } + if reply_markup: + body['reply_markup'] = reply_markup.to_json() + set_task( + 'botalka_mailbox', + { + 'project': 'roulette-bot', + 'name': 'telegram-bot', + 'method': 'send_message', + 'body': body, + }, + 1, + ) + + def send(self, chat_id, method, **kwargs): + set_task( + 'botalka_mailbox', + { + 'project': 'roulette-bot', + 'name': 'telegram-bot', + 'method': method, + 'body': { + 'chat_id': chat_id, + **kwargs, + }, + }, + 1, + ) def set_state(self, state, chat_ids=None): mongo.chats_collection.update_many({"chat_id": {"$in": chat_ids or [self.chat_id]}}, {"$set": {"state": state}}) @@ -87,36 +112,16 @@ class Core: def handle_state_dialog(self): current_dialog = mongo.get_current_dialog(self.chat_id) chat_to_send = current_dialog['chat_id_2'] if current_dialog['chat_id_1'] == self.chat_id else current_dialog['chat_id_1'] - saves = platform.get_config('save') - if saves['messages']: - res = mongo.create_message(self.message.content_type, self.message_text, current_dialog['_id'], self.chat_id).inserted_id - else: - res = uuid.uuid4() if self.message.photo: - if saves['photos']: - photo = requests.get(bot.get_file_url(self.message.photo[-1].file_id)).content - minio.put_object(f"photos/{res}", photo) - bot.send_photo(chat_to_send, self.message.photo[-1].file_id) + self.send(chat_to_send, 'send_photo', photo=self.message.photo[-1].file_id) if self.message.sticker: - if saves['stickers']: - sticker = requests.get(bot.get_file_url(self.message.sticker.file_id)).content - minio.put_object(f"stickers/{res}", sticker) - bot.send_sticker(chat_to_send, self.message.sticker.file_id) + self.send(chat_to_send, 'send_data', data=self.message.sticker.file_id, data_type='sticker') if self.message.voice: - if saves['voices']: - voice = requests.get(bot.get_file_url(self.message.voice.file_id)).content - minio.put_object(f"voices/{res}", voice) - bot.send_voice(chat_to_send, self.message.voice.file_id) + self.send(chat_to_send, 'send_voice', voice=self.message.voice.file_id) if self.message.video_note: - if saves['video_notes']: - video_note = requests.get(bot.get_file_url(self.message.video_note.file_id)).content - minio.put_object(f"video_notes/{res}", video_note) - bot.send_video_note(chat_to_send, self.message.video_note.file_id) + self.send(chat_to_send, 'send_video_note', data=self.message.video_note.file_id) if self.message.animation: - if saves['gifs']: - video_note = requests.get(bot.get_file_url(self.message.animation.file_id)).content - minio.put_object(f"gifs/{res}", video_note) - bot.send_animation(chat_to_send, self.message.animation.file_id) + self.send(chat_to_send, 'send_animation', data=self.message.animation.file_id) self.send_message(self.message_text, chat_to_send) def start_new_dialog(self, chat_ids): @@ -136,25 +141,5 @@ class Core: self.set_state('dialog', [chat, next_chat['chat_id']]) -def run_bot(): - @bot.message_handler(content_types=[ - # 'audio', - 'photo', - 'voice', - 'video_note', - # 'document', - 'text', - 'animation', - # 'location', - # 'contact', - 'sticker' - ] - ) - def do_action(message: Message): - try: - Core(message).process() - except Exception as e: - print(e) - - print('bot is starting') - bot.polling() +if __name__ == '__main__': + Core().poll() diff --git a/local_platform.json b/local_platform.json deleted file mode 100644 index 8845508..0000000 --- a/local_platform.json +++ /dev/null @@ -1,14 +0,0 @@ -{ - "configs": { - "save": { - "photos": false, - "voices": false, - "messages": false, - "stickers": false, - "video_notes": false, - "gifs": false - } - }, - "experiments": {}, - "platform_staff": {} -} \ No newline at end of file diff --git a/main.py b/main.py deleted file mode 100644 index caedf8f..0000000 --- a/main.py +++ /dev/null @@ -1,11 +0,0 @@ -import sys - - -if sys.argv[-1] == "bot": - from bot import run_bot - run_bot() -elif sys.argv[-1] == "api": - from api import app - app.run(host="0.0.0.0", port=1238) -else: - raise NotImplementedError diff --git a/tools/minio.py b/tools/minio.py deleted file mode 100644 index 1498e29..0000000 --- a/tools/minio.py +++ /dev/null @@ -1,45 +0,0 @@ -import io - -from minio import Minio -from minio.error import MinioException - -import settings - - -class Client: - - def __init__(self, host: str, access_key: str, secret_key: str, bucket_name: str): - self.bucket_name = bucket_name - self.cli = Minio( - host, - access_key=access_key, - secret_key=secret_key, - secure=False - ) - try: - self.cli.make_bucket(bucket_name) - except MinioException: - pass - - def put_object(self, name: str, data: bytes): - self.cli.put_object(self.bucket_name, name, io.BytesIO(data), len(data)) - - def get_object(self, name: str) -> bytes: - try: - return self.cli.get_object(self.bucket_name, name).data - except MinioException: - return b"" - - def delete_object(self, name: str): - try: - self.cli.remove_object(self.bucket_name, name) - except MinioException: - pass - - -minio_client = Client( - settings.MINIO_HOST, - settings.MINIO_ACCESS_KEY, - settings.MINIO_SECRET_KEY, - settings.MINIO_BUCKET_NAME -) diff --git a/tools/queues.py b/tools/queues.py new file mode 100644 index 0000000..c00a18c --- /dev/null +++ b/tools/queues.py @@ -0,0 +1,52 @@ +import os +import requests +import time + + +stage = os.getenv("STAGE", 'local') +if stage == 'local': + QUEUES_URL = 'http://localhost:1239' +else: + QUEUES_URL = 'http://queues:1239' + + +class QueuesException(Exception): + ... + + +class TasksHandlerMixin: + def poll(self): + while True: + response = requests.get(f'{QUEUES_URL}/api/v1/take', headers={'queue': self.queue_name}).json() + task = response.get('task') + if not task: + time.sleep(0.2) + continue + try: + self.process(task['payload']) + except Exception as exc: + print(f'Error processing message id={task["id"]}, payload={task["payload"]}, exc={exc}') + continue + try: + resp = requests.post(f'{QUEUES_URL}/api/v1/finish', json={'id': task['id']}) + if resp.status_code != 202: + raise QueuesException + except: + print(f'Failed to finish task id={task["id"]}') + + @property + def queue_name(self): + raise NotImplemented + + def process(self, payload): + raise NotImplemented + + +def set_task(queue_name: str, payload: dict, seconds_to_execute: int, delay: int|None = None): + resp = requests.post(f'{QUEUES_URL}/api/v1/put', headers={'queue': queue_name}, json={ + 'payload': payload, + 'seconds_to_execute': seconds_to_execute, + 'delay': delay, + }) + if resp.status_code != 202: + raise QueuesException diff --git a/tools/redis.py b/tools/redis.py deleted file mode 100644 index c248fbf..0000000 --- a/tools/redis.py +++ /dev/null @@ -1,30 +0,0 @@ -import contextlib - -import redis - -import settings - - -class RedisClient: - - def __init__(self, host, password=None): - kwargs = { - "host": host, - } - if password: - kwargs['password'] = password - self.cli = redis.Redis(**kwargs) - - def get(self, key): - with self.cli as cli: - return cli.get(f"ruletka_{key}") - - def set(self, key, value): - with self.cli as cli: - cli.set(f"ruletka_{key}", value) - - -redis_client = RedisClient( - settings.REDIS_HOST, - settings.REDIS_PASSWORD -) diff --git a/tools/sprint_platform.py b/tools/sprint_platform.py deleted file mode 100644 index 7063a45..0000000 --- a/tools/sprint_platform.py +++ /dev/null @@ -1,98 +0,0 @@ -import json -import os -import typing -import urllib.parse -from threading import Thread -from time import sleep - -from requests import get - - -class PlatformClient: - def __init__(self, platform_security_token: str, app_name: str, stage: str, need_poll: bool = True): - self.platform_security_token = platform_security_token - self.app_name = app_name - self.stage = stage - self.endpoint = 'https://platform.sprinthub.ru/' - self.configs_url = urllib.parse.urljoin(self.endpoint, 'configs/get') - self.experiments_url = urllib.parse.urljoin(self.endpoint, 'experiments/get') - self.staff_url = urllib.parse.urljoin(self.endpoint, 'is_staff') - self.fetch_url = urllib.parse.urljoin(self.endpoint, 'fetch') - self.config_storage = {} - self.experiment_storage = {} - self.staff_storage = {} - self.poll_data() - if need_poll: - self.poll_data_in_thread() - - def poll_data_in_thread(self): - def inner(): - while True: - sleep(30) - self.fetch() - - Thread(target=inner, daemon=True).start() - - def poll_data(self): - self.fetch(with_exception=True) - - def request_with_retries(self, url, params, with_exception=False, retries_count=3): - exception_to_throw = None - for _ in range(retries_count): - try: - response = get( - url, - headers={'X-Security-Token': self.platform_security_token}, - params=params - ) - if response.status_code == 200: - return response.json() - print(f'Failed to request {url}, status_code={response.status_code}') - exception_to_throw = Exception('Not 200 status') - except Exception as exc: - print(exc) - exception_to_throw = exc - sleep(1) - print(f'Failed fetching with retries: {url}, {params}') - if with_exception: - raise exception_to_throw - - def fetch(self, with_exception=False): - if self.stage == 'local': - local_platform = json.loads(open('local_platform.json', 'r').read()) - self.config_storage = local_platform['configs'] - self.experiment_storage = local_platform['experiments'] - self.staff_storage = { - key: set(value) - for key, value in local_platform['platform_staff'].items() - } - return - response_data = self.request_with_retries(self.fetch_url, { - 'project': self.app_name, - 'stage': self.stage, - }, with_exception) - self.config_storage = response_data['configs'] - self.experiment_storage = response_data['experiments'] - self.staff_storage = { - key: set(value) - for key, value in response_data['platform_staff'].items() - } - - def is_staff(self, **kwargs): - for key, value in kwargs.items(): - if value in self.staff_storage[key]: - return True - return False - - def get_config(self, name): - return self.config_storage[name] - - def get_experiment(self, name): - return self.experiment_storage[name] - - -platform = PlatformClient( - os.getenv('PLATFORM_SECURITY_TOKEN'), - 'Ruletka', - os.getenv('STAGE', 'local') -) \ No newline at end of file