123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- @Copyright (C) ansjer cop Video Technology Co.,Ltd.All rights reserved.
- @AUTHOR: ASJRD018
- @NAME: AnsjerFormal
- @software: PyCharm
- @DATE: 2020/3/9 11:47
- @Version: python3.6
- @MODIFY DECORD:ansjer dev
- @file: s3boto3signpost.py
- @Contact: chanjunkai@163.com
- """
- # from Ansjer.config import *
- from boto3.session import Session
- import simplejson as json
- import boto3
- from botocore.client import Config
- import requests
- def upload():
- data = {'url': 'https://azvod1.s3.amazonaws.com/', 'fields': {'key': 'putkey/${filename}', 'AWSAccessKeyId': 'AKIA2E67UIMDUR3B3OOO', 'policy': 'eyJleHBpcmF0aW9uIjogIjIwMjAtMDMtMTNUMDI6NTI6MTlaIiwgImNvbmRpdGlvbnMiOiBbeyJidWNrZXQiOiAiYXp2b2QxIn0sIFsic3RhcnRzLXdpdGgiLCAiJGtleSIsICJwdXRrZXkvIl1dfQ==', 'signature': 'ZDucTfYayCVxKQ2CuCLlUY861Nw='}}
- file_path = 'D:/devcode/AnsjerFormal/Ansjer/test/oss.py'
- # files = {'file': open(file_path,'rb')}
- files = {'file': ('wwww/report.xls', open(file_path, 'rb')),'filename':'xxxooo'}
- res = requests.post(url=data['url'],data=data['fields'],files=files)
- print(res.text)
- upload()
- exit()
- AWS_ACCESS_ID = 'AKIA2E67UIMDUR3B3OOO'
- AWS_ACCESS_SECRET = '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'
- AWS_ACCESS_REGION = 'ap-northeast-1'
- AWS_BUCKET = 'azvod1'
- session = Session(
- aws_access_key_id=AWS_ACCESS_ID,
- aws_secret_access_key=AWS_ACCESS_SECRET,
- region_name=AWS_ACCESS_REGION
- )
- s3conn = session.client('s3')
- ss = s3conn.generate_presigned_post(
- Key='putkey/${filename}',
- ExpiresIn=3600,
- Bucket=AWS_BUCKET
- )
- print(ss)
- exit()
- def put_object(self, body, key, bucket=''):
- if bucket == '':
- bucket = self.bucket
- s3_client = self.conn
- response = s3_client.put_object(
- Body=body,
- Key=key,
- Bucket=bucket,
- # Expires=datetime()
- )
- return response
- def download_file(self, file_name):
- s3_client = self.conn
- try:
- s3_client.download_file(
- self.bucket,
- file_name,
- )
- return file_name
- except:
- print('Cannot download file', file_name)
- return
- def get_all_object(self, prefix):
- paginator = self.conn.get_paginator('list_objects')
- s3_results = paginator.paginate(
- Bucket=self.bucket,
- Prefix=prefix,
- PaginationConfig={'PageSize': 1000}
- )
- bucket_object_list = []
- for page in s3_results:
- if "Contents" in page:
- for key in page["Contents"]:
- s3_file_name = key['Key'].split('/')[-1]
- bucket_object_list.append(s3_file_name)
- return bucket_object_list
- def get_prefix_obj(self, prefix, bucket):
- paginator = self.conn.get_paginator('list_objects')
- s3_results = paginator.paginate(
- Bucket=bucket,
- Prefix=prefix,
- # PaginationConfig={'PageSize': 1000}
- )
- bucket_object_list = []
- for page in s3_results:
- if "Contents" in page:
- for key in page["Contents"]:
- s3_file_name = key['Key'].split('/')[-1]
- bucket_object_list.append(s3_file_name)
- return bucket_object_list
- def delete_object(self, key):
- response = self.conn.delete_object(
- Bucket=self.bucket,
- Key=key,
- )
- return response
- def get_object(self, key):
- response = self.conn.get_object(
- Bucket=self.bucket,
- Key=key,
- )
- return response
- def del_list_object(self, keydict):
- keylist = json.loads(keydict)['keylist']
- del_list = []
- for i in keylist:
- del_list.append({'Key': i})
- response = self.conn.delete_objects(
- Bucket=self.bucket,
- Delete={'Objects': del_list}
- )
- print(response)
- return response
- def del_object_list(self, keylist):
- del_list = []
- for i in keylist:
- del_list.append({'Key': i})
- response = self.conn.delete_objects(
- Bucket=self.bucket,
- Delete={'Objects': del_list}
- )
- print(response)
- return response
- def get_download_url(self, key):
- self.conn.generate_presigned_url(
- 'get_object',
- Params={
- 'Bucket': 'www.mybucket.com',
- 'Key': key
- },
- ExpiresIn=100)
- return
- # 推mp4到s3,且设置为html5播放
- def put_mp4_object(self, body, key):
- s3_client = self.conn
- response = s3_client.put_object(
- Body=body,
- Key=key,
- Bucket=self.bucket,
- ContentType='video/mp4'
- )
- return response
- def get_generate_vod_url(self, key):
- response_url = self.conn.generate_presigned_url(
- 'get_object',
- Params={
- 'Bucket': self.bucket,
- 'Key': key
- },
- ExpiresIn=3600
- )
- return response_url
- def sign_put_object(self, key, bucket_meal):
- s3_con = boto3.client('s3',
- aws_access_key_id=AWS_ACCESS_ID,
- aws_secret_access_key=AWS_ACCESS_SECRET,
- config=Config(signature_version='s3v4'),
- region_name=AWS_ACCESS_ID,
- )
- try:
- url = s3_con.generate_presigned_url(
- 'put_object',
- Params={
- 'Bucket': bucket_meal,
- 'Key': key,
- },
- ExpiresIn=60,
- HttpMethod='PUT'
- )
- except Exception as e:
- print(repr(e))
- return False
- else:
- return url
- def sign_post_object(self, key, bucket_meal):
- s3_con = boto3.client('s3',
- aws_access_key_id=AWS_ACCESS_ID,
- aws_secret_access_key=AWS_ACCESS_SECRET,
- config=Config(signature_version='s3v4'),
- region_name=AWS_ACCESS_REGION,
- )
- try:
- data = s3_con.generate_presigned_post(
- Key=key,
- ExpiresIn=60,
- Bucket=bucket_meal
- )
- except Exception as e:
- print(repr(e))
- return False
- else:
- return data
|