From 5c1418ddd8f5561586bd6029e05e41d6e7d17d25 Mon Sep 17 00:00:00 2001 From: Egor Matveev Date: Sun, 15 Jun 2025 03:30:04 +0300 Subject: [PATCH 1/5] fix --- utils/queues.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/utils/queues.py b/utils/queues.py index 583bcd1..edb5fa7 100644 --- a/utils/queues.py +++ b/utils/queues.py @@ -1,6 +1,6 @@ +from concurrent.futures import ThreadPoolExecutor import datetime import os -from threading import Thread import zoneinfo import requests import time @@ -13,6 +13,9 @@ else: QUEUES_URL = 'http://queues:1239' +executor = ThreadPoolExecutor(max_workers=1) + + class QueuesException(Exception): ... @@ -36,9 +39,7 @@ class TasksHandlerMixin: print(f'metric failed: {e}') def send_metric(self, start, success, end): - # Thread(target=self._send_metric, args=(start, success, end)).start() - # self._send_metric(start, success, end) - ... + executor.submit(self._send_metric, start, success, end) def poll(self): while True: From 88914893ce6c88ff5ac1f50fd449f6a518427b4b Mon Sep 17 00:00:00 2001 From: Egor Matveev Date: Sun, 15 Jun 2025 03:32:44 +0300 Subject: [PATCH 2/5] fix --- utils/queues.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/utils/queues.py b/utils/queues.py index edb5fa7..f96b78f 100644 --- a/utils/queues.py +++ b/utils/queues.py @@ -39,7 +39,8 @@ class TasksHandlerMixin: print(f'metric failed: {e}') def send_metric(self, start, success, end): - executor.submit(self._send_metric, start, success, end) + # executor.submit(self._send_metric, start, success, end) + ... def poll(self): while True: From ea3e4837f2b6db233d05bdfc4ff4e0ec5603f629 Mon Sep 17 00:00:00 2001 From: Egor Matveev Date: Sun, 15 Jun 2025 03:45:21 +0300 Subject: [PATCH 3/5] fx --- utils/queues.py | 34 +++++++--------------------------- 1 file changed, 7 insertions(+), 27 deletions(-) diff --git a/utils/queues.py b/utils/queues.py index f96b78f..23d3535 100644 --- a/utils/queues.py +++ b/utils/queues.py @@ -21,26 +21,6 @@ class QueuesException(Exception): class TasksHandlerMixin: - def _send_metric(self, start, success, end): - try: - metric = requests.post('http://monitoring:1237/api/v1/metrics/task', json={ - 'timestamp': start.strftime("%Y-%m-%dT%H:%M:%S") + "Z", - 'service': 'botalka', - 'environment': stage, - 'queue': self.queue_name, - 'success': success, - 'execution_time_ms': (end - start).microseconds // 1000, - }) - if metric.status_code == 202: - print('metric ok') - else: - print(f'metric failed: {metric.status_code}') - except Exception as e: - print(f'metric failed: {e}') - - def send_metric(self, start, success, end): - # executor.submit(self._send_metric, start, success, end) - ... def poll(self): while True: @@ -62,13 +42,13 @@ class TasksHandlerMixin: print(f'Error processing message id={task["id"]}, payload={task["payload"]}, exc={exc}') success = False end = datetime.datetime.now().astimezone(zoneinfo.ZoneInfo("Europe/Moscow")) - try: - resp = requests.post(f'{QUEUES_URL}/api/v1/finish', json={'id': task['id']}) - if resp.status_code != 202: - raise QueuesException - except: - print(f'Failed to finish task id={task["id"]}') - self.send_metric(start, success, end) + if success: + try: + resp = requests.post(f'{QUEUES_URL}/api/v1/finish', json={'id': task['id']}) + if resp.status_code != 202: + raise QueuesException + except: + print(f'Failed to finish task id={task["id"]}') @property def queue_name(self): From 82b5d571e55673238cb2b8742a6bf845cdf52395 Mon Sep 17 00:00:00 2001 From: Egor Matveev Date: Sun, 15 Jun 2025 03:45:58 +0300 Subject: [PATCH 4/5] fx --- utils/queues.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/utils/queues.py b/utils/queues.py index 23d3535..d19f196 100644 --- a/utils/queues.py +++ b/utils/queues.py @@ -1,4 +1,3 @@ -from concurrent.futures import ThreadPoolExecutor import datetime import os import zoneinfo @@ -13,9 +12,6 @@ else: QUEUES_URL = 'http://queues:1239' -executor = ThreadPoolExecutor(max_workers=1) - - class QueuesException(Exception): ... From a8592c70008774e77ad4c47664042299b2de913c Mon Sep 17 00:00:00 2001 From: Egor Matveev Date: Sun, 15 Jun 2025 03:50:36 +0300 Subject: [PATCH 5/5] fx --- daemons/mailbox.py | 1 + 1 file changed, 1 insertion(+) diff --git a/daemons/mailbox.py b/daemons/mailbox.py index 2fc236e..f5dc3f9 100644 --- a/daemons/mailbox.py +++ b/daemons/mailbox.py @@ -23,6 +23,7 @@ class Daemon(base.Daemon, queues.TasksHandlerMixin): return 'botalka_mailbox' def process(self, payload: dict): + print(f"Get message {payload}") message = Message.model_validate(payload) bot = platform.platform_client.get_config('bots')[message.project][message.name] if not bot['mailbox_enabled']: