AmazonKinesisVideoUtil.py 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219
  1. # -*- coding: utf-8 -*-
  2. """
  3. @Author : Rocky
  4. @Time : 2022/10/18 17:13
  5. @File :AmazonKinesisVideoUtil.py
  6. """
  7. import datetime
  8. import boto3
  9. class AmazonKinesisVideoObject:
  10. """
  11. Amazon Kinesis Video Streams对象
  12. api文档链接: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/kinesisvideo.html
  13. """
  14. def __init__(self, aws_access_key_id, secret_access_key, region_name):
  15. self.access_id = aws_access_key_id
  16. self.access_secret = secret_access_key
  17. self.region_name = region_name
  18. self.client_conn = boto3.client(
  19. 'kinesisvideo',
  20. aws_access_key_id=aws_access_key_id,
  21. aws_secret_access_key=secret_access_key,
  22. region_name=region_name
  23. )
  24. def create_stream(self, stream_name):
  25. """
  26. 创建视频流
  27. @param stream_name: 视频流名称
  28. @return : stream_arn or False
  29. """
  30. tags = {'project_kvs': 'kvs'}
  31. try:
  32. stream_arn = self.client_conn.create_stream(StreamName=stream_name, Tags=tags)['StreamARN']
  33. return stream_arn
  34. except Exception as e:
  35. print(e)
  36. return False
  37. def update_data_retention(self, stream_name, operation, data_retention_change_in_hours):
  38. """
  39. 修改视频流数据保留时间
  40. @param stream_name: 视频流名称
  41. @param operation: 增加/减少, 'INCREASE_DATA_RETENTION'|'DECREASE_DATA_RETENTION'
  42. @param data_retention_change_in_hours: 修改的时间
  43. @return : True or False
  44. """
  45. try:
  46. version = self.describe_stream(stream_name)['Version']
  47. self.client_conn.update_data_retention(StreamName=stream_name, Operation=operation, CurrentVersion=version,
  48. DataRetentionChangeInHours=data_retention_change_in_hours)
  49. return True
  50. except Exception as e:
  51. print(e)
  52. return False
  53. def describe_stream(self, stream_name):
  54. """
  55. 获取视频流信息数据
  56. @param stream_name: 视频流名称
  57. @return stream_info: 视频流信息数据
  58. """
  59. return self.client_conn.describe_stream(StreamName=stream_name)['StreamInfo']
  60. def get_data_endpoint(self, stream_name, api_name):
  61. """
  62. 获取指定流的终端节点以读取或写入
  63. @param api_name: API名称
  64. @param stream_name: 视频流名称
  65. @return stream_info: 视频流信息数据
  66. """
  67. return self.client_conn.get_data_endpoint(StreamName=stream_name, APIName=api_name)['DataEndpoint']
  68. def create_signaling_channel(self, channel_name):
  69. """
  70. 创建通道
  71. @param channel_name: 通道名称
  72. """
  73. res = self.client_conn.create_signaling_channel(ChannelName=channel_name)
  74. channel_arn = res['ChannelARN']
  75. return channel_arn
  76. def get_signaling_channel_endpoint(self, channel_arn):
  77. """
  78. 创建通道
  79. @param channel_arn: 通道arn
  80. @return stream_info: 视频流信息数据
  81. """
  82. res = self.client_conn.get_signaling_channel_endpoint(
  83. ChannelARN=channel_arn,
  84. SingleMasterChannelEndpointConfiguration={
  85. 'Protocols': ['HTTPS'],
  86. 'Role': 'MASTER'
  87. }
  88. )
  89. endpoint = res['ResourceEndpointList'][0]['ResourceEndpoint']
  90. return endpoint
  91. def delete_signaling_channel(self, channel_arn):
  92. """
  93. 删除通道
  94. @return channel_arn: 通道ARN
  95. """
  96. self.client_conn.delete_signaling_channel(ChannelARN=channel_arn)
  97. class AmazonKVAMObject:
  98. def __init__(self, aws_access_key_id, secret_access_key, region_name, stream_name, api_name):
  99. self.access_id = aws_access_key_id
  100. self.access_secret = secret_access_key
  101. self.region_name = region_name
  102. self.kv_client_conn = AmazonKinesisVideoObject(aws_access_key_id, secret_access_key, region_name)
  103. self.endpoint = self.kv_client_conn.get_data_endpoint(stream_name, api_name)
  104. self.kvam_client_coon = boto3.client(
  105. 'kinesis-video-archived-media',
  106. endpoint_url=self.endpoint,
  107. aws_access_key_id=aws_access_key_id,
  108. aws_secret_access_key=secret_access_key,
  109. region_name=region_name
  110. )
  111. def get_hls_streaming_session_url(self, stream_name, start_time, end_time, play_mode):
  112. """
  113. 获取视频流数据保留时间
  114. @param stream_name: 视频流名称
  115. @param start_time: 开始时间
  116. @param end_time: 结束时间
  117. @param play_mode: 播放模式
  118. @return HLSStreamingSessionURL: 媒体播放器可用于检索HLS主播放列表的URL
  119. """
  120. return self.kvam_client_coon.get_hls_streaming_session_url(StreamName=stream_name,
  121. PlaybackMode=play_mode,
  122. HLSFragmentSelector={
  123. 'FragmentSelectorType': 'PRODUCER_TIMESTAMP',
  124. 'TimestampRange': {
  125. 'StartTimestamp': start_time,
  126. 'EndTimestamp': end_time
  127. }
  128. },
  129. ContainerFormat='FRAGMENTED_MP4',
  130. DiscontinuityMode='ON_DISCONTINUITY',
  131. DisplayFragmentTimestamp='ALWAYS',
  132. Expires=43200,
  133. MaxMediaPlaylistFragmentResults=5000)[
  134. 'HLSStreamingSessionURL']
  135. def get_list_fragments(self, stream_name, start_time, end_time):
  136. """
  137. 获取视频流片段
  138. @param stream_name: 视频流名称
  139. @param start_time: 开始时间
  140. @param end_time: 结束时间
  141. @return HLSStreamingSessionURL: 视频流片段列表信息
  142. """
  143. stream_list = []
  144. result = self.kvam_client_coon.list_fragments(StreamName=stream_name, FragmentSelector={
  145. 'FragmentSelectorType': 'PRODUCER_TIMESTAMP',
  146. 'TimestampRange': {
  147. 'StartTimestamp': start_time,
  148. 'EndTimestamp': end_time
  149. }}, MaxResults=1000)
  150. fragments_list = result['Fragments']
  151. while 'NextToken' in result:
  152. result = self.kvam_client_coon.list_fragments(StreamName=stream_name, NextToken=result['NextToken'])
  153. fragments_list.extend(result['Fragments'])
  154. fragments_list = sorted(fragments_list, key=lambda item: item['FragmentNumber'])
  155. for item in fragments_list:
  156. stream_list.append({'startTime': item['ProducerTimestamp'],
  157. 'endTime': item['ProducerTimestamp'] + datetime.timedelta(
  158. milliseconds=item['FragmentLengthInMilliseconds']),
  159. 'duration': item['FragmentLengthInMilliseconds']})
  160. return stream_list
  161. def get_images(self, stream_name, start_time, end_time):
  162. """
  163. 获取视频流片段封面图片
  164. @param stream_name: 视频流名称
  165. @param start_time: 开始时间
  166. @param end_time: 结束时间
  167. @return HLSStreamingSessionURL: 视频流片段列表信息
  168. """
  169. try:
  170. images_list = self.kvam_client_coon.get_images(StreamName=stream_name,
  171. ImageSelectorType='PRODUCER_TIMESTAMP',
  172. StartTimestamp=start_time,
  173. EndTimestamp=end_time,
  174. SamplingInterval=3000,
  175. MaxResults=100,
  176. Format='JPEG')['Images']
  177. for image in images_list:
  178. if 'ImageContent' in image:
  179. return image['ImageContent']
  180. except Exception as e:
  181. return ''
  182. def get_clip(self, stream_name, start_time, end_time):
  183. """
  184. 获取视频流片段封面图片
  185. @param stream_name: 视频流名称
  186. @param start_time: 开始时间
  187. @param end_time: 结束时间
  188. @return HLSStreamingSessionURL: 视频流片段列表信息
  189. """
  190. try:
  191. clip = self.kvam_client_coon.get_clip(StreamName=stream_name,
  192. ClipFragmentSelector={
  193. 'FragmentSelectorType': 'PRODUCER_TIMESTAMP',
  194. 'TimestampRange': {
  195. 'StartTimestamp': start_time,
  196. 'EndTimestamp': end_time
  197. }
  198. })
  199. return clip['Payload'], clip['ResponseMetadata']['HTTPHeaders']['content-length']
  200. except Exception as e:
  201. return ''