Merge pull request 'fx' (#52) from master into dev
Reviewed-on: https://gitea.chocomarsh.com/self/botalka/pulls/52
This commit is contained in:
commit
9b2e96d401
@ -1,27 +0,0 @@
|
|||||||
from queue import Queue
|
|
||||||
from threading import Thread
|
|
||||||
|
|
||||||
|
|
||||||
class MetricsSender(Thread):
|
|
||||||
queue: Queue = Queue()
|
|
||||||
|
|
||||||
def put(self, metric):
|
|
||||||
self.queue.put(metric)
|
|
||||||
|
|
||||||
def run(self):
|
|
||||||
while True:
|
|
||||||
metric = self.queue.get()
|
|
||||||
if not metric:
|
|
||||||
continue
|
|
||||||
|
|
||||||
try:
|
|
||||||
metric = metric()
|
|
||||||
if metric.status_code == 202:
|
|
||||||
print('metric ok')
|
|
||||||
else:
|
|
||||||
print(f'metric failed: {metric.status_code}')
|
|
||||||
except Exception as e:
|
|
||||||
print(f'metric failed: {e}')
|
|
||||||
|
|
||||||
metrics_sender = MetricsSender()
|
|
||||||
metrics_sender.start()
|
|
@ -1,8 +1,8 @@
|
|||||||
import datetime
|
import datetime
|
||||||
import os
|
import os
|
||||||
|
from threading import Thread
|
||||||
import requests
|
import requests
|
||||||
import time
|
import time
|
||||||
from utils.metrics import metrics_sender
|
|
||||||
|
|
||||||
|
|
||||||
stage = os.getenv("STAGE", 'local')
|
stage = os.getenv("STAGE", 'local')
|
||||||
@ -17,6 +17,26 @@ class QueuesException(Exception):
|
|||||||
|
|
||||||
|
|
||||||
class TasksHandlerMixin:
|
class TasksHandlerMixin:
|
||||||
|
def _send_metric(self, start, success, end):
|
||||||
|
try:
|
||||||
|
metric = requests.post('http://monitoring:1237/api/v1/metrics/task', json={
|
||||||
|
'timestamp': start.strftime("%Y-%m-%dT%H:%M:%S") + "Z",
|
||||||
|
'service': 'botalka',
|
||||||
|
'environment': stage,
|
||||||
|
'queue': self.queue_name,
|
||||||
|
'success': success,
|
||||||
|
'execution_time_ms': (end - start).microseconds // 1000,
|
||||||
|
})
|
||||||
|
if metric.status_code == 202:
|
||||||
|
print('metric ok')
|
||||||
|
else:
|
||||||
|
print(f'metric failed: {metric.status_code}')
|
||||||
|
except Exception as e:
|
||||||
|
print(f'metric failed: {e}')
|
||||||
|
|
||||||
|
def send_metric(self, start, success, end):
|
||||||
|
Thread(target=self._send_metric, args=(start, success, end)).start()
|
||||||
|
|
||||||
def poll(self):
|
def poll(self):
|
||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
@ -43,16 +63,7 @@ class TasksHandlerMixin:
|
|||||||
raise QueuesException
|
raise QueuesException
|
||||||
except:
|
except:
|
||||||
print(f'Failed to finish task id={task["id"]}')
|
print(f'Failed to finish task id={task["id"]}')
|
||||||
|
self.send_metric(start, success, end)
|
||||||
metrics_sender.put(lambda: requests.post('http://monitoring:1237/api/v1/metrics/task', json={
|
|
||||||
'timestamp': start.strftime("%Y-%m-%dT%H:%M:%S") + "Z",
|
|
||||||
'service': 'botalka',
|
|
||||||
'environment': stage,
|
|
||||||
'queue': self.queue_name,
|
|
||||||
'success': success,
|
|
||||||
'execution_time_ms': (end - start).microseconds // 1000,
|
|
||||||
}))
|
|
||||||
|
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def queue_name(self):
|
def queue_name(self):
|
||||||
|
Loading…
Reference in New Issue
Block a user