From f036f521d86ae79903dedfc2f81bae36d788c2be Mon Sep 17 00:00:00 2001 From: emmatveev Date: Fri, 13 Dec 2024 02:08:36 +0300 Subject: [PATCH 1/2] fix --- utils/queues.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/utils/queues.py b/utils/queues.py index 31767e5..9701634 100644 --- a/utils/queues.py +++ b/utils/queues.py @@ -15,8 +15,7 @@ class TasksHandlerMixin: response: tasks_pb2.TakeResponse = self.stub.Take(tasks_pb2.TakeRequest(queue=self.queue_name)) task = response.task if not task: - if response.retry_after: - time.sleep(response.retry_after) + time.sleep(0.2) continue try: payload = json_format.MessageToDict(task.payload) -- 2.45.2 From 14592c2b8a6e4450a0cc647fb7050f8325787912 Mon Sep 17 00:00:00 2001 From: Egor Matveev Date: Fri, 27 Dec 2024 12:21:08 +0300 Subject: [PATCH 2/2] fix --- Makefile | 7 ------- daemons/base.py | 5 ----- daemons/worker.py | 2 -- utils/queues.py | 44 +++++++++++++++++++++++++------------------- 4 files changed, 25 insertions(+), 33 deletions(-) delete mode 100644 Makefile diff --git a/Makefile b/Makefile deleted file mode 100644 index f071d0d..0000000 --- a/Makefile +++ /dev/null @@ -1,7 +0,0 @@ -gen: - pip install grpcio grpcio-tools - curl https://platform.sprinthub.ru/generator >> generator.py - python generator.py - rm generator.py -run: - python ./server.py \ No newline at end of file diff --git a/daemons/base.py b/daemons/base.py index 7f32f71..7e5cfd9 100644 --- a/daemons/base.py +++ b/daemons/base.py @@ -1,6 +1,4 @@ import os -import grpc -from queues import tasks_pb2_grpc stage = os.getenv("STAGE", 'local') @@ -11,8 +9,5 @@ else: class Daemon: - def __init__(self): - self.channel = grpc.insecure_channel(QUEUES_URL) - self.stub = tasks_pb2_grpc.TasksStub(channel=self.channel) def execute(self): raise NotImplemented diff --git a/daemons/worker.py b/daemons/worker.py index de09dca..4d5b7f7 100644 --- a/daemons/worker.py +++ b/daemons/worker.py @@ -49,7 +49,6 @@ class Daemon(base.Daemon, queues.TasksHandlerMixin): def reply(self, text: str, chat_id: int, message_id: int): queues.set_task( - self.stub, 'botalka_mailbox', { 'project': 'pizda-bot', @@ -65,7 +64,6 @@ class Daemon(base.Daemon, queues.TasksHandlerMixin): def send(self, text: str, chat_id: int): queues.set_task( - self.stub, 'botalka_mailbox', { 'project': 'pizda-bot', diff --git a/utils/queues.py b/utils/queues.py index 9701634..c00a18c 100644 --- a/utils/queues.py +++ b/utils/queues.py @@ -1,8 +1,13 @@ +import os +import requests import time -from queues import tasks_pb2_grpc -from queues import tasks_pb2 -from google.protobuf import json_format + +stage = os.getenv("STAGE", 'local') +if stage == 'local': + QUEUES_URL = 'http://localhost:1239' +else: + QUEUES_URL = 'http://queues:1239' class QueuesException(Exception): @@ -12,21 +17,22 @@ class QueuesException(Exception): class TasksHandlerMixin: def poll(self): while True: - response: tasks_pb2.TakeResponse = self.stub.Take(tasks_pb2.TakeRequest(queue=self.queue_name)) - task = response.task + response = requests.get(f'{QUEUES_URL}/api/v1/take', headers={'queue': self.queue_name}).json() + task = response.get('task') if not task: time.sleep(0.2) continue try: - payload = json_format.MessageToDict(task.payload) - self.process(payload) + self.process(task['payload']) except Exception as exc: - print(f'Error processing message id={task.id}, payload={payload}, exc={exc}') + print(f'Error processing message id={task["id"]}, payload={task["payload"]}, exc={exc}') continue try: - self.stub.Finish(tasks_pb2.FinishRequest(id=task.id)) + resp = requests.post(f'{QUEUES_URL}/api/v1/finish', json={'id': task['id']}) + if resp.status_code != 202: + raise QueuesException except: - print(f'Failed to finish task id={task.id}') + print(f'Failed to finish task id={task["id"]}') @property def queue_name(self): @@ -35,12 +41,12 @@ class TasksHandlerMixin: def process(self, payload): raise NotImplemented -def set_task(stub: tasks_pb2_grpc.TasksStub, queue_name: str, payload: dict, seconds_to_execute: int, delay: int|None = None): - stub.Put( - tasks_pb2.PutRequest( - queue=queue_name, - seconds_to_execute=seconds_to_execute, - delay=delay, - payload=payload - ) - ) \ No newline at end of file + +def set_task(queue_name: str, payload: dict, seconds_to_execute: int, delay: int|None = None): + resp = requests.post(f'{QUEUES_URL}/api/v1/put', headers={'queue': queue_name}, json={ + 'payload': payload, + 'seconds_to_execute': seconds_to_execute, + 'delay': delay, + }) + if resp.status_code != 202: + raise QueuesException -- 2.45.2