import asyncio import datetime import grpc import bson import os from queues import tasks_pb2 from queues import tasks_pb2_grpc from utils import time from storage.mongo import tasks from utils import configurator DEFAULT_RETRY_AFTER = 0.2 client = configurator.Client( 'queues', os.environ['STAGE'], need_poll=True, ) def get_feature(feature_db, point): """Returns Feature at given location or None.""" for feature in feature_db: if feature.location == point: return feature return None class TasksServicer(tasks_pb2_grpc.TasksServicer): async def Put(self, request: tasks_pb2.PutRequest, context) -> tasks_pb2.EmptyResponse: now = time.now() await tasks.add_task( task=tasks.Task( queue=request.queue, payload=request.payload, put_at=now, available_from=(now + datetime.timedelta(seconds=request.delay)) if request.delay else now, seconds_to_execute=request.seconds_to_execute, ), ) return tasks_pb2.EmptyResponse() async def Take(self, request: tasks_pb2.TakeRequest, context) -> tasks_pb2.TakeResponse: task = await tasks.take_task(request.queue) if not task: return tasks_pb2.TakeResponse(task=None) retry_after_settings = client.get_config('retry_after_settings') retry_after = retry_after_settings.get(request.queue) or retry_after_settings.get('__default__') or DEFAULT_RETRY_AFTER return tasks_pb2.TakeResponse(task=tasks_pb2.Task(id=str(task._id), attempt=task.attempts, payload=task.payload), retry_after=retry_after) async def Finish(self, request: tasks_pb2.FinishRequest, context) -> tasks_pb2.EmptyResponse: if await tasks.finish_task(bson.ObjectId(request.id)): return tasks_pb2.EmptyResponse context.set_code(grpc.StatusCode.NOT_FOUND) context.set_details('Task not found!') return context async def serve(): server = grpc.aio.server() tasks_pb2_grpc.add_TasksServicer_to_server( TasksServicer(), server ) server.add_insecure_port("[::]:50051") await server.start() await server.wait_for_termination() if __name__ == "__main__": asyncio.run(serve())