AmazonKinesisVideoUtil.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249
  1. # -*- coding: utf-8 -*-
  2. """
  3. @Author : Rocky
  4. @Time : 2022/10/18 17:13
  5. @File :AmazonKinesisVideoUtil.py
  6. """
  7. import boto3
  8. import datetime
  9. from Ansjer.config import LOGGER
  10. class AmazonKinesisVideoObject:
  11. """
  12. Amazon Kinesis Video Streams对象
  13. api文档链接: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/kinesisvideo.html
  14. """
  15. def __init__(self, aws_access_key_id, secret_access_key, region_name):
  16. self.access_id = aws_access_key_id
  17. self.access_secret = secret_access_key
  18. self.region_name = region_name
  19. self.client_conn = boto3.client(
  20. 'kinesisvideo',
  21. aws_access_key_id=aws_access_key_id,
  22. aws_secret_access_key=secret_access_key,
  23. region_name=region_name
  24. )
  25. def create_stream(self, stream_name):
  26. """
  27. 创建视频流
  28. @param stream_name: 视频流名称
  29. @return : stream_arn or False
  30. """
  31. tags = {'project_kvs': 'kvs'}
  32. try:
  33. stream_arn = self.client_conn.create_stream(StreamName=stream_name, Tags=tags)['StreamARN']
  34. return stream_arn
  35. except Exception as e:
  36. print(e)
  37. return False
  38. def update_data_retention(self, stream_name, operation, data_retention_change_in_hours):
  39. """
  40. 修改视频流数据保留时间
  41. @param stream_name: 视频流名称
  42. @param operation: 增加/减少, 'INCREASE_DATA_RETENTION'|'DECREASE_DATA_RETENTION'
  43. @param data_retention_change_in_hours: 修改的时间
  44. @return : True or False
  45. """
  46. try:
  47. version = self.describe_stream(stream_name)['Version']
  48. self.client_conn.update_data_retention(StreamName=stream_name, Operation=operation, CurrentVersion=version,
  49. DataRetentionChangeInHours=data_retention_change_in_hours)
  50. return True
  51. except Exception as e:
  52. print(e)
  53. return False
  54. def describe_stream(self, stream_name):
  55. """
  56. 获取视频流信息数据
  57. @param stream_name: 视频流名称
  58. @return stream_info: 视频流信息数据
  59. """
  60. return self.client_conn.describe_stream(StreamName=stream_name)['StreamInfo']
  61. def get_data_endpoint(self, stream_name, api_name):
  62. """
  63. 获取指定流的终端节点以读取或写入
  64. @param api_name: API名称
  65. @param stream_name: 视频流名称
  66. @return stream_info: 视频流信息数据
  67. """
  68. return self.client_conn.get_data_endpoint(StreamName=stream_name, APIName=api_name)['DataEndpoint']
  69. def create_signaling_channel(self, channel_name):
  70. """
  71. 创建通道,如果通道已存在则直接返回其ARN
  72. @param channel_name: 通道名称
  73. @return: channel_arn: 通道ARN
  74. """
  75. try:
  76. # 先检查信令通道是否已存在
  77. response = self.client_conn.describe_signaling_channel(ChannelName=channel_name)
  78. # 如果通道存在,直接返回其ARN
  79. channel_arn = response['ChannelInfo']['ChannelARN']
  80. return channel_arn
  81. except self.client_conn.exceptions.ResourceNotFoundException:
  82. # 通道不存在,创建新通道
  83. res = self.client_conn.create_signaling_channel(ChannelName=channel_name)
  84. channel_arn = res['ChannelARN']
  85. return channel_arn
  86. except Exception as e:
  87. # 处理其他可能的异常
  88. LOGGER.info('创建通道异常,error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
  89. # 尝试创建通道
  90. res = self.client_conn.create_signaling_channel(ChannelName=channel_name)
  91. channel_arn = res['ChannelARN']
  92. return channel_arn
  93. def get_signaling_channel_endpoint(self, channel_arn):
  94. """
  95. 创建通道
  96. @param channel_arn: 通道arn
  97. @return stream_info: 视频流信息数据
  98. """
  99. res = self.client_conn.get_signaling_channel_endpoint(
  100. ChannelARN=channel_arn,
  101. SingleMasterChannelEndpointConfiguration={
  102. 'Protocols': ['HTTPS'],
  103. 'Role': 'MASTER'
  104. }
  105. )
  106. endpoint = res['ResourceEndpointList'][0]['ResourceEndpoint']
  107. return endpoint
  108. def delete_signaling_channel(self, channel_arn):
  109. """
  110. 删除通道
  111. @param channel_arn: 通道ARN
  112. @return: True 删除成功,False 删除失败或通道不存在
  113. """
  114. try:
  115. # 先检查信令通道是否存在
  116. self.client_conn.describe_signaling_channel(ChannelARN=channel_arn)
  117. # 通道存在,执行删除操作
  118. self.client_conn.delete_signaling_channel(ChannelARN=channel_arn)
  119. return True
  120. except self.client_conn.exceptions.ResourceNotFoundException:
  121. # 通道不存在,记录日志并返回
  122. LOGGER.info('删除通道失败:通道不存在,channel_arn: {}'.format(channel_arn))
  123. return False
  124. except Exception as e:
  125. # 处理其他可能的异常
  126. LOGGER.info('删除通道异常,error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
  127. return False
  128. class AmazonKVAMObject:
  129. def __init__(self, aws_access_key_id, secret_access_key, region_name, stream_name, api_name):
  130. self.access_id = aws_access_key_id
  131. self.access_secret = secret_access_key
  132. self.region_name = region_name
  133. self.kv_client_conn = AmazonKinesisVideoObject(aws_access_key_id, secret_access_key, region_name)
  134. self.endpoint = self.kv_client_conn.get_data_endpoint(stream_name, api_name)
  135. self.kvam_client_coon = boto3.client(
  136. 'kinesis-video-archived-media',
  137. endpoint_url=self.endpoint,
  138. aws_access_key_id=aws_access_key_id,
  139. aws_secret_access_key=secret_access_key,
  140. region_name=region_name
  141. )
  142. def get_hls_streaming_session_url(self, stream_name, start_time, end_time, play_mode):
  143. """
  144. 获取视频流数据保留时间
  145. @param stream_name: 视频流名称
  146. @param start_time: 开始时间
  147. @param end_time: 结束时间
  148. @param play_mode: 播放模式
  149. @return HLSStreamingSessionURL: 媒体播放器可用于检索HLS主播放列表的URL
  150. """
  151. return self.kvam_client_coon.get_hls_streaming_session_url(StreamName=stream_name,
  152. PlaybackMode=play_mode,
  153. HLSFragmentSelector={
  154. 'FragmentSelectorType': 'PRODUCER_TIMESTAMP',
  155. 'TimestampRange': {
  156. 'StartTimestamp': start_time,
  157. 'EndTimestamp': end_time
  158. }
  159. },
  160. ContainerFormat='FRAGMENTED_MP4',
  161. DiscontinuityMode='ON_DISCONTINUITY',
  162. DisplayFragmentTimestamp='ALWAYS',
  163. Expires=43200,
  164. MaxMediaPlaylistFragmentResults=5000)[
  165. 'HLSStreamingSessionURL']
  166. def get_list_fragments(self, stream_name, start_time, end_time):
  167. """
  168. 获取视频流片段
  169. @param stream_name: 视频流名称
  170. @param start_time: 开始时间
  171. @param end_time: 结束时间
  172. @return HLSStreamingSessionURL: 视频流片段列表信息
  173. """
  174. stream_list = []
  175. result = self.kvam_client_coon.list_fragments(StreamName=stream_name, FragmentSelector={
  176. 'FragmentSelectorType': 'PRODUCER_TIMESTAMP',
  177. 'TimestampRange': {
  178. 'StartTimestamp': start_time,
  179. 'EndTimestamp': end_time
  180. }}, MaxResults=1000)
  181. fragments_list = result['Fragments']
  182. while 'NextToken' in result:
  183. result = self.kvam_client_coon.list_fragments(StreamName=stream_name, NextToken=result['NextToken'])
  184. fragments_list.extend(result['Fragments'])
  185. fragments_list = sorted(fragments_list, key=lambda item: item['FragmentNumber'])
  186. for item in fragments_list:
  187. stream_list.append({'startTime': item['ProducerTimestamp'],
  188. 'endTime': item['ProducerTimestamp'] + datetime.timedelta(
  189. milliseconds=item['FragmentLengthInMilliseconds']),
  190. 'duration': item['FragmentLengthInMilliseconds']})
  191. return stream_list
  192. def get_images(self, stream_name, start_time, end_time):
  193. """
  194. 获取视频流片段封面图片
  195. @param stream_name: 视频流名称
  196. @param start_time: 开始时间
  197. @param end_time: 结束时间
  198. @return HLSStreamingSessionURL: 视频流片段列表信息
  199. """
  200. try:
  201. images_list = self.kvam_client_coon.get_images(StreamName=stream_name,
  202. ImageSelectorType='PRODUCER_TIMESTAMP',
  203. StartTimestamp=start_time,
  204. EndTimestamp=end_time,
  205. SamplingInterval=3000,
  206. MaxResults=100,
  207. Format='JPEG')['Images']
  208. for image in images_list:
  209. if 'ImageContent' in image:
  210. return image['ImageContent']
  211. except Exception as e:
  212. return ''
  213. def get_clip(self, stream_name, start_time, end_time):
  214. """
  215. 获取视频流片段封面图片
  216. @param stream_name: 视频流名称
  217. @param start_time: 开始时间
  218. @param end_time: 结束时间
  219. @return HLSStreamingSessionURL: 视频流片段列表信息
  220. """
  221. try:
  222. clip = self.kvam_client_coon.get_clip(StreamName=stream_name,
  223. ClipFragmentSelector={
  224. 'FragmentSelectorType': 'PRODUCER_TIMESTAMP',
  225. 'TimestampRange': {
  226. 'StartTimestamp': start_time,
  227. 'EndTimestamp': end_time
  228. }
  229. })
  230. return clip['Payload'], clip['ResponseMetadata']['HTTPHeaders']['content-length']
  231. except Exception as e:
  232. return ''