| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135 | # -*- encoding: utf-8 -*-"""@File    : OCIObjectStorage.py@Time    : 2024/4/10 15:06@Author  : stephen@Email   : zhangdongming@asj6.wecom.work@Software: PyCharm"""import ocifrom Ansjer.config import OCI_CONFIG, OCI_NAMESPACE_NAMEclass OCIObjectStorage:    # Create a default oci_config using DEFAULT profile in default location    # Refer to    # https://docs.cloud.oracle.com/en-us/iaas/Content/API/Concepts/sdkconfig.htm#SDK_and_CLI_Configuration_File    # for more info    def __init__(self, region):        # Initialize service client with default oci_config file        self.object_storage_client = oci.object_storage.ObjectStorageClient(OCI_CONFIG[region])    def create_ereauthenticated_request(self, bucket_name, name, object_name, time_expires):        """        创建特定于桶的预认证请求。        api:https://docs.oracle.com/en-us/iaas/api/#/en/objectstorage/20160918/PreauthenticatedRequest/CreatePreauthenticatedRequest        @param bucket_name: 存储桶名称        @param name: 请求名称 是创建的预授权链接的名称,是方便管理用的,不会影响功能。比如对每个桶分别创建链接,如果要删除或者查看,可以根据name看出来是对哪个桶的链接。        @param object_name: 对象名        @param time_expires: 失效时间 需要datetime类型格式 例如:datetime.utcnow() + timedelta(minutes=30)        @return: 预认证请求URL        """        try:            object_storage_client = self.object_storage_client            # Send the request to service, some parameters are not required, see API            # doc for more info            response = object_storage_client.create_preauthenticated_request(                namespace_name=OCI_NAMESPACE_NAME,                bucket_name=bucket_name,                create_preauthenticated_request_details=oci.object_storage.models.CreatePreauthenticatedRequestDetails(                    name=name,                    access_type="AnyObjectWrite",                    time_expires=time_expires,                    bucket_listing_action="Deny",                    object_name=object_name))            assert response.status == 200            return response.data        except Exception as e:            print(repr(e))            return None    def get_preauthenticated_request_url(self, bucket_name, name, object_name, time_expires, access_type='ObjectRead'):        """        获取指定对象预认证请求URL。        @param bucket_name: 存储桶名称        @param name: 请求名称 是创建的预授权链接的名称,是方便管理用的,不会影响功能。比如对每个桶分别创建链接,如果要删除或者查看,可以根据name看出来是对哪个桶的链接。        @param object_name: 对象名        @param time_expires: 失效时间 需要datetime类型格式 例如:datetime.utcnow() + timedelta(minutes=30)        @param access_type: 授权类型(https://docs.oracle.com/en-us/iaas/tools/python/2.127.0/api/object_storage/models/oci.        object_storage.models.CreatePreauthenticatedRequestDetails.html#oci.object_storage.models.CreatePreauthenticatedRequestDetails.access_type)        @return: 预认证请求URL        """        try:            object_storage_client = self.object_storage_client            # 创建预认证请求            response = object_storage_client.create_preauthenticated_request(                namespace_name=OCI_NAMESPACE_NAME,                bucket_name=bucket_name,                create_preauthenticated_request_details=oci.object_storage.models.CreatePreauthenticatedRequestDetails(                    name=name,                    object_name=object_name,                    access_type=access_type,                    time_expires=time_expires                )            )            assert response.status == 200            return response.data        except Exception as e:            print(repr(e))            return None    def put_object(self, bucket_name, object_name, obj, content_type=None):        """        上传对象        @param bucket_name: 存储桶名称        @param object_name: 对象名        @param obj: 数据内容        @param content_type: 文件类型        @return: 可访问对象URL        """        try:            object_storage_client = self.object_storage_client            # 发送上传请求            put_object_response = object_storage_client.put_object(                namespace_name=OCI_NAMESPACE_NAME,                bucket_name=bucket_name,                object_name=object_name,                put_object_body=obj,                content_type=content_type            )            # 打印响应头信息            assert put_object_response.status == 200        except Exception as e:            print(repr(e))            return None    def delete_object(self, bucket_name, object_name, content_type=None):        """        删除对象        @param bucket_name: 存储桶名称        @param object_name: 对象名        @param content_type: 文件类型        @return: 可访问对象URL        """        try:            object_storage_client = self.object_storage_client            # 发送删除请求            object_storage_client.delete_object(                namespace_name=OCI_NAMESPACE_NAME,                bucket_name=bucket_name,                object_name=object_name)        except Exception as e:            print(repr(e))            return None
 |