123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185 |
- from Ansjer.config import *
- from boto3.session import Session
- import simplejson as json
- import boto3
- from botocore.client import Config
- class S3ClassObject:
- def __init__(self, *args, **kwargs):
- self.bucket = AWS_BUCKET
- self.access_id = AWS_ACCESS_ID
- self.access_secret = AWS_ACCESS_SECRET
- self.region_name = AWS_ACCESS_REGION
- session = Session(
- aws_access_key_id=AWS_ACCESS_ID,
- aws_secret_access_key=AWS_ACCESS_SECRET,
- region_name=AWS_ACCESS_REGION
- )
- self.conn = session.client('s3')
- def put_object(self, body, key, bucket=''):
- if bucket == '':
- bucket = self.bucket
- s3_client = self.conn
- response = s3_client.put_object(
- Body=body,
- Key=key,
- Bucket=bucket,
- # Expires=datetime()
- )
- return response
- def download_file(self, file_name):
- s3_client = self.conn
- try:
- s3_client.download_file(
- self.bucket,
- file_name,
- )
- return file_name
- except:
- print('Cannot download file', file_name)
- return
- def get_all_object(self, prefix):
- paginator = self.conn.get_paginator('list_objects')
- s3_results = paginator.paginate(
- Bucket=self.bucket,
- Prefix=prefix,
- PaginationConfig={'PageSize': 1000}
- )
- bucket_object_list = []
- for page in s3_results:
- if "Contents" in page:
- for key in page["Contents"]:
- s3_file_name = key['Key'].split('/')[-1]
- bucket_object_list.append(s3_file_name)
- return bucket_object_list
- def get_prefix_obj(self, prefix,bucket):
- paginator = self.conn.get_paginator('list_objects')
- s3_results = paginator.paginate(
- Bucket=bucket,
- Prefix=prefix,
- # PaginationConfig={'PageSize': 1000}
- )
- bucket_object_list = []
- for page in s3_results:
- if "Contents" in page:
- for key in page["Contents"]:
- s3_file_name = key['Key'].split('/')[-1]
- bucket_object_list.append(s3_file_name)
- return bucket_object_list
- def delete_object(self, key):
- response = self.conn.delete_object(
- Bucket=self.bucket,
- Key=key,
- )
- return response
- def get_object(self, key):
- response = self.conn.get_object(
- Bucket=self.bucket,
- Key=key,
- )
- return response
- def del_list_object(self, keydict):
- keylist = json.loads(keydict)['keylist']
- del_list = []
- for i in keylist:
- del_list.append({'Key': i})
- response = self.conn.delete_objects(
- Bucket=self.bucket,
- Delete={'Objects': del_list}
- )
- print(response)
- return response
- def del_object_list(self, keylist):
- del_list = []
- for i in keylist:
- del_list.append({'Key': i})
- response = self.conn.delete_objects(
- Bucket=self.bucket,
- Delete={'Objects': del_list}
- )
- print(response)
- return response
- def get_download_url(self, key):
- self.conn.generate_presigned_url(
- 'get_object',
- Params={
- 'Bucket': 'www.mybucket.com',
- 'Key': key
- },
- ExpiresIn=100)
- return
- # 推mp4到s3,且设置为html5播放
- def put_mp4_object(self, body, key):
- s3_client = self.conn
- response = s3_client.put_object(
- Body=body,
- Key=key,
- Bucket=self.bucket,
- ContentType='video/mp4'
- )
- return response
- def get_generate_vod_url(self, key):
- response_url = self.conn.generate_presigned_url(
- 'get_object',
- Params={
- 'Bucket': self.bucket,
- 'Key': key
- },
- ExpiresIn=3600
- )
- return response_url
- def sign_put_object(self, key,bucket_meal):
- s3_con = boto3.client('s3',
- aws_access_key_id=AWS_ACCESS_ID,
- aws_secret_access_key=AWS_ACCESS_SECRET,
- config=Config(signature_version='s3v4'),
- region_name=AWS_ACCESS_ID,
- )
- try:
- url = s3_con.generate_presigned_url(
- 'put_object',
- Params={
- 'Bucket':bucket_meal,
- 'Key': key,
- },
- ExpiresIn=60,
- HttpMethod='PUT'
- )
- except Exception as e:
- print(repr(e))
- return False
- else:
- return url
- def sign_post_object(self, key,bucket_meal):
- s3_con = boto3.client('s3',
- aws_access_key_id=AWS_ACCESS_ID,
- aws_secret_access_key=AWS_ACCESS_SECRET,
- config=Config(signature_version='s3v4'),
- region_name=AWS_ACCESS_REGION,
- )
- try:
- data = s3_con.generate_presigned_post(
- Key=key,
- ExpiresIn=60,
- Bucket=bucket_meal
- )
- except Exception as e:
- print(repr(e))
- return False
- else:
- return data
|