From 4d03af75d8bccf7f01b4fc431f2105c3f57a3577 Mon Sep 17 00:00:00 2001 From: Administrator Date: Sun, 22 Dec 2024 22:20:02 +0300 Subject: [PATCH] fix --- Makefile | 6 ------ daemons/base.py | 15 --------------- daemons/poll.py | 3 +-- utils/queues.py | 44 +++++++++++++++++++++++++------------------- 4 files changed, 26 insertions(+), 42 deletions(-) delete mode 100644 Makefile diff --git a/Makefile b/Makefile deleted file mode 100644 index b971207..0000000 --- a/Makefile +++ /dev/null @@ -1,6 +0,0 @@ -gen: - curl https://platform.sprinthub.ru/generator >> generator.py - python generator.py - rm generator.py -run: - python ./server.py \ No newline at end of file diff --git a/daemons/base.py b/daemons/base.py index 7f32f71..bd6fd00 100644 --- a/daemons/base.py +++ b/daemons/base.py @@ -1,18 +1,3 @@ -import os -import grpc -from queues import tasks_pb2_grpc - - -stage = os.getenv("STAGE", 'local') -if stage == 'local': - QUEUES_URL = 'localhost:50051' -else: - QUEUES_URL = 'queues-grpc:50051' - - class Daemon: - def __init__(self): - self.channel = grpc.insecure_channel(QUEUES_URL) - self.stub = tasks_pb2_grpc.TasksStub(channel=self.channel) def execute(self): raise NotImplemented diff --git a/daemons/poll.py b/daemons/poll.py index 4f3b48a..d7169ee 100644 --- a/daemons/poll.py +++ b/daemons/poll.py @@ -9,7 +9,6 @@ from utils import queues class Daemon(base.Daemon): def __init__(self): - super().__init__() self.processes: dict[str, multiprocessing.Process|None] = {} def execute(self): @@ -42,5 +41,5 @@ class Daemon(base.Daemon): bot = telebot.TeleBot(token) @bot.message_handler(content_types=['audio', 'photo', 'voice', 'video', 'document', 'animation', 'text', 'location', 'contact', 'sticker', 'video_note']) def do_action(message: telebot.types.Message): - queues.set_task(self.stub, queue, message.json, 1) + queues.set_task(queue, message.json, 1) bot.polling() diff --git a/utils/queues.py b/utils/queues.py index d346ffb..c00a18c 100644 --- a/utils/queues.py +++ b/utils/queues.py @@ -1,8 +1,13 @@ +import os +import requests import time -from queues import tasks_pb2_grpc -from queues import tasks_pb2 -from google.protobuf import json_format + +stage = os.getenv("STAGE", 'local') +if stage == 'local': + QUEUES_URL = 'http://localhost:1239' +else: + QUEUES_URL = 'http://queues:1239' class QueuesException(Exception): @@ -12,21 +17,22 @@ class QueuesException(Exception): class TasksHandlerMixin: def poll(self): while True: - response: tasks_pb2.TakeResponse = self.stub.Take(tasks_pb2.TakeRequest(queue=self.queue_name)) - task = response.task + response = requests.get(f'{QUEUES_URL}/api/v1/take', headers={'queue': self.queue_name}).json() + task = response.get('task') if not task: time.sleep(0.2) continue try: - payload = json_format.MessageToDict(task.payload) - self.process(payload) + self.process(task['payload']) except Exception as exc: - print(f'Error processing message id={task.id}, payload={payload}, exc={exc}') + print(f'Error processing message id={task["id"]}, payload={task["payload"]}, exc={exc}') continue try: - self.stub.Finish(tasks_pb2.FinishRequest(id=task.id)) + resp = requests.post(f'{QUEUES_URL}/api/v1/finish', json={'id': task['id']}) + if resp.status_code != 202: + raise QueuesException except: - print(f'Failed to finish task id={task.id}') + print(f'Failed to finish task id={task["id"]}') @property def queue_name(self): @@ -35,12 +41,12 @@ class TasksHandlerMixin: def process(self, payload): raise NotImplemented -def set_task(stub: tasks_pb2_grpc.TasksStub, queue_name: str, payload: dict, seconds_to_execute: int, delay: int|None = None): - stub.Put( - tasks_pb2.PutRequest( - queue=queue_name, - seconds_to_execute=seconds_to_execute, - delay=delay, - payload=payload - ) - ) + +def set_task(queue_name: str, payload: dict, seconds_to_execute: int, delay: int|None = None): + resp = requests.post(f'{QUEUES_URL}/api/v1/put', headers={'queue': queue_name}, json={ + 'payload': payload, + 'seconds_to_execute': seconds_to_execute, + 'delay': delay, + }) + if resp.status_code != 202: + raise QueuesException