From e6e3762768d09517fc7a38de826dbbc0196a4390 Mon Sep 17 00:00:00 2001 From: emmatveev Date: Wed, 27 Nov 2024 02:17:26 +0300 Subject: [PATCH 1/4] fix --- utils/queues.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/utils/queues.py b/utils/queues.py index 923bc3b..dec794a 100644 --- a/utils/queues.py +++ b/utils/queues.py @@ -19,10 +19,11 @@ class TasksHandlerMixin: def poll(self): while True: response = requests.get(f'{QUEUES_URL}/api/v1/take', headers={'queue': self.queue_name}) - if response.status_code == 404: + task = response.get('task') + if not task: time.sleep(0.2) continue - data = response.json() + data = task.json() try: self.process(data['payload']) except Exception as exc: -- 2.45.2 From 2d051a1881125f21dba4c01090b61b3b74e175ff Mon Sep 17 00:00:00 2001 From: emmatveev Date: Wed, 27 Nov 2024 02:20:49 +0300 Subject: [PATCH 2/4] fix --- utils/queues.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/utils/queues.py b/utils/queues.py index dec794a..07f3c29 100644 --- a/utils/queues.py +++ b/utils/queues.py @@ -18,7 +18,7 @@ class QueuesException(Exception): class TasksHandlerMixin: def poll(self): while True: - response = requests.get(f'{QUEUES_URL}/api/v1/take', headers={'queue': self.queue_name}) + response = requests.get(f'{QUEUES_URL}/api/v1/take', headers={'queue': self.queue_name}).json() task = response.get('task') if not task: time.sleep(0.2) -- 2.45.2 From dc7c222f508664ee5a0f142b7e3e27cf4b7482cd Mon Sep 17 00:00:00 2001 From: emmatveev Date: Wed, 27 Nov 2024 02:33:09 +0300 Subject: [PATCH 3/4] fix --- utils/queues.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/utils/queues.py b/utils/queues.py index 07f3c29..603e981 100644 --- a/utils/queues.py +++ b/utils/queues.py @@ -23,18 +23,17 @@ class TasksHandlerMixin: if not task: time.sleep(0.2) continue - data = task.json() try: - self.process(data['payload']) + self.process(task['payload']) except Exception as exc: - print(f'Error processing message id={data["id"]}, payload={data["payload"]}, exc={exc}') + print(f'Error processing message id={task["id"]}, payload={task["payload"]}, exc={exc}') continue try: resp = requests.post(f'{QUEUES_URL}/api/v1/finish', json={'id': data['id']}) if resp.status_code != 202: raise QueuesException except: - print(f'Failed to finish task id={data["id"]}') + print(f'Failed to finish task id={task["id"]}') @property def queue_name(self): -- 2.45.2 From e7109cc2549e11b6e7be29668934e7d899f3dcc7 Mon Sep 17 00:00:00 2001 From: emmatveev Date: Wed, 27 Nov 2024 02:36:13 +0300 Subject: [PATCH 4/4] fix --- utils/queues.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/utils/queues.py b/utils/queues.py index 603e981..030bf34 100644 --- a/utils/queues.py +++ b/utils/queues.py @@ -29,7 +29,7 @@ class TasksHandlerMixin: print(f'Error processing message id={task["id"]}, payload={task["payload"]}, exc={exc}') continue try: - resp = requests.post(f'{QUEUES_URL}/api/v1/finish', json={'id': data['id']}) + resp = requests.post(f'{QUEUES_URL}/api/v1/finish', json={'id': task['id']}) if resp.status_code != 202: raise QueuesException except: -- 2.45.2