from Ansjer.config import * from boto3.session import Session import simplejson as json import boto3 from botocore.client import Config class S3ClassObject: def __init__(self, *args, **kwargs): self.bucket = AWS_BUCKET self.access_id = AWS_ACCESS_ID self.access_secret = AWS_ACCESS_SECRET self.region_name = AWS_ACCESS_REGION session = Session( aws_access_key_id=AWS_ACCESS_ID, aws_secret_access_key=AWS_ACCESS_SECRET, region_name=AWS_ACCESS_REGION ) self.conn = session.client('s3') def put_object(self, body, key, bucket=''): if bucket == '': bucket = self.bucket s3_client = self.conn response = s3_client.put_object( Body=body, Key=key, Bucket=bucket, # Expires=datetime() ) return response def download_file(self, file_name): s3_client = self.conn try: s3_client.download_file( self.bucket, file_name, ) return file_name except: print('Cannot download file', file_name) return def get_all_object(self, prefix): paginator = self.conn.get_paginator('list_objects') s3_results = paginator.paginate( Bucket=self.bucket, Prefix=prefix, PaginationConfig={'PageSize': 1000} ) bucket_object_list = [] for page in s3_results: if "Contents" in page: for key in page["Contents"]: s3_file_name = key['Key'].split('/')[-1] bucket_object_list.append(s3_file_name) return bucket_object_list def get_prefix_obj(self, prefix,bucket): paginator = self.conn.get_paginator('list_objects') s3_results = paginator.paginate( Bucket=bucket, Prefix=prefix, # PaginationConfig={'PageSize': 1000} ) bucket_object_list = [] for page in s3_results: if "Contents" in page: for key in page["Contents"]: s3_file_name = key['Key'].split('/')[-1] bucket_object_list.append(s3_file_name) return bucket_object_list def delete_object(self, key): response = self.conn.delete_object( Bucket=self.bucket, Key=key, ) return response def get_object(self, key): response = self.conn.get_object( Bucket=self.bucket, Key=key, ) return response def del_list_object(self, keydict): keylist = json.loads(keydict)['keylist'] del_list = [] for i in keylist: del_list.append({'Key': i}) response = self.conn.delete_objects( Bucket=self.bucket, Delete={'Objects': del_list} ) print(response) return response def del_object_list(self, keylist): del_list = [] for i in keylist: del_list.append({'Key': i}) response = self.conn.delete_objects( Bucket=self.bucket, Delete={'Objects': del_list} ) print(response) return response def get_download_url(self, key): self.conn.generate_presigned_url( 'get_object', Params={ 'Bucket': 'www.mybucket.com', 'Key': key }, ExpiresIn=100) return # 推mp4到s3,且设置为html5播放 def put_mp4_object(self, body, key): s3_client = self.conn response = s3_client.put_object( Body=body, Key=key, Bucket=self.bucket, ContentType='video/mp4' ) return response def get_generate_vod_url(self, key): response_url = self.conn.generate_presigned_url( 'get_object', Params={ 'Bucket': self.bucket, 'Key': key }, ExpiresIn=3600 ) return response_url def sign_put_object(self, key,bucket_meal): s3_con = boto3.client('s3', aws_access_key_id=AWS_ACCESS_ID, aws_secret_access_key=AWS_ACCESS_SECRET, config=Config(signature_version='s3v4'), region_name=AWS_ACCESS_ID, ) try: url = s3_con.generate_presigned_url( 'put_object', Params={ 'Bucket':bucket_meal, 'Key': key, }, ExpiresIn=60, HttpMethod='PUT' ) except Exception as e: print(repr(e)) return False else: return url def sign_post_object(self, key,bucket_meal): s3_con = boto3.client('s3', aws_access_key_id=AWS_ACCESS_ID, aws_secret_access_key=AWS_ACCESS_SECRET, config=Config(signature_version='s3v4'), region_name=AWS_ACCESS_REGION, ) try: data = s3_con.generate_presigned_post( Key=key, ExpiresIn=60, Bucket=bucket_meal ) except Exception as e: print(repr(e)) return False else: return data