diff --git a/utils/queues.py b/utils/queues.py index f96b78f..d19f196 100644 --- a/utils/queues.py +++ b/utils/queues.py @@ -1,4 +1,3 @@ -from concurrent.futures import ThreadPoolExecutor import datetime import os import zoneinfo @@ -13,34 +12,11 @@ else: QUEUES_URL = 'http://queues:1239' -executor = ThreadPoolExecutor(max_workers=1) - - class QueuesException(Exception): ... class TasksHandlerMixin: - def _send_metric(self, start, success, end): - try: - metric = requests.post('http://monitoring:1237/api/v1/metrics/task', json={ - 'timestamp': start.strftime("%Y-%m-%dT%H:%M:%S") + "Z", - 'service': 'botalka', - 'environment': stage, - 'queue': self.queue_name, - 'success': success, - 'execution_time_ms': (end - start).microseconds // 1000, - }) - if metric.status_code == 202: - print('metric ok') - else: - print(f'metric failed: {metric.status_code}') - except Exception as e: - print(f'metric failed: {e}') - - def send_metric(self, start, success, end): - # executor.submit(self._send_metric, start, success, end) - ... def poll(self): while True: @@ -62,13 +38,13 @@ class TasksHandlerMixin: print(f'Error processing message id={task["id"]}, payload={task["payload"]}, exc={exc}') success = False end = datetime.datetime.now().astimezone(zoneinfo.ZoneInfo("Europe/Moscow")) - try: - resp = requests.post(f'{QUEUES_URL}/api/v1/finish', json={'id': task['id']}) - if resp.status_code != 202: - raise QueuesException - except: - print(f'Failed to finish task id={task["id"]}') - self.send_metric(start, success, end) + if success: + try: + resp = requests.post(f'{QUEUES_URL}/api/v1/finish', json={'id': task['id']}) + if resp.status_code != 202: + raise QueuesException + except: + print(f'Failed to finish task id={task["id"]}') @property def queue_name(self):