| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236 |
- import traceback
- from decimal import Decimal, ROUND_HALF_UP
- from typing import Optional, Dict
- import oss2
- from Ansjer.config import LOGGER
- class AliOssUtil:
- def __init__(self, access_key_id, access_key_secret, endpoint):
- self.access_id = access_key_id
- self.access_secret = access_key_secret
- self.endpoint = endpoint
- self.auth = oss2.Auth(access_key_id, access_key_secret)
- def upload_file_obj(self, bucket_name, file_key, file_obj, extra_args=None):
- """
- 对象上传至OSS存储桶
- @param bucket_name: 存储桶名称-必须
- @param file_key: 需要上传文件路径+文件名称
- @param file_obj: 文件对象
- @param extra_args: 额外参数
- @return: 当上传成功时为True;否则,False
- """
- try:
- bucket = oss2.Bucket(self.auth, self.endpoint, bucket_name)
- bucket.put_object(file_key, file_obj, headers=extra_args)
- return True
- except Exception as e:
- print(e.args)
- ex = traceback.format_exc()
- print('具体错误{}'.format(ex))
- return False
- def generate_file_obj_url(self, bucket_name, file_key):
- """
- 生成对象URL
- @param bucket_name: 存储桶名称
- @param file_key: 文件名称
- @return: url
- """
- try:
- bucket = oss2.Bucket(self.auth, self.endpoint, bucket_name)
- # 设置URL过期时间为3600秒(1小时)
- response_url = bucket.sign_url('GET', file_key, 3600)
- return response_url
- except Exception as e:
- print(e.args)
- ex = traceback.format_exc()
- print('具体错误{}'.format(ex))
- return ""
- def delete_obj(self, bucket_name, file_key):
- """
- 删除对象
- @param bucket_name: 存储桶
- @param file_key: 文件名称
- @return: 当删除成功时为True;否则,False
- """
- try:
- bucket = oss2.Bucket(self.auth, self.endpoint, bucket_name)
- bucket.delete_object(file_key)
- return True
- except Exception as e:
- print(e.args)
- ex = traceback.format_exc()
- print('具体错误{}'.format(ex))
- return False
- def bucket_exists(self, bucket_name):
- """
- 判断桶是否存在,是否有访问权限
- @param bucket_name: 存储桶
- @return: 当桶存在且有权限时为True;否则,False
- """
- try:
- bucket = oss2.Bucket(self.auth, self.endpoint, bucket_name)
- return bucket.get_bucket_info() is not None
- except Exception as e:
- print(e.args)
- ex = traceback.format_exc()
- print('具体错误{}'.format(ex))
- return False
- def list_objects(self, bucket_name: str, prefix: str = '', delimiter: str = '',
- continuation_token: str = '', start_after: str = '',
- fetch_owner: bool = False, encoding_type: str = 'url',
- max_keys: int = 1000, headers: Optional[dict] = None) -> Optional[dict]:
- """
- 根据前缀罗列Bucket里的文件。
- Args:
- bucket_name (str): 存储桶名称。
- prefix (str): 只罗列文件名为该前缀的文件。
- delimiter (str): 分隔符,可以用来模拟目录。
- continuation_token (str): 分页标志。首次调用传空串,后续使用返回值的next_continuation_token。
- start_after (str): 起始文件名称,OSS会返回按照字典序排列start_after之后的文件。
- fetch_owner (bool): 是否获取文件的owner信息,默认不返回。
- encoding_type (str): 编码类型。
- max_keys (int): 最多返回文件的个数,文件和目录的和不能超过该值。
- headers (Optional[dict]): HTTP头部。
- Returns:
- Optional[dict]: 包含文件列表和分页信息的字典,或None(发生错误时)。
- - files (List[str]): 文件列表。
- - prefixes (List[str]): 目录前缀列表。
- - next_continuation_token (str): 分页标志。
- - is_truncated (bool): 是否还有更多结果。
- """
- try:
- bucket = oss2.Bucket(self.auth, self.endpoint, bucket_name)
- result = bucket.list_objects_v2(
- prefix=prefix,
- delimiter=delimiter,
- continuation_token=continuation_token,
- start_after=start_after,
- fetch_owner=fetch_owner,
- encoding_type=encoding_type,
- max_keys=max_keys,
- headers=headers
- )
- return {
- 'files': result.object_list,
- 'prefixes': result.prefix_list,
- 'next_continuation_token': result.next_continuation_token,
- 'is_truncated': result.is_truncated
- }
- except Exception as e:
- LOGGER.error('查询oss文件列表失败, :{},:errLine:{}, errMsg:{}'
- .format(bucket_name, e.__traceback__.tb_lineno, repr(e)))
- raise RuntimeError(f"Failed to list objects in bucket {bucket_name}: {str(e)}")
- def list_all_objects(self, bucket_name: str, prefix: str = '', start_after: str = '',
- max_keys: int = 1000, end_time: Optional[int] = None,
- include_files: bool = True) -> Dict:
- """
- 递归获取所有分页的文件(封装分页逻辑,对外提供简洁接口),支持按10位时间戳截止时间过滤
- Args:
- bucket_name (str): 存储桶名称
- prefix (str): 文件前缀
- start_after (str): 起始文件名
- max_keys (int): 每页最大数量
- end_time (int): 截止时间(10位整数时间戳,仅统计 last_modified < end_time 的文件)
- include_files (bool): 是否包含文件列表(大数据量时建议设为False)
- Returns:
- Dict:
- - total_size_bytes (int): 文件总大小(字节)
- - total_files (int): 文件总数
- - total_size_mb (float): 总大小(MB,保留4位小数)
- - files (List[oss2.models.ObjectInfo]): 所有文件列表(仅当include_files=True时返回)
- """
- try:
- total_size = 0
- total_files = 0
- continuation_token = ''
- all_files = [] if include_files else None
- # 预计算常量
- MB_BYTES = 1024 * 1024
- while True:
- result = self.list_objects(
- bucket_name=bucket_name,
- prefix=prefix,
- start_after=start_after,
- continuation_token=continuation_token,
- max_keys=max_keys
- )
- current_files = result.get('files', [])
- # 时间过滤逻辑优化
- if end_time is not None:
- should_break = False
- filtered_files = []
- for obj in current_files:
- if obj.last_modified < end_time:
- filtered_files.append(obj)
- else:
- # 遇到超过截止时间的文件,标记终止并跳出循环
- should_break = True
- break
- current_files = filtered_files
- # 如果当前页有文件超过截止时间,提前终止
- if should_break:
- # 累加当前过滤后的文件
- total_size += sum(obj.size for obj in current_files)
- total_files += len(current_files)
- if include_files:
- all_files.extend(current_files)
- break
- # 使用生成器表达式减少内存使用
- batch_size = sum(obj.size for obj in current_files)
- batch_files_count = len(current_files)
- total_size += batch_size
- total_files += batch_files_count
- if include_files:
- all_files.extend(current_files)
- # 终止循环条件优化
- is_truncated = result.get('is_truncated', False)
- next_token = result.get('next_continuation_token', '')
- if not is_truncated or (end_time is not None and not next_token):
- break
- continuation_token = next_token
- # 字节转MB优化计算
- total_size_mb = Decimal(total_size) / Decimal(MB_BYTES)
- total_size_mb = total_size_mb.quantize(Decimal('0.00'), rounding=ROUND_HALF_UP)
- # 构建返回结果
- result_data = {
- 'total_size_bytes': total_size,
- 'total_files': total_files,
- 'total_size_mb': float(total_size_mb)
- }
- if include_files:
- result_data['files'] = all_files
- return result_data
- except Exception as e:
- error_msg = str(e)
- LOGGER.error(f"{prefix} 查询oss文件列表失败, errMsg: {error_msg}")
- return {}
|