AliOssUtil.py 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236
  1. import traceback
  2. from decimal import Decimal, ROUND_HALF_UP
  3. from typing import Optional, Dict
  4. import oss2
  5. from Ansjer.config import LOGGER
  6. class AliOssUtil:
  7. def __init__(self, access_key_id, access_key_secret, endpoint):
  8. self.access_id = access_key_id
  9. self.access_secret = access_key_secret
  10. self.endpoint = endpoint
  11. self.auth = oss2.Auth(access_key_id, access_key_secret)
  12. def upload_file_obj(self, bucket_name, file_key, file_obj, extra_args=None):
  13. """
  14. 对象上传至OSS存储桶
  15. @param bucket_name: 存储桶名称-必须
  16. @param file_key: 需要上传文件路径+文件名称
  17. @param file_obj: 文件对象
  18. @param extra_args: 额外参数
  19. @return: 当上传成功时为True;否则,False
  20. """
  21. try:
  22. bucket = oss2.Bucket(self.auth, self.endpoint, bucket_name)
  23. bucket.put_object(file_key, file_obj, headers=extra_args)
  24. return True
  25. except Exception as e:
  26. print(e.args)
  27. ex = traceback.format_exc()
  28. print('具体错误{}'.format(ex))
  29. return False
  30. def generate_file_obj_url(self, bucket_name, file_key):
  31. """
  32. 生成对象URL
  33. @param bucket_name: 存储桶名称
  34. @param file_key: 文件名称
  35. @return: url
  36. """
  37. try:
  38. bucket = oss2.Bucket(self.auth, self.endpoint, bucket_name)
  39. # 设置URL过期时间为3600秒(1小时)
  40. response_url = bucket.sign_url('GET', file_key, 3600)
  41. return response_url
  42. except Exception as e:
  43. print(e.args)
  44. ex = traceback.format_exc()
  45. print('具体错误{}'.format(ex))
  46. return ""
  47. def delete_obj(self, bucket_name, file_key):
  48. """
  49. 删除对象
  50. @param bucket_name: 存储桶
  51. @param file_key: 文件名称
  52. @return: 当删除成功时为True;否则,False
  53. """
  54. try:
  55. bucket = oss2.Bucket(self.auth, self.endpoint, bucket_name)
  56. bucket.delete_object(file_key)
  57. return True
  58. except Exception as e:
  59. print(e.args)
  60. ex = traceback.format_exc()
  61. print('具体错误{}'.format(ex))
  62. return False
  63. def bucket_exists(self, bucket_name):
  64. """
  65. 判断桶是否存在,是否有访问权限
  66. @param bucket_name: 存储桶
  67. @return: 当桶存在且有权限时为True;否则,False
  68. """
  69. try:
  70. bucket = oss2.Bucket(self.auth, self.endpoint, bucket_name)
  71. return bucket.get_bucket_info() is not None
  72. except Exception as e:
  73. print(e.args)
  74. ex = traceback.format_exc()
  75. print('具体错误{}'.format(ex))
  76. return False
  77. def list_objects(self, bucket_name: str, prefix: str = '', delimiter: str = '',
  78. continuation_token: str = '', start_after: str = '',
  79. fetch_owner: bool = False, encoding_type: str = 'url',
  80. max_keys: int = 1000, headers: Optional[dict] = None) -> Optional[dict]:
  81. """
  82. 根据前缀罗列Bucket里的文件。
  83. Args:
  84. bucket_name (str): 存储桶名称。
  85. prefix (str): 只罗列文件名为该前缀的文件。
  86. delimiter (str): 分隔符,可以用来模拟目录。
  87. continuation_token (str): 分页标志。首次调用传空串,后续使用返回值的next_continuation_token。
  88. start_after (str): 起始文件名称,OSS会返回按照字典序排列start_after之后的文件。
  89. fetch_owner (bool): 是否获取文件的owner信息,默认不返回。
  90. encoding_type (str): 编码类型。
  91. max_keys (int): 最多返回文件的个数,文件和目录的和不能超过该值。
  92. headers (Optional[dict]): HTTP头部。
  93. Returns:
  94. Optional[dict]: 包含文件列表和分页信息的字典,或None(发生错误时)。
  95. - files (List[str]): 文件列表。
  96. - prefixes (List[str]): 目录前缀列表。
  97. - next_continuation_token (str): 分页标志。
  98. - is_truncated (bool): 是否还有更多结果。
  99. """
  100. try:
  101. bucket = oss2.Bucket(self.auth, self.endpoint, bucket_name)
  102. result = bucket.list_objects_v2(
  103. prefix=prefix,
  104. delimiter=delimiter,
  105. continuation_token=continuation_token,
  106. start_after=start_after,
  107. fetch_owner=fetch_owner,
  108. encoding_type=encoding_type,
  109. max_keys=max_keys,
  110. headers=headers
  111. )
  112. return {
  113. 'files': result.object_list,
  114. 'prefixes': result.prefix_list,
  115. 'next_continuation_token': result.next_continuation_token,
  116. 'is_truncated': result.is_truncated
  117. }
  118. except Exception as e:
  119. LOGGER.error('查询oss文件列表失败, :{},:errLine:{}, errMsg:{}'
  120. .format(bucket_name, e.__traceback__.tb_lineno, repr(e)))
  121. raise RuntimeError(f"Failed to list objects in bucket {bucket_name}: {str(e)}")
  122. def list_all_objects(self, bucket_name: str, prefix: str = '', start_after: str = '',
  123. max_keys: int = 1000, end_time: Optional[int] = None,
  124. include_files: bool = True) -> Dict:
  125. """
  126. 递归获取所有分页的文件(封装分页逻辑,对外提供简洁接口),支持按10位时间戳截止时间过滤
  127. Args:
  128. bucket_name (str): 存储桶名称
  129. prefix (str): 文件前缀
  130. start_after (str): 起始文件名
  131. max_keys (int): 每页最大数量
  132. end_time (int): 截止时间(10位整数时间戳,仅统计 last_modified < end_time 的文件)
  133. include_files (bool): 是否包含文件列表(大数据量时建议设为False)
  134. Returns:
  135. Dict:
  136. - total_size_bytes (int): 文件总大小(字节)
  137. - total_files (int): 文件总数
  138. - total_size_mb (float): 总大小(MB,保留4位小数)
  139. - files (List[oss2.models.ObjectInfo]): 所有文件列表(仅当include_files=True时返回)
  140. """
  141. try:
  142. total_size = 0
  143. total_files = 0
  144. continuation_token = ''
  145. all_files = [] if include_files else None
  146. # 预计算常量
  147. MB_BYTES = 1024 * 1024
  148. while True:
  149. result = self.list_objects(
  150. bucket_name=bucket_name,
  151. prefix=prefix,
  152. start_after=start_after,
  153. continuation_token=continuation_token,
  154. max_keys=max_keys
  155. )
  156. current_files = result.get('files', [])
  157. # 时间过滤逻辑优化
  158. if end_time is not None:
  159. should_break = False
  160. filtered_files = []
  161. for obj in current_files:
  162. if obj.last_modified < end_time:
  163. filtered_files.append(obj)
  164. else:
  165. # 遇到超过截止时间的文件,标记终止并跳出循环
  166. should_break = True
  167. break
  168. current_files = filtered_files
  169. # 如果当前页有文件超过截止时间,提前终止
  170. if should_break:
  171. # 累加当前过滤后的文件
  172. total_size += sum(obj.size for obj in current_files)
  173. total_files += len(current_files)
  174. if include_files:
  175. all_files.extend(current_files)
  176. break
  177. # 使用生成器表达式减少内存使用
  178. batch_size = sum(obj.size for obj in current_files)
  179. batch_files_count = len(current_files)
  180. total_size += batch_size
  181. total_files += batch_files_count
  182. if include_files:
  183. all_files.extend(current_files)
  184. # 终止循环条件优化
  185. is_truncated = result.get('is_truncated', False)
  186. next_token = result.get('next_continuation_token', '')
  187. if not is_truncated or (end_time is not None and not next_token):
  188. break
  189. continuation_token = next_token
  190. # 字节转MB优化计算
  191. total_size_mb = Decimal(total_size) / Decimal(MB_BYTES)
  192. total_size_mb = total_size_mb.quantize(Decimal('0.00'), rounding=ROUND_HALF_UP)
  193. # 构建返回结果
  194. result_data = {
  195. 'total_size_bytes': total_size,
  196. 'total_files': total_files,
  197. 'total_size_mb': float(total_size_mb)
  198. }
  199. if include_files:
  200. result_data['files'] = all_files
  201. return result_data
  202. except Exception as e:
  203. error_msg = str(e)
  204. LOGGER.error(f"{prefix} 查询oss文件列表失败, errMsg: {error_msg}")
  205. return {}