# -*- coding: utf-8 -*- import struct import boto3 from boto3.session import Session from datetime import datetime import time from requests_aws4auth import AWS4Auth import requests import chunk from botocore.client import Config print(datetime(2015, 1, 1)) print(time.time()) # exit() aws_key = 'AKIAJYSIOA24FQANOFTA' aws_secret = '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' # aws_key = 'AKIAIK7LP7TRWPFTRWVA' # aws_secret = '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' session = Session( aws_access_key_id=aws_key, aws_secret_access_key=aws_secret, region_name='us-east-1', ) ''' s3 ''' s3_con = boto3.client( 's3', aws_access_key_id=aws_key, aws_secret_access_key=aws_secret, config=Config(signature_version='s3v4'), region_name='us-east-1' ) url = s3_con.generate_presigned_post( Key='img.jpg', ExpiresIn=7200, Bucket='ansjertest' ) print(url) exit() url = s3_con.generate_presigned_url( 'put_object', Params={ 'Bucket': 'ansjertest', 'Key': 'img.jpg', # 'ContentType': 'image/jpg' }, ExpiresIn=3600, HttpMethod='PUT' ) print(url) exit() S3_client = session.client('s3') response = S3_client.generate_presigned_url( 'put_object', Params={ 'Bucket': 'ansjertest', 'Key': 'img.jpg', # 'ContentType': 'image/jpg' }, ExpiresIn=3600, # HttpMethod='PUT' ) print(response) exit() S3_client = session.client('s3') response = S3_client.generate_presigned_url( 'get_object', Params={ 'Bucket': 'ansjertest', 'Key': '2N1K3LE78TYJ38CE111A_3/2N1K3LE78TYJ38CE111A_3-1524557834.mp4' }, ExpiresIn=3600 ) print(response) exit() kinesis_client = session.client('kinesisvideo') response = kinesis_client.get_data_endpoint( StreamName='demo-stream', APIName='GET_MEDIA_FOR_FRAGMENT_LIST' ) endpoint = response['DataEndpoint'] + '/getMediaForFragmentList' print(endpoint) data = { "Fragments": ["91343852333181432407537343081996969589651220552"], "StreamName": "demo-stream" } auth = AWS4Auth(aws_key, aws_secret, 'us-east-1', 'kinesisvideo') headers = {'Content-type': 'application/json'} response = requests.post(endpoint, json=data, auth=auth, headers=headers) # test=chunk.Chunk(response.text.encode('utf8')) print(response.headers) exit() stream_bytes = response.text.encode('utf8') print(response.text.encode('utf8')) print('3333') exit() # url = "网络zip的地址" # path = "你本地的地址" # # req = urllib2.urlopen(url) # data = req.read() # with open(path, "wb") as zip: # zip.write(data) # req.close() # ##----------------------------------------------------------------------------------------- exit() response_describe_stream = kinesis_client.describe_stream( StreamName='demo-stream', ) print(response_describe_stream) # exit() response = kinesis_client.get_data_endpoint( StreamName='demo-stream', APIName='GET_MEDIA' ) print(response) # exit() print('-----------') print(response['DataEndpoint']) kinesis_video_media_client = session.client('kinesis-video-media') response_media = kinesis_video_media_client.get_media( StreamName='demo-stream', StartSelector={ 'StartSelectorType': 'EARLIEST ', } )