# -*- encoding: utf-8 -*- """ @File : AmazonS3Util.py @Time : 2022/8/11 14:00 @Author : stephen @Email : zhangdongming@asj6.wecom.work @Software: PyCharm """ import logging import traceback import boto3 import botocore from boto3.session import Session from botocore import client from botocore.exceptions import ClientError logger = logging.getLogger('info') class AmazonS3Util: def __init__(self, aws_access_key_id, secret_access_key, region_name): self.access_id = aws_access_key_id self.access_secret = secret_access_key self.region_name = region_name session = Session( aws_access_key_id=aws_access_key_id, aws_secret_access_key=secret_access_key, region_name=region_name ) self.client_conn = boto3.client( 's3', aws_access_key_id=aws_access_key_id, aws_secret_access_key=secret_access_key, config=botocore.client.Config(signature_version='s3v4'), region_name=region_name ) self.session_conn = session.resource('s3') def upload_file_obj(self, bucket, file_key, file_obj, extra_args=None): """ 对象上传至S3存储桶 https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Object.upload_file @param bucket: 存储桶名称-必须 @param file_key: 需要上传文件路径+文件名称 @param file_obj: 文件对象 @param extra_args: 额外参数 如ACL配置 @return: 当上传成功时为True;否则,False """ try: session = self.session_conn bucket = session.Bucket(bucket) obj = bucket.Object(file_key) obj.upload_fileobj(file_obj, ExtraArgs=extra_args) return True except Exception as e: print(e.args) ex = traceback.format_exc() print('具体错误{}'.format(ex)) return False def generate_file_obj_url(self, bucket, file_key): """ 生成对象URL https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Client.generate_presigned_url @param bucket: 存储桶名称 @param file_key: 文件名称 @return: url """ response_url = self.client_conn.generate_presigned_url( ClientMethod='get_object', Params={ 'Bucket': bucket, 'Key': file_key } ) return response_url def delete_obj(self, bucket, file_key): """ 删除对象 https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Object.delete @param bucket: 存储桶 @param file_key: 文件名称 @return: 当删除成功时为True;否则,False """ try: bucket = self.session_conn.Bucket(bucket) obj = bucket.Object(file_key) obj.delete() return True except Exception as e: print(e.args) ex = traceback.format_exc() print('具体错误{}'.format(ex)) return False def bucket_exists(self, bucket_name): """ 判断桶是否存在,是否有访问权限 @return: 当bucket存在时为True;否则,假的 """ try: self.client_conn.head_bucket(Bucket=bucket_name) logger.info("存储桶 {} 存在.".format(bucket_name)) exists = True except ClientError: logger.warning( "存储桶 {} 不存在,或者你没有权限.".format(bucket_name)) exists = False return exists def get_object(self, bucket, key): """ 获取对象 @param bucket: 存储桶 @param key: 文件 @return : boolean """ try: self.client_conn.get_object(Bucket=bucket, Key=key) return True except self.client_conn.exceptions.NoSuchKey: return False def download_object(self, bucket, key, file_name): """ 下载对象至本地 @param file_name: 保存位置以及名称 @param bucket: 存储桶 @param key: 文件 @return : boolean """ try: self.client_conn.download_file(bucket, key, file_name) return True except Exception as e: return e.args def copy_obj(self, source_bucket, to_bucket, file_key): """ 复制对象 @param source_bucket: 原存储桶 @param file_key: 文件名称 @param to_bucket: 新存储桶 """ source_dict = { 'Bucket': source_bucket, 'Key': file_key } self.session_conn.meta.client.copy(source_dict, to_bucket, file_key) def copy_single_obj(self, source_bucket, source_object, target_bucket, target_object, StorageClass=None): """ 单个对象复制 @param source_bucket:源存储桶 @param source_object:源对象 @param target_bucket:目标存储桶 @param target_object:目标对象 @param StorageClass:存储类 @return: None """ s3 = self.session_conn copy_source = { 'Bucket': source_bucket, 'Key': source_object } target_object = s3.Object(target_bucket, target_object) if StorageClass: target_object.copy_from(CopySource=copy_source, StorageClass=StorageClass) else: target_object.copy_from(CopySource=copy_source) def generate_put_obj_url(self, bucket_name, obj_key, storage_class=None): """ 生成预签名对象URL @param bucket_name: 存储桶名称 @param obj_key: 对象key @param storage_class: 存储类 例 @return: 对象URL """ params = { 'Bucket': bucket_name, 'Key': obj_key, } if storage_class: params['StorageClass'] = storage_class return self.session_conn.meta.client.generate_presigned_url('put_object', Params=params, ExpiresIn=7200) def batch_copy_obj(self, source_bucket, target_bucket, prefix, target_prefix, storage_class=None): """ 批量拷贝对象 @param source_bucket: 源存储桶 @param target_bucket: 目标存储桶 @param prefix: 需要搜索的对象前缀 例:AUS000247LTCLY/vod1/1686043996 @param target_prefix: 目标对象前缀 例:app/algorithm-shop/1686043996 @param storage_class: 存储类 @return: None """ s3 = self.session_conn # 遍历源存储桶中指定前缀下的所有对象,依次进行复制操作 for obj in s3.Bucket(source_bucket).objects.filter(Prefix=prefix): key = obj.key # 对象键名 target_key = f'{target_prefix}/' + key.split('/')[-1] # 新的对象键名,此处为 "new_path/" + 原有文件名 copy_source = { 'Bucket': source_bucket, 'Key': key } # 将对象复制到目标存储桶,并设置存储类型和新的对象键名 if storage_class: s3.Object(target_bucket, target_key).copy_from(CopySource=copy_source, StorageClass=storage_class) else: s3.Object(target_bucket, target_key).copy_from(CopySource=copy_source) def get_object_size(self,bucket_name, object_key): """ 获取存储桶中指定对象的大小 :param bucket_name: string,存储桶名称 :param object_key: string,对象键名 :return: int,指定对象的大小,单位为字节 """ s3 = self.session_conn obj = s3.Object(bucket_name, object_key) try: return obj.content_length except Exception as e: return 0 def get_object_list(self, bucket_name, prefix): """ 获取指定路径所有对象 :param bucket_name: string,存储桶名称 :param prefix: string,路径 :return: int,指定对象的大小,单位为字节 """ try: s3 = self.client_conn obj = s3.list_objects_v2(Bucket=bucket_name, Prefix=prefix) return obj['Contents'] except Exception as e: return []