diff --git a/utils/metrics.py b/utils/metrics.py new file mode 100644 index 0000000..6480ba9 --- /dev/null +++ b/utils/metrics.py @@ -0,0 +1,27 @@ +from queue import Queue +from threading import Thread + + +class MetricsSender(Thread): + queue: Queue = Queue() + + def put(self, metric): + self.queue.put(metric) + + def run(self): + while True: + metric = self.queue.get() + if not metric: + continue + + try: + metric = metric() + if metric.status_code == 202: + print('metric ok') + else: + print(f'metric failed: {metric.status_code}') + except Exception as e: + print(f'metric failed: {e}') + +metrics_sender = MetricsSender() +metrics_sender.start() diff --git a/utils/queues.py b/utils/queues.py index e829b1c..1999db1 100644 --- a/utils/queues.py +++ b/utils/queues.py @@ -2,6 +2,7 @@ import datetime import os import requests import time +from utils.metrics import metrics_sender stage = os.getenv("STAGE", 'local') @@ -42,21 +43,15 @@ class TasksHandlerMixin: raise QueuesException except: print(f'Failed to finish task id={task["id"]}') - try: - metric = requests.post('http://monitoring:1237/api/v1/metrics/task', json={ - 'timestamp': start.strftime("%Y-%m-%dT%H:%M:%S") + "Z", - 'service': 'botalka', - 'environment': stage, - 'queue': self.queue_name, - 'success': success, - 'execution_time_ms': (end - start).microseconds // 1000, - }) - if metric.status_code == 202: - print('metric ok') - else: - print(f'metric failed: {metric.status_code}') - except Exception as e: - print(f'metric failed: {e}') + + metrics_sender.put(lambda: requests.post('http://monitoring:1237/api/v1/metrics/task', json={ + 'timestamp': start.strftime("%Y-%m-%dT%H:%M:%S") + "Z", + 'service': 'botalka', + 'environment': stage, + 'queue': self.queue_name, + 'success': success, + 'execution_time_ms': (end - start).microseconds // 1000, + })) @property