From 4af857e2f3890c8a37ba93822bf436dae309acc8 Mon Sep 17 00:00:00 2001 From: Egor Matveev Date: Sun, 15 Jun 2025 05:07:16 +0300 Subject: [PATCH 01/17] fix --- .deploy/deploy-dev.yaml | 3 +++ .deploy/deploy-prod.yaml | 3 +++ utils/queues.py | 46 ++++++++++++++++++++++++++++++++++------ 3 files changed, 45 insertions(+), 7 deletions(-) diff --git a/.deploy/deploy-dev.yaml b/.deploy/deploy-dev.yaml index 227877a..f59cd31 100644 --- a/.deploy/deploy-dev.yaml +++ b/.deploy/deploy-dev.yaml @@ -22,6 +22,7 @@ services: networks: - configurator - queues-development + - monitoring environment: STAGE: "development" command: mailbox @@ -38,3 +39,5 @@ networks: external: true queues-development: external: true + monitoring: + external: true diff --git a/.deploy/deploy-prod.yaml b/.deploy/deploy-prod.yaml index 0043ede..535826f 100644 --- a/.deploy/deploy-prod.yaml +++ b/.deploy/deploy-prod.yaml @@ -22,6 +22,7 @@ services: networks: - configurator - queues + - monitoring environment: STAGE: "production" command: mailbox @@ -38,3 +39,5 @@ networks: external: true queues: external: true + monitoring: + external: true diff --git a/utils/queues.py b/utils/queues.py index dc0f219..3ec1e8b 100644 --- a/utils/queues.py +++ b/utils/queues.py @@ -1,4 +1,7 @@ +from concurrent.futures import ThreadPoolExecutor +import datetime import os +import zoneinfo import requests import time @@ -10,11 +13,35 @@ else: QUEUES_URL = 'http://queues:1239' +executor = ThreadPoolExecutor(max_workers=1) + + class QueuesException(Exception): ... class TasksHandlerMixin: + def _send_metric(self, start: datetime.datetime, end: datetime.datetime, success: bool): + try: + resp = requests.post('http://monitoring:1237/api/v1/metrics/task', json={ + 'service': 'botalka', + 'queue': self.queue_name, + 'success': success, + 'timestamp': start.strftime("%Y-%m-%dT%H:%M:%S") + "Z", + "success": success, + "execution_time_ms": (end - start).microseconds // 1000, + "environment": stage, + }) + if resp.status_code == 202: + print("Metric ok") + else: + print(f'metric not ok: {resp.status_code}') + except Exception as e: + print(f"Error sending metric: {e}") + + def send_metric(self, start: datetime.datetime, end: datetime.datetime, success: bool): + executor.submit(self._send_metric, start, end, success) + def poll(self): while True: try: @@ -27,17 +54,22 @@ class TasksHandlerMixin: if not task: time.sleep(0.2) continue + start = datetime.datetime.now(zoneinfo.ZoneInfo("Europe/Moscow")) try: self.process(task['payload']) + success = True except Exception as exc: print(f'Error processing message id={task["id"]}, payload={task["payload"]}, exc={exc}') - continue - try: - resp = requests.post(f'{QUEUES_URL}/api/v1/finish', json={'id': task['id']}) - if resp.status_code != 202: - raise QueuesException - except: - print(f'Failed to finish task id={task["id"]}') + success = False + end = datetime.datetime.now(zoneinfo.ZoneInfo("Europe/Moscow")) + if success: + try: + resp = requests.post(f'{QUEUES_URL}/api/v1/finish', json={'id': task['id']}) + if resp.status_code != 202: + raise QueuesException + except: + print(f'Failed to finish task id={task["id"]}') + self.send_metric(start, end, success) @property def queue_name(self): -- 2.45.2 From 2bff8983b54c129ebde158cb3405ed6ae0769fbf Mon Sep 17 00:00:00 2001 From: Egor Matveev Date: Sun, 15 Jun 2025 05:11:23 +0300 Subject: [PATCH 02/17] fix --- utils/queues.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/utils/queues.py b/utils/queues.py index 3ec1e8b..a571358 100644 --- a/utils/queues.py +++ b/utils/queues.py @@ -40,7 +40,8 @@ class TasksHandlerMixin: print(f"Error sending metric: {e}") def send_metric(self, start: datetime.datetime, end: datetime.datetime, success: bool): - executor.submit(self._send_metric, start, end, success) + # executor.submit(self._send_metric, start, end, success) + self._send_metric(start, end, success) def poll(self): while True: -- 2.45.2 From e0e7564fcc21c19654fbf48763ded277eb793214 Mon Sep 17 00:00:00 2001 From: Egor Matveev Date: Sun, 15 Jun 2025 05:27:31 +0300 Subject: [PATCH 03/17] fix --- utils/queues.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/utils/queues.py b/utils/queues.py index a571358..a69857d 100644 --- a/utils/queues.py +++ b/utils/queues.py @@ -13,7 +13,7 @@ else: QUEUES_URL = 'http://queues:1239' -executor = ThreadPoolExecutor(max_workers=1) +executor = ThreadPoolExecutor(max_workers=4) class QueuesException(Exception): @@ -40,8 +40,8 @@ class TasksHandlerMixin: print(f"Error sending metric: {e}") def send_metric(self, start: datetime.datetime, end: datetime.datetime, success: bool): - # executor.submit(self._send_metric, start, end, success) - self._send_metric(start, end, success) + executor.submit(self._send_metric, start, end, success) + # self._send_metric(start, end, success) def poll(self): while True: -- 2.45.2 From 72c5823ccd528d91c9159a4be7df375ffb3e5ffa Mon Sep 17 00:00:00 2001 From: Egor Matveev Date: Sun, 15 Jun 2025 14:40:14 +0300 Subject: [PATCH 04/17] fix --- utils/queues.py | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/utils/queues.py b/utils/queues.py index a69857d..035b884 100644 --- a/utils/queues.py +++ b/utils/queues.py @@ -13,14 +13,15 @@ else: QUEUES_URL = 'http://queues:1239' -executor = ThreadPoolExecutor(max_workers=4) - - class QueuesException(Exception): ... class TasksHandlerMixin: + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.executor = ThreadPoolExecutor(max_workers=4) + def _send_metric(self, start: datetime.datetime, end: datetime.datetime, success: bool): try: resp = requests.post('http://monitoring:1237/api/v1/metrics/task', json={ @@ -39,10 +40,6 @@ class TasksHandlerMixin: except Exception as e: print(f"Error sending metric: {e}") - def send_metric(self, start: datetime.datetime, end: datetime.datetime, success: bool): - executor.submit(self._send_metric, start, end, success) - # self._send_metric(start, end, success) - def poll(self): while True: try: @@ -70,7 +67,7 @@ class TasksHandlerMixin: raise QueuesException except: print(f'Failed to finish task id={task["id"]}') - self.send_metric(start, end, success) + self.executor.submit(self._send_metric, start, end, success) @property def queue_name(self): -- 2.45.2 From 2d292dfc46f46f5827c03222cffa55c94dcf2e91 Mon Sep 17 00:00:00 2001 From: Egor Matveev Date: Sun, 15 Jun 2025 14:42:46 +0300 Subject: [PATCH 05/17] fix --- utils/queues.py | 1 + 1 file changed, 1 insertion(+) diff --git a/utils/queues.py b/utils/queues.py index 035b884..9b77c8e 100644 --- a/utils/queues.py +++ b/utils/queues.py @@ -54,6 +54,7 @@ class TasksHandlerMixin: continue start = datetime.datetime.now(zoneinfo.ZoneInfo("Europe/Moscow")) try: + print(f'process task with id {task["id"]}') self.process(task['payload']) success = True except Exception as exc: -- 2.45.2 From 0cab3e52cb167f18cdf03cb1fd8a3b0fd6e3122b Mon Sep 17 00:00:00 2001 From: Egor Matveev Date: Sun, 15 Jun 2025 14:45:14 +0300 Subject: [PATCH 06/17] fix --- utils/queues.py | 1 + 1 file changed, 1 insertion(+) diff --git a/utils/queues.py b/utils/queues.py index 9b77c8e..12b38df 100644 --- a/utils/queues.py +++ b/utils/queues.py @@ -66,6 +66,7 @@ class TasksHandlerMixin: resp = requests.post(f'{QUEUES_URL}/api/v1/finish', json={'id': task['id']}) if resp.status_code != 202: raise QueuesException + print(f'finish task with id {task["id"]}') except: print(f'Failed to finish task id={task["id"]}') self.executor.submit(self._send_metric, start, end, success) -- 2.45.2 From 4f0114e99ad812a95991103dba0f55bb7dacce26 Mon Sep 17 00:00:00 2001 From: Egor Matveev Date: Sun, 15 Jun 2025 14:53:13 +0300 Subject: [PATCH 07/17] fix --- utils/queues.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/utils/queues.py b/utils/queues.py index 12b38df..79559e1 100644 --- a/utils/queues.py +++ b/utils/queues.py @@ -54,7 +54,7 @@ class TasksHandlerMixin: continue start = datetime.datetime.now(zoneinfo.ZoneInfo("Europe/Moscow")) try: - print(f'process task with id {task["id"]}') + print(f'process task with id {task["id"]}, attempt {task["attempt"]}') self.process(task['payload']) success = True except Exception as exc: -- 2.45.2 From d61c665b6c24509dbccdf302aaf8218ad359ef33 Mon Sep 17 00:00:00 2001 From: Egor Matveev Date: Sun, 15 Jun 2025 16:00:22 +0300 Subject: [PATCH 08/17] fix --- Dockerfile | 4 +++- daemons/metrics.py | 22 ++++++++++++++++++++++ main.py | 3 +++ run.sh | 13 +++++++++++++ utils/queues.py | 21 ++++++--------------- 5 files changed, 47 insertions(+), 16 deletions(-) create mode 100644 daemons/metrics.py create mode 100644 run.sh diff --git a/Dockerfile b/Dockerfile index 5375a3d..cd17104 100644 --- a/Dockerfile +++ b/Dockerfile @@ -5,4 +5,6 @@ COPY requirements.txt requirements.txt RUN pip install -r requirements.txt COPY . . ENV PYTHONUNBUFFERED 1 -ENTRYPOINT ["python", "main.py"] +RUN mkdir /usr/src/metrics +RUN chmod 777 run.sh +ENTRYPOINT ["./run.sh"] diff --git a/daemons/metrics.py b/daemons/metrics.py new file mode 100644 index 0000000..465de1b --- /dev/null +++ b/daemons/metrics.py @@ -0,0 +1,22 @@ +import os + +from requests import post +import json + +from daemons import base + + +class Daemon(base.Daemon): + + def execute(self): + while True: + for file in os.listdir('/usr/src/metrics'): + data = open(f'/usr/src/metrics/{file}', 'r').read() + payload = json.loads(data) + resp = post('http://monitoring:1237/api/v1/metrics/task', json=payload) + if resp.status_code == 202: + print("Metric ok") + else: + print(f'metric not ok: {resp.status_code}') + os.remove(f'/usr/src/metrics/{file}') + break diff --git a/main.py b/main.py index b1f1648..b32aa51 100644 --- a/main.py +++ b/main.py @@ -11,6 +11,9 @@ if __name__ == '__main__': elif arg == 'mailbox': print("mailbox is starting") from daemons.mailbox import Daemon + elif arg == 'metrics': + print("metrics is starting") + from daemons.metrics import Daemon else: raise ValueError(f"Unknown param {arg}") diff --git a/run.sh b/run.sh new file mode 100644 index 0000000..1826c2e --- /dev/null +++ b/run.sh @@ -0,0 +1,13 @@ +#!/bin/bash + +# Start the first process +python3 main.py $1 & + +# Start the second process +python3 main.py metrics & + +# Wait for any process to exit +wait -n + +# Exit with status of process that exited first +exit $? \ No newline at end of file diff --git a/utils/queues.py b/utils/queues.py index 79559e1..5adf3f4 100644 --- a/utils/queues.py +++ b/utils/queues.py @@ -1,6 +1,7 @@ -from concurrent.futures import ThreadPoolExecutor import datetime +import json import os +import uuid import zoneinfo import requests import time @@ -18,13 +19,9 @@ class QueuesException(Exception): class TasksHandlerMixin: - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self.executor = ThreadPoolExecutor(max_workers=4) - def _send_metric(self, start: datetime.datetime, end: datetime.datetime, success: bool): - try: - resp = requests.post('http://monitoring:1237/api/v1/metrics/task', json={ + with open(f'/usr/src/{uuid.uuid4()}.json', 'w') as fp: + fp.write(json.dumps({ 'service': 'botalka', 'queue': self.queue_name, 'success': success, @@ -32,13 +29,7 @@ class TasksHandlerMixin: "success": success, "execution_time_ms": (end - start).microseconds // 1000, "environment": stage, - }) - if resp.status_code == 202: - print("Metric ok") - else: - print(f'metric not ok: {resp.status_code}') - except Exception as e: - print(f"Error sending metric: {e}") + })) def poll(self): while True: @@ -69,7 +60,7 @@ class TasksHandlerMixin: print(f'finish task with id {task["id"]}') except: print(f'Failed to finish task id={task["id"]}') - self.executor.submit(self._send_metric, start, end, success) + self._send_metric(start, end, success) @property def queue_name(self): -- 2.45.2 From 24b1d39e10d1804f143f0fded1c02548cd351073 Mon Sep 17 00:00:00 2001 From: Egor Matveev Date: Sun, 15 Jun 2025 16:39:33 +0300 Subject: [PATCH 09/17] fix --- run.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/run.sh b/run.sh index 1826c2e..c4967bc 100644 --- a/run.sh +++ b/run.sh @@ -4,7 +4,7 @@ python3 main.py $1 & # Start the second process -python3 main.py metrics & +# python3 main.py metrics & # Wait for any process to exit wait -n -- 2.45.2 From b575ba571717835fc16f13d67f2f431eff2361e4 Mon Sep 17 00:00:00 2001 From: Egor Matveev Date: Sun, 15 Jun 2025 16:40:18 +0300 Subject: [PATCH 10/17] fix --- run.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/run.sh b/run.sh index c4967bc..1826c2e 100644 --- a/run.sh +++ b/run.sh @@ -4,7 +4,7 @@ python3 main.py $1 & # Start the second process -# python3 main.py metrics & +python3 main.py metrics & # Wait for any process to exit wait -n -- 2.45.2 From 6a8f18300b950fd7e91bf65d7db23c7289d10a78 Mon Sep 17 00:00:00 2001 From: Egor Matveev Date: Sun, 15 Jun 2025 16:40:44 +0300 Subject: [PATCH 11/17] fix --- run.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/run.sh b/run.sh index 1826c2e..c4967bc 100644 --- a/run.sh +++ b/run.sh @@ -4,7 +4,7 @@ python3 main.py $1 & # Start the second process -python3 main.py metrics & +# python3 main.py metrics & # Wait for any process to exit wait -n -- 2.45.2 From df3354a536d779e508d36c22c43ef9d595972840 Mon Sep 17 00:00:00 2001 From: Egor Matveev Date: Sun, 15 Jun 2025 16:45:51 +0300 Subject: [PATCH 12/17] fix --- utils/queues.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/utils/queues.py b/utils/queues.py index 5adf3f4..b62f807 100644 --- a/utils/queues.py +++ b/utils/queues.py @@ -60,7 +60,7 @@ class TasksHandlerMixin: print(f'finish task with id {task["id"]}') except: print(f'Failed to finish task id={task["id"]}') - self._send_metric(start, end, success) + # self._send_metric(start, end, success) @property def queue_name(self): -- 2.45.2 From a707a8b93eaacc563f887cc98dbe36d200c63a4d Mon Sep 17 00:00:00 2001 From: Egor Matveev Date: Sun, 15 Jun 2025 16:49:40 +0300 Subject: [PATCH 13/17] fix --- .deploy/deploy-dev.yaml | 1 - 1 file changed, 1 deletion(-) diff --git a/.deploy/deploy-dev.yaml b/.deploy/deploy-dev.yaml index f59cd31..1c9a06f 100644 --- a/.deploy/deploy-dev.yaml +++ b/.deploy/deploy-dev.yaml @@ -22,7 +22,6 @@ services: networks: - configurator - queues-development - - monitoring environment: STAGE: "development" command: mailbox -- 2.45.2 From 7b9d264ef98719331d972218e260405ed7e4aa2b Mon Sep 17 00:00:00 2001 From: Egor Matveev Date: Sun, 15 Jun 2025 17:08:15 +0300 Subject: [PATCH 14/17] fix --- .deploy/deploy-dev.yaml | 2 -- daemons/metrics.py | 2 +- run.sh | 2 +- utils/queues.py | 2 +- 4 files changed, 3 insertions(+), 5 deletions(-) diff --git a/.deploy/deploy-dev.yaml b/.deploy/deploy-dev.yaml index 1c9a06f..227877a 100644 --- a/.deploy/deploy-dev.yaml +++ b/.deploy/deploy-dev.yaml @@ -38,5 +38,3 @@ networks: external: true queues-development: external: true - monitoring: - external: true diff --git a/daemons/metrics.py b/daemons/metrics.py index 465de1b..304d0d6 100644 --- a/daemons/metrics.py +++ b/daemons/metrics.py @@ -13,7 +13,7 @@ class Daemon(base.Daemon): for file in os.listdir('/usr/src/metrics'): data = open(f'/usr/src/metrics/{file}', 'r').read() payload = json.loads(data) - resp = post('http://monitoring:1237/api/v1/metrics/task', json=payload) + resp = post('http://queues:1239/api/v1/metric', json=payload) if resp.status_code == 202: print("Metric ok") else: diff --git a/run.sh b/run.sh index c4967bc..1826c2e 100644 --- a/run.sh +++ b/run.sh @@ -4,7 +4,7 @@ python3 main.py $1 & # Start the second process -# python3 main.py metrics & +python3 main.py metrics & # Wait for any process to exit wait -n diff --git a/utils/queues.py b/utils/queues.py index b62f807..5adf3f4 100644 --- a/utils/queues.py +++ b/utils/queues.py @@ -60,7 +60,7 @@ class TasksHandlerMixin: print(f'finish task with id {task["id"]}') except: print(f'Failed to finish task id={task["id"]}') - # self._send_metric(start, end, success) + self._send_metric(start, end, success) @property def queue_name(self): -- 2.45.2 From 33901b0a825b06834afe514f16248b855decd6c5 Mon Sep 17 00:00:00 2001 From: Egor Matveev Date: Sun, 15 Jun 2025 17:13:54 +0300 Subject: [PATCH 15/17] fix --- Dockerfile | 4 +--- daemons/metrics.py | 22 ---------------------- main.py | 3 --- run.sh | 13 ------------- utils/queues.py | 13 ++++++++++--- 5 files changed, 11 insertions(+), 44 deletions(-) delete mode 100644 daemons/metrics.py delete mode 100644 run.sh diff --git a/Dockerfile b/Dockerfile index cd17104..5375a3d 100644 --- a/Dockerfile +++ b/Dockerfile @@ -5,6 +5,4 @@ COPY requirements.txt requirements.txt RUN pip install -r requirements.txt COPY . . ENV PYTHONUNBUFFERED 1 -RUN mkdir /usr/src/metrics -RUN chmod 777 run.sh -ENTRYPOINT ["./run.sh"] +ENTRYPOINT ["python", "main.py"] diff --git a/daemons/metrics.py b/daemons/metrics.py deleted file mode 100644 index 304d0d6..0000000 --- a/daemons/metrics.py +++ /dev/null @@ -1,22 +0,0 @@ -import os - -from requests import post -import json - -from daemons import base - - -class Daemon(base.Daemon): - - def execute(self): - while True: - for file in os.listdir('/usr/src/metrics'): - data = open(f'/usr/src/metrics/{file}', 'r').read() - payload = json.loads(data) - resp = post('http://queues:1239/api/v1/metric', json=payload) - if resp.status_code == 202: - print("Metric ok") - else: - print(f'metric not ok: {resp.status_code}') - os.remove(f'/usr/src/metrics/{file}') - break diff --git a/main.py b/main.py index b32aa51..b1f1648 100644 --- a/main.py +++ b/main.py @@ -11,9 +11,6 @@ if __name__ == '__main__': elif arg == 'mailbox': print("mailbox is starting") from daemons.mailbox import Daemon - elif arg == 'metrics': - print("metrics is starting") - from daemons.metrics import Daemon else: raise ValueError(f"Unknown param {arg}") diff --git a/run.sh b/run.sh deleted file mode 100644 index 1826c2e..0000000 --- a/run.sh +++ /dev/null @@ -1,13 +0,0 @@ -#!/bin/bash - -# Start the first process -python3 main.py $1 & - -# Start the second process -python3 main.py metrics & - -# Wait for any process to exit -wait -n - -# Exit with status of process that exited first -exit $? \ No newline at end of file diff --git a/utils/queues.py b/utils/queues.py index 5adf3f4..fe64e36 100644 --- a/utils/queues.py +++ b/utils/queues.py @@ -1,3 +1,4 @@ +from concurrent.futures import ThreadPoolExecutor import datetime import json import os @@ -19,9 +20,13 @@ class QueuesException(Exception): class TasksHandlerMixin: + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.executor = ThreadPoolExecutor(max_workers=1) + def _send_metric(self, start: datetime.datetime, end: datetime.datetime, success: bool): - with open(f'/usr/src/{uuid.uuid4()}.json', 'w') as fp: - fp.write(json.dumps({ + def send(): + requests.post(f'{QUEUES_URL}/api/v1/metric', json={ 'service': 'botalka', 'queue': self.queue_name, 'success': success, @@ -29,7 +34,9 @@ class TasksHandlerMixin: "success": success, "execution_time_ms": (end - start).microseconds // 1000, "environment": stage, - })) + }) + + self.executor.submit(send) def poll(self): while True: -- 2.45.2 From 66b4348957d09aa59c13774bac2dd063fe28cc07 Mon Sep 17 00:00:00 2001 From: Egor Matveev Date: Sun, 15 Jun 2025 17:24:52 +0300 Subject: [PATCH 16/17] fix --- .deploy/deploy-prod.yaml | 3 --- 1 file changed, 3 deletions(-) diff --git a/.deploy/deploy-prod.yaml b/.deploy/deploy-prod.yaml index 535826f..0043ede 100644 --- a/.deploy/deploy-prod.yaml +++ b/.deploy/deploy-prod.yaml @@ -22,7 +22,6 @@ services: networks: - configurator - queues - - monitoring environment: STAGE: "production" command: mailbox @@ -39,5 +38,3 @@ networks: external: true queues: external: true - monitoring: - external: true -- 2.45.2 From 5581786225c119753efb67e67cc995ed30056d44 Mon Sep 17 00:00:00 2001 From: Egor Matveev Date: Thu, 17 Jul 2025 23:27:52 +0300 Subject: [PATCH 17/17] fix --- utils/queues.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/utils/queues.py b/utils/queues.py index fe64e36..fd185b7 100644 --- a/utils/queues.py +++ b/utils/queues.py @@ -2,6 +2,7 @@ from concurrent.futures import ThreadPoolExecutor import datetime import json import os +import traceback import uuid import zoneinfo import requests @@ -57,6 +58,7 @@ class TasksHandlerMixin: success = True except Exception as exc: print(f'Error processing message id={task["id"]}, payload={task["payload"]}, exc={exc}') + traceback.print_stack() success = False end = datetime.datetime.now(zoneinfo.ZoneInfo("Europe/Moscow")) if success: -- 2.45.2