|
- # -*- coding: utf-8 -*-
- """
- @Author : Rocky
- @Time : 2022/10/18 17:13
- @File :AmazonKinesisVideoUtil.py
- """
- import boto3
- import datetime
- from Ansjer.config import LOGGER
- class AmazonKinesisVideoObject:
- """
- Amazon Kinesis Video Streams对象
- api文档链接: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/kinesisvideo.html
- """
- def __init__(self, aws_access_key_id, secret_access_key, region_name):
- self.access_id = aws_access_key_id
- self.access_secret = secret_access_key
- self.region_name = region_name
- self.client_conn = boto3.client(
- 'kinesisvideo',
- aws_access_key_id=aws_access_key_id,
- aws_secret_access_key=secret_access_key,
- region_name=region_name
- )
- def create_stream(self, stream_name):
- """
- 创建视频流
- @param stream_name: 视频流名称
- @return : stream_arn or False
- """
- tags = {'project_kvs': 'kvs'}
- try:
- stream_arn = self.client_conn.create_stream(StreamName=stream_name, Tags=tags)['StreamARN']
- return stream_arn
- except Exception as e:
- print(e)
- return False
- def update_data_retention(self, stream_name, operation, data_retention_change_in_hours):
- """
- 修改视频流数据保留时间
- @param stream_name: 视频流名称
- @param operation: 增加/减少, 'INCREASE_DATA_RETENTION'|'DECREASE_DATA_RETENTION'
- @param data_retention_change_in_hours: 修改的时间
- @return : True or False
- """
- try:
- version = self.describe_stream(stream_name)['Version']
- self.client_conn.update_data_retention(StreamName=stream_name, Operation=operation, CurrentVersion=version,
- DataRetentionChangeInHours=data_retention_change_in_hours)
- return True
- except Exception as e:
- print(e)
- return False
- def describe_stream(self, stream_name):
- """
- 获取视频流信息数据
- @param stream_name: 视频流名称
- @return stream_info: 视频流信息数据
- """
- return self.client_conn.describe_stream(StreamName=stream_name)['StreamInfo']
- def get_data_endpoint(self, stream_name, api_name):
- """
- 获取指定流的终端节点以读取或写入
- @param api_name: API名称
- @param stream_name: 视频流名称
- @return stream_info: 视频流信息数据
- """
- return self.client_conn.get_data_endpoint(StreamName=stream_name, APIName=api_name)['DataEndpoint']
- def create_signaling_channel(self, channel_name):
- """
- 创建通道,如果通道已存在则直接返回其ARN
- @param channel_name: 通道名称
- @return: channel_arn: 通道ARN
- """
- try:
- # 先检查信令通道是否已存在
- response = self.client_conn.describe_signaling_channel(ChannelName=channel_name)
- # 如果通道存在,直接返回其ARN
- channel_arn = response['ChannelInfo']['ChannelARN']
- return channel_arn
- except self.client_conn.exceptions.ResourceNotFoundException:
- # 通道不存在,创建新通道
- res = self.client_conn.create_signaling_channel(ChannelName=channel_name)
- channel_arn = res['ChannelARN']
- return channel_arn
- except Exception as e:
- # 处理其他可能的异常
- LOGGER.info('创建通道异常,error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
- # 尝试创建通道
- res = self.client_conn.create_signaling_channel(ChannelName=channel_name)
- channel_arn = res['ChannelARN']
- return channel_arn
- def get_signaling_channel_endpoint(self, channel_arn):
- """
- 创建通道
- @param channel_arn: 通道arn
- @return stream_info: 视频流信息数据
- """
- res = self.client_conn.get_signaling_channel_endpoint(
- ChannelARN=channel_arn,
- SingleMasterChannelEndpointConfiguration={
- 'Protocols': ['HTTPS'],
- 'Role': 'MASTER'
- }
- )
- endpoint = res['ResourceEndpointList'][0]['ResourceEndpoint']
- return endpoint
- def delete_signaling_channel(self, channel_arn):
- """
- 删除通道
- @param channel_arn: 通道ARN
- @return: True 删除成功,False 删除失败或通道不存在
- """
- try:
- # 先检查信令通道是否存在
- self.client_conn.describe_signaling_channel(ChannelARN=channel_arn)
- # 通道存在,执行删除操作
- self.client_conn.delete_signaling_channel(ChannelARN=channel_arn)
- return True
- except self.client_conn.exceptions.ResourceNotFoundException:
- # 通道不存在,记录日志并返回
- LOGGER.info('删除通道失败:通道不存在,channel_arn: {}'.format(channel_arn))
- return False
- except Exception as e:
- # 处理其他可能的异常
- LOGGER.info('删除通道异常,error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
- return False
- class AmazonKVAMObject:
- def __init__(self, aws_access_key_id, secret_access_key, region_name, stream_name, api_name):
- self.access_id = aws_access_key_id
- self.access_secret = secret_access_key
- self.region_name = region_name
- self.kv_client_conn = AmazonKinesisVideoObject(aws_access_key_id, secret_access_key, region_name)
- self.endpoint = self.kv_client_conn.get_data_endpoint(stream_name, api_name)
- self.kvam_client_coon = boto3.client(
- 'kinesis-video-archived-media',
- endpoint_url=self.endpoint,
- aws_access_key_id=aws_access_key_id,
- aws_secret_access_key=secret_access_key,
- region_name=region_name
- )
- def get_hls_streaming_session_url(self, stream_name, start_time, end_time, play_mode):
- """
- 获取视频流数据保留时间
- @param stream_name: 视频流名称
- @param start_time: 开始时间
- @param end_time: 结束时间
- @param play_mode: 播放模式
- @return HLSStreamingSessionURL: 媒体播放器可用于检索HLS主播放列表的URL
- """
- return self.kvam_client_coon.get_hls_streaming_session_url(StreamName=stream_name,
- PlaybackMode=play_mode,
- HLSFragmentSelector={
- 'FragmentSelectorType': 'PRODUCER_TIMESTAMP',
- 'TimestampRange': {
- 'StartTimestamp': start_time,
- 'EndTimestamp': end_time
- }
- },
- ContainerFormat='FRAGMENTED_MP4',
- DiscontinuityMode='ON_DISCONTINUITY',
- DisplayFragmentTimestamp='ALWAYS',
- Expires=43200,
- MaxMediaPlaylistFragmentResults=5000)[
- 'HLSStreamingSessionURL']
- def get_list_fragments(self, stream_name, start_time, end_time):
- """
- 获取视频流片段
- @param stream_name: 视频流名称
- @param start_time: 开始时间
- @param end_time: 结束时间
- @return HLSStreamingSessionURL: 视频流片段列表信息
- """
- stream_list = []
- result = self.kvam_client_coon.list_fragments(StreamName=stream_name, FragmentSelector={
- 'FragmentSelectorType': 'PRODUCER_TIMESTAMP',
- 'TimestampRange': {
- 'StartTimestamp': start_time,
- 'EndTimestamp': end_time
- }}, MaxResults=1000)
- fragments_list = result['Fragments']
- while 'NextToken' in result:
- result = self.kvam_client_coon.list_fragments(StreamName=stream_name, NextToken=result['NextToken'])
- fragments_list.extend(result['Fragments'])
- fragments_list = sorted(fragments_list, key=lambda item: item['FragmentNumber'])
- for item in fragments_list:
- stream_list.append({'startTime': item['ProducerTimestamp'],
- 'endTime': item['ProducerTimestamp'] + datetime.timedelta(
- milliseconds=item['FragmentLengthInMilliseconds']),
- 'duration': item['FragmentLengthInMilliseconds']})
- return stream_list
- def get_images(self, stream_name, start_time, end_time):
- """
- 获取视频流片段封面图片
- @param stream_name: 视频流名称
- @param start_time: 开始时间
- @param end_time: 结束时间
- @return HLSStreamingSessionURL: 视频流片段列表信息
- """
- try:
- images_list = self.kvam_client_coon.get_images(StreamName=stream_name,
- ImageSelectorType='PRODUCER_TIMESTAMP',
- StartTimestamp=start_time,
- EndTimestamp=end_time,
- SamplingInterval=3000,
- MaxResults=100,
- Format='JPEG')['Images']
- for image in images_list:
- if 'ImageContent' in image:
- return image['ImageContent']
- except Exception as e:
- return ''
- def get_clip(self, stream_name, start_time, end_time):
- """
- 获取视频流片段封面图片
- @param stream_name: 视频流名称
- @param start_time: 开始时间
- @param end_time: 结束时间
- @return HLSStreamingSessionURL: 视频流片段列表信息
- """
- try:
- clip = self.kvam_client_coon.get_clip(StreamName=stream_name,
- ClipFragmentSelector={
- 'FragmentSelectorType': 'PRODUCER_TIMESTAMP',
- 'TimestampRange': {
- 'StartTimestamp': start_time,
- 'EndTimestamp': end_time
- }
- })
- return clip['Payload'], clip['ResponseMetadata']['HTTPHeaders']['content-length']
- except Exception as e:
- return ''
|