Merge pull request 'master' (#29) from master into prod

Reviewed-on: https://gitea.chocomarsh.com/self/ruz-bot/pulls/29
This commit is contained in:
emmatveev 2025-06-15 23:29:02 +03:00
commit 5392a9acc3
4 changed files with 47 additions and 14 deletions

View File

@ -60,14 +60,13 @@ services:
ruz-bot-nginx:
image: mathwave/sprint-repo:ruz-bot
networks:
- common-infra-nginx
- common-infra-nginx-development
- queues-development
environment:
MONGO_HOST: "mongo.develop.sprinthub.ru"
STAGE: "development"
MONGO_PASSWORD: $MONGO_PASSWORD_DEV
PLATFORM_SECURITY_TOKEN: $PLATFORM_SECURITY_TOKEN
networks:
- queues-development
command: api
deploy:
mode: replicated

View File

@ -77,7 +77,7 @@ class Answer:
body = {'text': text, 'chat_id': self.user['chat_id'], 'parse_mode': 'Markdown'}
if reply_markup:
body['reply_markup'] = reply_markup.to_json()
queues.set_task('botalka_mailbox', {'project': 'ruz-bot', 'name': 'telegram-bot', 'body': body}, 1)
queues.set_task('botalka_mailbox', {'project': 'ruz-bot', 'name': 'telegram-bot', 'body': body}, 5)
def set_state(self, state: str):
self.user['state'] = state

View File

@ -12,7 +12,8 @@ fields = [
'date_start',
'date_end',
'lecturer_profiles',
'stream_links'
'stream_links',
'type',
]

View File

@ -1,4 +1,7 @@
from concurrent.futures import ThreadPoolExecutor
import datetime
import os
import zoneinfo
import requests
import time
@ -15,24 +18,54 @@ class QueuesException(Exception):
class TasksHandlerMixin:
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.executor = ThreadPoolExecutor(max_workers=1)
def _send_metric(self, start: datetime.datetime, end: datetime.datetime, success: bool):
def send():
requests.post(f'{QUEUES_URL}/api/v1/metric', json={
'service': 'botalka',
'queue': self.queue_name,
'success': success,
'timestamp': start.strftime("%Y-%m-%dT%H:%M:%S") + "Z",
"success": success,
"execution_time_ms": (end - start).microseconds // 1000,
"environment": stage,
})
self.executor.submit(send)
def poll(self):
while True:
response = requests.get(f'{QUEUES_URL}/api/v1/take', headers={'queue': self.queue_name}).json()
try:
response = requests.get(f'{QUEUES_URL}/api/v1/take', headers={'queue': self.queue_name}).json()
except requests.JSONDecodeError:
print('Unable to decode json')
time.sleep(3)
continue
task = response.get('task')
if not task:
time.sleep(0.2)
continue
start = datetime.datetime.now(zoneinfo.ZoneInfo("Europe/Moscow"))
try:
print(f'process task with id {task["id"]}, attempt {task["attempt"]}')
self.process(task['payload'])
success = True
except Exception as exc:
print(f'Error processing message id={task["id"]}, payload={task["payload"]}, exc={exc}')
continue
try:
resp = requests.post(f'{QUEUES_URL}/api/v1/finish', json={'id': task['id']})
if resp.status_code != 202:
raise QueuesException
except:
print(f'Failed to finish task id={task["id"]}')
success = False
end = datetime.datetime.now(zoneinfo.ZoneInfo("Europe/Moscow"))
if success:
try:
resp = requests.post(f'{QUEUES_URL}/api/v1/finish', json={'id': task['id']})
if resp.status_code != 202:
raise QueuesException
print(f'finish task with id {task["id"]}')
except:
print(f'Failed to finish task id={task["id"]}')
self._send_metric(start, end, success)
@property
def queue_name(self):
@ -49,4 +82,4 @@ def set_task(queue_name: str, payload: dict, seconds_to_execute: int, delay: int
'delay': delay,
})
if resp.status_code != 202:
raise QueuesException
raise QueuesException