| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245 | # -*- encoding: utf-8 -*-"""@File    : AmazonS3Util.py@Time    : 2022/8/11 14:00@Author  : stephen@Email   : zhangdongming@asj6.wecom.work@Software: PyCharm"""import loggingimport tracebackimport boto3import botocorefrom boto3.session import Sessionfrom botocore import clientfrom botocore.exceptions import ClientErrorlogger = logging.getLogger('info')class AmazonS3Util:    def __init__(self, aws_access_key_id, secret_access_key, region_name):        self.access_id = aws_access_key_id        self.access_secret = secret_access_key        self.region_name = region_name        session = Session(            aws_access_key_id=aws_access_key_id,            aws_secret_access_key=secret_access_key,            region_name=region_name        )        self.client_conn = boto3.client(            's3',            aws_access_key_id=aws_access_key_id,            aws_secret_access_key=secret_access_key,            config=botocore.client.Config(signature_version='s3v4'),            region_name=region_name        )        self.session_conn = session.resource('s3')    def upload_file_obj(self, bucket, file_key, file_obj, extra_args=None):        """        对象上传至S3存储桶        https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Object.upload_file        @param bucket: 存储桶名称-必须        @param file_key: 需要上传文件路径+文件名称        @param file_obj: 文件对象        @param extra_args: 额外参数 如ACL配置        @return: 当上传成功时为True;否则,False        """        try:            session = self.session_conn            bucket = session.Bucket(bucket)            obj = bucket.Object(file_key)            obj.upload_fileobj(file_obj, ExtraArgs=extra_args)            return True        except Exception as e:            print(e.args)            ex = traceback.format_exc()            print('具体错误{}'.format(ex))            return False    def generate_file_obj_url(self, bucket, file_key):        """        生成对象URL        https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Client.generate_presigned_url        @param bucket: 存储桶名称        @param file_key: 文件名称        @return: url        """        response_url = self.client_conn.generate_presigned_url(            ClientMethod='get_object',            Params={                'Bucket': bucket,                'Key': file_key            }        )        return response_url    def delete_obj(self, bucket, file_key):        """        删除对象        https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Object.delete        @param bucket: 存储桶        @param file_key: 文件名称        @return: 当删除成功时为True;否则,False        """        try:            bucket = self.session_conn.Bucket(bucket)            obj = bucket.Object(file_key)            obj.delete()            return True        except Exception as e:            print(e.args)            ex = traceback.format_exc()            print('具体错误{}'.format(ex))            return False    def bucket_exists(self, bucket_name):        """        判断桶是否存在,是否有访问权限        @return: 当bucket存在时为True;否则,假的        """        try:            self.client_conn.head_bucket(Bucket=bucket_name)            logger.info("存储桶 {} 存在.".format(bucket_name))            exists = True        except ClientError:            logger.warning(                "存储桶 {} 不存在,或者你没有权限.".format(bucket_name))            exists = False        return exists    def get_object(self, bucket, key):        """        获取对象        @param bucket: 存储桶        @param key: 文件        @return : boolean        """        try:            self.client_conn.get_object(Bucket=bucket, Key=key)            return True        except self.client_conn.exceptions.NoSuchKey:            return False    def download_object(self, bucket, key, file_name):        """        下载对象至本地        @param file_name: 保存位置以及名称        @param bucket: 存储桶        @param key: 文件        @return : boolean        """        try:            self.client_conn.download_file(bucket, key, file_name)            return True        except Exception as e:            return e.args    def copy_obj(self, source_bucket, to_bucket, file_key):        """        复制对象        @param source_bucket: 原存储桶        @param file_key: 文件名称        @param to_bucket: 新存储桶        """        source_dict = {            'Bucket': source_bucket,            'Key': file_key        }        self.session_conn.meta.client.copy(source_dict, to_bucket, file_key)    def copy_single_obj(self, source_bucket, source_object, target_bucket, target_object, StorageClass=None):        """        单个对象复制        @param source_bucket:源存储桶        @param source_object:源对象        @param target_bucket:目标存储桶        @param target_object:目标对象        @param StorageClass:存储类        @return: None        """        s3 = self.session_conn        copy_source = {            'Bucket': source_bucket,            'Key': source_object        }        target_object = s3.Object(target_bucket, target_object)        if StorageClass:            target_object.copy_from(CopySource=copy_source, StorageClass=StorageClass)        else:            target_object.copy_from(CopySource=copy_source)    def generate_put_obj_url(self, bucket_name, obj_key, storage_class=None):        """        生成预签名对象URL        @param bucket_name: 存储桶名称        @param obj_key: 对象key        @param storage_class: 存储类 例        @return: 对象URL        """        params = {            'Bucket': bucket_name,            'Key': obj_key,        }        if storage_class:            params['StorageClass'] = storage_class        return self.session_conn.meta.client.generate_presigned_url('put_object',                                                                    Params=params,                                                                    ExpiresIn=7200)    def batch_copy_obj(self, source_bucket, target_bucket, prefix, target_prefix, storage_class=None):        """        批量拷贝对象        @param source_bucket: 源存储桶        @param target_bucket: 目标存储桶        @param prefix: 需要搜索的对象前缀 例:AUS000247LTCLY/vod1/1686043996        @param target_prefix: 目标对象前缀 例:app/algorithm-shop/1686043996        @param storage_class: 存储类        @return: None        """        s3 = self.session_conn        # 遍历源存储桶中指定前缀下的所有对象,依次进行复制操作        for obj in s3.Bucket(source_bucket).objects.filter(Prefix=prefix):            key = obj.key  # 对象键名            target_key = f'{target_prefix}/' + key.split('/')[-1]  # 新的对象键名,此处为 "new_path/" + 原有文件名            copy_source = {                'Bucket': source_bucket,                'Key': key            }            # 将对象复制到目标存储桶,并设置存储类型和新的对象键名            if storage_class:                s3.Object(target_bucket, target_key).copy_from(CopySource=copy_source, StorageClass=storage_class)            else:                s3.Object(target_bucket, target_key).copy_from(CopySource=copy_source)    def get_object_size(self,bucket_name, object_key):        """        获取存储桶中指定对象的大小        :param bucket_name: string,存储桶名称        :param object_key: string,对象键名        :return: int,指定对象的大小,单位为字节        """        s3 = self.session_conn        obj = s3.Object(bucket_name, object_key)        try:            return obj.content_length        except Exception as e:            return 0    def get_object_list(self, bucket_name, prefix):        """        获取指定路径所有对象        :param bucket_name: string,存储桶名称        :param prefix: string,路径        :return: int,指定对象的大小,单位为字节        """        try:            s3 = self.client_conn            obj = s3.list_objects_v2(Bucket=bucket_name, Prefix=prefix)            return obj['Contents']        except Exception as e:            return []
 |