123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245 |
- # -*- encoding: utf-8 -*-
- """
- @File : AmazonS3Util.py
- @Time : 2022/8/11 14:00
- @Author : stephen
- @Email : zhangdongming@asj6.wecom.work
- @Software: PyCharm
- """
- import logging
- import traceback
- import boto3
- import botocore
- from boto3.session import Session
- from botocore import client
- from botocore.exceptions import ClientError
- logger = logging.getLogger('info')
- class AmazonS3Util:
- def __init__(self, aws_access_key_id, secret_access_key, region_name):
- self.access_id = aws_access_key_id
- self.access_secret = secret_access_key
- self.region_name = region_name
- session = Session(
- aws_access_key_id=aws_access_key_id,
- aws_secret_access_key=secret_access_key,
- region_name=region_name
- )
- self.client_conn = boto3.client(
- 's3',
- aws_access_key_id=aws_access_key_id,
- aws_secret_access_key=secret_access_key,
- config=botocore.client.Config(signature_version='s3v4'),
- region_name=region_name
- )
- self.session_conn = session.resource('s3')
- def upload_file_obj(self, bucket, file_key, file_obj, extra_args=None):
- """
- 对象上传至S3存储桶
- https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Object.upload_file
- @param bucket: 存储桶名称-必须
- @param file_key: 需要上传文件路径+文件名称
- @param file_obj: 文件对象
- @param extra_args: 额外参数 如ACL配置
- @return: 当上传成功时为True;否则,False
- """
- try:
- session = self.session_conn
- bucket = session.Bucket(bucket)
- obj = bucket.Object(file_key)
- obj.upload_fileobj(file_obj, ExtraArgs=extra_args)
- return True
- except Exception as e:
- print(e.args)
- ex = traceback.format_exc()
- print('具体错误{}'.format(ex))
- return False
- def generate_file_obj_url(self, bucket, file_key):
- """
- 生成对象URL
- https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Client.generate_presigned_url
- @param bucket: 存储桶名称
- @param file_key: 文件名称
- @return: url
- """
- response_url = self.client_conn.generate_presigned_url(
- ClientMethod='get_object',
- Params={
- 'Bucket': bucket,
- 'Key': file_key
- }
- )
- return response_url
- def delete_obj(self, bucket, file_key):
- """
- 删除对象
- https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Object.delete
- @param bucket: 存储桶
- @param file_key: 文件名称
- @return: 当删除成功时为True;否则,False
- """
- try:
- bucket = self.session_conn.Bucket(bucket)
- obj = bucket.Object(file_key)
- obj.delete()
- return True
- except Exception as e:
- print(e.args)
- ex = traceback.format_exc()
- print('具体错误{}'.format(ex))
- return False
- def bucket_exists(self, bucket_name):
- """
- 判断桶是否存在,是否有访问权限
- @return: 当bucket存在时为True;否则,假的
- """
- try:
- self.client_conn.head_bucket(Bucket=bucket_name)
- logger.info("存储桶 {} 存在.".format(bucket_name))
- exists = True
- except ClientError:
- logger.warning(
- "存储桶 {} 不存在,或者你没有权限.".format(bucket_name))
- exists = False
- return exists
- def get_object(self, bucket, key):
- """
- 获取对象
- @param bucket: 存储桶
- @param key: 文件
- @return : boolean
- """
- try:
- self.client_conn.get_object(Bucket=bucket, Key=key)
- return True
- except self.client_conn.exceptions.NoSuchKey:
- return False
- def download_object(self, bucket, key, file_name):
- """
- 下载对象至本地
- @param file_name: 保存位置以及名称
- @param bucket: 存储桶
- @param key: 文件
- @return : boolean
- """
- try:
- self.client_conn.download_file(bucket, key, file_name)
- return True
- except Exception as e:
- return e.args
- def copy_obj(self, source_bucket, to_bucket, file_key):
- """
- 复制对象
- @param source_bucket: 原存储桶
- @param file_key: 文件名称
- @param to_bucket: 新存储桶
- """
- source_dict = {
- 'Bucket': source_bucket,
- 'Key': file_key
- }
- self.session_conn.meta.client.copy(source_dict, to_bucket, file_key)
- def copy_single_obj(self, source_bucket, source_object, target_bucket, target_object, StorageClass=None):
- """
- 单个对象复制
- @param source_bucket:源存储桶
- @param source_object:源对象
- @param target_bucket:目标存储桶
- @param target_object:目标对象
- @param StorageClass:存储类
- @return: None
- """
- s3 = self.session_conn
- copy_source = {
- 'Bucket': source_bucket,
- 'Key': source_object
- }
- target_object = s3.Object(target_bucket, target_object)
- if StorageClass:
- target_object.copy_from(CopySource=copy_source, StorageClass=StorageClass)
- else:
- target_object.copy_from(CopySource=copy_source)
- def generate_put_obj_url(self, bucket_name, obj_key, storage_class=None):
- """
- 生成预签名对象URL
- @param bucket_name: 存储桶名称
- @param obj_key: 对象key
- @param storage_class: 存储类 例
- @return: 对象URL
- """
- params = {
- 'Bucket': bucket_name,
- 'Key': obj_key,
- }
- if storage_class:
- params['StorageClass'] = storage_class
- return self.session_conn.meta.client.generate_presigned_url('put_object',
- Params=params,
- ExpiresIn=7200)
- def batch_copy_obj(self, source_bucket, target_bucket, prefix, target_prefix, storage_class=None):
- """
- 批量拷贝对象
- @param source_bucket: 源存储桶
- @param target_bucket: 目标存储桶
- @param prefix: 需要搜索的对象前缀 例:AUS000247LTCLY/vod1/1686043996
- @param target_prefix: 目标对象前缀 例:app/algorithm-shop/1686043996
- @param storage_class: 存储类
- @return: None
- """
- s3 = self.session_conn
- # 遍历源存储桶中指定前缀下的所有对象,依次进行复制操作
- for obj in s3.Bucket(source_bucket).objects.filter(Prefix=prefix):
- key = obj.key # 对象键名
- target_key = f'{target_prefix}/' + key.split('/')[-1] # 新的对象键名,此处为 "new_path/" + 原有文件名
- copy_source = {
- 'Bucket': source_bucket,
- 'Key': key
- }
- # 将对象复制到目标存储桶,并设置存储类型和新的对象键名
- if storage_class:
- s3.Object(target_bucket, target_key).copy_from(CopySource=copy_source, StorageClass=storage_class)
- else:
- s3.Object(target_bucket, target_key).copy_from(CopySource=copy_source)
- def get_object_size(self,bucket_name, object_key):
- """
- 获取存储桶中指定对象的大小
- :param bucket_name: string,存储桶名称
- :param object_key: string,对象键名
- :return: int,指定对象的大小,单位为字节
- """
- s3 = self.session_conn
- obj = s3.Object(bucket_name, object_key)
- try:
- return obj.content_length
- except Exception as e:
- return 0
- def get_object_list(self, bucket_name, prefix):
- """
- 获取指定路径所有对象
- :param bucket_name: string,存储桶名称
- :param prefix: string,路径
- :return: int,指定对象的大小,单位为字节
- """
- try:
- s3 = self.client_conn
- obj = s3.list_objects_v2(Bucket=bucket_name, Prefix=prefix)
- return obj['Contents']
- except Exception as e:
- return []
|