import io from minio import Minio from minio.error import MinioException from Platform import settings class Client: def __init__(self, host: str, access_key: str, secret_key: str, bucket_name: str): self.bucket_name = bucket_name self.cli = Minio( host, access_key=access_key, secret_key=secret_key, secure=False ) try: self.cli.make_bucket(bucket_name) except MinioException: pass def put_object(self, data: bytes, name: str): self.cli.put_object(self.bucket_name, name, io.BytesIO(data), len(data)) def get_object(self, name: str) -> bytes: try: return self.cli.get_object(self.bucket_name, name).data except MinioException: return b"" def delete_object(self, name: str): try: self.cli.remove_object(self.bucket_name, name) except MinioException: pass minio_client = Client( settings.MINIO_HOST, settings.MINIO_ACCESS_KEY, settings.MINIO_SECRET_KEY, settings.MINIO_BUCKET_NAME )