From d45e8c8a355e223d2cc9f21ebf7e099ef77a2aa0 Mon Sep 17 00:00:00 2001 From: Egor Matveev Date: Sun, 15 Jun 2025 03:14:18 +0300 Subject: [PATCH 1/3] fix --- utils/queues.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/utils/queues.py b/utils/queues.py index 8cf6aaa..320b55b 100644 --- a/utils/queues.py +++ b/utils/queues.py @@ -1,6 +1,7 @@ import datetime import os from threading import Thread +import zoneinfo import requests import time @@ -50,14 +51,14 @@ class TasksHandlerMixin: if not task: time.sleep(0.2) continue - start = datetime.datetime.now() + start = datetime.datetime.now().astimezone(zoneinfo.ZoneInfo("Europe/Moscow")) try: self.process(task['payload']) success = True except Exception as exc: print(f'Error processing message id={task["id"]}, payload={task["payload"]}, exc={exc}') success = False - end = datetime.datetime.now() + end = datetime.datetime.now().astimezone(zoneinfo.ZoneInfo("Europe/Moscow")) try: resp = requests.post(f'{QUEUES_URL}/api/v1/finish', json={'id': task['id']}) if resp.status_code != 202: From 357ab5576fc42d0fcdd618789b37f231907356fb Mon Sep 17 00:00:00 2001 From: Egor Matveev Date: Sun, 15 Jun 2025 03:22:00 +0300 Subject: [PATCH 2/3] fix --- utils/queues.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/utils/queues.py b/utils/queues.py index 320b55b..583bcd1 100644 --- a/utils/queues.py +++ b/utils/queues.py @@ -37,7 +37,8 @@ class TasksHandlerMixin: def send_metric(self, start, success, end): # Thread(target=self._send_metric, args=(start, success, end)).start() - self._send_metric(start, success, end) + # self._send_metric(start, success, end) + ... def poll(self): while True: From 5c1418ddd8f5561586bd6029e05e41d6e7d17d25 Mon Sep 17 00:00:00 2001 From: Egor Matveev Date: Sun, 15 Jun 2025 03:30:04 +0300 Subject: [PATCH 3/3] fix --- utils/queues.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/utils/queues.py b/utils/queues.py index 583bcd1..edb5fa7 100644 --- a/utils/queues.py +++ b/utils/queues.py @@ -1,6 +1,6 @@ +from concurrent.futures import ThreadPoolExecutor import datetime import os -from threading import Thread import zoneinfo import requests import time @@ -13,6 +13,9 @@ else: QUEUES_URL = 'http://queues:1239' +executor = ThreadPoolExecutor(max_workers=1) + + class QueuesException(Exception): ... @@ -36,9 +39,7 @@ class TasksHandlerMixin: print(f'metric failed: {e}') def send_metric(self, start, success, end): - # Thread(target=self._send_metric, args=(start, success, end)).start() - # self._send_metric(start, success, end) - ... + executor.submit(self._send_metric, start, success, end) def poll(self): while True: