From 9dfc115224c6aab18de32575c4513d0b047fb588 Mon Sep 17 00:00:00 2001 From: Egor Matveev Date: Sat, 28 Dec 2024 13:19:46 +0300 Subject: [PATCH] fix --- .DS_Store | Bin 0 -> 6148 bytes .deploy/deploy-dev.yaml | 4 ++ .deploy/deploy-prod.yaml | 4 ++ app/routers/take.py | 9 +++- app/utils/configurator.py | 88 ++++++++++++++++++++++++++++++++++++++ requirements.txt | 4 ++ 6 files changed, 107 insertions(+), 2 deletions(-) create mode 100644 .DS_Store create mode 100644 app/utils/configurator.py diff --git a/.DS_Store b/.DS_Store new file mode 100644 index 0000000000000000000000000000000000000000..9895a61099506ffb6ebc6738c86bd90d5c280b5f GIT binary patch literal 6148 zcmeHKO^?$s5FNLHO}9eI0i?YkMdDg5<*Q=FC6sdDN)Q|Xg`^E^v}qigv;|cw>{ja7Jez#W}|P>^s&nE!-*8 z7-NcQOvjW`w3BTctO8bne@y}2yIt(oh)Vj1z58qbMo)4%PI8$bgO7)Y@WUU<5D~SJ zTT;1|_2vAmWLcGFtglI#XI0(re-o{?bLsM)>$+X{b$Be7VI9}=Y8cO6a_yOvd2$xV z$@45-M&8v&vZ&*%NN0+WO;V)1dXW_qxg5%QF{u&O@{?K>37llgt{6On&j`$#7vZePW$_LzoV`yYlF;r1 z=iZFoQbswYo4SbW$Xi!$kEYNsqk;}u`K@f8Ydzb;xGBA-IT##G;RV%HfnC!Fs<-&x zxA+b%6l1ZO*Wwkd&x5M93!{Mf(%tTbS+QNHRlq9nrwZ`-;KLaMgN;UYbf8dA0HA|r zWr+D_fjPdxz+j^hJusoEKus0qi6Jx{?Y_wi3^p1yorHOO2s5)VFBG9>$N0WVClP40 zrB%Qxu&F>>S3SP}AOHUSze%z+tAJJDzfwRrqi{6DlFZ$@vpBwMefS$V8|O6|6$OR4 hj#a={@c~>J`g{(6fx$*2T444^K*?YWtH57X;3t`(m;e9( literal 0 HcmV?d00001 diff --git a/.deploy/deploy-dev.yaml b/.deploy/deploy-dev.yaml index 80e0af2..ef6df87 100644 --- a/.deploy/deploy-dev.yaml +++ b/.deploy/deploy-dev.yaml @@ -6,9 +6,11 @@ services: image: mathwave/sprint-repo:queues networks: - queues-development + - configurator environment: MONGO_HOST: "mongo.develop.sprinthub.ru" MONGO_PASSWORD: $MONGO_PASSWORD_DEV + STAGE: "development" deploy: mode: replicated restart_policy: @@ -20,3 +22,5 @@ services: networks: queues-development: external: true + configurator: + external: true diff --git a/.deploy/deploy-prod.yaml b/.deploy/deploy-prod.yaml index 1e94afa..299038b 100644 --- a/.deploy/deploy-prod.yaml +++ b/.deploy/deploy-prod.yaml @@ -6,9 +6,11 @@ services: image: mathwave/sprint-repo:queues networks: - queues + - configurator environment: MONGO_HOST: "mongo.sprinthub.ru" MONGO_PASSWORD: $MONGO_PASSWORD_PROD + STAGE: "production" deploy: mode: replicated restart_policy: @@ -20,3 +22,5 @@ services: networks: queues: external: true + configurator: + external: true diff --git a/app/routers/take.py b/app/routers/take.py index d8fe38d..d4e3d4f 100644 --- a/app/routers/take.py +++ b/app/routers/take.py @@ -5,8 +5,10 @@ import pydantic import typing from app.storage.mongo import tasks +from app.utils.configurator import configurator +DEFAULT_RETRY_AFTER = 0.2 locks = collections.defaultdict(asyncio.Lock) router = fastapi.APIRouter() @@ -19,12 +21,15 @@ class Task(pydantic.BaseModel): class Response(pydantic.BaseModel): task: Task|None + retry_after: float -@router.get('/api/v1/take', responses={404: {'description': 'Not found'}}) +@router.get('/api/v1/take') async def execute(queue: typing.Annotated[str, fastapi.Header()]) -> Response: async with locks[queue]: task = await tasks.take_task(queue) if not task: return Response(task=None) - return Response(task=Task(id=str(task._id), attempt=task.attempts, payload=task.payload)) + retry_after_config = configurator.get_config('retry_after') + retry_after = retry_after_config.get(queue) or retry_after_config.get('default') or DEFAULT_RETRY_AFTER + return Response(task=Task(id=str(task._id), attempt=task.attempts, payload=task.payload), retry_after=retry_after) diff --git a/app/utils/configurator.py b/app/utils/configurator.py new file mode 100644 index 0000000..062ff71 --- /dev/null +++ b/app/utils/configurator.py @@ -0,0 +1,88 @@ +import json +import urllib.parse +import os +from threading import Thread +from time import sleep + +from requests import get + + +class ConfiguratorClient: + def __init__(self, app_name: str, stage: str, need_poll: bool = True): + self.app_name = app_name + self.stage = stage + self.endpoint = 'http://configurator/' + self.fetch_url = urllib.parse.urljoin(self.endpoint, '/api/v1/fetch') + self.config_storage = {} + self.experiment_storage = {} + self.staff_storage = {} + self.poll_data() + if need_poll: + self.poll_data_in_thread() + + def poll_data_in_thread(self): + def inner(): + while True: + sleep(30) + self.fetch() + + Thread(target=inner, daemon=True).start() + + def poll_data(self): + self.fetch(with_exception=True) + + def request_with_retries(self, url, params, with_exception=False, retries_count=3): + exception_to_throw = None + for _ in range(retries_count): + try: + response = get( + url, + params=params + ) + if response.status_code == 200: + return response.json() + print(f'Failed to request {url}, status_code={response.status_code}') + exception_to_throw = Exception('Not 200 status') + except Exception as exc: + print(exc) + exception_to_throw = exc + sleep(1) + print(f'Failed fetching with retries: {url}, {params}') + if with_exception: + raise exception_to_throw + + def fetch(self, with_exception=False): + if self.stage == 'local': + local_platform = json.loads(open('local_platform.json', 'r').read()) + self.config_storage = local_platform['configs'] + self.experiment_storage = local_platform['experiments'] + self.staff_storage = { + key: set(value) + for key, value in local_platform['platform_staff'].items() + } + return + response_data = self.request_with_retries(self.fetch_url, { + 'project': self.app_name, + 'stage': self.stage, + }, with_exception) + self.config_storage = response_data['configs'] + self.experiment_storage = response_data['experiments'] + self.staff_storage = { + key: set(value) + for key, value in response_data['platform_staff'].items() + } + + def is_staff(self, **kwargs): + for key, value in kwargs.items(): + if value in self.staff_storage[key]: + return True + return False + + def get_config(self, name): + return self.config_storage[name] + + def get_experiment(self, name): + return self.experiment_storage[name] + + +configurator = ConfiguratorClient('queues', os.getenv('STAGE', 'local')) diff --git a/requirements.txt b/requirements.txt index 325371f..4281151 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,8 @@ annotated-types==0.7.0 anyio==4.6.2.post1 APScheduler==3.10.4 +certifi==2024.12.14 +charset-normalizer==3.4.1 click==8.1.7 dnspython==2.7.0 fastapi==0.115.4 @@ -12,9 +14,11 @@ pydantic_core==2.23.4 pymongo==4.9.2 pytz==2024.2 redis==5.2.0 +requests==2.32.3 six==1.16.0 sniffio==1.3.1 starlette==0.41.2 typing_extensions==4.12.2 tzlocal==5.2 +urllib3==2.3.0 uvicorn==0.32.0