configurator/app/routers/experiments.py
emmatveev fdb4cc98e8
All checks were successful
Deploy Dev / Build (pull_request) Successful in 6s
Deploy Dev / Push (pull_request) Successful in 8s
Deploy Dev / Deploy dev (pull_request) Successful in 7s
fix
2024-11-24 23:10:05 +03:00

64 lines
1.7 KiB
Python

import bson
import fastapi
import pydantic
from app.storage.mongo import experiments
class RequestPostBody(pydantic.BaseModel):
name: str
stage: str
project: str
class RequestPutBody(pydantic.BaseModel):
id: str
enabled: bool
condition: str
class RequestDeleteBody(pydantic.BaseModel):
id: str
class Experiment(pydantic.BaseModel):
id: str
name: str
enabled: bool
condition: str
router = fastapi.APIRouter()
@router.post('/api/v1/experiments', status_code=fastapi.status.HTTP_202_ACCEPTED)
async def post(body: RequestPostBody):
await experiments.create(experiments.Experiment(name=body.name, project=body.project, stage=body.stage, enabled=False, condition='False'))
@router.put('/api/v1/experiments', status_code=fastapi.status.HTTP_202_ACCEPTED, responses={404: {'description': 'Not found'}})
async def put(body: RequestPutBody):
changed = await experiments.update(id=bson.ObjectId(body.id), enabled=body.enabled, condition=body.condition)
if not changed:
raise fastapi.HTTPException(404)
@router.delete('/api/v1/experiments', status_code=fastapi.status.HTTP_202_ACCEPTED, responses={404: {'description': 'Not found'}})
async def delete(body: RequestDeleteBody):
changed = await experiments.delete(id=bson.ObjectId(body.id))
if not changed:
raise fastapi.HTTPException(404)
@router.get('/api/v1/experiments')
async def get(stage: str, project: str) -> list[Experiment]:
return [
Experiment(
id=str(experiment._id),
name=experiment.name,
enabled=experiment.enabled,
condition=experiment.condition,
)
for experiment in await experiments.get(project=project, stage=stage)
]