From f4af02ec1d0fd35ca4fee9c02192e2818930bfad Mon Sep 17 00:00:00 2001 From: Egor Matveev Date: Sun, 15 Jun 2025 14:56:49 +0300 Subject: [PATCH 1/4] fix --- helpers/answer.py | 2 +- helpers/ruz.py | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/helpers/answer.py b/helpers/answer.py index 7c711d2..e9893f3 100644 --- a/helpers/answer.py +++ b/helpers/answer.py @@ -77,7 +77,7 @@ class Answer: body = {'text': text, 'chat_id': self.user['chat_id'], 'parse_mode': 'Markdown'} if reply_markup: body['reply_markup'] = reply_markup.to_json() - queues.set_task('botalka_mailbox', {'project': 'ruz-bot', 'name': 'telegram-bot', 'body': body}, 1) + queues.set_task('botalka_mailbox', {'project': 'ruz-bot', 'name': 'telegram-bot', 'body': body}, 5) def set_state(self, state: str): self.user['state'] = state diff --git a/helpers/ruz.py b/helpers/ruz.py index 27f6447..91922a6 100644 --- a/helpers/ruz.py +++ b/helpers/ruz.py @@ -12,7 +12,8 @@ fields = [ 'date_start', 'date_end', 'lecturer_profiles', - 'stream_links' + 'stream_links', + 'type', ] -- 2.45.2 From dbe5df157baf58ab88ab944852ee128133357dd9 Mon Sep 17 00:00:00 2001 From: Egor Matveev Date: Sun, 15 Jun 2025 15:01:22 +0300 Subject: [PATCH 2/4] fix --- .deploy/deploy-dev.yaml | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/.deploy/deploy-dev.yaml b/.deploy/deploy-dev.yaml index 3753443..ab8b0ac 100644 --- a/.deploy/deploy-dev.yaml +++ b/.deploy/deploy-dev.yaml @@ -61,13 +61,12 @@ services: image: mathwave/sprint-repo:ruz-bot networks: - common-infra-nginx + - queues-development environment: MONGO_HOST: "mongo.develop.sprinthub.ru" STAGE: "development" MONGO_PASSWORD: $MONGO_PASSWORD_DEV PLATFORM_SECURITY_TOKEN: $PLATFORM_SECURITY_TOKEN - networks: - - queues-development command: api deploy: mode: replicated -- 2.45.2 From e9ecb7d94fd289ba874edf5438733790b9bc683f Mon Sep 17 00:00:00 2001 From: Egor Matveev Date: Sun, 15 Jun 2025 15:02:35 +0300 Subject: [PATCH 3/4] fix --- .deploy/deploy-dev.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.deploy/deploy-dev.yaml b/.deploy/deploy-dev.yaml index ab8b0ac..a76dbb0 100644 --- a/.deploy/deploy-dev.yaml +++ b/.deploy/deploy-dev.yaml @@ -60,7 +60,7 @@ services: ruz-bot-nginx: image: mathwave/sprint-repo:ruz-bot networks: - - common-infra-nginx + - common-infra-nginx-development - queues-development environment: MONGO_HOST: "mongo.develop.sprinthub.ru" -- 2.45.2 From 7fcbb316e641889b7ea0a0ebcdfa56ce36dc14dd Mon Sep 17 00:00:00 2001 From: Egor Matveev Date: Sun, 15 Jun 2025 23:26:18 +0300 Subject: [PATCH 4/4] fix --- utils/queues.py | 51 ++++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 42 insertions(+), 9 deletions(-) diff --git a/utils/queues.py b/utils/queues.py index c00a18c..c435c05 100644 --- a/utils/queues.py +++ b/utils/queues.py @@ -1,4 +1,7 @@ +from concurrent.futures import ThreadPoolExecutor +import datetime import os +import zoneinfo import requests import time @@ -15,24 +18,54 @@ class QueuesException(Exception): class TasksHandlerMixin: + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.executor = ThreadPoolExecutor(max_workers=1) + + def _send_metric(self, start: datetime.datetime, end: datetime.datetime, success: bool): + def send(): + requests.post(f'{QUEUES_URL}/api/v1/metric', json={ + 'service': 'botalka', + 'queue': self.queue_name, + 'success': success, + 'timestamp': start.strftime("%Y-%m-%dT%H:%M:%S") + "Z", + "success": success, + "execution_time_ms": (end - start).microseconds // 1000, + "environment": stage, + }) + + self.executor.submit(send) + def poll(self): while True: - response = requests.get(f'{QUEUES_URL}/api/v1/take', headers={'queue': self.queue_name}).json() + try: + response = requests.get(f'{QUEUES_URL}/api/v1/take', headers={'queue': self.queue_name}).json() + except requests.JSONDecodeError: + print('Unable to decode json') + time.sleep(3) + continue task = response.get('task') if not task: time.sleep(0.2) continue + start = datetime.datetime.now(zoneinfo.ZoneInfo("Europe/Moscow")) try: + print(f'process task with id {task["id"]}, attempt {task["attempt"]}') self.process(task['payload']) + success = True except Exception as exc: print(f'Error processing message id={task["id"]}, payload={task["payload"]}, exc={exc}') - continue - try: - resp = requests.post(f'{QUEUES_URL}/api/v1/finish', json={'id': task['id']}) - if resp.status_code != 202: - raise QueuesException - except: - print(f'Failed to finish task id={task["id"]}') + success = False + end = datetime.datetime.now(zoneinfo.ZoneInfo("Europe/Moscow")) + if success: + try: + resp = requests.post(f'{QUEUES_URL}/api/v1/finish', json={'id': task['id']}) + if resp.status_code != 202: + raise QueuesException + print(f'finish task with id {task["id"]}') + except: + print(f'Failed to finish task id={task["id"]}') + self._send_metric(start, end, success) @property def queue_name(self): @@ -49,4 +82,4 @@ def set_task(queue_name: str, payload: dict, seconds_to_execute: int, delay: int 'delay': delay, }) if resp.status_code != 202: - raise QueuesException + raise QueuesException \ No newline at end of file -- 2.45.2