# -*- encoding: utf-8 -*- """ @File : AmazonS3Util.py @Time : 2022/8/11 14:00 @Author : stephen @Email : zhangdongming@asj6.wecom.work @Software: PyCharm """ import time import traceback import redis import boto3 import botocore from boto3.session import Session from botocore import client from botocore.exceptions import ClientError import jwt import datetime SERVER_HOST = 'redis' # Docker服务名 OAUTH_ACCESS_TOKEN_SECRET = 'a+jbgnw%@1%zy^=@dn62%' OAUTH_REFRESH_TOKEN_SECRET = 'r+jbgnw%@1%zy^=@dn62%' # access_token超时 OAUTH_ACCESS_TOKEN_TIME = datetime.timedelta(days=30) # refresh_token超时 OAUTH_REFRESH_TOKEN_TIME = datetime.timedelta(days=30) class AmazonS3Util: def __init__(self): self.access_id = 'AKIA2E67UIMD45Y3HL53' self.access_secret = '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' self.region_name = 'us-east-1' session = Session( aws_access_key_id=self.access_id, aws_secret_access_key=self.access_secret, region_name=self.region_name ) self.client_conn = boto3.client( 's3', aws_access_key_id=self.access_id, aws_secret_access_key=self.access_secret, config=botocore.client.Config(signature_version='s3v4'), region_name=self.region_name ) self.session_conn = session.resource('s3') def upload_file_obj(self, bucket, file_key, file_obj, extra_args=None): """ 对象上传至S3存储桶 https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Object.upload_file @param bucket: 存储桶名称-必须 @param file_key: 需要上传文件路径+文件名称 @param file_obj: 文件对象 @param extra_args: 额外参数 如ACL配置 @return: 当上传成功时为True;否则,False """ try: session = self.session_conn bucket = session.Bucket(bucket) obj = bucket.Object(file_key) obj.upload_fileobj(file_obj, ExtraArgs=extra_args) return True except Exception as e: print(e.args) ex = traceback.format_exc() print('具体错误{}'.format(ex)) return False def generate_file_obj_url(self, bucket, file_key): """ 生成对象URL https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Client.generate_presigned_url @param bucket: 存储桶名称 @param file_key: 文件名称 @return: url """ response_url = self.client_conn.generate_presigned_url( ClientMethod='get_object', Params={ 'Bucket': bucket, 'Key': file_key } ) return response_url def delete_obj(self, bucket, file_key): """ 删除对象 https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Object.delete @param bucket: 存储桶 @param file_key: 文件名称 @return: 当删除成功时为True;否则,False """ try: bucket = self.session_conn.Bucket(bucket) obj = bucket.Object(file_key) obj.delete() return True except Exception as e: print(e.args) ex = traceback.format_exc() print('具体错误{}'.format(ex)) return False def bucket_exists(self, bucket_name): """ 判断桶是否存在,是否有访问权限 @return: 当bucket存在时为True;否则,假的 """ try: self.client_conn.head_bucket(Bucket=bucket_name) exists = True except ClientError: exists = False return exists def get_object(self, bucket, key): """ 获取对象 @param bucket: 存储桶 @param key: 文件 @return : boolean """ try: self.client_conn.get_object(Bucket=bucket, Key=key) return True except self.client_conn.exceptions.NoSuchKey: return False def download_object(self, bucket, key, file_name): """ 下载对象至本地 @param file_name: 保存位置以及名称 @param bucket: 存储桶 @param key: 文件 @return : boolean """ try: self.client_conn.download_file(bucket, key, file_name) return True except Exception as e: return e.args def copy_obj(self, source_bucket, to_bucket, file_key): """ 复制对象 @param source_bucket: 原存储桶 @param file_key: 文件名称 @param to_bucket: 新存储桶 """ source_dict = { 'Bucket': source_bucket, 'Key': file_key } self.session_conn.meta.client.copy(source_dict, to_bucket, file_key) def copy_single_obj(self, source_bucket, source_object, target_bucket, target_object, StorageClass=None): """ 单个对象复制 @param source_bucket:源存储桶 @param source_object:源对象 @param target_bucket:目标存储桶 @param target_object:目标对象 @param StorageClass:存储类 @return: None """ s3 = self.session_conn copy_source = { 'Bucket': source_bucket, 'Key': source_object } target_object = s3.Object(target_bucket, target_object) if StorageClass: target_object.copy_from(CopySource=copy_source, StorageClass=StorageClass) else: target_object.copy_from(CopySource=copy_source) def generate_put_obj_url(self, bucket_name, obj_key, storage_class=None): """ 生成预签名对象URL @param bucket_name: 存储桶名称 @param obj_key: 对象key @param storage_class: 存储类 例 @return: 对象URL """ params = { 'Bucket': bucket_name, 'Key': obj_key, } if storage_class: params['StorageClass'] = storage_class return self.session_conn.meta.client.generate_presigned_url('put_object', Params=params, ExpiresIn=7200) def batch_copy_obj(self, source_bucket, target_bucket, prefix, target_prefix, storage_class=None): """ 批量拷贝对象 @param source_bucket: 源存储桶 @param target_bucket: 目标存储桶 @param prefix: 需要搜索的对象前缀 例:AUS000247LTCLY/vod1/1686043996 @param target_prefix: 目标对象前缀 例:app/algorithm-shop/1686043996 @param storage_class: 存储类 @return: None """ s3 = self.session_conn # 遍历源存储桶中指定前缀下的所有对象,依次进行复制操作 for obj in s3.Bucket(source_bucket).objects.filter(Prefix=prefix): key = obj.key # 对象键名 target_key = f'{target_prefix}/' + key.split('/')[-1] # 新的对象键名,此处为 "new_path/" + 原有文件名 copy_source = { 'Bucket': source_bucket, 'Key': key } # 将对象复制到目标存储桶,并设置存储类型和新的对象键名 if storage_class: s3.Object(target_bucket, target_key).copy_from(CopySource=copy_source, StorageClass=storage_class) else: s3.Object(target_bucket, target_key).copy_from(CopySource=copy_source) def get_object_size(self, bucket_name, object_key): """ 获取存储桶中指定对象的大小 :param bucket_name: string,存储桶名称 :param object_key: string,对象键名 :return: int,指定对象的大小,单位为字节 """ s3 = self.session_conn obj = s3.Object(bucket_name, object_key) try: return obj.content_length except Exception as e: return 0 def get_object_list(self, bucket_name, prefix): """ 获取指定路径所有对象 :param bucket_name: string,存储桶名称 :param prefix: string,路径 :return: int,指定对象的大小,单位为字节 """ try: s3 = self.client_conn obj = s3.list_objects_v2(Bucket=bucket_name, Prefix=prefix) return obj['Contents'] except Exception as e: return [] class RedisObject: def __init__(self, db=0): self.POOL = redis.ConnectionPool(host=SERVER_HOST, port=6379, db=db) self.CONN = redis.Redis(connection_pool=self.POOL) def set_data(self, key, val, expire=0): try: self.CONN.set(key, val) if expire > 0: self.CONN.expire(key, expire) except Exception as e: return False else: return True def get_data(self, key): try: val = self.CONN.get(key) except Exception as e: print(repr(e)) return False else: if val: return val.decode('utf-8') else: return False def del_data(self, key): try: val = self.CONN.delete(key) except Exception as e: print(repr(e)) return False else: return True def get_size(self): return self.CONN.dbsize() # 向列表插入数据 def rpush(self, name, val): self.CONN.rpush(name, val) def lpop(self, name): val = self.CONN.lpop(name) if val: return val.decode('utf-8') else: return False # 获取列表长度 def llen(self, name): return self.CONN.llen(name=name) # 获取列表所有数据 def lrange(self, name, start, end): return self.CONN.lrange(name, start, end) # 删除列表指定数据 def lrem(self, name, num, value): """ num:列表方向,删除个数(0:所有) value:删除的值 """ return self.CONN.lrem(name, num, value) def get_ttl(self, key): ttl = self.CONN.ttl(key) if ttl: return ttl else: return 0 def get_keys(self, key): keys = self.CONN.keys(key) if keys: return keys else: return False def set_ex_data(self, key, val, expire=0): try: self.CONN.setex(name=key, time=expire, value=val) except Exception as e: return False else: return True def set_hash_data(self, key, kwargs): self.CONN.hmset(key, kwargs) def get_hash_data(self, key, file): return self.CONN.hmget(key, file) def get_all_hash_data(self, key): return self.CONN.hgetall(key) class TokenObject: def __init__(self, token=None, returntpye='currency'): if token == 'local': token = 'eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJ1c2VySUQiOiIxNTg0MzUxODk2MjgyMTM4MDAxMzgwMDAiLCJsYW5nIjoiZW4iLCJ1c2VyIjoiMTM2ODAzMTc1OTYiLCJtX2NvZGUiOiIxMjM0MTMyNDMyMTQiLCJleHAiOjE1ODcyNzcwNjB9.c0LV_XyxwbzUlYqMJqx7vw9f19Jv-0kGnUHuu_go-mo' if token == 'test': token = 'eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJ1c2VyIjoiMTM4MDAxMzgwMDEiLCJleHAiOjE1Njk5OTg4OTYsInVzZXJJRCI6IjE1MTU2NDI2MjMzNzkzOTUxMzgwMDEzODAwMSIsImxhbmciOiJlbiIsIm1fY29kZSI6IjEyMzQxMzI0MzIxNCJ9.VAQtT9AbCCfXcrNj9DL5cvVasMDoI7AP8ptgU1GoMu8' self.token = token self.lang = None self.userID = None self.user = '' self.code = 0 self.valid() self.returntpye = returntpye def valid(self): if self.token is None: self.code = 309 return try: self.token = self.token.replace("Bearer ", "") res = jwt.decode(self.token, OAUTH_ACCESS_TOKEN_SECRET, algorithms='HS256') self.userID = res.get('userID', None) self.lang = res.get('lang', None) self.user = res.get('user', '') # 刷新登录时间 # if self.userID: # print(self.user) # redisObj = RedisObject(db=3) # redisObj.set_data(key=self.userID, val=self.user, expire=300) except jwt.ExpiredSignatureError as e: self.code = 309 return except Exception as e: self.code = 309 return else: if not self.userID: self.code = 309 return else: if self.userID: self.code = 0 return res else: self.code = 309 return def generate(self, data=None): if data is None: data = {} try: access_expire = int(OAUTH_ACCESS_TOKEN_TIME.total_seconds()) refresh_expire = int(OAUTH_REFRESH_TOKEN_TIME.total_seconds()) now_stamp = int(time.time()) access_data = data refresh_data = data access_data['exp'] = access_expire + now_stamp refresh_data['exp'] = refresh_expire + now_stamp access_token = jwt.encode(access_data, OAUTH_ACCESS_TOKEN_SECRET, algorithm='HS256') refresh_token = jwt.encode( refresh_data, OAUTH_REFRESH_TOKEN_SECRET, algorithm='HS256') res = { 'access_token': access_token, 'access_expire': access_expire, 'refresh_expire': refresh_expire, 'refresh_token': refresh_token, } if self.returntpye == 'pc': res = { 'token': access_token, 'access_expire': access_expire, 'refresh_expire': refresh_expire, 'refresh_token': refresh_token, } except Exception as e: self.code = 309 print(repr(e)) else: self.code = 0 return res def encryption(self, data=None): if data is None: data = {} try: access_expire = int(OAUTH_ACCESS_TOKEN_TIME.total_seconds()) refresh_expire = int(OAUTH_REFRESH_TOKEN_TIME.total_seconds()) now_stamp = int(time.time()) access_data = data refresh_data = data access_data['exp'] = access_expire + now_stamp refresh_data['exp'] = refresh_expire + now_stamp access_token = jwt.encode(access_data, OAUTH_ACCESS_TOKEN_SECRET, algorithm='HS256') return access_token except Exception as e: self.code = 309 print(repr(e)) def refresh(self): if not self.token: self.code = 309 return try: res = jwt.decode(self.token, OAUTH_REFRESH_TOKEN_SECRET, algorithms='HS256') except jwt.ExpiredSignatureError as e: print('过期') print(repr(e)) self.code = 309 except Exception as e: self.code = 309 print(repr(e)) else: self.code = 0 userID = res.get('userID', '') user = res.get('user', '') lang = self.lang self.userID = userID self.user = user refreshRes = self.generate(data={'userID': userID, 'lang': lang, 'user': user}) return refreshRes