From 4af857e2f3890c8a37ba93822bf436dae309acc8 Mon Sep 17 00:00:00 2001 From: Egor Matveev Date: Sun, 15 Jun 2025 05:07:16 +0300 Subject: [PATCH] fix --- .deploy/deploy-dev.yaml | 3 +++ .deploy/deploy-prod.yaml | 3 +++ utils/queues.py | 46 ++++++++++++++++++++++++++++++++++------ 3 files changed, 45 insertions(+), 7 deletions(-) diff --git a/.deploy/deploy-dev.yaml b/.deploy/deploy-dev.yaml index 227877a..f59cd31 100644 --- a/.deploy/deploy-dev.yaml +++ b/.deploy/deploy-dev.yaml @@ -22,6 +22,7 @@ services: networks: - configurator - queues-development + - monitoring environment: STAGE: "development" command: mailbox @@ -38,3 +39,5 @@ networks: external: true queues-development: external: true + monitoring: + external: true diff --git a/.deploy/deploy-prod.yaml b/.deploy/deploy-prod.yaml index 0043ede..535826f 100644 --- a/.deploy/deploy-prod.yaml +++ b/.deploy/deploy-prod.yaml @@ -22,6 +22,7 @@ services: networks: - configurator - queues + - monitoring environment: STAGE: "production" command: mailbox @@ -38,3 +39,5 @@ networks: external: true queues: external: true + monitoring: + external: true diff --git a/utils/queues.py b/utils/queues.py index dc0f219..3ec1e8b 100644 --- a/utils/queues.py +++ b/utils/queues.py @@ -1,4 +1,7 @@ +from concurrent.futures import ThreadPoolExecutor +import datetime import os +import zoneinfo import requests import time @@ -10,11 +13,35 @@ else: QUEUES_URL = 'http://queues:1239' +executor = ThreadPoolExecutor(max_workers=1) + + class QueuesException(Exception): ... class TasksHandlerMixin: + def _send_metric(self, start: datetime.datetime, end: datetime.datetime, success: bool): + try: + resp = requests.post('http://monitoring:1237/api/v1/metrics/task', json={ + 'service': 'botalka', + 'queue': self.queue_name, + 'success': success, + 'timestamp': start.strftime("%Y-%m-%dT%H:%M:%S") + "Z", + "success": success, + "execution_time_ms": (end - start).microseconds // 1000, + "environment": stage, + }) + if resp.status_code == 202: + print("Metric ok") + else: + print(f'metric not ok: {resp.status_code}') + except Exception as e: + print(f"Error sending metric: {e}") + + def send_metric(self, start: datetime.datetime, end: datetime.datetime, success: bool): + executor.submit(self._send_metric, start, end, success) + def poll(self): while True: try: @@ -27,17 +54,22 @@ class TasksHandlerMixin: if not task: time.sleep(0.2) continue + start = datetime.datetime.now(zoneinfo.ZoneInfo("Europe/Moscow")) try: self.process(task['payload']) + success = True except Exception as exc: print(f'Error processing message id={task["id"]}, payload={task["payload"]}, exc={exc}') - continue - try: - resp = requests.post(f'{QUEUES_URL}/api/v1/finish', json={'id': task['id']}) - if resp.status_code != 202: - raise QueuesException - except: - print(f'Failed to finish task id={task["id"]}') + success = False + end = datetime.datetime.now(zoneinfo.ZoneInfo("Europe/Moscow")) + if success: + try: + resp = requests.post(f'{QUEUES_URL}/api/v1/finish', json={'id': task['id']}) + if resp.status_code != 202: + raise QueuesException + except: + print(f'Failed to finish task id={task["id"]}') + self.send_metric(start, end, success) @property def queue_name(self):