diff --git a/.deploy/deploy-dev.yaml b/.deploy/deploy-dev.yaml
index 1bcf3bf..5680343 100644
--- a/.deploy/deploy-dev.yaml
+++ b/.deploy/deploy-dev.yaml
@@ -3,15 +3,45 @@ version: "3.4"
services:
- bot:
+ poll:
+ image: mathwave/sprint-repo:ruz-bot
+ environment:
+ STAGE: "development"
+ TELEGRAM_TOKEN: $TELEGRAM_TOKEN_DEV
+ QUEUES_TOKEN: $QUEUES_TOKEN_DEV
+ command: poll
+ deploy:
+ mode: replicated
+ restart_policy:
+ condition: any
+ update_config:
+ parallelism: 1
+ order: start-first
+
+ worker:
image: mathwave/sprint-repo:ruz-bot
environment:
MONGO_HOST: "mongo.develop.sprinthub.ru"
STAGE: "development"
MONGO_PASSWORD: $MONGO_PASSWORD_DEV
- TELEGRAM_TOKEN: $TELEGRAM_TOKEN_DEV
PLATFORM_SECURITY_TOKEN: $PLATFORM_SECURITY_TOKEN
- command: bot
+ QUEUES_TOKEN: $QUEUES_TOKEN_DEV
+ command: worker
+ deploy:
+ mode: replicated
+ restart_policy:
+ condition: any
+ update_config:
+ parallelism: 1
+ order: start-first
+
+ mailbox:
+ image: mathwave/sprint-repo:ruz-bot
+ environment:
+ STAGE: "development"
+ TELEGRAM_TOKEN: $TELEGRAM_TOKEN_DEV
+ QUEUES_TOKEN: $QUEUES_TOKEN_DEV
+ command: mailbox
deploy:
mode: replicated
restart_policy:
@@ -26,8 +56,8 @@ services:
MONGO_HOST: "mongo.develop.sprinthub.ru"
STAGE: "development"
MONGO_PASSWORD: $MONGO_PASSWORD_DEV
- TELEGRAM_TOKEN: $TELEGRAM_TOKEN_DEV
PLATFORM_SECURITY_TOKEN: $PLATFORM_SECURITY_TOKEN
+ QUEUES_TOKEN: $QUEUES_TOKEN_DEV
command: fetch
deploy:
mode: replicated
@@ -43,8 +73,8 @@ services:
MONGO_HOST: "mongo.develop.sprinthub.ru"
STAGE: "development"
MONGO_PASSWORD: $MONGO_PASSWORD_DEV
- TELEGRAM_TOKEN: $TELEGRAM_TOKEN_DEV
PLATFORM_SECURITY_TOKEN: $PLATFORM_SECURITY_TOKEN
+ QUEUES_TOKEN: $QUEUES_TOKEN_DEV
command: notify
deploy:
mode: replicated
@@ -62,8 +92,8 @@ services:
MONGO_HOST: "mongo.develop.sprinthub.ru"
STAGE: "development"
MONGO_PASSWORD: $MONGO_PASSWORD_DEV
- TELEGRAM_TOKEN: $TELEGRAM_TOKEN_DEV
PLATFORM_SECURITY_TOKEN: $PLATFORM_SECURITY_TOKEN
+ QUEUES_TOKEN: $QUEUES_TOKEN_DEV
command: api
deploy:
mode: replicated
diff --git a/.deploy/deploy-prod.yaml b/.deploy/deploy-prod.yaml
index 9c4678b..28831a1 100644
--- a/.deploy/deploy-prod.yaml
+++ b/.deploy/deploy-prod.yaml
@@ -3,24 +3,49 @@ version: "3.4"
services:
- bot:
+ poll:
+ image: mathwave/sprint-repo:ruz-bot
+ environment:
+ STAGE: "production"
+ TELEGRAM_TOKEN: $TELEGRAM_TOKEN_PROD
+ QUEUES_TOKEN: $QUEUES_TOKEN_PROD
+ command: poll
+ deploy:
+ mode: replicated
+ restart_policy:
+ condition: any
+ update_config:
+ parallelism: 1
+ order: start-first
+
+ worker:
image: mathwave/sprint-repo:ruz-bot
environment:
MONGO_HOST: "mongo.sprinthub.ru"
STAGE: "production"
MONGO_PASSWORD: $MONGO_PASSWORD_PROD
- TELEGRAM_TOKEN: $TELEGRAM_TOKEN_PROD
PLATFORM_SECURITY_TOKEN: $PLATFORM_SECURITY_TOKEN
- DEBUG: "false"
- command: bot
+ QUEUES_TOKEN: $QUEUES_TOKEN_PROD
+ command: worker
+ deploy:
+ mode: replicated
+ restart_policy:
+ condition: any
+ update_config:
+ parallelism: 1
+ order: start-first
+
+ mailbox:
+ image: mathwave/sprint-repo:ruz-bot
+ environment:
+ STAGE: "production"
+ TELEGRAM_TOKEN: $TELEGRAM_TOKEN_PROD
+ QUEUES_TOKEN: $QUEUES_TOKEN_PROD
+ command: mailbox
deploy:
mode: replicated
restart_policy:
condition: any
- placement:
- constraints:
- - node.role == worker
- - node.labels.zone == ru
update_config:
parallelism: 1
order: start-first
@@ -31,9 +56,9 @@ services:
MONGO_HOST: "mongo.sprinthub.ru"
STAGE: "production"
MONGO_PASSWORD: $MONGO_PASSWORD_PROD
- TELEGRAM_TOKEN: $TELEGRAM_TOKEN_PROD
PLATFORM_SECURITY_TOKEN: $PLATFORM_SECURITY_TOKEN
DEBUG: "false"
+ QUEUES_TOKEN: $QUEUES_TOKEN_PROD
command: fetch
deploy:
mode: replicated
@@ -53,9 +78,9 @@ services:
MONGO_HOST: "mongo.sprinthub.ru"
STAGE: "production"
MONGO_PASSWORD: $MONGO_PASSWORD_PROD
- TELEGRAM_TOKEN: $TELEGRAM_TOKEN_PROD
PLATFORM_SECURITY_TOKEN: $PLATFORM_SECURITY_TOKEN
DEBUG: "false"
+ QUEUES_TOKEN: $QUEUES_TOKEN_PROD
command: notify
deploy:
mode: replicated
@@ -77,7 +102,6 @@ services:
MONGO_HOST: "mongo.sprinthub.ru"
STAGE: "production"
MONGO_PASSWORD: $MONGO_PASSWORD_PROD
- TELEGRAM_TOKEN: $TELEGRAM_TOKEN_PROD
PLATFORM_SECURITY_TOKEN: $PLATFORM_SECURITY_TOKEN
DEBUG: "false"
command: api
diff --git a/.gitea/workflows/deploy-dev.yaml b/.gitea/workflows/deploy-dev.yaml
index 9330d09..519080a 100644
--- a/.gitea/workflows/deploy-dev.yaml
+++ b/.gitea/workflows/deploy-dev.yaml
@@ -42,4 +42,5 @@ jobs:
TELEGRAM_TOKEN_DEV: ${{ secrets.TELEGRAM_TOKEN_DEV }}
MONGO_PASSWORD_DEV: ${{ secrets.MONGO_PASSWORD_DEV }}
PLATFORM_SECURITY_TOKEN: ${{ secrets.PLATFORM_SECURITY_TOKEN }}
+ QUEUES_TOKEN_DEV: ${{ secrets.QUEUES_TOKEN_DEV }}
run: docker stack deploy --with-registry-auth -c ./.deploy/deploy-dev.yaml ruz-bot
diff --git a/.gitea/workflows/deploy-prod.yaml b/.gitea/workflows/deploy-prod.yaml
index 9ba368a..2f5b793 100644
--- a/.gitea/workflows/deploy-prod.yaml
+++ b/.gitea/workflows/deploy-prod.yaml
@@ -42,4 +42,5 @@ jobs:
TELEGRAM_TOKEN_PROD: ${{ secrets.TELEGRAM_TOKEN_PROD }}
MONGO_PASSWORD_PROD: ${{ secrets.MONGO_PASSWORD_PROD }}
PLATFORM_SECURITY_TOKEN: ${{ secrets.PLATFORM_SECURITY_TOKEN }}
+ QUEUES_TOKEN_PROD: ${{ secrets.QUEUES_TOKEN_PROD }}
run: docker stack deploy --with-registry-auth -c ./.deploy/deploy-prod.yaml ruz-bot
diff --git a/daemons/api.py b/daemons/api.py
index 3a540ea..faf0877 100644
--- a/daemons/api.py
+++ b/daemons/api.py
@@ -3,73 +3,75 @@ from flask import Flask, request
import settings
from helpers.alice import Processor
from helpers.mongo import mongo
+from daemons import base
-def api():
- app = Flask(__name__)
+class Daemon(base.Daemon):
+ def execute(self):
+ app = Flask(__name__)
- @app.route('/stats/json', methods=['GET'])
- def stats_json():
- all_users = mongo.users_collection.count_documents({})
- teachers = mongo.users_collection.count_documents({"is_teacher": True})
- return {
- "Всего пользователей": all_users,
- "Пользователей прошедших регистрацию": mongo.users_collection.count_documents({'email': {'$ne': None}}),
- "Преподавателей": teachers,
- "Отписались от уведомлений": mongo.users_collection.count_documents({'notify_minutes': None}),
- "Отправлено уведомлений за сегодня": mongo.lessons_collection.count_documents({'notified': True}),
- "Проиндексировано занятий из РУЗа": mongo.lessons_collection.count_documents({}),
- "Пользователей из Москвы": mongo.users_collection.count_documents(
- {'campus': 'Москва'}) + mongo.users_collection.count_documents({'campus': {'$exists': False}}),
- "Пользователей из Москвы (регистрация)": mongo.users_collection.count_documents(
- {'campus': 'Москва', 'email': {'$ne': None}}) + mongo.users_collection.count_documents(
- {'campus': {'$exists': False}, 'email': {'$ne': None}}),
- "Пользователей из Перми": mongo.users_collection.count_documents({'campus': 'Пермь'}),
- "Пользователей из Перми (регистрация)": mongo.users_collection.count_documents(
- {'campus': 'Пермь', 'email': {'$ne': None}}),
- "Пользователей из Нижнего Новгорода": mongo.users_collection.count_documents({'campus': 'Нижний Новгород'}),
- "Пользователей из Нижнего Новгорода (регистрация)": mongo.users_collection.count_documents(
- {'campus': 'Нижний Новгород', 'email': {'$ne': None}}),
- "Пользователей из Санкт-Петербурга": mongo.users_collection.count_documents({'campus': 'Санкт-Петербург'}),
- "Пользователей из Санкт-Петербурга (регистрация)": mongo.users_collection.count_documents(
- {'campus': 'Санкт-Петербург', 'email': {'$ne': None}})
- }
-
- @app.route('/stats', methods=['GET'])
- def stats():
- all_users = mongo.users_collection.count_documents({})
- teachers = mongo.users_collection.count_documents({"is_teacher": True})
- text = f"Всего пользователей: {all_users}
" \
- f"Пользователей прошедших регистрацию: {mongo.users_collection.count_documents({'email': {'$ne': None}})}
" \
- f"Студентов: {all_users - teachers}
" \
- f"Преподавателей: {teachers}
" \
- f"Отписались от уведомлений: {mongo.users_collection.count_documents({'notify_minutes': None})}
" \
- f"Отправлено уведомлений за сегодня: {mongo.lessons_collection.count_documents({'notified': True})}
" \
- f"Проиндексировано занятий из РУЗа: {mongo.lessons_collection.count_documents({})}
" \
- f"
" \
- f"
" \
- f"Пользователей из Москвы: {mongo.users_collection.count_documents({'campus': 'Москва'}) + mongo.users_collection.count_documents({'campus': {'$exists': False}})}
" \
- f"Пользователей из Москвы (регистрация): {mongo.users_collection.count_documents({'campus': 'Москва', 'email': {'$ne': None}}) + mongo.users_collection.count_documents({'campus': {'$exists': False}, 'email': {'$ne': None}})}
" \
- f"Пользователей из Перми: {mongo.users_collection.count_documents({'campus': 'Пермь'})}
" \
- f"Пользователей из Перми (регистрация): {mongo.users_collection.count_documents({'campus': 'Пермь', 'email': {'$ne': None}})}
" \
- f"Пользователей из Нижнего Новгорода: {mongo.users_collection.count_documents({'campus': 'Нижний Новгород'})}
" \
- f"Пользователей из Нижнего Новгорода (регистрация): {mongo.users_collection.count_documents({'campus': 'Нижний Новгород', 'email': {'$ne': None}})}
" \
- f"Пользователей из Санкт-Петербурга: {mongo.users_collection.count_documents({'campus': 'Санкт-Петербург'})}
" \
- f"Пользователей из Санкт-Петербурга (регистрация): {mongo.users_collection.count_documents({'campus': 'Санкт-Петербург', 'email': {'$ne': None}})}
"
- return text
-
- @app.route('/alice', methods=['POST'])
- def alice():
- req = request.json
- processor = Processor(req)
- response = {
- "version": req['version'],
- "session": req['session'],
- "response": {
- "end_session": False
+ @app.route('/stats/json', methods=['GET'])
+ def stats_json():
+ all_users = mongo.users_collection.count_documents({})
+ teachers = mongo.users_collection.count_documents({"is_teacher": True})
+ return {
+ "Всего пользователей": all_users,
+ "Пользователей прошедших регистрацию": mongo.users_collection.count_documents({'email': {'$ne': None}}),
+ "Преподавателей": teachers,
+ "Отписались от уведомлений": mongo.users_collection.count_documents({'notify_minutes': None}),
+ "Отправлено уведомлений за сегодня": mongo.lessons_collection.count_documents({'notified': True}),
+ "Проиндексировано занятий из РУЗа": mongo.lessons_collection.count_documents({}),
+ "Пользователей из Москвы": mongo.users_collection.count_documents(
+ {'campus': 'Москва'}) + mongo.users_collection.count_documents({'campus': {'$exists': False}}),
+ "Пользователей из Москвы (регистрация)": mongo.users_collection.count_documents(
+ {'campus': 'Москва', 'email': {'$ne': None}}) + mongo.users_collection.count_documents(
+ {'campus': {'$exists': False}, 'email': {'$ne': None}}),
+ "Пользователей из Перми": mongo.users_collection.count_documents({'campus': 'Пермь'}),
+ "Пользователей из Перми (регистрация)": mongo.users_collection.count_documents(
+ {'campus': 'Пермь', 'email': {'$ne': None}}),
+ "Пользователей из Нижнего Новгорода": mongo.users_collection.count_documents({'campus': 'Нижний Новгород'}),
+ "Пользователей из Нижнего Новгорода (регистрация)": mongo.users_collection.count_documents(
+ {'campus': 'Нижний Новгород', 'email': {'$ne': None}}),
+ "Пользователей из Санкт-Петербурга": mongo.users_collection.count_documents({'campus': 'Санкт-Петербург'}),
+ "Пользователей из Санкт-Петербурга (регистрация)": mongo.users_collection.count_documents(
+ {'campus': 'Санкт-Петербург', 'email': {'$ne': None}})
}
- }
- response['response'].update(processor.process())
- return response
- app.run(host="0.0.0.0", port=1238, debug=settings.DEBUG)
+ @app.route('/stats', methods=['GET'])
+ def stats():
+ all_users = mongo.users_collection.count_documents({})
+ teachers = mongo.users_collection.count_documents({"is_teacher": True})
+ text = f"Всего пользователей: {all_users}
" \
+ f"Пользователей прошедших регистрацию: {mongo.users_collection.count_documents({'email': {'$ne': None}})}
" \
+ f"Студентов: {all_users - teachers}
" \
+ f"Преподавателей: {teachers}
" \
+ f"Отписались от уведомлений: {mongo.users_collection.count_documents({'notify_minutes': None})}
" \
+ f"Отправлено уведомлений за сегодня: {mongo.lessons_collection.count_documents({'notified': True})}
" \
+ f"Проиндексировано занятий из РУЗа: {mongo.lessons_collection.count_documents({})}
" \
+ f"
" \
+ f"
" \
+ f"Пользователей из Москвы: {mongo.users_collection.count_documents({'campus': 'Москва'}) + mongo.users_collection.count_documents({'campus': {'$exists': False}})}
" \
+ f"Пользователей из Москвы (регистрация): {mongo.users_collection.count_documents({'campus': 'Москва', 'email': {'$ne': None}}) + mongo.users_collection.count_documents({'campus': {'$exists': False}, 'email': {'$ne': None}})}
" \
+ f"Пользователей из Перми: {mongo.users_collection.count_documents({'campus': 'Пермь'})}
" \
+ f"Пользователей из Перми (регистрация): {mongo.users_collection.count_documents({'campus': 'Пермь', 'email': {'$ne': None}})}
" \
+ f"Пользователей из Нижнего Новгорода: {mongo.users_collection.count_documents({'campus': 'Нижний Новгород'})}
" \
+ f"Пользователей из Нижнего Новгорода (регистрация): {mongo.users_collection.count_documents({'campus': 'Нижний Новгород', 'email': {'$ne': None}})}
" \
+ f"Пользователей из Санкт-Петербурга: {mongo.users_collection.count_documents({'campus': 'Санкт-Петербург'})}
" \
+ f"Пользователей из Санкт-Петербурга (регистрация): {mongo.users_collection.count_documents({'campus': 'Санкт-Петербург', 'email': {'$ne': None}})}
"
+ return text
+
+ @app.route('/alice', methods=['POST'])
+ def alice():
+ req = request.json
+ processor = Processor(req)
+ response = {
+ "version": req['version'],
+ "session": req['session'],
+ "response": {
+ "end_session": False
+ }
+ }
+ response['response'].update(processor.process())
+ return response
+
+ app.run(host="0.0.0.0", port=1238, debug=settings.DEBUG)
diff --git a/daemons/base.py b/daemons/base.py
new file mode 100644
index 0000000..aca86ad
--- /dev/null
+++ b/daemons/base.py
@@ -0,0 +1,3 @@
+class Daemon:
+ def execute(self):
+ raise NotImplementedError
diff --git a/daemons/bot.py b/daemons/bot.py
deleted file mode 100644
index 973d718..0000000
--- a/daemons/bot.py
+++ /dev/null
@@ -1,20 +0,0 @@
-import os
-
-import telebot
-from telebot.types import Message
-
-from helpers.mongo import mongo
-
-bot = telebot.TeleBot(os.getenv("TELEGRAM_TOKEN"))
-
-
-@bot.message_handler(commands=['start'])
-def on_start(message: Message):
- mongo.users_collection.delete_many({"chat_id": message.chat.id})
- do_action(message)
-
-
-@bot.message_handler()
-def do_action(message: Message):
- from helpers.answer import Answer
- Answer(message).process()
diff --git a/daemons/fetch.py b/daemons/fetch.py
index 17c13ec..79d050b 100644
--- a/daemons/fetch.py
+++ b/daemons/fetch.py
@@ -6,6 +6,8 @@ from helpers import now, campus_timdelta
from helpers.mongo import mongo
from helpers.ruz import ruz
+from daemons import base
+
def fetch_schedule_for_user(user: dict):
today = now(user)
@@ -75,16 +77,17 @@ def delete_old():
mongo.lessons_collection.delete_many({"end": {"$lte": datetime.datetime.now() - datetime.timedelta(days=1)}})
-def fetch():
- while True:
- logging.info("fetch start")
- begin = datetime.datetime.now()
- if begin.hour > 22 or begin.hour < 7:
- logging.info("Too late, sleeping")
- sleep(30 * 60)
- continue
- process()
- end = datetime.datetime.now()
- logging.info('fetch finished')
- logging.info("time elapsed %s", (end - begin).total_seconds())
- delete_old()
+class Daemon(base.Daemon):
+ def execute(self):
+ while True:
+ logging.info("fetch start")
+ begin = datetime.datetime.now()
+ if begin.hour > 22 or begin.hour < 7:
+ logging.info("Too late, sleeping")
+ sleep(30 * 60)
+ continue
+ process()
+ end = datetime.datetime.now()
+ logging.info('fetch finished')
+ logging.info("time elapsed %s", (end - begin).total_seconds())
+ delete_old()
diff --git a/daemons/mailbox.py b/daemons/mailbox.py
new file mode 100644
index 0000000..ce1fa9a
--- /dev/null
+++ b/daemons/mailbox.py
@@ -0,0 +1,28 @@
+import telebot
+import os
+
+from daemons import base
+from utils import queues
+
+
+class Daemon(base.BaseDaemon, queues.TasksHandlerMixin):
+ def __init__(self):
+ super().__init__()
+ self.bot = telebot.TeleBot(os.getenv("TELEGRAM_TOKEN"))
+
+ @property
+ def queue_name(self):
+ return 'pizda_bot_mailbox'
+
+ def execute(self):
+ self.poll()
+
+ def process(self, payload):
+ body = {
+ 'chat_id': payload['chat_id'],
+ 'text': payload['text'],
+ }
+ reply_markup = payload.get('reply_markup')
+ if reply_markup:
+ body['reply_markup'] = reply_markup
+ self.bot.send_message(**body, parse_mode='Markdown')
diff --git a/daemons/notify.py b/daemons/notify.py
index 68228da..5aefc34 100644
--- a/daemons/notify.py
+++ b/daemons/notify.py
@@ -2,12 +2,12 @@ import datetime
import logging
from time import sleep
-from telebot.apihelper import ApiTelegramException
-
-from daemons.bot import bot
from helpers import now
from helpers.mongo import mongo
from helpers.ruz import ruz
+from daemons import base
+
+from utils import queues
def process():
@@ -25,13 +25,7 @@ def process():
ans += f"🧑🏫 {(lesson['lecturer'] or 'Неизвестно')}\n"
if lesson.get('link', None):
ans += f"🔗 {lesson['link']}"
- try:
- bot.send_message(
- user["chat_id"],
- f"Через {user['notify_minutes']} минут у тебя занятие!\n" + ans
- )
- except ApiTelegramException:
- pass
+ queues.set_task('ruz_bot_mailbox', {'text': f"Через {user['notify_minutes']} минут у тебя занятие!\n" + ans, 'chat_id': user["chat_id"]}, 1)
mongo.lessons_collection.update_one({"_id": lesson['_id']}, {"$set": {"notified": True}})
time_now = datetime.datetime.now()
for user in mongo.users_collection.find({"next_daily_notify_time": {"$lte": time_now}}):
@@ -46,11 +40,7 @@ def process():
else:
text = ruz.schedule_builder(lessons)
try:
- bot.send_message(
- user["chat_id"],
- f"Уведомляю о занятиях! Твое расписание на {'сегодня' if user.get('daily_notify_today', True) else 'завтра'}:\n" + text,
- parse_mode='Markdown'
- )
+ queues.set_task('ruz_bot_mailbox', {'text': f"Уведомляю о занятиях! Твое расписание на {'сегодня' if user.get('daily_notify_today', True) else 'завтра'}:\n" + text, 'chat_id': user["chat_id"]}, 1)
except:
pass
mongo.users_collection.update_one(
@@ -72,34 +62,29 @@ def process():
ans += f"🧑🏫 {(lesson['lecturer'] or 'Неизвестно')}\n"
if lesson.get('link', None):
ans += f"🔗 {lesson['link']}"
- try:
- mess = "Пары начутся через "
- if user['first_lesson_notify'] == 30:
- mess += "30 минут"
- elif user['first_lesson_notify'] == 60:
- mess += "1 час"
- elif user['first_lesson_notify'] == 4 * 60:
- mess += "4 часа"
- else:
- mess += "12 часов"
- mess += "!\n\nТвоя первая пара:\n\n" + ans
- bot.send_message(
- user["chat_id"],
- mess
- )
- except ApiTelegramException:
- pass
+ mess = "Пары начутся через "
+ if user['first_lesson_notify'] == 30:
+ mess += "30 минут"
+ elif user['first_lesson_notify'] == 60:
+ mess += "1 час"
+ elif user['first_lesson_notify'] == 4 * 60:
+ mess += "4 часа"
+ else:
+ mess += "12 часов"
+ mess += "!\n\nТвоя первая пара:\n\n" + ans
+ queues.set_task('ruz_bot_mailbox', {'text': mess, 'chat_id': user["chat_id"]}, 1)
start_of_day = datetime.datetime(year=time_now.year, month=time_now.month, day=time_now.day)
mongo.lessons_collection.update_many({"begin": {"$gte": start_of_day, "$lt": (start_of_day + datetime.timedelta(days=1))}, "user_email": user["email"]}, {"$set": {"notified_today": True}})
break
-def notify():
- while True:
- logging.info("notify start")
- begin = datetime.datetime.now()
- process()
- end = datetime.datetime.now()
- logging.info('notify finished')
- logging.info("time elapsed %s", (end - begin).total_seconds())
- sleep(63 - end.second)
+class Daemon(base.Daemon):
+ def execute(self):
+ while True:
+ logging.info("notify start")
+ begin = datetime.datetime.now()
+ process()
+ end = datetime.datetime.now()
+ logging.info('notify finished')
+ logging.info("time elapsed %s", (end - begin).total_seconds())
+ sleep(63 - end.second)
diff --git a/daemons/poll.py b/daemons/poll.py
new file mode 100644
index 0000000..374a730
--- /dev/null
+++ b/daemons/poll.py
@@ -0,0 +1,15 @@
+import os
+import telebot
+
+from daemons import base
+from telebot import types
+from utils import queues
+
+
+class Daemon(base.BaseDaemon):
+ def execute(self):
+ bot = telebot.TeleBot(os.getenv("TELEGRAM_TOKEN"))
+ @bot.message_handler()
+ def do_action(message: types.Message):
+ queues.set_task('ruz_bot_worker', message.json, 1)
+ bot.polling()
diff --git a/daemons/worker.py b/daemons/worker.py
new file mode 100644
index 0000000..7b4d58d
--- /dev/null
+++ b/daemons/worker.py
@@ -0,0 +1,19 @@
+from daemons import base
+from utils import queues
+import json
+
+from telebot.types import Message
+
+
+class Daemon(base.BaseDaemon, queues.TasksHandlerMixin):
+ @property
+ def queue_name(self):
+ return 'ruz_bot_worker'
+
+ def execute(self):
+ self.poll()
+
+ def process(self, payload):
+ message: Message = Message.de_json(json.dumps(payload))
+ from helpers.answer import Answer
+ Answer(message).process()
diff --git a/entrypoint.py b/entrypoint.py
index 3c0b441..6c35ccc 100644
--- a/entrypoint.py
+++ b/entrypoint.py
@@ -2,10 +2,6 @@ import logging.config
import sys
import settings
-from daemons.api import api
-from daemons.bot import bot
-from daemons.fetch import fetch
-from daemons.notify import notify
import locale
@@ -13,17 +9,25 @@ logging.config.dictConfig(settings.logging_config)
locale.setlocale(locale.LC_TIME, 'ru_RU.UTF-8')
arg = sys.argv[-1]
-if arg == "bot":
- logging.info("bot is starting")
- bot.polling()
+if arg == "poll":
+ logging.info("poll is starting")
+ from daemons.poll import Daemon
+elif arg == 'worker':
+ logging.info("worker is starting")
+ from daemons.worker import Daemon
+elif arg == 'mailbox':
+ logging.info("mailbox is starting")
+ from daemons.mailbox import Daemon
elif arg == "fetch":
logging.info("fetch is starting")
- fetch()
+ from daemons.fetch import Daemon
elif arg == "notify":
logging.info("notify is starting")
- notify()
+ from daemons.notify import Daemon
elif arg == "api":
logging.info("api is starting")
- api()
+ from daemons.api import Daemon
else:
raise ValueError(f"Unknown param {arg}")
+
+Daemon().execute()
diff --git a/helpers/answer.py b/helpers/answer.py
index 277216b..f93814a 100644
--- a/helpers/answer.py
+++ b/helpers/answer.py
@@ -4,7 +4,6 @@ from random import choice
from telebot.types import Message, ReplyKeyboardRemove
-from daemons.bot import bot
from daemons.fetch import fetch_schedule_for_user
from helpers import get_next_daily_notify_time
from helpers.keyboards import main_keyboard, notify_keyboard, yes_no_keyboard, again_keyboard, no_daily_notify, \
@@ -12,6 +11,7 @@ from helpers.keyboards import main_keyboard, notify_keyboard, yes_no_keyboard, a
from helpers.mongo import mongo
from helpers.sprint_platform import platform
from helpers.ruz import ruz
+from utils import queues
class User:
@@ -32,6 +32,8 @@ class Answer:
def __init__(self, message: Message):
self.message = message
self.message_text = message.text or message.caption or ""
+ if self.message_text.startswith('/start'):
+ mongo.users_collection.delete_many({"chat_id": message.chat.id})
user = mongo.users_collection.find_one({"chat_id": message.chat.id})
if user is None:
user = {
@@ -72,7 +74,10 @@ class Answer:
def send_message(self, text, reply_markup=None, remove_keyboard=True, **kwargs):
if reply_markup is None and remove_keyboard:
reply_markup = ReplyKeyboardRemove()
- bot.send_message(self.user['chat_id'], text, reply_markup=reply_markup, **kwargs)
+ body = {'text': text, 'chat_id': self.user['chat_id']}
+ if reply_markup:
+ body['reply_markup'] = reply_markup.to_json()
+ queues.set_task('ruz_bot_mailbox', body, 1)
def set_state(self, state: str):
self.user['state'] = state
diff --git a/utils/__init__.py b/utils/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/utils/queues.py b/utils/queues.py
new file mode 100644
index 0000000..b4b071d
--- /dev/null
+++ b/utils/queues.py
@@ -0,0 +1,64 @@
+import json
+import os
+import requests
+import time
+
+
+stage = os.getenv("STAGE", 'local')
+if stage == 'development':
+ QUEUES_URL = 'https://queues.develop.sprinthub.ru'
+elif stage == 'production':
+ QUEUES_URL = 'https://queues.sprinthub.ru'
+else:
+ QUEUES_URL = None
+
+token = os.getenv('QUEUES_TOKEN')
+
+
+class QueuesException(Exception):
+ ...
+
+
+class TasksHandlerMixin:
+ def poll(self):
+ while True:
+ if QUEUES_URL is None:
+ data = {'payload': json.loads(input('Input message: '))}
+ else:
+ response = requests.get(f'{QUEUES_URL}/api/v1/take', headers={'queue': self.queue_name, 'X-Queues-Token': token})
+ if response.status_code == 404:
+ time.sleep(0.2)
+ continue
+ if response.status_code == 403:
+ raise NotImplemented('QUEUE_TOKEN is incorrect')
+ data = response.json()
+ try:
+ self.process(data['payload'])
+ except Exception as exc:
+ print(f'Error processing message id={data["id"]}, payload={data["payload"]}, exc={exc}')
+ continue
+ if QUEUES_URL is None:
+ continue
+ try:
+ resp = requests.post(f'{QUEUES_URL}/api/v1/finish', json={'id': data['id']}, headers={'X-Queues-Token': token})
+ if resp.status_code != 202:
+ raise QueuesException
+ except:
+ print(f'Failed to finish task id={data["id"]}')
+
+ @property
+ def queue_name(self):
+ raise NotImplemented
+
+ def process(self, payload):
+ raise NotImplemented
+
+
+def set_task(queue_name: str, payload: dict, seconds_to_execute: int, delay: int|None = None):
+ resp = requests.post(f'{QUEUES_URL}/api/v1/put', headers={'queue': queue_name, 'X-Queues-Token': token}, json={
+ 'payload': payload,
+ 'seconds_to_execute': seconds_to_execute,
+ 'delay': delay,
+ })
+ if resp.status_code != 202:
+ raise QueuesException