import traceback from decimal import Decimal, ROUND_HALF_UP from typing import Optional, Dict import oss2 from Ansjer.config import LOGGER class AliOssUtil: def __init__(self, access_key_id, access_key_secret, endpoint): self.access_id = access_key_id self.access_secret = access_key_secret self.endpoint = endpoint self.auth = oss2.Auth(access_key_id, access_key_secret) def upload_file_obj(self, bucket_name, file_key, file_obj, extra_args=None): """ 对象上传至OSS存储桶 @param bucket_name: 存储桶名称-必须 @param file_key: 需要上传文件路径+文件名称 @param file_obj: 文件对象 @param extra_args: 额外参数 @return: 当上传成功时为True;否则,False """ try: bucket = oss2.Bucket(self.auth, self.endpoint, bucket_name) bucket.put_object(file_key, file_obj, headers=extra_args) return True except Exception as e: print(e.args) ex = traceback.format_exc() print('具体错误{}'.format(ex)) return False def generate_file_obj_url(self, bucket_name, file_key): """ 生成对象URL @param bucket_name: 存储桶名称 @param file_key: 文件名称 @return: url """ try: bucket = oss2.Bucket(self.auth, self.endpoint, bucket_name) # 设置URL过期时间为3600秒(1小时) response_url = bucket.sign_url('GET', file_key, 3600) return response_url except Exception as e: print(e.args) ex = traceback.format_exc() print('具体错误{}'.format(ex)) return "" def delete_obj(self, bucket_name, file_key): """ 删除对象 @param bucket_name: 存储桶 @param file_key: 文件名称 @return: 当删除成功时为True;否则,False """ try: bucket = oss2.Bucket(self.auth, self.endpoint, bucket_name) bucket.delete_object(file_key) return True except Exception as e: print(e.args) ex = traceback.format_exc() print('具体错误{}'.format(ex)) return False def bucket_exists(self, bucket_name): """ 判断桶是否存在,是否有访问权限 @param bucket_name: 存储桶 @return: 当桶存在且有权限时为True;否则,False """ try: bucket = oss2.Bucket(self.auth, self.endpoint, bucket_name) return bucket.get_bucket_info() is not None except Exception as e: print(e.args) ex = traceback.format_exc() print('具体错误{}'.format(ex)) return False def list_objects(self, bucket_name: str, prefix: str = '', delimiter: str = '', continuation_token: str = '', start_after: str = '', fetch_owner: bool = False, encoding_type: str = 'url', max_keys: int = 1000, headers: Optional[dict] = None) -> Optional[dict]: """ 根据前缀罗列Bucket里的文件。 Args: bucket_name (str): 存储桶名称。 prefix (str): 只罗列文件名为该前缀的文件。 delimiter (str): 分隔符,可以用来模拟目录。 continuation_token (str): 分页标志。首次调用传空串,后续使用返回值的next_continuation_token。 start_after (str): 起始文件名称,OSS会返回按照字典序排列start_after之后的文件。 fetch_owner (bool): 是否获取文件的owner信息,默认不返回。 encoding_type (str): 编码类型。 max_keys (int): 最多返回文件的个数,文件和目录的和不能超过该值。 headers (Optional[dict]): HTTP头部。 Returns: Optional[dict]: 包含文件列表和分页信息的字典,或None(发生错误时)。 - files (List[str]): 文件列表。 - prefixes (List[str]): 目录前缀列表。 - next_continuation_token (str): 分页标志。 - is_truncated (bool): 是否还有更多结果。 """ try: bucket = oss2.Bucket(self.auth, self.endpoint, bucket_name) result = bucket.list_objects_v2( prefix=prefix, delimiter=delimiter, continuation_token=continuation_token, start_after=start_after, fetch_owner=fetch_owner, encoding_type=encoding_type, max_keys=max_keys, headers=headers ) return { 'files': result.object_list, 'prefixes': result.prefix_list, 'next_continuation_token': result.next_continuation_token, 'is_truncated': result.is_truncated } except Exception as e: LOGGER.error('查询oss文件列表失败, :{},:errLine:{}, errMsg:{}' .format(bucket_name, e.__traceback__.tb_lineno, repr(e))) raise RuntimeError(f"Failed to list objects in bucket {bucket_name}: {str(e)}") def list_all_objects(self, bucket_name: str, prefix: str = '', start_after: str = '', max_keys: int = 1000, end_time: Optional[int] = None, include_files: bool = True) -> Dict: """ 递归获取所有分页的文件(封装分页逻辑,对外提供简洁接口),支持按10位时间戳截止时间过滤 Args: bucket_name (str): 存储桶名称 prefix (str): 文件前缀 start_after (str): 起始文件名 max_keys (int): 每页最大数量 end_time (int): 截止时间(10位整数时间戳,仅统计 last_modified < end_time 的文件) include_files (bool): 是否包含文件列表(大数据量时建议设为False) Returns: Dict: - total_size_bytes (int): 文件总大小(字节) - total_files (int): 文件总数 - total_size_mb (float): 总大小(MB,保留4位小数) - files (List[oss2.models.ObjectInfo]): 所有文件列表(仅当include_files=True时返回) """ try: total_size = 0 total_files = 0 continuation_token = '' all_files = [] if include_files else None # 预计算常量 MB_BYTES = 1024 * 1024 while True: result = self.list_objects( bucket_name=bucket_name, prefix=prefix, start_after=start_after, continuation_token=continuation_token, max_keys=max_keys ) current_files = result.get('files', []) # 时间过滤逻辑优化 if end_time is not None: should_break = False filtered_files = [] for obj in current_files: if obj.last_modified < end_time: filtered_files.append(obj) else: # 遇到超过截止时间的文件,标记终止并跳出循环 should_break = True break current_files = filtered_files # 如果当前页有文件超过截止时间,提前终止 if should_break: # 累加当前过滤后的文件 total_size += sum(obj.size for obj in current_files) total_files += len(current_files) if include_files: all_files.extend(current_files) break # 使用生成器表达式减少内存使用 batch_size = sum(obj.size for obj in current_files) batch_files_count = len(current_files) total_size += batch_size total_files += batch_files_count if include_files: all_files.extend(current_files) # 终止循环条件优化 is_truncated = result.get('is_truncated', False) next_token = result.get('next_continuation_token', '') if not is_truncated or (end_time is not None and not next_token): break continuation_token = next_token # 字节转MB优化计算 total_size_mb = Decimal(total_size) / Decimal(MB_BYTES) total_size_mb = total_size_mb.quantize(Decimal('0.00'), rounding=ROUND_HALF_UP) # 构建返回结果 result_data = { 'total_size_bytes': total_size, 'total_files': total_files, 'total_size_mb': float(total_size_mb) } if include_files: result_data['files'] = all_files return result_data except Exception as e: error_msg = str(e) LOGGER.error(f"{prefix} 查询oss文件列表失败, errMsg: {error_msg}") return {}