configurator/app/storage/mongo/experiments.py
emmatveev f5f55bc8cf
Some checks failed
Deploy Dev / Build (pull_request) Successful in 5s
Deploy Dev / Push (pull_request) Successful in 8s
Deploy Dev / Deploy dev (pull_request) Failing after 5s
fix
2024-11-24 11:00:54 +03:00

40 lines
1.1 KiB
Python

import bson
import pydantic
from app.storage.mongo import database
from bson import codec_options
collection = database.get_collection("experiments", codec_options=codec_options.CodecOptions(tz_aware=True))
class Experiment(pydantic.BaseModel):
name: str
enabled: bool
condition: str
project: str
stage: str
_id: bson.ObjectId|None = None
async def create(experiment: Experiment) -> str:
result = await collection.insert_one(experiment.model_dump())
return result.inserted_id
async def update(id: bson.ObjectId, enabled: bool, condition: str) -> bool:
result = await collection.update_one({'_id': id}, {'$set': {'enabled': enabled, 'condition': condition}})
return result.modified_count != 0
async def delete(id: bson.ObjectId) -> bool:
result = await collection.delete_one({'_id': id})
return result.deleted_count != 0
async def get(project: str, stage: str) -> list[Experiment]:
result = []
async for item in collection.find({'stage': stage, 'project': project}):
result.append(Experiment.model_validate(item))
return result