All checks were successful
Deploy Dev / Build (pull_request) Successful in 9s
Deploy Dev / Push (pull_request) Successful in 18s
Deploy Dev / Deploy dev (pull_request) Successful in 8s
Deploy Prod / Build (pull_request) Successful in 8s
Deploy Prod / Push (pull_request) Successful in 14s
Deploy Prod / Deploy prod (pull_request) Successful in 9s
60 lines
1.9 KiB
Python
60 lines
1.9 KiB
Python
import asyncio
|
|
import datetime
|
|
import grpc
|
|
import bson
|
|
|
|
from queues import tasks_pb2
|
|
from queues import tasks_pb2_grpc
|
|
|
|
from utils import time
|
|
from storage.mongo import tasks
|
|
|
|
|
|
def get_feature(feature_db, point):
|
|
"""Returns Feature at given location or None."""
|
|
for feature in feature_db:
|
|
if feature.location == point:
|
|
return feature
|
|
return None
|
|
|
|
|
|
class TasksServicer(tasks_pb2_grpc.TasksServicer):
|
|
async def Put(self, request: tasks_pb2.PutRequest, context) -> tasks_pb2.EmptyResponse:
|
|
now = time.now()
|
|
await tasks.add_task(
|
|
task=tasks.Task(
|
|
queue=request.queue,
|
|
payload=request.payload,
|
|
put_at=now,
|
|
available_from=(now + datetime.timedelta(seconds=request.delay)) if request.delay else now,
|
|
seconds_to_execute=request.seconds_to_execute,
|
|
),
|
|
)
|
|
return tasks_pb2.EmptyResponse()
|
|
|
|
async def Take(self, request: tasks_pb2.TakeRequest, context) -> tasks_pb2.TakeResponse:
|
|
task = await tasks.take_task(request.queue)
|
|
if not task:
|
|
return tasks_pb2.TakeResponse(task=None)
|
|
return tasks_pb2.TakeResponse(task=tasks_pb2.Task(id=str(task._id), attempt=task.attempts, payload=task.payload))
|
|
|
|
async def Finish(self, request: tasks_pb2.FinishRequest, context) -> tasks_pb2.EmptyResponse:
|
|
if await tasks.finish_task(bson.ObjectId(request.id)):
|
|
return tasks_pb2.EmptyResponse
|
|
context.set_code(grpc.StatusCode.NOT_FOUND)
|
|
context.set_details('Task not found!')
|
|
return context
|
|
|
|
|
|
async def serve():
|
|
server = grpc.aio.server()
|
|
tasks_pb2_grpc.add_TasksServicer_to_server(
|
|
TasksServicer(), server
|
|
)
|
|
server.add_insecure_port("[::]:50051")
|
|
await server.start()
|
|
await server.wait_for_termination()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(serve()) |