| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185 | from Ansjer.config import *from boto3.session import Sessionimport simplejson as jsonimport boto3from botocore.client import Configclass S3ClassObject:    def __init__(self, *args, **kwargs):        self.bucket = AWS_BUCKET        self.access_id = AWS_ACCESS_ID        self.access_secret = AWS_ACCESS_SECRET        self.region_name = AWS_ACCESS_REGION        session = Session(            aws_access_key_id=AWS_ACCESS_ID,            aws_secret_access_key=AWS_ACCESS_SECRET,            region_name=AWS_ACCESS_REGION        )        self.conn = session.client('s3')    def put_object(self, body, key, bucket=''):        if bucket == '':            bucket = self.bucket        s3_client = self.conn        response = s3_client.put_object(            Body=body,            Key=key,            Bucket=bucket,            # Expires=datetime()        )        return response    def download_file(self, file_name):        s3_client = self.conn        try:            s3_client.download_file(                self.bucket,                file_name,            )            return file_name        except:            print('Cannot download file', file_name)            return    def get_all_object(self, prefix):        paginator = self.conn.get_paginator('list_objects')        s3_results = paginator.paginate(            Bucket=self.bucket,            Prefix=prefix,            PaginationConfig={'PageSize': 1000}        )        bucket_object_list = []        for page in s3_results:            if "Contents" in page:                for key in page["Contents"]:                    s3_file_name = key['Key'].split('/')[-1]                    bucket_object_list.append(s3_file_name)        return bucket_object_list    def get_prefix_obj(self, prefix,bucket):        paginator = self.conn.get_paginator('list_objects')        s3_results = paginator.paginate(            Bucket=bucket,            Prefix=prefix,            # PaginationConfig={'PageSize': 1000}        )        bucket_object_list = []        for page in s3_results:            if "Contents" in page:                for key in page["Contents"]:                    s3_file_name = key['Key'].split('/')[-1]                    bucket_object_list.append(s3_file_name)        return bucket_object_list    def delete_object(self, key):        response = self.conn.delete_object(            Bucket=self.bucket,            Key=key,        )        return response    def get_object(self, key):        response = self.conn.get_object(            Bucket=self.bucket,            Key=key,        )        return response    def del_list_object(self, keydict):        keylist = json.loads(keydict)['keylist']        del_list = []        for i in keylist:            del_list.append({'Key': i})        response = self.conn.delete_objects(            Bucket=self.bucket,            Delete={'Objects': del_list}        )        print(response)        return response    def del_object_list(self, keylist):        del_list = []        for i in keylist:            del_list.append({'Key': i})        response = self.conn.delete_objects(            Bucket=self.bucket,            Delete={'Objects': del_list}        )        print(response)        return response    def get_download_url(self, key):        self.conn.generate_presigned_url(            'get_object',            Params={                'Bucket': 'www.mybucket.com',                'Key': key            },            ExpiresIn=100)        return    # 推mp4到s3,且设置为html5播放    def put_mp4_object(self, body, key):        s3_client = self.conn        response = s3_client.put_object(            Body=body,            Key=key,            Bucket=self.bucket,            ContentType='video/mp4'        )        return response    def get_generate_vod_url(self, key):        response_url = self.conn.generate_presigned_url(            'get_object',            Params={                'Bucket': self.bucket,                'Key': key            },            ExpiresIn=3600        )        return response_url    def sign_put_object(self, key,bucket_meal):        s3_con = boto3.client('s3',            aws_access_key_id=AWS_ACCESS_ID,            aws_secret_access_key=AWS_ACCESS_SECRET,            config=Config(signature_version='s3v4'),            region_name=AWS_ACCESS_ID,        )        try:            url = s3_con.generate_presigned_url(                'put_object',                Params={                    'Bucket':bucket_meal,                    'Key': key,                },                ExpiresIn=60,                HttpMethod='PUT'            )        except Exception as e:            print(repr(e))            return False        else:            return url    def sign_post_object(self, key,bucket_meal):        s3_con = boto3.client('s3',            aws_access_key_id=AWS_ACCESS_ID,            aws_secret_access_key=AWS_ACCESS_SECRET,            config=Config(signature_version='s3v4'),            region_name=AWS_ACCESS_REGION,        )        try:            data = s3_con.generate_presigned_post(                Key=key,                ExpiresIn=60,                Bucket=bucket_meal            )        except Exception as e:            print(repr(e))            return False        else:            return data
 |