From 272233fc67963656249dd66b3463565e35fd15d2 Mon Sep 17 00:00:00 2001 From: emmatveev Date: Sun, 17 Nov 2024 22:52:50 +0300 Subject: [PATCH] trial --- .deploy/deploy-dev.yaml | 42 ++++++++-- .deploy/deploy-prod.yaml | 46 ++++++++--- .gitea/workflows/deploy-dev.yaml | 1 + .gitea/workflows/deploy-prod.yaml | 1 + daemons/api.py | 132 +++++++++++++++--------------- daemons/base.py | 3 + daemons/bot.py | 20 ----- daemons/fetch.py | 29 ++++--- daemons/mailbox.py | 28 +++++++ daemons/notify.py | 67 ++++++--------- daemons/poll.py | 15 ++++ daemons/worker.py | 19 +++++ entrypoint.py | 24 +++--- helpers/answer.py | 9 +- utils/__init__.py | 0 utils/queues.py | 64 +++++++++++++++ 16 files changed, 332 insertions(+), 168 deletions(-) create mode 100644 daemons/base.py delete mode 100644 daemons/bot.py create mode 100644 daemons/mailbox.py create mode 100644 daemons/poll.py create mode 100644 daemons/worker.py create mode 100644 utils/__init__.py create mode 100644 utils/queues.py diff --git a/.deploy/deploy-dev.yaml b/.deploy/deploy-dev.yaml index 1bcf3bf..5680343 100644 --- a/.deploy/deploy-dev.yaml +++ b/.deploy/deploy-dev.yaml @@ -3,15 +3,45 @@ version: "3.4" services: - bot: + poll: + image: mathwave/sprint-repo:ruz-bot + environment: + STAGE: "development" + TELEGRAM_TOKEN: $TELEGRAM_TOKEN_DEV + QUEUES_TOKEN: $QUEUES_TOKEN_DEV + command: poll + deploy: + mode: replicated + restart_policy: + condition: any + update_config: + parallelism: 1 + order: start-first + + worker: image: mathwave/sprint-repo:ruz-bot environment: MONGO_HOST: "mongo.develop.sprinthub.ru" STAGE: "development" MONGO_PASSWORD: $MONGO_PASSWORD_DEV - TELEGRAM_TOKEN: $TELEGRAM_TOKEN_DEV PLATFORM_SECURITY_TOKEN: $PLATFORM_SECURITY_TOKEN - command: bot + QUEUES_TOKEN: $QUEUES_TOKEN_DEV + command: worker + deploy: + mode: replicated + restart_policy: + condition: any + update_config: + parallelism: 1 + order: start-first + + mailbox: + image: mathwave/sprint-repo:ruz-bot + environment: + STAGE: "development" + TELEGRAM_TOKEN: $TELEGRAM_TOKEN_DEV + QUEUES_TOKEN: $QUEUES_TOKEN_DEV + command: mailbox deploy: mode: replicated restart_policy: @@ -26,8 +56,8 @@ services: MONGO_HOST: "mongo.develop.sprinthub.ru" STAGE: "development" MONGO_PASSWORD: $MONGO_PASSWORD_DEV - TELEGRAM_TOKEN: $TELEGRAM_TOKEN_DEV PLATFORM_SECURITY_TOKEN: $PLATFORM_SECURITY_TOKEN + QUEUES_TOKEN: $QUEUES_TOKEN_DEV command: fetch deploy: mode: replicated @@ -43,8 +73,8 @@ services: MONGO_HOST: "mongo.develop.sprinthub.ru" STAGE: "development" MONGO_PASSWORD: $MONGO_PASSWORD_DEV - TELEGRAM_TOKEN: $TELEGRAM_TOKEN_DEV PLATFORM_SECURITY_TOKEN: $PLATFORM_SECURITY_TOKEN + QUEUES_TOKEN: $QUEUES_TOKEN_DEV command: notify deploy: mode: replicated @@ -62,8 +92,8 @@ services: MONGO_HOST: "mongo.develop.sprinthub.ru" STAGE: "development" MONGO_PASSWORD: $MONGO_PASSWORD_DEV - TELEGRAM_TOKEN: $TELEGRAM_TOKEN_DEV PLATFORM_SECURITY_TOKEN: $PLATFORM_SECURITY_TOKEN + QUEUES_TOKEN: $QUEUES_TOKEN_DEV command: api deploy: mode: replicated diff --git a/.deploy/deploy-prod.yaml b/.deploy/deploy-prod.yaml index 9c4678b..28831a1 100644 --- a/.deploy/deploy-prod.yaml +++ b/.deploy/deploy-prod.yaml @@ -3,24 +3,49 @@ version: "3.4" services: - bot: + poll: + image: mathwave/sprint-repo:ruz-bot + environment: + STAGE: "production" + TELEGRAM_TOKEN: $TELEGRAM_TOKEN_PROD + QUEUES_TOKEN: $QUEUES_TOKEN_PROD + command: poll + deploy: + mode: replicated + restart_policy: + condition: any + update_config: + parallelism: 1 + order: start-first + + worker: image: mathwave/sprint-repo:ruz-bot environment: MONGO_HOST: "mongo.sprinthub.ru" STAGE: "production" MONGO_PASSWORD: $MONGO_PASSWORD_PROD - TELEGRAM_TOKEN: $TELEGRAM_TOKEN_PROD PLATFORM_SECURITY_TOKEN: $PLATFORM_SECURITY_TOKEN - DEBUG: "false" - command: bot + QUEUES_TOKEN: $QUEUES_TOKEN_PROD + command: worker + deploy: + mode: replicated + restart_policy: + condition: any + update_config: + parallelism: 1 + order: start-first + + mailbox: + image: mathwave/sprint-repo:ruz-bot + environment: + STAGE: "production" + TELEGRAM_TOKEN: $TELEGRAM_TOKEN_PROD + QUEUES_TOKEN: $QUEUES_TOKEN_PROD + command: mailbox deploy: mode: replicated restart_policy: condition: any - placement: - constraints: - - node.role == worker - - node.labels.zone == ru update_config: parallelism: 1 order: start-first @@ -31,9 +56,9 @@ services: MONGO_HOST: "mongo.sprinthub.ru" STAGE: "production" MONGO_PASSWORD: $MONGO_PASSWORD_PROD - TELEGRAM_TOKEN: $TELEGRAM_TOKEN_PROD PLATFORM_SECURITY_TOKEN: $PLATFORM_SECURITY_TOKEN DEBUG: "false" + QUEUES_TOKEN: $QUEUES_TOKEN_PROD command: fetch deploy: mode: replicated @@ -53,9 +78,9 @@ services: MONGO_HOST: "mongo.sprinthub.ru" STAGE: "production" MONGO_PASSWORD: $MONGO_PASSWORD_PROD - TELEGRAM_TOKEN: $TELEGRAM_TOKEN_PROD PLATFORM_SECURITY_TOKEN: $PLATFORM_SECURITY_TOKEN DEBUG: "false" + QUEUES_TOKEN: $QUEUES_TOKEN_PROD command: notify deploy: mode: replicated @@ -77,7 +102,6 @@ services: MONGO_HOST: "mongo.sprinthub.ru" STAGE: "production" MONGO_PASSWORD: $MONGO_PASSWORD_PROD - TELEGRAM_TOKEN: $TELEGRAM_TOKEN_PROD PLATFORM_SECURITY_TOKEN: $PLATFORM_SECURITY_TOKEN DEBUG: "false" command: api diff --git a/.gitea/workflows/deploy-dev.yaml b/.gitea/workflows/deploy-dev.yaml index 9330d09..519080a 100644 --- a/.gitea/workflows/deploy-dev.yaml +++ b/.gitea/workflows/deploy-dev.yaml @@ -42,4 +42,5 @@ jobs: TELEGRAM_TOKEN_DEV: ${{ secrets.TELEGRAM_TOKEN_DEV }} MONGO_PASSWORD_DEV: ${{ secrets.MONGO_PASSWORD_DEV }} PLATFORM_SECURITY_TOKEN: ${{ secrets.PLATFORM_SECURITY_TOKEN }} + QUEUES_TOKEN_DEV: ${{ secrets.QUEUES_TOKEN_DEV }} run: docker stack deploy --with-registry-auth -c ./.deploy/deploy-dev.yaml ruz-bot diff --git a/.gitea/workflows/deploy-prod.yaml b/.gitea/workflows/deploy-prod.yaml index 9ba368a..2f5b793 100644 --- a/.gitea/workflows/deploy-prod.yaml +++ b/.gitea/workflows/deploy-prod.yaml @@ -42,4 +42,5 @@ jobs: TELEGRAM_TOKEN_PROD: ${{ secrets.TELEGRAM_TOKEN_PROD }} MONGO_PASSWORD_PROD: ${{ secrets.MONGO_PASSWORD_PROD }} PLATFORM_SECURITY_TOKEN: ${{ secrets.PLATFORM_SECURITY_TOKEN }} + QUEUES_TOKEN_PROD: ${{ secrets.QUEUES_TOKEN_PROD }} run: docker stack deploy --with-registry-auth -c ./.deploy/deploy-prod.yaml ruz-bot diff --git a/daemons/api.py b/daemons/api.py index 3a540ea..faf0877 100644 --- a/daemons/api.py +++ b/daemons/api.py @@ -3,73 +3,75 @@ from flask import Flask, request import settings from helpers.alice import Processor from helpers.mongo import mongo +from daemons import base -def api(): - app = Flask(__name__) +class Daemon(base.Daemon): + def execute(self): + app = Flask(__name__) - @app.route('/stats/json', methods=['GET']) - def stats_json(): - all_users = mongo.users_collection.count_documents({}) - teachers = mongo.users_collection.count_documents({"is_teacher": True}) - return { - "Всего пользователей": all_users, - "Пользователей прошедших регистрацию": mongo.users_collection.count_documents({'email': {'$ne': None}}), - "Преподавателей": teachers, - "Отписались от уведомлений": mongo.users_collection.count_documents({'notify_minutes': None}), - "Отправлено уведомлений за сегодня": mongo.lessons_collection.count_documents({'notified': True}), - "Проиндексировано занятий из РУЗа": mongo.lessons_collection.count_documents({}), - "Пользователей из Москвы": mongo.users_collection.count_documents( - {'campus': 'Москва'}) + mongo.users_collection.count_documents({'campus': {'$exists': False}}), - "Пользователей из Москвы (регистрация)": mongo.users_collection.count_documents( - {'campus': 'Москва', 'email': {'$ne': None}}) + mongo.users_collection.count_documents( - {'campus': {'$exists': False}, 'email': {'$ne': None}}), - "Пользователей из Перми": mongo.users_collection.count_documents({'campus': 'Пермь'}), - "Пользователей из Перми (регистрация)": mongo.users_collection.count_documents( - {'campus': 'Пермь', 'email': {'$ne': None}}), - "Пользователей из Нижнего Новгорода": mongo.users_collection.count_documents({'campus': 'Нижний Новгород'}), - "Пользователей из Нижнего Новгорода (регистрация)": mongo.users_collection.count_documents( - {'campus': 'Нижний Новгород', 'email': {'$ne': None}}), - "Пользователей из Санкт-Петербурга": mongo.users_collection.count_documents({'campus': 'Санкт-Петербург'}), - "Пользователей из Санкт-Петербурга (регистрация)": mongo.users_collection.count_documents( - {'campus': 'Санкт-Петербург', 'email': {'$ne': None}}) - } - - @app.route('/stats', methods=['GET']) - def stats(): - all_users = mongo.users_collection.count_documents({}) - teachers = mongo.users_collection.count_documents({"is_teacher": True}) - text = f"Всего пользователей: {all_users}
" \ - f"Пользователей прошедших регистрацию: {mongo.users_collection.count_documents({'email': {'$ne': None}})}
" \ - f"Студентов: {all_users - teachers}
" \ - f"Преподавателей: {teachers}
" \ - f"Отписались от уведомлений: {mongo.users_collection.count_documents({'notify_minutes': None})}
" \ - f"Отправлено уведомлений за сегодня: {mongo.lessons_collection.count_documents({'notified': True})}
" \ - f"Проиндексировано занятий из РУЗа: {mongo.lessons_collection.count_documents({})}
" \ - f"
" \ - f"
" \ - f"Пользователей из Москвы: {mongo.users_collection.count_documents({'campus': 'Москва'}) + mongo.users_collection.count_documents({'campus': {'$exists': False}})}
" \ - f"Пользователей из Москвы (регистрация): {mongo.users_collection.count_documents({'campus': 'Москва', 'email': {'$ne': None}}) + mongo.users_collection.count_documents({'campus': {'$exists': False}, 'email': {'$ne': None}})}
" \ - f"Пользователей из Перми: {mongo.users_collection.count_documents({'campus': 'Пермь'})}
" \ - f"Пользователей из Перми (регистрация): {mongo.users_collection.count_documents({'campus': 'Пермь', 'email': {'$ne': None}})}
" \ - f"Пользователей из Нижнего Новгорода: {mongo.users_collection.count_documents({'campus': 'Нижний Новгород'})}
" \ - f"Пользователей из Нижнего Новгорода (регистрация): {mongo.users_collection.count_documents({'campus': 'Нижний Новгород', 'email': {'$ne': None}})}
" \ - f"Пользователей из Санкт-Петербурга: {mongo.users_collection.count_documents({'campus': 'Санкт-Петербург'})}
" \ - f"Пользователей из Санкт-Петербурга (регистрация): {mongo.users_collection.count_documents({'campus': 'Санкт-Петербург', 'email': {'$ne': None}})}
" - return text - - @app.route('/alice', methods=['POST']) - def alice(): - req = request.json - processor = Processor(req) - response = { - "version": req['version'], - "session": req['session'], - "response": { - "end_session": False + @app.route('/stats/json', methods=['GET']) + def stats_json(): + all_users = mongo.users_collection.count_documents({}) + teachers = mongo.users_collection.count_documents({"is_teacher": True}) + return { + "Всего пользователей": all_users, + "Пользователей прошедших регистрацию": mongo.users_collection.count_documents({'email': {'$ne': None}}), + "Преподавателей": teachers, + "Отписались от уведомлений": mongo.users_collection.count_documents({'notify_minutes': None}), + "Отправлено уведомлений за сегодня": mongo.lessons_collection.count_documents({'notified': True}), + "Проиндексировано занятий из РУЗа": mongo.lessons_collection.count_documents({}), + "Пользователей из Москвы": mongo.users_collection.count_documents( + {'campus': 'Москва'}) + mongo.users_collection.count_documents({'campus': {'$exists': False}}), + "Пользователей из Москвы (регистрация)": mongo.users_collection.count_documents( + {'campus': 'Москва', 'email': {'$ne': None}}) + mongo.users_collection.count_documents( + {'campus': {'$exists': False}, 'email': {'$ne': None}}), + "Пользователей из Перми": mongo.users_collection.count_documents({'campus': 'Пермь'}), + "Пользователей из Перми (регистрация)": mongo.users_collection.count_documents( + {'campus': 'Пермь', 'email': {'$ne': None}}), + "Пользователей из Нижнего Новгорода": mongo.users_collection.count_documents({'campus': 'Нижний Новгород'}), + "Пользователей из Нижнего Новгорода (регистрация)": mongo.users_collection.count_documents( + {'campus': 'Нижний Новгород', 'email': {'$ne': None}}), + "Пользователей из Санкт-Петербурга": mongo.users_collection.count_documents({'campus': 'Санкт-Петербург'}), + "Пользователей из Санкт-Петербурга (регистрация)": mongo.users_collection.count_documents( + {'campus': 'Санкт-Петербург', 'email': {'$ne': None}}) } - } - response['response'].update(processor.process()) - return response - app.run(host="0.0.0.0", port=1238, debug=settings.DEBUG) + @app.route('/stats', methods=['GET']) + def stats(): + all_users = mongo.users_collection.count_documents({}) + teachers = mongo.users_collection.count_documents({"is_teacher": True}) + text = f"Всего пользователей: {all_users}
" \ + f"Пользователей прошедших регистрацию: {mongo.users_collection.count_documents({'email': {'$ne': None}})}
" \ + f"Студентов: {all_users - teachers}
" \ + f"Преподавателей: {teachers}
" \ + f"Отписались от уведомлений: {mongo.users_collection.count_documents({'notify_minutes': None})}
" \ + f"Отправлено уведомлений за сегодня: {mongo.lessons_collection.count_documents({'notified': True})}
" \ + f"Проиндексировано занятий из РУЗа: {mongo.lessons_collection.count_documents({})}
" \ + f"
" \ + f"
" \ + f"Пользователей из Москвы: {mongo.users_collection.count_documents({'campus': 'Москва'}) + mongo.users_collection.count_documents({'campus': {'$exists': False}})}
" \ + f"Пользователей из Москвы (регистрация): {mongo.users_collection.count_documents({'campus': 'Москва', 'email': {'$ne': None}}) + mongo.users_collection.count_documents({'campus': {'$exists': False}, 'email': {'$ne': None}})}
" \ + f"Пользователей из Перми: {mongo.users_collection.count_documents({'campus': 'Пермь'})}
" \ + f"Пользователей из Перми (регистрация): {mongo.users_collection.count_documents({'campus': 'Пермь', 'email': {'$ne': None}})}
" \ + f"Пользователей из Нижнего Новгорода: {mongo.users_collection.count_documents({'campus': 'Нижний Новгород'})}
" \ + f"Пользователей из Нижнего Новгорода (регистрация): {mongo.users_collection.count_documents({'campus': 'Нижний Новгород', 'email': {'$ne': None}})}
" \ + f"Пользователей из Санкт-Петербурга: {mongo.users_collection.count_documents({'campus': 'Санкт-Петербург'})}
" \ + f"Пользователей из Санкт-Петербурга (регистрация): {mongo.users_collection.count_documents({'campus': 'Санкт-Петербург', 'email': {'$ne': None}})}
" + return text + + @app.route('/alice', methods=['POST']) + def alice(): + req = request.json + processor = Processor(req) + response = { + "version": req['version'], + "session": req['session'], + "response": { + "end_session": False + } + } + response['response'].update(processor.process()) + return response + + app.run(host="0.0.0.0", port=1238, debug=settings.DEBUG) diff --git a/daemons/base.py b/daemons/base.py new file mode 100644 index 0000000..aca86ad --- /dev/null +++ b/daemons/base.py @@ -0,0 +1,3 @@ +class Daemon: + def execute(self): + raise NotImplementedError diff --git a/daemons/bot.py b/daemons/bot.py deleted file mode 100644 index 973d718..0000000 --- a/daemons/bot.py +++ /dev/null @@ -1,20 +0,0 @@ -import os - -import telebot -from telebot.types import Message - -from helpers.mongo import mongo - -bot = telebot.TeleBot(os.getenv("TELEGRAM_TOKEN")) - - -@bot.message_handler(commands=['start']) -def on_start(message: Message): - mongo.users_collection.delete_many({"chat_id": message.chat.id}) - do_action(message) - - -@bot.message_handler() -def do_action(message: Message): - from helpers.answer import Answer - Answer(message).process() diff --git a/daemons/fetch.py b/daemons/fetch.py index 17c13ec..79d050b 100644 --- a/daemons/fetch.py +++ b/daemons/fetch.py @@ -6,6 +6,8 @@ from helpers import now, campus_timdelta from helpers.mongo import mongo from helpers.ruz import ruz +from daemons import base + def fetch_schedule_for_user(user: dict): today = now(user) @@ -75,16 +77,17 @@ def delete_old(): mongo.lessons_collection.delete_many({"end": {"$lte": datetime.datetime.now() - datetime.timedelta(days=1)}}) -def fetch(): - while True: - logging.info("fetch start") - begin = datetime.datetime.now() - if begin.hour > 22 or begin.hour < 7: - logging.info("Too late, sleeping") - sleep(30 * 60) - continue - process() - end = datetime.datetime.now() - logging.info('fetch finished') - logging.info("time elapsed %s", (end - begin).total_seconds()) - delete_old() +class Daemon(base.Daemon): + def execute(self): + while True: + logging.info("fetch start") + begin = datetime.datetime.now() + if begin.hour > 22 or begin.hour < 7: + logging.info("Too late, sleeping") + sleep(30 * 60) + continue + process() + end = datetime.datetime.now() + logging.info('fetch finished') + logging.info("time elapsed %s", (end - begin).total_seconds()) + delete_old() diff --git a/daemons/mailbox.py b/daemons/mailbox.py new file mode 100644 index 0000000..ce1fa9a --- /dev/null +++ b/daemons/mailbox.py @@ -0,0 +1,28 @@ +import telebot +import os + +from daemons import base +from utils import queues + + +class Daemon(base.BaseDaemon, queues.TasksHandlerMixin): + def __init__(self): + super().__init__() + self.bot = telebot.TeleBot(os.getenv("TELEGRAM_TOKEN")) + + @property + def queue_name(self): + return 'pizda_bot_mailbox' + + def execute(self): + self.poll() + + def process(self, payload): + body = { + 'chat_id': payload['chat_id'], + 'text': payload['text'], + } + reply_markup = payload.get('reply_markup') + if reply_markup: + body['reply_markup'] = reply_markup + self.bot.send_message(**body, parse_mode='Markdown') diff --git a/daemons/notify.py b/daemons/notify.py index 68228da..5aefc34 100644 --- a/daemons/notify.py +++ b/daemons/notify.py @@ -2,12 +2,12 @@ import datetime import logging from time import sleep -from telebot.apihelper import ApiTelegramException - -from daemons.bot import bot from helpers import now from helpers.mongo import mongo from helpers.ruz import ruz +from daemons import base + +from utils import queues def process(): @@ -25,13 +25,7 @@ def process(): ans += f"🧑‍🏫 {(lesson['lecturer'] or 'Неизвестно')}\n" if lesson.get('link', None): ans += f"🔗 {lesson['link']}" - try: - bot.send_message( - user["chat_id"], - f"Через {user['notify_minutes']} минут у тебя занятие!\n" + ans - ) - except ApiTelegramException: - pass + queues.set_task('ruz_bot_mailbox', {'text': f"Через {user['notify_minutes']} минут у тебя занятие!\n" + ans, 'chat_id': user["chat_id"]}, 1) mongo.lessons_collection.update_one({"_id": lesson['_id']}, {"$set": {"notified": True}}) time_now = datetime.datetime.now() for user in mongo.users_collection.find({"next_daily_notify_time": {"$lte": time_now}}): @@ -46,11 +40,7 @@ def process(): else: text = ruz.schedule_builder(lessons) try: - bot.send_message( - user["chat_id"], - f"Уведомляю о занятиях! Твое расписание на {'сегодня' if user.get('daily_notify_today', True) else 'завтра'}:\n" + text, - parse_mode='Markdown' - ) + queues.set_task('ruz_bot_mailbox', {'text': f"Уведомляю о занятиях! Твое расписание на {'сегодня' if user.get('daily_notify_today', True) else 'завтра'}:\n" + text, 'chat_id': user["chat_id"]}, 1) except: pass mongo.users_collection.update_one( @@ -72,34 +62,29 @@ def process(): ans += f"🧑‍🏫 {(lesson['lecturer'] or 'Неизвестно')}\n" if lesson.get('link', None): ans += f"🔗 {lesson['link']}" - try: - mess = "Пары начутся через " - if user['first_lesson_notify'] == 30: - mess += "30 минут" - elif user['first_lesson_notify'] == 60: - mess += "1 час" - elif user['first_lesson_notify'] == 4 * 60: - mess += "4 часа" - else: - mess += "12 часов" - mess += "!\n\nТвоя первая пара:\n\n" + ans - bot.send_message( - user["chat_id"], - mess - ) - except ApiTelegramException: - pass + mess = "Пары начутся через " + if user['first_lesson_notify'] == 30: + mess += "30 минут" + elif user['first_lesson_notify'] == 60: + mess += "1 час" + elif user['first_lesson_notify'] == 4 * 60: + mess += "4 часа" + else: + mess += "12 часов" + mess += "!\n\nТвоя первая пара:\n\n" + ans + queues.set_task('ruz_bot_mailbox', {'text': mess, 'chat_id': user["chat_id"]}, 1) start_of_day = datetime.datetime(year=time_now.year, month=time_now.month, day=time_now.day) mongo.lessons_collection.update_many({"begin": {"$gte": start_of_day, "$lt": (start_of_day + datetime.timedelta(days=1))}, "user_email": user["email"]}, {"$set": {"notified_today": True}}) break -def notify(): - while True: - logging.info("notify start") - begin = datetime.datetime.now() - process() - end = datetime.datetime.now() - logging.info('notify finished') - logging.info("time elapsed %s", (end - begin).total_seconds()) - sleep(63 - end.second) +class Daemon(base.Daemon): + def execute(self): + while True: + logging.info("notify start") + begin = datetime.datetime.now() + process() + end = datetime.datetime.now() + logging.info('notify finished') + logging.info("time elapsed %s", (end - begin).total_seconds()) + sleep(63 - end.second) diff --git a/daemons/poll.py b/daemons/poll.py new file mode 100644 index 0000000..374a730 --- /dev/null +++ b/daemons/poll.py @@ -0,0 +1,15 @@ +import os +import telebot + +from daemons import base +from telebot import types +from utils import queues + + +class Daemon(base.BaseDaemon): + def execute(self): + bot = telebot.TeleBot(os.getenv("TELEGRAM_TOKEN")) + @bot.message_handler() + def do_action(message: types.Message): + queues.set_task('ruz_bot_worker', message.json, 1) + bot.polling() diff --git a/daemons/worker.py b/daemons/worker.py new file mode 100644 index 0000000..7b4d58d --- /dev/null +++ b/daemons/worker.py @@ -0,0 +1,19 @@ +from daemons import base +from utils import queues +import json + +from telebot.types import Message + + +class Daemon(base.BaseDaemon, queues.TasksHandlerMixin): + @property + def queue_name(self): + return 'ruz_bot_worker' + + def execute(self): + self.poll() + + def process(self, payload): + message: Message = Message.de_json(json.dumps(payload)) + from helpers.answer import Answer + Answer(message).process() diff --git a/entrypoint.py b/entrypoint.py index 3c0b441..6c35ccc 100644 --- a/entrypoint.py +++ b/entrypoint.py @@ -2,10 +2,6 @@ import logging.config import sys import settings -from daemons.api import api -from daemons.bot import bot -from daemons.fetch import fetch -from daemons.notify import notify import locale @@ -13,17 +9,25 @@ logging.config.dictConfig(settings.logging_config) locale.setlocale(locale.LC_TIME, 'ru_RU.UTF-8') arg = sys.argv[-1] -if arg == "bot": - logging.info("bot is starting") - bot.polling() +if arg == "poll": + logging.info("poll is starting") + from daemons.poll import Daemon +elif arg == 'worker': + logging.info("worker is starting") + from daemons.worker import Daemon +elif arg == 'mailbox': + logging.info("mailbox is starting") + from daemons.mailbox import Daemon elif arg == "fetch": logging.info("fetch is starting") - fetch() + from daemons.fetch import Daemon elif arg == "notify": logging.info("notify is starting") - notify() + from daemons.notify import Daemon elif arg == "api": logging.info("api is starting") - api() + from daemons.api import Daemon else: raise ValueError(f"Unknown param {arg}") + +Daemon().execute() diff --git a/helpers/answer.py b/helpers/answer.py index 277216b..f93814a 100644 --- a/helpers/answer.py +++ b/helpers/answer.py @@ -4,7 +4,6 @@ from random import choice from telebot.types import Message, ReplyKeyboardRemove -from daemons.bot import bot from daemons.fetch import fetch_schedule_for_user from helpers import get_next_daily_notify_time from helpers.keyboards import main_keyboard, notify_keyboard, yes_no_keyboard, again_keyboard, no_daily_notify, \ @@ -12,6 +11,7 @@ from helpers.keyboards import main_keyboard, notify_keyboard, yes_no_keyboard, a from helpers.mongo import mongo from helpers.sprint_platform import platform from helpers.ruz import ruz +from utils import queues class User: @@ -32,6 +32,8 @@ class Answer: def __init__(self, message: Message): self.message = message self.message_text = message.text or message.caption or "" + if self.message_text.startswith('/start'): + mongo.users_collection.delete_many({"chat_id": message.chat.id}) user = mongo.users_collection.find_one({"chat_id": message.chat.id}) if user is None: user = { @@ -72,7 +74,10 @@ class Answer: def send_message(self, text, reply_markup=None, remove_keyboard=True, **kwargs): if reply_markup is None and remove_keyboard: reply_markup = ReplyKeyboardRemove() - bot.send_message(self.user['chat_id'], text, reply_markup=reply_markup, **kwargs) + body = {'text': text, 'chat_id': self.user['chat_id']} + if reply_markup: + body['reply_markup'] = reply_markup.to_json() + queues.set_task('ruz_bot_mailbox', body, 1) def set_state(self, state: str): self.user['state'] = state diff --git a/utils/__init__.py b/utils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/utils/queues.py b/utils/queues.py new file mode 100644 index 0000000..b4b071d --- /dev/null +++ b/utils/queues.py @@ -0,0 +1,64 @@ +import json +import os +import requests +import time + + +stage = os.getenv("STAGE", 'local') +if stage == 'development': + QUEUES_URL = 'https://queues.develop.sprinthub.ru' +elif stage == 'production': + QUEUES_URL = 'https://queues.sprinthub.ru' +else: + QUEUES_URL = None + +token = os.getenv('QUEUES_TOKEN') + + +class QueuesException(Exception): + ... + + +class TasksHandlerMixin: + def poll(self): + while True: + if QUEUES_URL is None: + data = {'payload': json.loads(input('Input message: '))} + else: + response = requests.get(f'{QUEUES_URL}/api/v1/take', headers={'queue': self.queue_name, 'X-Queues-Token': token}) + if response.status_code == 404: + time.sleep(0.2) + continue + if response.status_code == 403: + raise NotImplemented('QUEUE_TOKEN is incorrect') + data = response.json() + try: + self.process(data['payload']) + except Exception as exc: + print(f'Error processing message id={data["id"]}, payload={data["payload"]}, exc={exc}') + continue + if QUEUES_URL is None: + continue + try: + resp = requests.post(f'{QUEUES_URL}/api/v1/finish', json={'id': data['id']}, headers={'X-Queues-Token': token}) + if resp.status_code != 202: + raise QueuesException + except: + print(f'Failed to finish task id={data["id"]}') + + @property + def queue_name(self): + raise NotImplemented + + def process(self, payload): + raise NotImplemented + + +def set_task(queue_name: str, payload: dict, seconds_to_execute: int, delay: int|None = None): + resp = requests.post(f'{QUEUES_URL}/api/v1/put', headers={'queue': queue_name, 'X-Queues-Token': token}, json={ + 'payload': payload, + 'seconds_to_execute': seconds_to_execute, + 'delay': delay, + }) + if resp.status_code != 202: + raise QueuesException