# -*- coding: utf-8 -*- """ @Author : Rocky @Time : 2022/10/18 17:13 @File :AmazonKinesisVideoUtil.py """ import datetime import boto3 class AmazonKinesisVideoObject: """ Amazon Kinesis Video Streams对象 api文档链接: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/kinesisvideo.html """ def __init__(self, aws_access_key_id, secret_access_key, region_name): self.access_id = aws_access_key_id self.access_secret = secret_access_key self.region_name = region_name self.client_conn = boto3.client( 'kinesisvideo', aws_access_key_id=aws_access_key_id, aws_secret_access_key=secret_access_key, region_name=region_name ) def create_stream(self, stream_name): """ 创建视频流 @param stream_name: 视频流名称 @return : stream_arn or False """ tags = {'project_kvs': 'kvs'} try: stream_arn = self.client_conn.create_stream(StreamName=stream_name, Tags=tags)['StreamARN'] return stream_arn except Exception as e: print(e) return False def update_data_retention(self, stream_name, operation, data_retention_change_in_hours): """ 修改视频流数据保留时间 @param stream_name: 视频流名称 @param operation: 增加/减少, 'INCREASE_DATA_RETENTION'|'DECREASE_DATA_RETENTION' @param data_retention_change_in_hours: 修改的时间 @return : True or False """ try: version = self.describe_stream(stream_name)['Version'] self.client_conn.update_data_retention(StreamName=stream_name, Operation=operation, CurrentVersion=version, DataRetentionChangeInHours=data_retention_change_in_hours) return True except Exception as e: print(e) return False def describe_stream(self, stream_name): """ 获取视频流信息数据 @param stream_name: 视频流名称 @return stream_info: 视频流信息数据 """ return self.client_conn.describe_stream(StreamName=stream_name)['StreamInfo'] def get_data_endpoint(self, stream_name, api_name): """ 获取指定流的终端节点以读取或写入 @param api_name: API名称 @param stream_name: 视频流名称 @return stream_info: 视频流信息数据 """ return self.client_conn.get_data_endpoint(StreamName=stream_name, APIName=api_name)['DataEndpoint'] class AmazonKVAMObject: def __init__(self, aws_access_key_id, secret_access_key, region_name, stream_name, api_name): self.access_id = aws_access_key_id self.access_secret = secret_access_key self.region_name = region_name self.kv_client_conn = AmazonKinesisVideoObject(aws_access_key_id, secret_access_key, region_name) self.endpoint = self.kv_client_conn.get_data_endpoint(stream_name, api_name) self.kvam_client_coon = boto3.client( 'kinesis-video-archived-media', endpoint_url=self.endpoint, aws_access_key_id=aws_access_key_id, aws_secret_access_key=secret_access_key, region_name=region_name ) def get_hls_streaming_session_url(self, stream_name, start_time, end_time): """ 获取视频流数据保留时间 @param stream_name: 视频流名称 @param start_time: 开始时间 @param end_time: 结束时间 @return HLSStreamingSessionURL: 媒体播放器可用于检索HLS主播放列表的URL """ return self.kvam_client_coon.get_hls_streaming_session_url(StreamName=stream_name, PlaybackMode='ON_DEMAND', HLSFragmentSelector={ 'FragmentSelectorType': 'PRODUCER_TIMESTAMP', 'TimestampRange': { 'StartTimestamp': start_time, 'EndTimestamp': end_time } }, ContainerFormat='FRAGMENTED_MP4', DiscontinuityMode='ON_DISCONTINUITY', DisplayFragmentTimestamp='NEVER', Expires=43200, MaxMediaPlaylistFragmentResults=5000)[ 'HLSStreamingSessionURL'] def get_list_fragments(self, stream_name, start_time, end_time): """ 获取视频流片段 @param stream_name: 视频流名称 @param start_time: 开始时间 @param end_time: 结束时间 @return HLSStreamingSessionURL: 视频流片段列表信息 """ stream_list = [] result = self.kvam_client_coon.list_fragments(StreamName=stream_name, FragmentSelector={ 'FragmentSelectorType': 'PRODUCER_TIMESTAMP', 'TimestampRange': { 'StartTimestamp': start_time, 'EndTimestamp': end_time }}, MaxResults=1000) fragments_list = result['Fragments'] while 'NextToken' in result: result = self.kvam_client_coon.list_fragments(StreamName=stream_name, NextToken=result['NextToken']) fragments_list.extend(result['Fragments']) fragments_list = sorted(fragments_list, key=lambda item: item['FragmentNumber']) for item in fragments_list: stream_list.append({'startTime': item['ProducerTimestamp'], 'endTime': item['ProducerTimestamp'] + datetime.timedelta( milliseconds=item['FragmentLengthInMilliseconds']), 'duration': item['FragmentLengthInMilliseconds']}) return stream_list def get_images(self, stream_name, start_time, end_time): """ 获取视频流片段封面图片 @param stream_name: 视频流名称 @param start_time: 开始时间 @param end_time: 结束时间 @return HLSStreamingSessionURL: 视频流片段列表信息 """ try: images_list = self.kvam_client_coon.get_images(StreamName=stream_name, ImageSelectorType='PRODUCER_TIMESTAMP', StartTimestamp=start_time, EndTimestamp=end_time, SamplingInterval=3000, MaxResults=100, Format='JPEG')['Images'] for image in images_list: if 'ImageContent' in image: return image['ImageContent'] except Exception as e: return ''