#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ @Copyright (C) ansjer cop Video Technology Co.,Ltd.All rights reserved. @AUTHOR: ASJRD018 @NAME: AnsjerFormal @software: PyCharm @DATE: 2020/3/9 11:47 @Version: python3.6 @MODIFY DECORD:ansjer dev @file: s3boto3signpost.py @Contact: chanjunkai@163.com """ # from Ansjer.config import * from boto3.session import Session import simplejson as json import boto3 from botocore.client import Config import requests def upload(): data = {'url': 'https://azvod1.s3.amazonaws.com/', 'fields': {'key': 'putkey/${filename}', 'AWSAccessKeyId': 'AKIA2E67UIMDUR3B3OOO', 'policy': 'eyJleHBpcmF0aW9uIjogIjIwMjAtMDMtMTNUMDI6NTI6MTlaIiwgImNvbmRpdGlvbnMiOiBbeyJidWNrZXQiOiAiYXp2b2QxIn0sIFsic3RhcnRzLXdpdGgiLCAiJGtleSIsICJwdXRrZXkvIl1dfQ==', 'signature': 'ZDucTfYayCVxKQ2CuCLlUY861Nw='}} file_path = 'D:/devcode/AnsjerFormal/Ansjer/test/oss.py' # files = {'file': open(file_path,'rb')} files = {'file': ('wwww/report.xls', open(file_path, 'rb')),'filename':'xxxooo'} res = requests.post(url=data['url'],data=data['fields'],files=files) print(res.text) upload() exit() AWS_ACCESS_ID = 'AKIA2E67UIMDUR3B3OOO' AWS_ACCESS_SECRET = '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' AWS_ACCESS_REGION = 'ap-northeast-1' AWS_BUCKET = 'azvod1' session = Session( aws_access_key_id=AWS_ACCESS_ID, aws_secret_access_key=AWS_ACCESS_SECRET, region_name=AWS_ACCESS_REGION ) s3conn = session.client('s3') ss = s3conn.generate_presigned_post( Key='putkey/${filename}', ExpiresIn=3600, Bucket=AWS_BUCKET ) print(ss) exit() def put_object(self, body, key, bucket=''): if bucket == '': bucket = self.bucket s3_client = self.conn response = s3_client.put_object( Body=body, Key=key, Bucket=bucket, # Expires=datetime() ) return response def download_file(self, file_name): s3_client = self.conn try: s3_client.download_file( self.bucket, file_name, ) return file_name except: print('Cannot download file', file_name) return def get_all_object(self, prefix): paginator = self.conn.get_paginator('list_objects') s3_results = paginator.paginate( Bucket=self.bucket, Prefix=prefix, PaginationConfig={'PageSize': 1000} ) bucket_object_list = [] for page in s3_results: if "Contents" in page: for key in page["Contents"]: s3_file_name = key['Key'].split('/')[-1] bucket_object_list.append(s3_file_name) return bucket_object_list def get_prefix_obj(self, prefix, bucket): paginator = self.conn.get_paginator('list_objects') s3_results = paginator.paginate( Bucket=bucket, Prefix=prefix, # PaginationConfig={'PageSize': 1000} ) bucket_object_list = [] for page in s3_results: if "Contents" in page: for key in page["Contents"]: s3_file_name = key['Key'].split('/')[-1] bucket_object_list.append(s3_file_name) return bucket_object_list def delete_object(self, key): response = self.conn.delete_object( Bucket=self.bucket, Key=key, ) return response def get_object(self, key): response = self.conn.get_object( Bucket=self.bucket, Key=key, ) return response def del_list_object(self, keydict): keylist = json.loads(keydict)['keylist'] del_list = [] for i in keylist: del_list.append({'Key': i}) response = self.conn.delete_objects( Bucket=self.bucket, Delete={'Objects': del_list} ) print(response) return response def del_object_list(self, keylist): del_list = [] for i in keylist: del_list.append({'Key': i}) response = self.conn.delete_objects( Bucket=self.bucket, Delete={'Objects': del_list} ) print(response) return response def get_download_url(self, key): self.conn.generate_presigned_url( 'get_object', Params={ 'Bucket': 'www.mybucket.com', 'Key': key }, ExpiresIn=100) return # 推mp4到s3,且设置为html5播放 def put_mp4_object(self, body, key): s3_client = self.conn response = s3_client.put_object( Body=body, Key=key, Bucket=self.bucket, ContentType='video/mp4' ) return response def get_generate_vod_url(self, key): response_url = self.conn.generate_presigned_url( 'get_object', Params={ 'Bucket': self.bucket, 'Key': key }, ExpiresIn=3600 ) return response_url def sign_put_object(self, key, bucket_meal): s3_con = boto3.client('s3', aws_access_key_id=AWS_ACCESS_ID, aws_secret_access_key=AWS_ACCESS_SECRET, config=Config(signature_version='s3v4'), region_name=AWS_ACCESS_ID, ) try: url = s3_con.generate_presigned_url( 'put_object', Params={ 'Bucket': bucket_meal, 'Key': key, }, ExpiresIn=60, HttpMethod='PUT' ) except Exception as e: print(repr(e)) return False else: return url def sign_post_object(self, key, bucket_meal): s3_con = boto3.client('s3', aws_access_key_id=AWS_ACCESS_ID, aws_secret_access_key=AWS_ACCESS_SECRET, config=Config(signature_version='s3v4'), region_name=AWS_ACCESS_REGION, ) try: data = s3_con.generate_presigned_post( Key=key, ExpiresIn=60, Bucket=bucket_meal ) except Exception as e: print(repr(e)) return False else: return data