AliOssUtil.py 2.7 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879
  1. import traceback
  2. import oss2
  3. class AliOssUtil:
  4. def __init__(self, access_key_id, access_key_secret, endpoint):
  5. self.access_id = access_key_id
  6. self.access_secret = access_key_secret
  7. self.endpoint = endpoint
  8. self.auth = oss2.Auth(access_key_id, access_key_secret)
  9. def upload_file_obj(self, bucket_name, file_key, file_obj, extra_args=None):
  10. """
  11. 对象上传至OSS存储桶
  12. @param bucket_name: 存储桶名称-必须
  13. @param file_key: 需要上传文件路径+文件名称
  14. @param file_obj: 文件对象
  15. @param extra_args: 额外参数
  16. @return: 当上传成功时为True;否则,False
  17. """
  18. try:
  19. bucket = oss2.Bucket(self.auth, self.endpoint, bucket_name)
  20. bucket.put_object(file_key, file_obj, headers=extra_args)
  21. return True
  22. except Exception as e:
  23. print(e.args)
  24. ex = traceback.format_exc()
  25. print('具体错误{}'.format(ex))
  26. return False
  27. def generate_file_obj_url(self, bucket_name, file_key):
  28. """
  29. 生成对象URL
  30. @param bucket_name: 存储桶名称
  31. @param file_key: 文件名称
  32. @return: url
  33. """
  34. try:
  35. bucket = oss2.Bucket(self.auth, self.endpoint, bucket_name)
  36. # 设置URL过期时间为3600秒(1小时)
  37. response_url = bucket.sign_url('GET', file_key, 3600)
  38. return response_url
  39. except Exception as e:
  40. print(e.args)
  41. ex = traceback.format_exc()
  42. print('具体错误{}'.format(ex))
  43. return ""
  44. def delete_obj(self, bucket_name, file_key):
  45. """
  46. 删除对象
  47. @param bucket_name: 存储桶
  48. @param file_key: 文件名称
  49. @return: 当删除成功时为True;否则,False
  50. """
  51. try:
  52. bucket = oss2.Bucket(self.auth, self.endpoint, bucket_name)
  53. bucket.delete_object(file_key)
  54. return True
  55. except Exception as e:
  56. print(e.args)
  57. ex = traceback.format_exc()
  58. print('具体错误{}'.format(ex))
  59. return False
  60. def bucket_exists(self, bucket_name):
  61. """
  62. 判断桶是否存在,是否有访问权限
  63. @param bucket_name: 存储桶
  64. @return: 当桶存在且有权限时为True;否则,False
  65. """
  66. try:
  67. bucket = oss2.Bucket(self.auth, self.endpoint, bucket_name)
  68. return bucket.get_bucket_info() is not None
  69. except Exception as e:
  70. print(e.args)
  71. ex = traceback.format_exc()
  72. print('具体错误{}'.format(ex))
  73. return False