From d61c665b6c24509dbccdf302aaf8218ad359ef33 Mon Sep 17 00:00:00 2001 From: Egor Matveev Date: Sun, 15 Jun 2025 16:00:22 +0300 Subject: [PATCH] fix --- Dockerfile | 4 +++- daemons/metrics.py | 22 ++++++++++++++++++++++ main.py | 3 +++ run.sh | 13 +++++++++++++ utils/queues.py | 21 ++++++--------------- 5 files changed, 47 insertions(+), 16 deletions(-) create mode 100644 daemons/metrics.py create mode 100644 run.sh diff --git a/Dockerfile b/Dockerfile index 5375a3d..cd17104 100644 --- a/Dockerfile +++ b/Dockerfile @@ -5,4 +5,6 @@ COPY requirements.txt requirements.txt RUN pip install -r requirements.txt COPY . . ENV PYTHONUNBUFFERED 1 -ENTRYPOINT ["python", "main.py"] +RUN mkdir /usr/src/metrics +RUN chmod 777 run.sh +ENTRYPOINT ["./run.sh"] diff --git a/daemons/metrics.py b/daemons/metrics.py new file mode 100644 index 0000000..465de1b --- /dev/null +++ b/daemons/metrics.py @@ -0,0 +1,22 @@ +import os + +from requests import post +import json + +from daemons import base + + +class Daemon(base.Daemon): + + def execute(self): + while True: + for file in os.listdir('/usr/src/metrics'): + data = open(f'/usr/src/metrics/{file}', 'r').read() + payload = json.loads(data) + resp = post('http://monitoring:1237/api/v1/metrics/task', json=payload) + if resp.status_code == 202: + print("Metric ok") + else: + print(f'metric not ok: {resp.status_code}') + os.remove(f'/usr/src/metrics/{file}') + break diff --git a/main.py b/main.py index b1f1648..b32aa51 100644 --- a/main.py +++ b/main.py @@ -11,6 +11,9 @@ if __name__ == '__main__': elif arg == 'mailbox': print("mailbox is starting") from daemons.mailbox import Daemon + elif arg == 'metrics': + print("metrics is starting") + from daemons.metrics import Daemon else: raise ValueError(f"Unknown param {arg}") diff --git a/run.sh b/run.sh new file mode 100644 index 0000000..1826c2e --- /dev/null +++ b/run.sh @@ -0,0 +1,13 @@ +#!/bin/bash + +# Start the first process +python3 main.py $1 & + +# Start the second process +python3 main.py metrics & + +# Wait for any process to exit +wait -n + +# Exit with status of process that exited first +exit $? \ No newline at end of file diff --git a/utils/queues.py b/utils/queues.py index 79559e1..5adf3f4 100644 --- a/utils/queues.py +++ b/utils/queues.py @@ -1,6 +1,7 @@ -from concurrent.futures import ThreadPoolExecutor import datetime +import json import os +import uuid import zoneinfo import requests import time @@ -18,13 +19,9 @@ class QueuesException(Exception): class TasksHandlerMixin: - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self.executor = ThreadPoolExecutor(max_workers=4) - def _send_metric(self, start: datetime.datetime, end: datetime.datetime, success: bool): - try: - resp = requests.post('http://monitoring:1237/api/v1/metrics/task', json={ + with open(f'/usr/src/{uuid.uuid4()}.json', 'w') as fp: + fp.write(json.dumps({ 'service': 'botalka', 'queue': self.queue_name, 'success': success, @@ -32,13 +29,7 @@ class TasksHandlerMixin: "success": success, "execution_time_ms": (end - start).microseconds // 1000, "environment": stage, - }) - if resp.status_code == 202: - print("Metric ok") - else: - print(f'metric not ok: {resp.status_code}') - except Exception as e: - print(f"Error sending metric: {e}") + })) def poll(self): while True: @@ -69,7 +60,7 @@ class TasksHandlerMixin: print(f'finish task with id {task["id"]}') except: print(f'Failed to finish task id={task["id"]}') - self.executor.submit(self._send_metric, start, end, success) + self._send_metric(start, end, success) @property def queue_name(self):