From 8828bfd05b14c91b239e510c35872ac6ab73f5ab Mon Sep 17 00:00:00 2001 From: Egor Matveev Date: Tue, 31 Dec 2024 02:49:03 +0300 Subject: [PATCH 1/7] fix --- daemons/poll.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/daemons/poll.py b/daemons/poll.py index d7169ee..d4fe4a4 100644 --- a/daemons/poll.py +++ b/daemons/poll.py @@ -1,6 +1,7 @@ import telebot import multiprocessing import time +import json from daemons import base from utils import platform @@ -41,5 +42,5 @@ class Daemon(base.Daemon): bot = telebot.TeleBot(token) @bot.message_handler(content_types=['audio', 'photo', 'voice', 'video', 'document', 'animation', 'text', 'location', 'contact', 'sticker', 'video_note']) def do_action(message: telebot.types.Message): - queues.set_task(queue, message.json, 1) + queues.set_task(queue, json.loads(message.json), 1) bot.polling() From ef1d60e3683674e93aa13f9d41798879d85c5ef7 Mon Sep 17 00:00:00 2001 From: Egor Matveev Date: Tue, 31 Dec 2024 02:52:49 +0300 Subject: [PATCH 2/7] fix --- daemons/poll.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/daemons/poll.py b/daemons/poll.py index d4fe4a4..d3ad377 100644 --- a/daemons/poll.py +++ b/daemons/poll.py @@ -42,5 +42,5 @@ class Daemon(base.Daemon): bot = telebot.TeleBot(token) @bot.message_handler(content_types=['audio', 'photo', 'voice', 'video', 'document', 'animation', 'text', 'location', 'contact', 'sticker', 'video_note']) def do_action(message: telebot.types.Message): - queues.set_task(queue, json.loads(message.json), 1) + queues.set_task(queue, message.json, 1) bot.polling() From 46031265e07998f7d5f98d90f1e769b9ffe783df Mon Sep 17 00:00:00 2001 From: Egor Matveev Date: Sun, 15 Jun 2025 00:09:33 +0300 Subject: [PATCH 3/7] fix --- .deploy/deploy-dev.yaml | 3 +++ .deploy/deploy-prod.yaml | 3 +++ utils/queues.py | 22 +++++++++++++++++++++- 3 files changed, 27 insertions(+), 1 deletion(-) diff --git a/.deploy/deploy-dev.yaml b/.deploy/deploy-dev.yaml index 227877a..f59cd31 100644 --- a/.deploy/deploy-dev.yaml +++ b/.deploy/deploy-dev.yaml @@ -22,6 +22,7 @@ services: networks: - configurator - queues-development + - monitoring environment: STAGE: "development" command: mailbox @@ -38,3 +39,5 @@ networks: external: true queues-development: external: true + monitoring: + external: true diff --git a/.deploy/deploy-prod.yaml b/.deploy/deploy-prod.yaml index 0043ede..535826f 100644 --- a/.deploy/deploy-prod.yaml +++ b/.deploy/deploy-prod.yaml @@ -22,6 +22,7 @@ services: networks: - configurator - queues + - monitoring environment: STAGE: "production" command: mailbox @@ -38,3 +39,5 @@ networks: external: true queues: external: true + monitoring: + external: true diff --git a/utils/queues.py b/utils/queues.py index dc0f219..076dbfa 100644 --- a/utils/queues.py +++ b/utils/queues.py @@ -1,3 +1,4 @@ +import datetime import os import requests import time @@ -27,17 +28,36 @@ class TasksHandlerMixin: if not task: time.sleep(0.2) continue + start = datetime.datetime.now() try: self.process(task['payload']) + success = True except Exception as exc: print(f'Error processing message id={task["id"]}, payload={task["payload"]}, exc={exc}') - continue + success = False + end = datetime.datetime.now() try: resp = requests.post(f'{QUEUES_URL}/api/v1/finish', json={'id': task['id']}) if resp.status_code != 202: raise QueuesException except: print(f'Failed to finish task id={task["id"]}') + try: + metric = requests.post('http://monitoring:1237/api/v1/metrics/task', json={ + 'timestamp': start.isoformat(), + 'service': 'botalka', + 'environment': stage, + 'queue': self.queue_name, + 'success': success, + 'execution_time_ms': (end - start).microseconds // 1000, + }) + if metric.status_code == 202: + print('metric ok') + else: + print(f'metric failed: {metric.status_code}') + except Exception as e: + print(f'metric failed: {e}') + @property def queue_name(self): From d91ae82f6eabf786d9941e61e116c20ed9b21c5c Mon Sep 17 00:00:00 2001 From: Egor Matveev Date: Sun, 15 Jun 2025 00:17:35 +0300 Subject: [PATCH 4/7] fix --- utils/queues.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/utils/queues.py b/utils/queues.py index 076dbfa..e829b1c 100644 --- a/utils/queues.py +++ b/utils/queues.py @@ -44,7 +44,7 @@ class TasksHandlerMixin: print(f'Failed to finish task id={task["id"]}') try: metric = requests.post('http://monitoring:1237/api/v1/metrics/task', json={ - 'timestamp': start.isoformat(), + 'timestamp': start.strftime("%Y-%m-%dT%H:%M:%S") + "Z", 'service': 'botalka', 'environment': stage, 'queue': self.queue_name, From 151327ae0aca5f1e9c44e3a048b3e09c7c6de011 Mon Sep 17 00:00:00 2001 From: Egor Matveev Date: Sun, 15 Jun 2025 02:32:47 +0300 Subject: [PATCH 5/7] fix --- utils/metrics.py | 27 +++++++++++++++++++++++++++ utils/queues.py | 25 ++++++++++--------------- 2 files changed, 37 insertions(+), 15 deletions(-) create mode 100644 utils/metrics.py diff --git a/utils/metrics.py b/utils/metrics.py new file mode 100644 index 0000000..6480ba9 --- /dev/null +++ b/utils/metrics.py @@ -0,0 +1,27 @@ +from queue import Queue +from threading import Thread + + +class MetricsSender(Thread): + queue: Queue = Queue() + + def put(self, metric): + self.queue.put(metric) + + def run(self): + while True: + metric = self.queue.get() + if not metric: + continue + + try: + metric = metric() + if metric.status_code == 202: + print('metric ok') + else: + print(f'metric failed: {metric.status_code}') + except Exception as e: + print(f'metric failed: {e}') + +metrics_sender = MetricsSender() +metrics_sender.start() diff --git a/utils/queues.py b/utils/queues.py index e829b1c..1999db1 100644 --- a/utils/queues.py +++ b/utils/queues.py @@ -2,6 +2,7 @@ import datetime import os import requests import time +from utils.metrics import metrics_sender stage = os.getenv("STAGE", 'local') @@ -42,21 +43,15 @@ class TasksHandlerMixin: raise QueuesException except: print(f'Failed to finish task id={task["id"]}') - try: - metric = requests.post('http://monitoring:1237/api/v1/metrics/task', json={ - 'timestamp': start.strftime("%Y-%m-%dT%H:%M:%S") + "Z", - 'service': 'botalka', - 'environment': stage, - 'queue': self.queue_name, - 'success': success, - 'execution_time_ms': (end - start).microseconds // 1000, - }) - if metric.status_code == 202: - print('metric ok') - else: - print(f'metric failed: {metric.status_code}') - except Exception as e: - print(f'metric failed: {e}') + + metrics_sender.put(lambda: requests.post('http://monitoring:1237/api/v1/metrics/task', json={ + 'timestamp': start.strftime("%Y-%m-%dT%H:%M:%S") + "Z", + 'service': 'botalka', + 'environment': stage, + 'queue': self.queue_name, + 'success': success, + 'execution_time_ms': (end - start).microseconds // 1000, + })) @property From c20c5546fe09fe81de69d268b2cf5063618ae717 Mon Sep 17 00:00:00 2001 From: Egor Matveev Date: Sun, 15 Jun 2025 02:42:23 +0300 Subject: [PATCH 6/7] fx --- utils/metrics.py | 27 --------------------------- utils/queues.py | 33 ++++++++++++++++++++++----------- 2 files changed, 22 insertions(+), 38 deletions(-) delete mode 100644 utils/metrics.py diff --git a/utils/metrics.py b/utils/metrics.py deleted file mode 100644 index 6480ba9..0000000 --- a/utils/metrics.py +++ /dev/null @@ -1,27 +0,0 @@ -from queue import Queue -from threading import Thread - - -class MetricsSender(Thread): - queue: Queue = Queue() - - def put(self, metric): - self.queue.put(metric) - - def run(self): - while True: - metric = self.queue.get() - if not metric: - continue - - try: - metric = metric() - if metric.status_code == 202: - print('metric ok') - else: - print(f'metric failed: {metric.status_code}') - except Exception as e: - print(f'metric failed: {e}') - -metrics_sender = MetricsSender() -metrics_sender.start() diff --git a/utils/queues.py b/utils/queues.py index 1999db1..b4c71e4 100644 --- a/utils/queues.py +++ b/utils/queues.py @@ -1,8 +1,8 @@ import datetime import os +from threading import Thread import requests import time -from utils.metrics import metrics_sender stage = os.getenv("STAGE", 'local') @@ -17,6 +17,26 @@ class QueuesException(Exception): class TasksHandlerMixin: + def _send_metric(self, start, success, end): + try: + metric = requests.post('http://monitoring:1237/api/v1/metrics/task', json={ + 'timestamp': start.strftime("%Y-%m-%dT%H:%M:%S") + "Z", + 'service': 'botalka', + 'environment': stage, + 'queue': self.queue_name, + 'success': success, + 'execution_time_ms': (end - start).microseconds // 1000, + }) + if metric.status_code == 202: + print('metric ok') + else: + print(f'metric failed: {metric.status_code}') + except Exception as e: + print(f'metric failed: {e}') + + def send_metric(self, start, success, end): + Thread(target=self._send_metric, args=(start, success, end)).start() + def poll(self): while True: try: @@ -43,16 +63,7 @@ class TasksHandlerMixin: raise QueuesException except: print(f'Failed to finish task id={task["id"]}') - - metrics_sender.put(lambda: requests.post('http://monitoring:1237/api/v1/metrics/task', json={ - 'timestamp': start.strftime("%Y-%m-%dT%H:%M:%S") + "Z", - 'service': 'botalka', - 'environment': stage, - 'queue': self.queue_name, - 'success': success, - 'execution_time_ms': (end - start).microseconds // 1000, - })) - + self.send_metric(start, success, end) @property def queue_name(self): From 73c8466a50985c12a49c7fccbadfa8ceba6f5cbf Mon Sep 17 00:00:00 2001 From: Egor Matveev Date: Sun, 15 Jun 2025 02:45:33 +0300 Subject: [PATCH 7/7] fix --- utils/queues.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/utils/queues.py b/utils/queues.py index b4c71e4..8cf6aaa 100644 --- a/utils/queues.py +++ b/utils/queues.py @@ -35,7 +35,8 @@ class TasksHandlerMixin: print(f'metric failed: {e}') def send_metric(self, start, success, end): - Thread(target=self._send_metric, args=(start, success, end)).start() + # Thread(target=self._send_metric, args=(start, success, end)).start() + self._send_metric(start, success, end) def poll(self): while True: