import asyncio import datetime import grpc import bson from queues import tasks_pb2 from queues import tasks_pb2_grpc from utils import time from storage.mongo import tasks def get_feature(feature_db, point): """Returns Feature at given location or None.""" for feature in feature_db: if feature.location == point: return feature return None class TasksServicer(tasks_pb2_grpc.TasksServicer): async def Put(self, request: tasks_pb2.PutRequest, context) -> tasks_pb2.EmptyResponse: now = time.now() await tasks.add_task( task=tasks.Task( queue=request.queue, payload=request.payload, put_at=now, available_from=(now + datetime.timedelta(seconds=request.delay)) if request.delay else now, seconds_to_execute=request.seconds_to_execute, ), ) return tasks_pb2.EmptyResponse() async def Take(self, request: tasks_pb2.TakeRequest, context) -> tasks_pb2.TakeResponse: task = await tasks.take_task(request.queue) if not task: return tasks_pb2.TakeResponse(task=None) return tasks_pb2.TakeResponse(task=tasks_pb2.Task(id=str(task._id), attempt=task.attempts, payload=task.payload)) async def Finish(self, request: tasks_pb2.FinishRequest, context) -> tasks_pb2.EmptyResponse: if await tasks.finish_task(bson.ObjectId(request.id)): return tasks_pb2.EmptyResponse context.set_code(grpc.StatusCode.NOT_FOUND) context.set_details('Task not found!') return context async def serve(): server = grpc.aio.server() tasks_pb2_grpc.add_TasksServicer_to_server( TasksServicer(), server ) server.add_insecure_port("[::]:50051") await server.start() await server.wait_for_termination() if __name__ == "__main__": asyncio.run(serve())