Merge pull request 'master' (#54) from master into prod

Reviewed-on: https://gitea.chocomarsh.com/self/botalka/pulls/54
This commit is contained in:
emmatveev 2025-06-15 03:04:00 +03:00
commit 944ac1301b
4 changed files with 35 additions and 1 deletions

View File

@ -22,6 +22,7 @@ services:
networks: networks:
- configurator - configurator
- queues-development - queues-development
- monitoring
environment: environment:
STAGE: "development" STAGE: "development"
command: mailbox command: mailbox
@ -38,3 +39,5 @@ networks:
external: true external: true
queues-development: queues-development:
external: true external: true
monitoring:
external: true

View File

@ -22,6 +22,7 @@ services:
networks: networks:
- configurator - configurator
- queues - queues
- monitoring
environment: environment:
STAGE: "production" STAGE: "production"
command: mailbox command: mailbox
@ -38,3 +39,5 @@ networks:
external: true external: true
queues: queues:
external: true external: true
monitoring:
external: true

View File

@ -1,6 +1,7 @@
import telebot import telebot
import multiprocessing import multiprocessing
import time import time
import json
from daemons import base from daemons import base
from utils import platform from utils import platform

View File

@ -1,4 +1,6 @@
import datetime
import os import os
from threading import Thread
import requests import requests
import time import time
@ -15,6 +17,27 @@ class QueuesException(Exception):
class TasksHandlerMixin: class TasksHandlerMixin:
def _send_metric(self, start, success, end):
try:
metric = requests.post('http://monitoring:1237/api/v1/metrics/task', json={
'timestamp': start.strftime("%Y-%m-%dT%H:%M:%S") + "Z",
'service': 'botalka',
'environment': stage,
'queue': self.queue_name,
'success': success,
'execution_time_ms': (end - start).microseconds // 1000,
})
if metric.status_code == 202:
print('metric ok')
else:
print(f'metric failed: {metric.status_code}')
except Exception as e:
print(f'metric failed: {e}')
def send_metric(self, start, success, end):
# Thread(target=self._send_metric, args=(start, success, end)).start()
self._send_metric(start, success, end)
def poll(self): def poll(self):
while True: while True:
try: try:
@ -27,17 +50,21 @@ class TasksHandlerMixin:
if not task: if not task:
time.sleep(0.2) time.sleep(0.2)
continue continue
start = datetime.datetime.now()
try: try:
self.process(task['payload']) self.process(task['payload'])
success = True
except Exception as exc: except Exception as exc:
print(f'Error processing message id={task["id"]}, payload={task["payload"]}, exc={exc}') print(f'Error processing message id={task["id"]}, payload={task["payload"]}, exc={exc}')
continue success = False
end = datetime.datetime.now()
try: try:
resp = requests.post(f'{QUEUES_URL}/api/v1/finish', json={'id': task['id']}) resp = requests.post(f'{QUEUES_URL}/api/v1/finish', json={'id': task['id']})
if resp.status_code != 202: if resp.status_code != 202:
raise QueuesException raise QueuesException
except: except:
print(f'Failed to finish task id={task["id"]}') print(f'Failed to finish task id={task["id"]}')
self.send_metric(start, success, end)
@property @property
def queue_name(self): def queue_name(self):