# -*- encoding: utf-8 -*- """ @File : AmazonS3Util.py @Time : 2022/8/11 14:00 @Author : stephen @Email : zhangdongming@asj6.wecom.work @Software: PyCharm """ import logging import traceback import boto3 import botocore from boto3.session import Session from botocore import client from botocore.exceptions import ClientError logger = logging.getLogger('info') class AmazonS3Util: def __init__(self, aws_access_key_id, secret_access_key, region_name): self.access_id = aws_access_key_id self.access_secret = secret_access_key self.region_name = region_name session = Session( aws_access_key_id=aws_access_key_id, aws_secret_access_key=secret_access_key, region_name=region_name ) self.client_conn = boto3.client( 's3', aws_access_key_id=aws_access_key_id, aws_secret_access_key=secret_access_key, config=botocore.client.Config(signature_version='s3v4'), region_name=region_name ) self.session_conn = session.resource('s3') def upload_file_obj(self, bucket, file_key, file_obj, extra_args=None): """ 对象上传至S3存储桶 https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Object.upload_file @param bucket: 存储桶名称-必须 @param file_key: 需要上传文件路径+文件名称 @param file_obj: 文件对象 @param extra_args: 额外参数 如ACL配置 @return: 当上传成功时为True;否则,False """ try: session = self.session_conn bucket = session.Bucket(bucket) obj = bucket.Object(file_key) obj.upload_fileobj(file_obj, ExtraArgs=extra_args) return True except Exception as e: print(e.args) ex = traceback.format_exc() print('具体错误{}'.format(ex)) return False def generate_file_obj_url(self, bucket, file_key): """ 生成对象URL https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Client.generate_presigned_url @param bucket: 存储桶名称 @param file_key: 文件名称 @return: url """ response_url = self.client_conn.generate_presigned_url( ClientMethod='get_object', Params={ 'Bucket': bucket, 'Key': file_key } ) return response_url def delete_obj(self, bucket, file_key): """ 删除对象 https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Object.delete @param bucket: 存储桶 @param file_key: 文件名称 @return: 当删除成功时为True;否则,False """ try: bucket = self.session_conn.Bucket(bucket) obj = bucket.Object(file_key) obj.delete() return True except Exception as e: print(e.args) ex = traceback.format_exc() print('具体错误{}'.format(ex)) return False def bucket_exists(self, bucket_name): """ 判断桶是否存在,是否有访问权限 @return: 当bucket存在时为True;否则,假的 """ try: self.client_conn.head_bucket(Bucket=bucket_name) logger.info("存储桶 {} 存在.".format(bucket_name)) exists = True except ClientError: logger.warning( "存储桶 {} 不存在,或者你没有权限.".format(bucket_name)) exists = False return exists def get_object(self, bucket, key): """ 获取对象 @param bucket: 存储桶 @param key: 文件 @return : boolean """ try: self.client_conn.get_object(Bucket=bucket, Key=key) return True except self.client_conn.exceptions.NoSuchKey: return False def download_object(self, bucket, key, file_name): """ 下载对象至本地 @param file_name: 保存位置以及名称 @param bucket: 存储桶 @param key: 文件 @return : boolean """ try: self.client_conn.download_file(bucket, key, file_name) return True except Exception as e: return e.args def copy_obj(self, source_bucket, to_bucket, file_key): """ 复制对象 @param source_bucket: 原存储桶 @param file_key: 文件名称 @param to_bucket: 新存储桶 """ source_dict = { 'Bucket': source_bucket, 'Key': file_key } self.session_conn.meta.client.copy(source_dict, to_bucket, file_key)