queues-py3-grpc/server.py
emmatveev cd23f8ba13
Some checks failed
Deploy Dev / Build (pull_request) Failing after 1m18s
Deploy Dev / Push (pull_request) Has been skipped
Deploy Dev / Deploy dev (pull_request) Has been skipped
fix
2024-12-12 22:29:28 +03:00

75 lines
2.3 KiB
Python

import asyncio
import datetime
import grpc
import bson
import os
from queues import tasks_pb2
from queues import tasks_pb2_grpc
from utils import time
from storage.mongo import tasks
from utils import configurator
DEFAULT_RETRY_AFTER = 0.2
client = configurator.Client(
'queues',
os.environ['STAGE'],
need_poll=True,
)
def get_feature(feature_db, point):
"""Returns Feature at given location or None."""
for feature in feature_db:
if feature.location == point:
return feature
return None
class TasksServicer(tasks_pb2_grpc.TasksServicer):
async def Put(self, request: tasks_pb2.PutRequest, context) -> tasks_pb2.EmptyResponse:
now = time.now()
await tasks.add_task(
task=tasks.Task(
queue=request.queue,
payload=request.payload,
put_at=now,
available_from=(now + datetime.timedelta(seconds=request.delay)) if request.delay else now,
seconds_to_execute=request.seconds_to_execute,
),
)
return tasks_pb2.EmptyResponse()
async def Take(self, request: tasks_pb2.TakeRequest, context) -> tasks_pb2.TakeResponse:
task = await tasks.take_task(request.queue)
if not task:
return tasks_pb2.TakeResponse(task=None)
retry_after_settings = client.get_config('retry_after_settings')
retry_after = retry_after_settings.get(request.queue) or retry_after_settings.get('__default__') or DEFAULT_RETRY_AFTER
return tasks_pb2.TakeResponse(task=tasks_pb2.Task(id=str(task._id), attempt=task.attempts, payload=task.payload), retry_after=retry_after)
async def Finish(self, request: tasks_pb2.FinishRequest, context) -> tasks_pb2.EmptyResponse:
if await tasks.finish_task(bson.ObjectId(request.id)):
return tasks_pb2.EmptyResponse
context.set_code(grpc.StatusCode.NOT_FOUND)
context.set_details('Task not found!')
return context
async def serve():
server = grpc.aio.server()
tasks_pb2_grpc.add_TasksServicer_to_server(
TasksServicer(), server
)
server.add_insecure_port("[::]:50051")
await server.start()
await server.wait_for_termination()
if __name__ == "__main__":
asyncio.run(serve())