configurator/app/storage/mongo/experiments.py
emmatveev 8b871775e6
All checks were successful
Deploy Dev / Build (pull_request) Successful in 6s
Deploy Dev / Push (pull_request) Successful in 7s
Deploy Dev / Deploy dev (pull_request) Successful in 8s
fix
2024-11-24 13:04:18 +03:00

41 lines
1.1 KiB
Python

import bson
import pydantic
from app.storage.mongo import database
from bson import codec_options
collection = database.get_collection("experiments", codec_options=codec_options.CodecOptions(tz_aware=True))
class Experiment(pydantic.BaseModel):
name: str
enabled: bool
condition: str
project: str
stage: str
_id: bson.ObjectId|None = None
async def create(experiment: Experiment) -> str:
result = await collection.insert_one(experiment.model_dump())
return result.inserted_id
async def update(id: bson.ObjectId, enabled: bool, condition: str) -> bool:
result = await collection.update_one({'_id': id}, {'$set': {'enabled': enabled, 'condition': condition}})
return result.modified_count != 0
async def delete(id: bson.ObjectId) -> bool:
result = await collection.delete_one({'_id': id})
return result.deleted_count != 0
async def get(project: str, stage: str) -> list[Experiment]:
result = []
async for item in collection.find({'stage': stage, 'project': project}):
result.append(Experiment.model_validate(item))
result[-1]._id = item['_id']
return result