123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483 |
- # -*- encoding: utf-8 -*-
- """
- @File : AmazonS3Util.py
- @Time : 2022/8/11 14:00
- @Author : stephen
- @Email : zhangdongming@asj6.wecom.work
- @Software: PyCharm
- """
- import time
- import traceback
- import redis
- import boto3
- import botocore
- from boto3.session import Session
- from botocore import client
- from botocore.exceptions import ClientError
- import jwt
- import datetime
- SERVER_HOST = 'redis' # Docker服务名
- OAUTH_ACCESS_TOKEN_SECRET = 'a+jbgnw%@1%zy^=@dn62%'
- OAUTH_REFRESH_TOKEN_SECRET = 'r+jbgnw%@1%zy^=@dn62%'
- # access_token超时
- OAUTH_ACCESS_TOKEN_TIME = datetime.timedelta(days=30)
- # refresh_token超时
- OAUTH_REFRESH_TOKEN_TIME = datetime.timedelta(days=30)
- class AmazonS3Util:
- def __init__(self):
- self.access_id = 'AKIA2E67UIMD45Y3HL53'
- self.access_secret = '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'
- self.region_name = 'us-east-1'
- session = Session(
- aws_access_key_id=self.access_id,
- aws_secret_access_key=self.access_secret,
- region_name=self.region_name
- )
- self.client_conn = boto3.client(
- 's3',
- aws_access_key_id=self.access_id,
- aws_secret_access_key=self.access_secret,
- config=botocore.client.Config(signature_version='s3v4'),
- region_name=self.region_name
- )
- self.session_conn = session.resource('s3')
- def upload_file_obj(self, bucket, file_key, file_obj, extra_args=None):
- """
- 对象上传至S3存储桶
- https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Object.upload_file
- @param bucket: 存储桶名称-必须
- @param file_key: 需要上传文件路径+文件名称
- @param file_obj: 文件对象
- @param extra_args: 额外参数 如ACL配置
- @return: 当上传成功时为True;否则,False
- """
- try:
- session = self.session_conn
- bucket = session.Bucket(bucket)
- obj = bucket.Object(file_key)
- obj.upload_fileobj(file_obj, ExtraArgs=extra_args)
- return True
- except Exception as e:
- print(e.args)
- ex = traceback.format_exc()
- print('具体错误{}'.format(ex))
- return False
- def generate_file_obj_url(self, bucket, file_key):
- """
- 生成对象URL
- https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Client.generate_presigned_url
- @param bucket: 存储桶名称
- @param file_key: 文件名称
- @return: url
- """
- response_url = self.client_conn.generate_presigned_url(
- ClientMethod='get_object',
- Params={
- 'Bucket': bucket,
- 'Key': file_key
- }
- )
- return response_url
- def delete_obj(self, bucket, file_key):
- """
- 删除对象
- https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Object.delete
- @param bucket: 存储桶
- @param file_key: 文件名称
- @return: 当删除成功时为True;否则,False
- """
- try:
- bucket = self.session_conn.Bucket(bucket)
- obj = bucket.Object(file_key)
- obj.delete()
- return True
- except Exception as e:
- print(e.args)
- ex = traceback.format_exc()
- print('具体错误{}'.format(ex))
- return False
- def bucket_exists(self, bucket_name):
- """
- 判断桶是否存在,是否有访问权限
- @return: 当bucket存在时为True;否则,假的
- """
- try:
- self.client_conn.head_bucket(Bucket=bucket_name)
- exists = True
- except ClientError:
- exists = False
- return exists
- def get_object(self, bucket, key):
- """
- 获取对象
- @param bucket: 存储桶
- @param key: 文件
- @return : boolean
- """
- try:
- self.client_conn.get_object(Bucket=bucket, Key=key)
- return True
- except self.client_conn.exceptions.NoSuchKey:
- return False
- def download_object(self, bucket, key, file_name):
- """
- 下载对象至本地
- @param file_name: 保存位置以及名称
- @param bucket: 存储桶
- @param key: 文件
- @return : boolean
- """
- try:
- self.client_conn.download_file(bucket, key, file_name)
- return True
- except Exception as e:
- return e.args
- def copy_obj(self, source_bucket, to_bucket, file_key):
- """
- 复制对象
- @param source_bucket: 原存储桶
- @param file_key: 文件名称
- @param to_bucket: 新存储桶
- """
- source_dict = {
- 'Bucket': source_bucket,
- 'Key': file_key
- }
- self.session_conn.meta.client.copy(source_dict, to_bucket, file_key)
- def copy_single_obj(self, source_bucket, source_object, target_bucket, target_object, StorageClass=None):
- """
- 单个对象复制
- @param source_bucket:源存储桶
- @param source_object:源对象
- @param target_bucket:目标存储桶
- @param target_object:目标对象
- @param StorageClass:存储类
- @return: None
- """
- s3 = self.session_conn
- copy_source = {
- 'Bucket': source_bucket,
- 'Key': source_object
- }
- target_object = s3.Object(target_bucket, target_object)
- if StorageClass:
- target_object.copy_from(CopySource=copy_source, StorageClass=StorageClass)
- else:
- target_object.copy_from(CopySource=copy_source)
- def generate_put_obj_url(self, bucket_name, obj_key, storage_class=None):
- """
- 生成预签名对象URL
- @param bucket_name: 存储桶名称
- @param obj_key: 对象key
- @param storage_class: 存储类 例
- @return: 对象URL
- """
- params = {
- 'Bucket': bucket_name,
- 'Key': obj_key,
- }
- if storage_class:
- params['StorageClass'] = storage_class
- return self.session_conn.meta.client.generate_presigned_url('put_object',
- Params=params,
- ExpiresIn=7200)
- def batch_copy_obj(self, source_bucket, target_bucket, prefix, target_prefix, storage_class=None):
- """
- 批量拷贝对象
- @param source_bucket: 源存储桶
- @param target_bucket: 目标存储桶
- @param prefix: 需要搜索的对象前缀 例:AUS000247LTCLY/vod1/1686043996
- @param target_prefix: 目标对象前缀 例:app/algorithm-shop/1686043996
- @param storage_class: 存储类
- @return: None
- """
- s3 = self.session_conn
- # 遍历源存储桶中指定前缀下的所有对象,依次进行复制操作
- for obj in s3.Bucket(source_bucket).objects.filter(Prefix=prefix):
- key = obj.key # 对象键名
- target_key = f'{target_prefix}/' + key.split('/')[-1] # 新的对象键名,此处为 "new_path/" + 原有文件名
- copy_source = {
- 'Bucket': source_bucket,
- 'Key': key
- }
- # 将对象复制到目标存储桶,并设置存储类型和新的对象键名
- if storage_class:
- s3.Object(target_bucket, target_key).copy_from(CopySource=copy_source, StorageClass=storage_class)
- else:
- s3.Object(target_bucket, target_key).copy_from(CopySource=copy_source)
- def get_object_size(self, bucket_name, object_key):
- """
- 获取存储桶中指定对象的大小
- :param bucket_name: string,存储桶名称
- :param object_key: string,对象键名
- :return: int,指定对象的大小,单位为字节
- """
- s3 = self.session_conn
- obj = s3.Object(bucket_name, object_key)
- try:
- return obj.content_length
- except Exception as e:
- return 0
- def get_object_list(self, bucket_name, prefix):
- """
- 获取指定路径所有对象
- :param bucket_name: string,存储桶名称
- :param prefix: string,路径
- :return: int,指定对象的大小,单位为字节
- """
- try:
- s3 = self.client_conn
- obj = s3.list_objects_v2(Bucket=bucket_name, Prefix=prefix)
- return obj['Contents']
- except Exception as e:
- return []
- class RedisObject:
- def __init__(self, db=0):
- self.POOL = redis.ConnectionPool(host=SERVER_HOST, port=6379, db=db)
- self.CONN = redis.Redis(connection_pool=self.POOL)
- def set_data(self, key, val, expire=0):
- try:
- self.CONN.set(key, val)
- if expire > 0:
- self.CONN.expire(key, expire)
- except Exception as e:
- return False
- else:
- return True
- def get_data(self, key):
- try:
- val = self.CONN.get(key)
- except Exception as e:
- print(repr(e))
- return False
- else:
- if val:
- return val.decode('utf-8')
- else:
- return False
- def del_data(self, key):
- try:
- val = self.CONN.delete(key)
- except Exception as e:
- print(repr(e))
- return False
- else:
- return True
- def get_size(self):
- return self.CONN.dbsize()
- # 向列表插入数据
- def rpush(self, name, val):
- self.CONN.rpush(name, val)
- def lpop(self, name):
- val = self.CONN.lpop(name)
- if val:
- return val.decode('utf-8')
- else:
- return False
- # 获取列表长度
- def llen(self, name):
- return self.CONN.llen(name=name)
- # 获取列表所有数据
- def lrange(self, name, start, end):
- return self.CONN.lrange(name, start, end)
- # 删除列表指定数据
- def lrem(self, name, num, value):
- """
- num:列表方向,删除个数(0:所有)
- value:删除的值
- """
- return self.CONN.lrem(name, num, value)
- def get_ttl(self, key):
- ttl = self.CONN.ttl(key)
- if ttl:
- return ttl
- else:
- return 0
- def get_keys(self, key):
- keys = self.CONN.keys(key)
- if keys:
- return keys
- else:
- return False
- def set_ex_data(self, key, val, expire=0):
- try:
- self.CONN.setex(name=key, time=expire, value=val)
- except Exception as e:
- return False
- else:
- return True
- def set_hash_data(self, key, kwargs):
- self.CONN.hmset(key, kwargs)
- def get_hash_data(self, key, file):
- return self.CONN.hmget(key, file)
- def get_all_hash_data(self, key):
- return self.CONN.hgetall(key)
- class TokenObject:
- def __init__(self, token=None, returntpye='currency'):
- if token == 'local':
- token = 'eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJ1c2VySUQiOiIxNTg0MzUxODk2MjgyMTM4MDAxMzgwMDAiLCJsYW5nIjoiZW4iLCJ1c2VyIjoiMTM2ODAzMTc1OTYiLCJtX2NvZGUiOiIxMjM0MTMyNDMyMTQiLCJleHAiOjE1ODcyNzcwNjB9.c0LV_XyxwbzUlYqMJqx7vw9f19Jv-0kGnUHuu_go-mo'
- if token == 'test':
- token = 'eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJ1c2VyIjoiMTM4MDAxMzgwMDEiLCJleHAiOjE1Njk5OTg4OTYsInVzZXJJRCI6IjE1MTU2NDI2MjMzNzkzOTUxMzgwMDEzODAwMSIsImxhbmciOiJlbiIsIm1fY29kZSI6IjEyMzQxMzI0MzIxNCJ9.VAQtT9AbCCfXcrNj9DL5cvVasMDoI7AP8ptgU1GoMu8'
- self.token = token
- self.lang = None
- self.userID = None
- self.user = ''
- self.code = 0
- self.valid()
- self.returntpye = returntpye
- def valid(self):
- if self.token is None:
- self.code = 309
- return
- try:
- self.token = self.token.replace("Bearer ", "")
- res = jwt.decode(self.token, OAUTH_ACCESS_TOKEN_SECRET, algorithms='HS256')
- self.userID = res.get('userID', None)
- self.lang = res.get('lang', None)
- self.user = res.get('user', '')
- # 刷新登录时间
- # if self.userID:
- # print(self.user)
- # redisObj = RedisObject(db=3)
- # redisObj.set_data(key=self.userID, val=self.user, expire=300)
- except jwt.ExpiredSignatureError as e:
- self.code = 309
- return
- except Exception as e:
- self.code = 309
- return
- else:
- if not self.userID:
- self.code = 309
- return
- else:
- if self.userID:
- self.code = 0
- return res
- else:
- self.code = 309
- return
- def generate(self, data=None):
- if data is None:
- data = {}
- try:
- access_expire = int(OAUTH_ACCESS_TOKEN_TIME.total_seconds())
- refresh_expire = int(OAUTH_REFRESH_TOKEN_TIME.total_seconds())
- now_stamp = int(time.time())
- access_data = data
- refresh_data = data
- access_data['exp'] = access_expire + now_stamp
- refresh_data['exp'] = refresh_expire + now_stamp
- access_token = jwt.encode(access_data,
- OAUTH_ACCESS_TOKEN_SECRET,
- algorithm='HS256')
- refresh_token = jwt.encode(
- refresh_data,
- OAUTH_REFRESH_TOKEN_SECRET,
- algorithm='HS256')
- res = {
- 'access_token': access_token,
- 'access_expire': access_expire,
- 'refresh_expire': refresh_expire,
- 'refresh_token': refresh_token,
- }
- if self.returntpye == 'pc':
- res = {
- 'token': access_token,
- 'access_expire': access_expire,
- 'refresh_expire': refresh_expire,
- 'refresh_token': refresh_token,
- }
- except Exception as e:
- self.code = 309
- print(repr(e))
- else:
- self.code = 0
- return res
- def encryption(self, data=None):
- if data is None:
- data = {}
- try:
- access_expire = int(OAUTH_ACCESS_TOKEN_TIME.total_seconds())
- refresh_expire = int(OAUTH_REFRESH_TOKEN_TIME.total_seconds())
- now_stamp = int(time.time())
- access_data = data
- refresh_data = data
- access_data['exp'] = access_expire + now_stamp
- refresh_data['exp'] = refresh_expire + now_stamp
- access_token = jwt.encode(access_data,
- OAUTH_ACCESS_TOKEN_SECRET,
- algorithm='HS256')
- return access_token
- except Exception as e:
- self.code = 309
- print(repr(e))
- def refresh(self):
- if not self.token:
- self.code = 309
- return
- try:
- res = jwt.decode(self.token, OAUTH_REFRESH_TOKEN_SECRET, algorithms='HS256')
- except jwt.ExpiredSignatureError as e:
- print('过期')
- print(repr(e))
- self.code = 309
- except Exception as e:
- self.code = 309
- print(repr(e))
- else:
- self.code = 0
- userID = res.get('userID', '')
- user = res.get('user', '')
- lang = self.lang
- self.userID = userID
- self.user = user
- refreshRes = self.generate(data={'userID': userID, 'lang': lang, 'user': user})
- return refreshRes
|