configurator/app/storage/mongo/configs.py
emmatveev 8b871775e6
All checks were successful
Deploy Dev / Build (pull_request) Successful in 6s
Deploy Dev / Push (pull_request) Successful in 7s
Deploy Dev / Deploy dev (pull_request) Successful in 8s
fix
2024-11-24 13:04:18 +03:00

40 lines
1.0 KiB
Python

import bson
import pydantic
from app.storage.mongo import database
from bson import codec_options
collection = database.get_collection("configs", codec_options=codec_options.CodecOptions(tz_aware=True))
class Config(pydantic.BaseModel):
name: str
project: str
stage: str
value: dict
_id: bson.ObjectId|None = None
async def create(config: Config) -> str:
result = await collection.insert_one(config.model_dump())
return result.inserted_id
async def update_data(id: bson.ObjectId, value: dict) -> bool:
result = await collection.update_one({'_id': id}, {'$set': {'value': value}})
return result.modified_count != 0
async def delete(id: bson.ObjectId) -> bool:
result = await collection.delete_one({'_id': id})
return result.deleted_count != 0
async def get(project: str, stage: str) -> list[Config]:
result = []
async for item in collection.find({'stage': stage, 'project': project}):
result.append(Config.model_validate(item))
result[-1]._id = item['_id']
return result