| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195 | # -*- coding: utf-8 -*-"""@Author : Rocky@Time : 2022/10/18 17:13@File :AmazonKinesisVideoUtil.py"""import datetimeimport boto3class AmazonKinesisVideoObject:    """    Amazon Kinesis Video Streams对象    api文档链接: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/kinesisvideo.html    """    def __init__(self, aws_access_key_id, secret_access_key, region_name):        self.access_id = aws_access_key_id        self.access_secret = secret_access_key        self.region_name = region_name        self.client_conn = boto3.client(            'kinesisvideo',            aws_access_key_id=aws_access_key_id,            aws_secret_access_key=secret_access_key,            region_name=region_name        )    def create_stream(self, stream_name):        """        创建视频流        @param stream_name: 视频流名称        @return : stream_arn or False        """        tags = {'project_kvs': 'kvs'}        try:            stream_arn = self.client_conn.create_stream(StreamName=stream_name, Tags=tags)['StreamARN']            return stream_arn        except Exception as e:            print(e)            return False    def update_data_retention(self, stream_name, operation, data_retention_change_in_hours):        """        修改视频流数据保留时间        @param stream_name: 视频流名称        @param operation: 增加/减少, 'INCREASE_DATA_RETENTION'|'DECREASE_DATA_RETENTION'        @param data_retention_change_in_hours: 修改的时间        @return : True or False        """        try:            version = self.describe_stream(stream_name)['Version']            self.client_conn.update_data_retention(StreamName=stream_name, Operation=operation, CurrentVersion=version,                                                   DataRetentionChangeInHours=data_retention_change_in_hours)            return True        except Exception as e:            print(e)            return False    def describe_stream(self, stream_name):        """        获取视频流信息数据        @param stream_name: 视频流名称        @return stream_info: 视频流信息数据        """        return self.client_conn.describe_stream(StreamName=stream_name)['StreamInfo']    def get_data_endpoint(self, stream_name, api_name):        """        获取指定流的终端节点以读取或写入        @param api_name: API名称        @param stream_name: 视频流名称        @return stream_info: 视频流信息数据        """        return self.client_conn.get_data_endpoint(StreamName=stream_name, APIName=api_name)['DataEndpoint']    def create_signaling_channel(self, channel_name):        """        创建通道        @param channel_name: 通道名称        @return stream_info: 视频流信息数据        """        return self.client_conn.create_signaling_channel(ChannelName=channel_name)class AmazonKVAMObject:    def __init__(self, aws_access_key_id, secret_access_key, region_name, stream_name, api_name):        self.access_id = aws_access_key_id        self.access_secret = secret_access_key        self.region_name = region_name        self.kv_client_conn = AmazonKinesisVideoObject(aws_access_key_id, secret_access_key, region_name)        self.endpoint = self.kv_client_conn.get_data_endpoint(stream_name, api_name)        self.kvam_client_coon = boto3.client(            'kinesis-video-archived-media',            endpoint_url=self.endpoint,            aws_access_key_id=aws_access_key_id,            aws_secret_access_key=secret_access_key,            region_name=region_name        )    def get_hls_streaming_session_url(self, stream_name, start_time, end_time, play_mode):        """        获取视频流数据保留时间        @param stream_name: 视频流名称        @param start_time: 开始时间        @param end_time: 结束时间        @param play_mode: 播放模式        @return HLSStreamingSessionURL: 媒体播放器可用于检索HLS主播放列表的URL        """        return self.kvam_client_coon.get_hls_streaming_session_url(StreamName=stream_name,                                                                   PlaybackMode=play_mode,                                                                   HLSFragmentSelector={                                                                       'FragmentSelectorType': 'PRODUCER_TIMESTAMP',                                                                       'TimestampRange': {                                                                           'StartTimestamp': start_time,                                                                           'EndTimestamp': end_time                                                                       }                                                                   },                                                                   ContainerFormat='FRAGMENTED_MP4',                                                                   DiscontinuityMode='ON_DISCONTINUITY',                                                                   DisplayFragmentTimestamp='ALWAYS',                                                                   Expires=43200,                                                                   MaxMediaPlaylistFragmentResults=5000)[            'HLSStreamingSessionURL']    def get_list_fragments(self, stream_name, start_time, end_time):        """        获取视频流片段        @param stream_name: 视频流名称        @param start_time: 开始时间        @param end_time: 结束时间        @return HLSStreamingSessionURL: 视频流片段列表信息        """        stream_list = []        result = self.kvam_client_coon.list_fragments(StreamName=stream_name, FragmentSelector={            'FragmentSelectorType': 'PRODUCER_TIMESTAMP',            'TimestampRange': {                'StartTimestamp': start_time,                'EndTimestamp': end_time            }}, MaxResults=1000)        fragments_list = result['Fragments']        while 'NextToken' in result:            result = self.kvam_client_coon.list_fragments(StreamName=stream_name, NextToken=result['NextToken'])            fragments_list.extend(result['Fragments'])        fragments_list = sorted(fragments_list, key=lambda item: item['FragmentNumber'])        for item in fragments_list:            stream_list.append({'startTime': item['ProducerTimestamp'],                                'endTime': item['ProducerTimestamp'] + datetime.timedelta(                                    milliseconds=item['FragmentLengthInMilliseconds']),                                'duration': item['FragmentLengthInMilliseconds']})        return stream_list    def get_images(self, stream_name, start_time, end_time):        """        获取视频流片段封面图片        @param stream_name: 视频流名称        @param start_time: 开始时间        @param end_time: 结束时间        @return HLSStreamingSessionURL: 视频流片段列表信息        """        try:            images_list = self.kvam_client_coon.get_images(StreamName=stream_name,                                                           ImageSelectorType='PRODUCER_TIMESTAMP',                                                           StartTimestamp=start_time,                                                           EndTimestamp=end_time,                                                           SamplingInterval=3000,                                                           MaxResults=100,                                                           Format='JPEG')['Images']            for image in images_list:                if 'ImageContent' in image:                    return image['ImageContent']        except Exception as e:            return ''    def get_clip(self, stream_name, start_time, end_time):        """        获取视频流片段封面图片        @param stream_name: 视频流名称        @param start_time: 开始时间        @param end_time: 结束时间        @return HLSStreamingSessionURL: 视频流片段列表信息        """        try:            clip = self.kvam_client_coon.get_clip(StreamName=stream_name,                                                  ClipFragmentSelector={                                                      'FragmentSelectorType': 'PRODUCER_TIMESTAMP',                                                      'TimestampRange': {                                                          'StartTimestamp': start_time,                                                          'EndTimestamp': end_time                                                      }                                                  })            return clip['Payload'], clip['ResponseMetadata']['HTTPHeaders']['content-length']        except Exception as e:            return ''
 |