diff --git a/.deploy/deploy-dev.yaml b/.deploy/deploy-dev.yaml index 4b57d9d..61898b3 100644 --- a/.deploy/deploy-dev.yaml +++ b/.deploy/deploy-dev.yaml @@ -9,6 +9,7 @@ services: environment: MONGO_HOST: "mongo.develop.sprinthub.ru" MONGO_PASSWORD: $MONGO_PASSWORD_DEV + STAGE: "development" deploy: mode: replicated restart_policy: @@ -20,3 +21,5 @@ services: networks: queues-development: external: true + configurator: + external: true diff --git a/.deploy/deploy-prod.yaml b/.deploy/deploy-prod.yaml index 4105a02..b6c3a20 100644 --- a/.deploy/deploy-prod.yaml +++ b/.deploy/deploy-prod.yaml @@ -9,6 +9,7 @@ services: environment: MONGO_HOST: "mongo.sprinthub.ru" MONGO_PASSWORD: $MONGO_PASSWORD_PROD + STAGE: "production" deploy: mode: replicated restart_policy: @@ -20,3 +21,5 @@ services: networks: queues: external: true + configurator: + external: true diff --git a/requirements.txt b/requirements.txt index 420da05..70d0fe0 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,13 +1,18 @@ annotated-types==0.7.0 +certifi==2024.8.30 +charset-normalizer==3.4.0 dnspython==2.7.0 grpcio==1.68.1 grpcio-tools==1.68.1 +idna==3.10 motor==3.6.0 protobuf==5.29.1 pydantic==2.10.3 pydantic_core==2.27.1 -pymongo==4.9.2 +pymongo==4.10.1 python-dateutil==2.9.0.post0 +requests==2.32.3 setuptools==75.6.0 six==1.17.0 typing_extensions==4.12.2 +urllib3==2.2.3 diff --git a/server.py b/server.py index 6291e8d..6155a3c 100644 --- a/server.py +++ b/server.py @@ -2,6 +2,7 @@ import asyncio import datetime import grpc import bson +import os from queues import tasks_pb2 from queues import tasks_pb2_grpc @@ -9,6 +10,18 @@ from queues import tasks_pb2_grpc from utils import time from storage.mongo import tasks +from utils import configurator + + +DEFAULT_RETRY_AFTER = 0.2 + + +client = configurator.Client( + 'queues', + os.environ['STAGE'], + need_poll=True, +) + def get_feature(feature_db, point): """Returns Feature at given location or None.""" @@ -36,7 +49,9 @@ class TasksServicer(tasks_pb2_grpc.TasksServicer): task = await tasks.take_task(request.queue) if not task: return tasks_pb2.TakeResponse(task=None) - return tasks_pb2.TakeResponse(task=tasks_pb2.Task(id=str(task._id), attempt=task.attempts, payload=task.payload)) + retry_after_settings = client.get_config('retry_after_settings') + retry_after = retry_after_settings.get(request.queue) or retry_after_settings.get('__default__') or DEFAULT_RETRY_AFTER + return tasks_pb2.TakeResponse(task=tasks_pb2.Task(id=str(task._id), attempt=task.attempts, payload=task.payload), retry_after=retry_after) async def Finish(self, request: tasks_pb2.FinishRequest, context) -> tasks_pb2.EmptyResponse: if await tasks.finish_task(bson.ObjectId(request.id)): diff --git a/utils/configurator.py b/utils/configurator.py new file mode 100644 index 0000000..b876a11 --- /dev/null +++ b/utils/configurator.py @@ -0,0 +1,84 @@ +import json +import urllib.parse +from threading import Thread +from time import sleep + +from requests import get + + +class Client: + def __init__(self, app_name: str, stage: str, need_poll: bool = True): + self.app_name = app_name + self.stage = stage + self.endpoint = 'http://configurator/' + self.fetch_url = urllib.parse.urljoin(self.endpoint, '/api/v1/fetch') + self.config_storage = {} + self.experiment_storage = {} + self.staff_storage = {} + self.poll_data() + if need_poll: + self.poll_data_in_thread() + + def poll_data_in_thread(self): + def inner(): + while True: + sleep(30) + self.fetch() + + Thread(target=inner, daemon=True).start() + + def poll_data(self): + self.fetch(with_exception=True) + + def request_with_retries(self, url, params, with_exception=False, retries_count=3): + exception_to_throw = None + for _ in range(retries_count): + try: + response = get( + url, + params=params + ) + if response.status_code == 200: + return response.json() + print(f'Failed to request {url}, status_code={response.status_code}') + exception_to_throw = Exception('Not 200 status') + except Exception as exc: + print(exc) + exception_to_throw = exc + sleep(1) + print(f'Failed fetching with retries: {url}, {params}') + if with_exception: + raise exception_to_throw + + def fetch(self, with_exception=False): + if self.stage == 'local': + local_platform = json.loads(open('local_platform.json', 'r').read()) + self.config_storage = local_platform['configs'] + self.experiment_storage = local_platform['experiments'] + self.staff_storage = { + key: set(value) + for key, value in local_platform['platform_staff'].items() + } + return + response_data = self.request_with_retries(self.fetch_url, { + 'project': self.app_name, + 'stage': self.stage, + }, with_exception) + self.config_storage = response_data['configs'] + self.experiment_storage = response_data['experiments'] + self.staff_storage = { + key: set(value) + for key, value in response_data['platform_staff'].items() + } + + def is_staff(self, **kwargs): + for key, value in kwargs.items(): + if value in self.staff_storage[key]: + return True + return False + + def get_config(self, name): + return self.config_storage[name] + + def get_experiment(self, name): + return self.experiment_storage[name]