# -*- coding: utf-8 -*- """ @Author : Rocky @Time : 2022/10/18 17:13 @File :AmazonKinesisVideoUtil.py """ import boto3 class AmazonKinesisVideoObject: """ Amazon Kinesis Video Streams对象 api文档链接: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/kinesisvideo.html """ def __init__(self, aws_access_key_id, secret_access_key, region_name): self.access_id = aws_access_key_id self.access_secret = secret_access_key self.region_name = region_name self.client_conn = boto3.client( 'kinesisvideo', aws_access_key_id=aws_access_key_id, aws_secret_access_key=secret_access_key, region_name=region_name ) def create_stream(self, stream_name): """ 创建视频流 @param stream_name: 视频流名称 @return : stream_arn or False """ tags = {'project_kvs': 'kvs'} try: stream_arn = self.client_conn.create_stream(StreamName=stream_name, Tags=tags)['StreamARN'] return stream_arn except Exception as e: print(e) return False def update_data_retention(self, stream_name, operation, data_retention_change_in_hours): """ 修改视频流数据保留时间 @param stream_name: 视频流名称 @param operation: 增加/减少, 'INCREASE_DATA_RETENTION'|'DECREASE_DATA_RETENTION' @param data_retention_change_in_hours: 修改的时间 @return : True or False """ try: version = self.describe_stream(stream_name)['Version'] self.client_conn.update_data_retention(StreamName=stream_name, Operation=operation, CurrentVersion=version, DataRetentionChangeInHours=data_retention_change_in_hours) return True except Exception as e: print(e) return False def describe_stream(self, stream_name): """ 获取视频流信息数据 @param stream_name: 视频流名称 @return stream_info: 视频流信息数据 """ return self.client_conn.describe_stream(StreamName=stream_name)['StreamInfo'] def get_data_endpoint(self, stream_name, api_name): """ 获取指定流的终端节点以读取或写入 @param api_name: API名称 @param stream_name: 视频流名称 @return stream_info: 视频流信息数据 """ return self.client_conn.get_data_endpoint(StreamName=stream_name, APIName=api_name)['DataEndpoint']