import time import tasks_pb2_grpc import tasks_pb2 from google.protobuf import json_format class QueuesException(Exception): ... class TasksHandlerMixin: def poll(self): while True: response: tasks_pb2.TakeResponse = self.stub.Take(tasks_pb2.TakeRequest(queue=self.queue_name)) task: tasks_pb2.Task = response.task if not task: time.sleep(0.2) continue try: self.process(json_format.MessageToDict(task.payload)) except Exception as exc: print(f'Error processing message id={task["id"]}, payload={task["payload"]}, exc={exc}') continue try: self.stub.Finish(tasks_pb2.FinishRequest(id=task.id)) except: print(f'Failed to finish task id={task.id}') @property def queue_name(self): raise NotImplemented def process(self, payload): raise NotImplemented def set_task(stub: tasks_pb2_grpc.TasksStub, queue_name: str, payload: dict, seconds_to_execute: int, delay: int|None = None): stub.Put( tasks_pb2.PutRequest( queue=queue_name, seconds_to_execute=seconds_to_execute, delay=delay, payload=payload ) )