import traceback import oss2 class AliOssUtil: def __init__(self, access_key_id, access_key_secret, endpoint): self.access_id = access_key_id self.access_secret = access_key_secret self.endpoint = endpoint self.auth = oss2.Auth(access_key_id, access_key_secret) def upload_file_obj(self, bucket_name, file_key, file_obj, extra_args=None): """ 对象上传至OSS存储桶 @param bucket_name: 存储桶名称-必须 @param file_key: 需要上传文件路径+文件名称 @param file_obj: 文件对象 @param extra_args: 额外参数 @return: 当上传成功时为True;否则,False """ try: bucket = oss2.Bucket(self.auth, self.endpoint, bucket_name) bucket.put_object(file_key, file_obj, headers=extra_args) return True except Exception as e: print(e.args) ex = traceback.format_exc() print('具体错误{}'.format(ex)) return False def generate_file_obj_url(self, bucket_name, file_key): """ 生成对象URL @param bucket_name: 存储桶名称 @param file_key: 文件名称 @return: url """ try: bucket = oss2.Bucket(self.auth, self.endpoint, bucket_name) # 设置URL过期时间为3600秒(1小时) response_url = bucket.sign_url('GET', file_key, 3600) return response_url except Exception as e: print(e.args) ex = traceback.format_exc() print('具体错误{}'.format(ex)) return "" def delete_obj(self, bucket_name, file_key): """ 删除对象 @param bucket_name: 存储桶 @param file_key: 文件名称 @return: 当删除成功时为True;否则,False """ try: bucket = oss2.Bucket(self.auth, self.endpoint, bucket_name) bucket.delete_object(file_key) return True except Exception as e: print(e.args) ex = traceback.format_exc() print('具体错误{}'.format(ex)) return False def bucket_exists(self, bucket_name): """ 判断桶是否存在,是否有访问权限 @param bucket_name: 存储桶 @return: 当桶存在且有权限时为True;否则,False """ try: bucket = oss2.Bucket(self.auth, self.endpoint, bucket_name) return bucket.get_bucket_info() is not None except Exception as e: print(e.args) ex = traceback.format_exc() print('具体错误{}'.format(ex)) return False