configurator/app/storage/mongo/configs.py
emmatveev f5f55bc8cf
Some checks failed
Deploy Dev / Build (pull_request) Successful in 5s
Deploy Dev / Push (pull_request) Successful in 8s
Deploy Dev / Deploy dev (pull_request) Failing after 5s
fix
2024-11-24 11:00:54 +03:00

39 lines
1018 B
Python

import bson
import pydantic
from app.storage.mongo import database
from bson import codec_options
collection = database.get_collection("configs", codec_options=codec_options.CodecOptions(tz_aware=True))
class Config(pydantic.BaseModel):
name: str
project: str
stage: str
value: dict
_id: bson.ObjectId|None = None
async def create(config: Config) -> str:
result = await collection.insert_one(config.model_dump())
return result.inserted_id
async def update_data(id: bson.ObjectId, value: dict) -> bool:
result = await collection.update_one({'_id': id}, {'$set': {'value': value}})
return result.modified_count != 0
async def delete(id: bson.ObjectId) -> bool:
result = await collection.delete_one({'_id': id})
return result.deleted_count != 0
async def get(project: str, stage: str) -> list[Config]:
result = []
async for item in collection.find({'stage': stage, 'project': project}):
result.append(Config.model_validate(item))
return result