| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879 |
- import traceback
- import oss2
- class AliOssUtil:
- def __init__(self, access_key_id, access_key_secret, endpoint):
- self.access_id = access_key_id
- self.access_secret = access_key_secret
- self.endpoint = endpoint
- self.auth = oss2.Auth(access_key_id, access_key_secret)
-
- def upload_file_obj(self, bucket_name, file_key, file_obj, extra_args=None):
- """
- 对象上传至OSS存储桶
- @param bucket_name: 存储桶名称-必须
- @param file_key: 需要上传文件路径+文件名称
- @param file_obj: 文件对象
- @param extra_args: 额外参数
- @return: 当上传成功时为True;否则,False
- """
- try:
- bucket = oss2.Bucket(self.auth, self.endpoint, bucket_name)
- bucket.put_object(file_key, file_obj, headers=extra_args)
- return True
- except Exception as e:
- print(e.args)
- ex = traceback.format_exc()
- print('具体错误{}'.format(ex))
- return False
- def generate_file_obj_url(self, bucket_name, file_key):
- """
- 生成对象URL
- @param bucket_name: 存储桶名称
- @param file_key: 文件名称
- @return: url
- """
- try:
- bucket = oss2.Bucket(self.auth, self.endpoint, bucket_name)
- # 设置URL过期时间为3600秒(1小时)
- response_url = bucket.sign_url('GET', file_key, 3600)
- return response_url
- except Exception as e:
- print(e.args)
- ex = traceback.format_exc()
- print('具体错误{}'.format(ex))
- return ""
- def delete_obj(self, bucket_name, file_key):
- """
- 删除对象
- @param bucket_name: 存储桶
- @param file_key: 文件名称
- @return: 当删除成功时为True;否则,False
- """
- try:
- bucket = oss2.Bucket(self.auth, self.endpoint, bucket_name)
- bucket.delete_object(file_key)
- return True
- except Exception as e:
- print(e.args)
- ex = traceback.format_exc()
- print('具体错误{}'.format(ex))
- return False
- def bucket_exists(self, bucket_name):
- """
- 判断桶是否存在,是否有访问权限
- @param bucket_name: 存储桶
- @return: 当桶存在且有权限时为True;否则,False
- """
- try:
- bucket = oss2.Bucket(self.auth, self.endpoint, bucket_name)
- return bucket.get_bucket_info() is not None
- except Exception as e:
- print(e.args)
- ex = traceback.format_exc()
- print('具体错误{}'.format(ex))
- return False
|