import bson import pydantic from app.storage.mongo import database from bson import codec_options collection = database.get_collection("configs", codec_options=codec_options.CodecOptions(tz_aware=True)) class Config(pydantic.BaseModel): name: str project: str stage: str value: dict _id: bson.ObjectId|None = None async def create(config: Config) -> str: result = await collection.insert_one(config.model_dump()) return result.inserted_id async def update_data(id: bson.ObjectId, value: dict) -> bool: result = await collection.update_one({'_id': id}, {'$set': {'value': value}}) return result.modified_count != 0 async def delete(id: bson.ObjectId) -> bool: result = await collection.delete_one({'_id': id}) return result.deleted_count != 0 async def get(project: str, stage: str) -> list[Config]: result = [] async for item in collection.find({'stage': stage, 'project': project}): result.append(Config.model_validate(item)) result[-1]._id = item['_id'] return result