diff --git a/.DS_Store b/.DS_Store new file mode 100644 index 0000000..1180528 Binary files /dev/null and b/.DS_Store differ diff --git a/utils/queues.py b/utils/queues.py index c00a18c..7aacd89 100644 --- a/utils/queues.py +++ b/utils/queues.py @@ -1,4 +1,7 @@ +from concurrent.futures import ThreadPoolExecutor +import datetime import os +import zoneinfo import requests import time @@ -15,24 +18,54 @@ class QueuesException(Exception): class TasksHandlerMixin: + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.executor = ThreadPoolExecutor(max_workers=1) + + def _send_metric(self, start: datetime.datetime, end: datetime.datetime, success: bool): + def send(): + requests.post(f'{QUEUES_URL}/api/v1/metric', json={ + 'service': 'botalka', + 'queue': self.queue_name, + 'success': success, + 'timestamp': start.strftime("%Y-%m-%dT%H:%M:%S") + "Z", + "success": success, + "execution_time_ms": (end - start).microseconds // 1000, + "environment": stage, + }) + + self.executor.submit(send) + def poll(self): while True: - response = requests.get(f'{QUEUES_URL}/api/v1/take', headers={'queue': self.queue_name}).json() + try: + response = requests.get(f'{QUEUES_URL}/api/v1/take', headers={'queue': self.queue_name}).json() + except requests.JSONDecodeError: + print('Unable to decode json') + time.sleep(3) + continue task = response.get('task') if not task: time.sleep(0.2) continue + start = datetime.datetime.now(zoneinfo.ZoneInfo("Europe/Moscow")) try: + print(f'process task with id {task["id"]}, attempt {task["attempt"]}') self.process(task['payload']) + success = True except Exception as exc: print(f'Error processing message id={task["id"]}, payload={task["payload"]}, exc={exc}') - continue - try: - resp = requests.post(f'{QUEUES_URL}/api/v1/finish', json={'id': task['id']}) - if resp.status_code != 202: - raise QueuesException - except: - print(f'Failed to finish task id={task["id"]}') + success = False + end = datetime.datetime.now(zoneinfo.ZoneInfo("Europe/Moscow")) + if success: + try: + resp = requests.post(f'{QUEUES_URL}/api/v1/finish', json={'id': task['id']}) + if resp.status_code != 202: + raise QueuesException + print(f'finish task with id {task["id"]}') + except: + print(f'Failed to finish task id={task["id"]}') + self._send_metric(start, end, success) @property def queue_name(self):