KVSController.py 36 KB


  1. # -*- coding: utf-8 -*-
  2. """
  3. @Author : Rocky
  4. @Time : 2022/10/18 9:48
  5. @File :KVSController.py
  6. """
  7. import base64
  8. import hashlib
  9. import json
  10. import time
  11. import uuid
  12. import datetime
  13. from collections import OrderedDict
  14. import boto3
  15. import requests
  16. from django.http import HttpResponse
  17. from django.views import View
  18. from Model.models import KVS, Device_User, Device_Info
  19. from Object.AWS.AmazonKinesisVideoUtil import AmazonKinesisVideoObject, AmazonKVAMObject
  20. from Object.IOTCore.IotObject import IOTClient
  21. from Object.RedisObject import RedisObject
  22. from Object.ResponseObject import ResponseObject
  23. from Ansjer.config import SERVER_DOMAIN, LOGGER, KVS_REGION, REGION_ID_LIST, WEBRTC_DOMAIN_NAME, \
  24. ZLMEDIAKIT_SECRET, ZLMEDIAKIT_APP_NAME, ZLMEDIAKIT_AUTH, ZLMEDIAKIT_AUTH_WHEP
  25. from Object.TokenObject import TokenObject
  26. from botocore.auth import SigV4Auth
  27. from botocore.awsrequest import AWSRequest
  28. from botocore.credentials import Credentials
  29. from Service.CommonService import CommonService
  30. from django.conf import settings
  31. ACCESS_KEY_ID = settings.ACCESS_KEY_ID
  32. SECRET_ACCESS_KEY = settings.SECRET_ACCESS_KEY
  33. class UserRelatedView(View):
  34. def get(self, request, *args, **kwargs):
  35. request.encoding = 'utf-8'
  36. operation = kwargs.get('operation')
  37. return self.validation(request.GET, operation, request)
  38. def post(self, request, *args, **kwargs):
  39. request.encoding = 'utf-8'
  40. operation = kwargs.get('operation')
  41. return self.validation(request.POST, operation, request)
  42. def validation(self, request_dict, operation, request):
  43. response = ResponseObject()
  44. if operation == 'generate-qr-code': # 网页生成二维码
  45. return self.generate_qr_code(response)
  46. elif operation == 'get-scanning-status': # 确认app是否扫码
  47. return self.get_scanning_status(request_dict, response)
  48. elif operation == 'web-login': # 网页登录
  49. return self.web_login(request_dict, response)
  50. elif operation == 'pc-login': # pc端登录
  51. return self.pc_login(request_dict, response)
  52. elif operation == 'confirm-login': # app确认登录
  53. return self.confirm_login(request_dict, response)
  54. else:
  55. tko = TokenObject(
  56. request.META.get('HTTP_AUTHORIZATION'),
  57. returntpye='pc')
  58. if tko.code != 0:
  59. return response.json(tko.code)
  60. response.lang = tko.lang
  61. user_id = tko.userID
  62. if operation == 'get-device': # 获取设备列表
  63. return self.get_device(response, user_id)
  64. else:
  65. return response.json(404)
  66. @staticmethod
  67. def get_user_info(user_id):
  68. """
  69. 获取用户信息
  70. @param user_id: 用户id
  71. @return: response
  72. """
  73. device_user_qs = Device_User.objects.filter(userID=user_id).values('NickName', 'userIconPath',
  74. 'userIconUrl')
  75. if not device_user_qs.exists():
  76. user_icon_url = ''
  77. nick_name = ''
  78. else:
  79. users = device_user_qs.first()
  80. nick_name = users['NickName']
  81. user_icon_path = str(users['userIconPath'])
  82. if user_icon_path:
  83. user_icon_path = user_icon_path.replace('static/', '').replace('\\', '/')
  84. user_icon_url = SERVER_DOMAIN + 'account/getAvatar/' + user_icon_path
  85. else:
  86. user_icon_url = ''
  87. return user_icon_url, nick_name
  88. @staticmethod
  89. def generate_qr_code(response):
  90. """
  91. 网页生成二维码
  92. @param response: 响应对象
  93. @return: response
  94. """
  95. nwo_time = time.time()
  96. redis_obj = RedisObject()
  97. try:
  98. uuid_number = hashlib.md5((str(uuid.uuid1()) + str(nwo_time)).encode('utf-8')).hexdigest()
  99. flag = redis_obj.set_ex_data(uuid_number, 0, 300) # redis记录uuid,状态为生成二维码
  100. res = {'type': 'autologin', 'id': uuid_number}
  101. if flag:
  102. return response.json(0, res)
  103. else:
  104. return response.json(119)
  105. except Exception as e:
  106. print(e)
  107. return response.json(500)
  108. @staticmethod
  109. def get_scanning_status(request_dict, response):
  110. """
  111. 获取app扫码状态
  112. @param request_dict: 请求参数
  113. @request_dict serial_number: 序列号
  114. @param response: 响应对象
  115. @return: response
  116. """
  117. uuid_number = request_dict.get('uuid', None)
  118. if not uuid_number:
  119. return response.json(444, {'error param': 'uuid'})
  120. try:
  121. redis_obj = RedisObject()
  122. status = redis_obj.get_data(uuid_number)
  123. if status is False:
  124. return response.json(119)
  125. elif status == '1' or status == '2':
  126. res = {'status': 1} # 已扫码
  127. else:
  128. res = {'status': 0} # 未扫码
  129. return response.json(0, res)
  130. except Exception as e:
  131. print(e)
  132. return response.json(500)
  133. @staticmethod
  134. def pc_login(request_dict, response):
  135. """
  136. pc端登录
  137. @param request_dict: 请求参数
  138. @request_dict serial_number: 序列号
  139. @param response: 响应对象
  140. @return: response
  141. """
  142. uuid_number = request_dict.get('uuid', None)
  143. if not uuid_number:
  144. return response.json(444, {'error param': 'uuid'})
  145. try:
  146. redis_obj = RedisObject()
  147. status = redis_obj.get_data(uuid_number)
  148. token = redis_obj.get_data(uuid_number + 'token')
  149. if status is False or token is False:
  150. return response.json(119)
  151. elif status == '2': # 已登录
  152. token_obj = TokenObject(token)
  153. response.lang = token_obj.lang
  154. if token_obj.code != 0:
  155. return response.json(token_obj.code)
  156. user_id = token_obj.userID
  157. user_icon_url, nick_name = UserRelatedView.get_user_info(user_id)
  158. res = {'status': 1, 'userIconUrl': user_icon_url, 'nickName': nick_name, 'token': token}
  159. redis_obj.del_data(uuid_number)
  160. redis_obj.del_data(uuid_number + 'token')
  161. else: # 未登录
  162. res = {'status': 0}
  163. return response.json(0, res)
  164. except Exception as e:
  165. print(e)
  166. return response.json(500)
  167. @staticmethod
  168. def web_login(request_dict, response):
  169. """
  170. 网页登录
  171. @param request_dict: 请求参数
  172. @request_dict serial_number: 序列号
  173. @param response: 响应对象
  174. @return: response
  175. """
  176. uuid_number = request_dict.get('uuid', None)
  177. confirm = request_dict.get('confirm', None)
  178. if not all([uuid_number, confirm]):
  179. return response.json(444, {'error param': 'uuid or confirm'})
  180. try:
  181. redis_obj = RedisObject()
  182. if confirm == '1': # 取消登录
  183. redis_obj.del_data(uuid_number)
  184. redis_obj.del_data(uuid_number + 'token')
  185. return response.json(0)
  186. token = redis_obj.get_data(uuid_number + 'token')
  187. ttl = redis_obj.get_ttl(uuid_number)
  188. if token is False or ttl <= 0:
  189. return response.json(119)
  190. result = redis_obj.set_ex_data(uuid_number, 2, ttl) # 修改uuid状态为已登录
  191. if result is False:
  192. return response.json(119)
  193. return response.json(0)
  194. except Exception as e:
  195. print(e)
  196. return response.json(500)
  197. @staticmethod
  198. def confirm_login(request_dict, response):
  199. """
  200. app确认登录
  201. @param request_dict: 请求参数
  202. @request_dict serial_number: 序列号
  203. @param response: 响应对象
  204. @return: response
  205. """
  206. uuid_number = request_dict.get('uuid', None)
  207. token = request_dict.get('token', None)
  208. if not all([uuid_number, token]):
  209. return response.json(444, {'error param': 'uuid or token'})
  210. redis_obj = RedisObject()
  211. try:
  212. status = redis_obj.get_data(uuid_number)
  213. ttl = redis_obj.get_ttl(uuid_number)
  214. if status is False or ttl <= 0:
  215. return response.json(119)
  216. result1 = redis_obj.set_ex_data(uuid_number, 1, ttl) # 修改uuid状态为已扫码
  217. result2 = redis_obj.set_ex_data(uuid_number + 'token', token, ttl)
  218. if result1 is False or result2 is False:
  219. return response.json(119)
  220. return response.json(0)
  221. except Exception as e:
  222. print(e)
  223. return response.json(500)
  224. @staticmethod
  225. def get_device(response, user_id):
  226. """
  227. 获取设备列表
  228. @param response: 响应对象
  229. @param user_id: 用户id
  230. @return: response
  231. """
  232. try:
  233. device_qs = Device_Info.objects.filter(userID=user_id).values('serial_number', 'NickName')
  234. return response.json(0, list(device_qs))
  235. except Exception as e:
  236. print(e)
  237. return response.json(500)
  238. class KVSView(View):
  239. def get(self, request, *args, **kwargs):
  240. request.encoding = 'utf-8'
  241. operation = kwargs.get('operation')
  242. return self.validation(request.GET, request, operation)
  243. def post(self, request, *args, **kwargs):
  244. request.encoding = 'utf-8'
  245. operation = kwargs.get('operation')
  246. return self.validation(request.POST, request, operation)
  247. def validation(self, request_dict, request, operation):
  248. response = ResponseObject()
  249. if operation == 'create-media': # 创建视频流
  250. return self.create_media(request_dict, response)
  251. elif operation == 'update-data-retention': # 修改视频流数据保留时间
  252. return self.update_data_retention(request_dict, response)
  253. elif operation == 'get-sts-token': # 获取临时token
  254. return self.get_sts_token(request_dict, response)
  255. elif operation == 'createSignalChannel': # 创建通道
  256. return self.create_signal_channel(request_dict, response)
  257. elif operation == 'SendAlexaOfferToMaster': # 发送Alexa offer
  258. return self.send_alexa_offer_to_master(request_dict, response)
  259. elif operation == 'deleteSignalChannel': # 删除通道
  260. return self.delete_signal_channel(request_dict, response)
  261. elif operation == 'sendMqttCommand': # 发送MQTT
  262. return self.send_mqtt_command(request_dict, response)
  263. elif operation == 'getAlexaAnswer': # 测试获取Alexa answer
  264. return self.get_alexa_answer(request_dict, response)
  265. else:
  266. # tko = TokenObject(request.META.get('HTTP_AUTHORIZATION'))
  267. # if tko.code != 0:
  268. # return response.json(tko.code)
  269. # response.lang = tko.lang
  270. # user_id = tko.userID
  271. if operation == 'get-device-midea-list': # 获取设备列表
  272. return self.get_device_midea_list(request_dict, response)
  273. elif operation == 'get-hls-midea': # 获取视频播放地址
  274. return self.get_hls_midea_url(request_dict, response)
  275. elif operation == 'download-clip': # 获取视频播放地址
  276. return self.download_clip(request_dict, response)
  277. else:
  278. return response.json(404)
  279. @staticmethod
  280. def create_media(request_dict, response):
  281. """
  282. 创建视频流
  283. @param request_dict: 请求参数
  284. @request_dict serial_number: 序列号
  285. @param response: 响应对象
  286. @return: response
  287. """
  288. serial_number = request_dict.get('serial_number', None)
  289. try:
  290. kvs_qs = KVS.objects.filter(stream_name=serial_number)
  291. if kvs_qs.exists():
  292. return response.json(174)
  293. kinesis_video_obj = AmazonKinesisVideoObject(
  294. aws_access_key_id=ACCESS_KEY_ID,
  295. secret_access_key=SECRET_ACCESS_KEY,
  296. region_name=KVS_REGION
  297. )
  298. stream_arn = kinesis_video_obj.create_stream(stream_name=serial_number)
  299. if stream_arn:
  300. now_time = int(time.time())
  301. KVS.objects.create(stream_name=serial_number, stream_arn=stream_arn, created_time=now_time,
  302. updated_time=now_time)
  303. return response.json(0)
  304. else:
  305. return response.json(178)
  306. except Exception as e:
  307. print(e)
  308. return response.json(500)
  309. @staticmethod
  310. def update_data_retention(request_dict, response):
  311. """
  312. 修改视频流数据保留时间
  313. @param request_dict: 请求参数
  314. @request_dict serial_number: 序列号
  315. @request_dict operation: 操作,增加/减少
  316. @request_dict data_retention_change_in_hours: 修改的时间
  317. @param response: 响应对象
  318. @return: response
  319. """
  320. serial_number = request_dict.get('serial_number', None)
  321. operation = request_dict.get('operation', None)
  322. data_retention_change_in_hours = request_dict.get('data_retention_change_in_hours', None)
  323. try:
  324. kvs_qs = KVS.objects.filter(stream_name=serial_number)
  325. if not kvs_qs.exists():
  326. return response.json(173)
  327. kinesis_video_obj = AmazonKinesisVideoObject(
  328. aws_access_key_id=ACCESS_KEY_ID,
  329. secret_access_key=SECRET_ACCESS_KEY,
  330. region_name=KVS_REGION
  331. )
  332. now_time = int(time.time())
  333. data_retention_change_in_hours = int(data_retention_change_in_hours)
  334. kinesis_video_obj.update_data_retention(stream_name=serial_number, operation=operation,
  335. data_retention_change_in_hours=data_retention_change_in_hours)
  336. kvs_qs.update(data_retention_in_hours=data_retention_change_in_hours, updated_time=now_time)
  337. return response.json(0)
  338. except Exception as e:
  339. print(e)
  340. return response.json(500)
  341. @staticmethod
  342. def get_hls_midea_url(request_dict, response):
  343. """
  344. 获取视频播放地址
  345. @param request_dict: 请求参数
  346. @request_dict serial_number: 序列号
  347. @request_dict startTime: 开始时间
  348. @request_dict endTime: 结束时间
  349. @request_dict playMode: 播放模式
  350. @param response: 响应对象
  351. @return: response
  352. """
  353. serial_number = request_dict.get('serial_number', None)
  354. start_time = request_dict.get('startTime', None)
  355. end_time = request_dict.get('endTime', None)
  356. play_mode = request_dict.get('playMode', None)
  357. if not all([serial_number, start_time, end_time, play_mode]):
  358. return response.json(444)
  359. start_time = datetime.datetime.fromtimestamp(int(start_time)) - datetime.timedelta(hours=8)
  360. end_time = datetime.datetime.fromtimestamp(int(end_time)) - datetime.timedelta(hours=8)
  361. play_mode = int(play_mode)
  362. play_mode = 'ON_DEMAND' if play_mode == 0 else 'LIVE_REPLAY'
  363. try:
  364. # kvs_qs = KVS.objects.filter(stream_name=serial_number)
  365. # if not kvs_qs.exists():
  366. # return response.json(174)
  367. kinesis_video_obj = AmazonKVAMObject(
  368. aws_access_key_id='AKIA2E67UIMD45Y3HL53',
  369. secret_access_key='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',
  370. region_name='us-east-1',
  371. stream_name=serial_number,
  372. api_name='GET_HLS_STREAMING_SESSION_URL'
  373. )
  374. hls_streaming_session_url = kinesis_video_obj.get_hls_streaming_session_url(serial_number, start_time,
  375. end_time, play_mode)
  376. return response.json(0, {"HlsStreamingSessionUrl": hls_streaming_session_url})
  377. except Exception as e:
  378. print(e)
  379. return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
  380. @staticmethod
  381. def get_device_midea_list(request_dict, response):
  382. """
  383. 获取视频播放列表
  384. @param request_dict: 请求参数
  385. @request_dict serial_number: 序列号
  386. @request_dict startTime: 开始时间
  387. @request_dict endTime: 结束时间
  388. @param response: 响应对象
  389. @return: response
  390. """
  391. serial_number = request_dict.get('serial_number', None)
  392. start_time = request_dict.get('startTime', None)
  393. end_time = request_dict.get('endTime', None)
  394. page = request_dict.get('page', None)
  395. size = request_dict.get('size', None)
  396. if not all([serial_number, start_time, end_time, page, size]):
  397. return response.json(444)
  398. page = int(page)
  399. size = int(size)
  400. start_time = datetime.datetime.fromtimestamp(int(start_time)) - datetime.timedelta(hours=8)
  401. end_time = datetime.datetime.fromtimestamp(int(end_time)) - datetime.timedelta(hours=8)
  402. try:
  403. # kvs_qs = KVS.objects.filter(stream_name=serial_number)
  404. # if not kvs_qs.exists():
  405. # return response.json(174)
  406. kinesis_fragments_obj = AmazonKVAMObject(
  407. aws_access_key_id='AKIA2E67UIMD45Y3HL53',
  408. secret_access_key='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',
  409. region_name='us-east-1',
  410. stream_name=serial_number,
  411. api_name='LIST_FRAGMENTS'
  412. )
  413. kinesis_images_obj = AmazonKVAMObject(
  414. aws_access_key_id='AKIA2E67UIMD45Y3HL53',
  415. secret_access_key='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',
  416. region_name='us-east-1',
  417. stream_name=serial_number,
  418. api_name='GET_IMAGES'
  419. )
  420. stream_list = kinesis_fragments_obj.get_list_fragments(serial_number, start_time, end_time)
  421. total_page = len(stream_list)
  422. stream_list = stream_list[(page - 1) * size:page * size]
  423. for item in stream_list:
  424. temp_start_time = (item['startTime'] - datetime.timedelta(hours=8)).replace(
  425. tzinfo=datetime.timezone.utc)
  426. temp_end_time = temp_start_time + datetime.timedelta(seconds=300)
  427. item['image'] = kinesis_images_obj.get_images(serial_number, temp_start_time, temp_end_time)
  428. item['startTime'] = int(item['startTime'].timestamp())
  429. item['endTime'] = int(item['endTime'].timestamp())
  430. res = {
  431. 'totalPage': total_page,
  432. 'fragments': stream_list
  433. }
  434. return response.json(0, res)
  435. except Exception as e:
  436. print(e)
  437. return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
  438. @staticmethod
  439. def download_clip(request_dict, response):
  440. """
  441. 获取视频播放地址
  442. @param request_dict: 请求参数
  443. @request_dict serial_number: 序列号
  444. @request_dict startTime: 开始时间
  445. @request_dict endTime: 结束时间
  446. @param response: 响应对象
  447. @return: response
  448. """
  449. serial_number = request_dict.get('serial_number', None)
  450. start_time = request_dict.get('startTime', None)
  451. end_time = request_dict.get('endTime', None)
  452. if not all([serial_number, start_time, end_time]):
  453. return response.json(444)
  454. start_time = datetime.datetime.fromtimestamp(int(start_time)) - datetime.timedelta(hours=8)
  455. end_time = datetime.datetime.fromtimestamp(int(end_time)) - datetime.timedelta(hours=8)
  456. try:
  457. # kvs_qs = KVS.objects.filter(stream_name=serial_number)
  458. # if not kvs_qs.exists():
  459. # return response.json(174)
  460. kinesis_video_obj = AmazonKVAMObject(
  461. aws_access_key_id='AKIA2E67UIMD45Y3HL53',
  462. secret_access_key='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',
  463. region_name='us-east-1',
  464. stream_name=serial_number,
  465. api_name='GET_CLIP'
  466. )
  467. clip_obj, clip_size = kinesis_video_obj.get_clip(serial_number, start_time, end_time)
  468. res = HttpResponse(clip_obj.read())
  469. res["content_type"] = "video/mp4"
  470. res["Content-Disposition"] = "attachment;filename=video.mp4"
  471. res['Content-Length'] = str(clip_size)
  472. return res
  473. except Exception as e:
  474. print(e)
  475. return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
  476. @staticmethod
  477. def get_sts_token(request_dict, response):
  478. """
  479. 获取临时token
  480. @param request_dict: 请求参数
  481. @request_dict uid: 设备uid
  482. @param response: 响应对象
  483. @return: response
  484. """
  485. uid = request_dict.get('uid', None)
  486. # if not all([]):
  487. # return response.json(444)
  488. try:
  489. sts_client_conn = boto3.client(
  490. 'sts',
  491. aws_access_key_id=ACCESS_KEY_ID,
  492. aws_secret_access_key=SECRET_ACCESS_KEY,
  493. region_name=KVS_REGION
  494. )
  495. sts_obj = sts_client_conn.get_session_token(DurationSeconds=129600)
  496. res = {
  497. 'AccessKeyId': sts_obj['Credentials']['AccessKeyId'],
  498. 'AccessKeySecret': sts_obj['Credentials']['SecretAccessKey'],
  499. 'SessionToken': sts_obj['Credentials']['SessionToken'],
  500. 'Expiration': str(sts_obj['Credentials']['Expiration'])
  501. }
  502. return response.json(0, res)
  503. except Exception as e:
  504. print(e)
  505. return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
  506. @staticmethod
  507. def create_signal_channel(request_dict, response):
  508. """
  509. 创建信号通道
  510. @param request_dict: 请求参数
  511. @request_dict serial: 序列号
  512. @param response: 响应对象
  513. @return: response
  514. """
  515. serial = request_dict.get('serial', None)
  516. if not all([serial]):
  517. return response.json(444)
  518. try:
  519. # 获取并判断region_id是否有效
  520. region_id = CommonService.confirm_region_id()
  521. if region_id not in REGION_ID_LIST:
  522. return response.json(444, {'invalid region_id': region_id})
  523. # 获取iot:CredentialProvider
  524. endpoint_type = 'iot:CredentialProvider'
  525. iot_client = IOTClient(region_id)
  526. iot_credential_provider_endpoint = iot_client.describe_iot_endpoint(endpoint_type)
  527. # 已有数据直接返回
  528. res = {
  529. 'region': KVS_REGION,
  530. 'role_alias': 'KvsCameraIoTRoleAlias',
  531. 'iot_credential_provider_endpoint': iot_credential_provider_endpoint,
  532. }
  533. channel_name = 'Ansjer_Device_{}'.format(serial)
  534. kvs = KVS.objects.filter(channel_name=channel_name)
  535. if kvs.exists():
  536. return response.json(0, res)
  537. kinesis_video_obj = AmazonKinesisVideoObject(
  538. aws_access_key_id=ACCESS_KEY_ID,
  539. secret_access_key=SECRET_ACCESS_KEY,
  540. region_name=KVS_REGION
  541. )
  542. channel_arn = kinesis_video_obj.create_signaling_channel(channel_name=channel_name)
  543. now_time = int(time.time())
  544. KVS.objects.create(
  545. channel_name=channel_name, channel_arn=channel_arn, channel_ttl=60,
  546. created_time=now_time, updated_time=now_time
  547. )
  548. return response.json(0, res)
  549. except Exception as e:
  550. return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
  551. @classmethod
  552. def send_alexa_offer_to_master(cls, request_dict, response):
  553. """
  554. 发送Alexa offer
  555. @param request_dict: 请求参数
  556. @request_dict serial: 序列号
  557. @param response: 响应对象
  558. @return: response
  559. """
  560. uid = request_dict.get('uid', None)
  561. sdp_offer = request_dict.get('sdp_offer', None)
  562. if not all([uid, sdp_offer]):
  563. return response.json(444)
  564. try:
  565. serial = CommonService.get_serial_number_by_uid(uid)
  566. channel_name = 'Ansjer_Device_{}'.format(serial)
  567. kvs_qs = KVS.objects.filter(channel_name=channel_name).values('channel_arn')
  568. if not kvs_qs.exists():
  569. return response.json(173)
  570. channel_arn = kvs_qs[0]['channel_arn']
  571. kinesis_video_obj = AmazonKinesisVideoObject(
  572. aws_access_key_id=ACCESS_KEY_ID,
  573. secret_access_key=SECRET_ACCESS_KEY,
  574. region_name=KVS_REGION
  575. )
  576. endpoint = kinesis_video_obj.get_signaling_channel_endpoint(channel_arn)
  577. url = '{}/v1/send-alexa-offer-to-master'.format(endpoint)
  578. # 构造请求 body
  579. client_id = hashlib.md5((str(uuid.uuid1()) + str(int(time.time()))).encode('utf-8')).hexdigest()
  580. # offer转base64
  581. offer = {
  582. 'type': 'offer',
  583. 'sdp': sdp_offer
  584. }
  585. offer = cls.dict_to_base64(offer)
  586. LOGGER.info('offer:{}'.format(offer))
  587. payload = {
  588. 'ChannelARN': channel_arn,
  589. 'SenderClientId': client_id,
  590. 'MessagePayload': offer
  591. }
  592. # 构造 AWSRequest 并签名
  593. req = AWSRequest(method='POST', url=url, data=json.dumps(payload))
  594. credentials = Credentials(ACCESS_KEY_ID, SECRET_ACCESS_KEY)
  595. SigV4Auth(credentials, 'kinesisvideo', KVS_REGION).add_auth(req)
  596. # 使用 requests 发送签名后的请求
  597. headers = dict(req.headers)
  598. headers['Content-Type'] = 'application/json'
  599. r = requests.post(url, headers=headers, data=json.dumps(payload))
  600. assert r.status_code == 200
  601. LOGGER.info('SendAlexaOfferToMaster响应: {}'.format(r.json()))
  602. sdp_answer = r.json()['Answer']
  603. assert sdp_answer
  604. # answer转字典
  605. sdp_answer = cls.base64_to_dict(sdp_answer)
  606. sdp_answer = sdp_answer['sdp']
  607. res = {
  608. 'sdp_answer': sdp_answer
  609. }
  610. return response.json(0, res)
  611. except Exception as e:
  612. return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
  613. @staticmethod
  614. def send_mqtt_command(request_dict, response):
  615. """
  616. 发送MQTT指令控制设备开始/停止推流
  617. """
  618. uid = request_dict.get('uid', None)
  619. enable = request_dict.get('enable', '1')
  620. if not uid:
  621. return response.json(444)
  622. try:
  623. steam_name = 'rtsp://{}:554/{}/{}'.format(WEBRTC_DOMAIN_NAME, ZLMEDIAKIT_APP_NAME, uid)
  624. thing_name = CommonService.get_serial_number_by_uid(uid) # 存在序列号则为使用序列号作为物品名
  625. topic_name = 'ansjer/generic/{}'.format(thing_name)
  626. msg = OrderedDict(
  627. [
  628. ('alexaRtspCommand', steam_name),
  629. ('enable', int(enable)),
  630. ]
  631. )
  632. if not CommonService.req_publish_mqtt_msg(thing_name, topic_name, msg):
  633. return response.json(10044)
  634. return response.json(0)
  635. except Exception as e:
  636. return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
  637. @classmethod
  638. def get_alexa_answer(cls, request_dict, response):
  639. uid = request_dict.get('uid', None)
  640. sdp_offer = request_dict.get('sdp_offer', None)
  641. if not all([uid, sdp_offer]):
  642. return response.json(444)
  643. try:
  644. cls.mediamtx_add_steam(uid)
  645. result = cls.get_mediamtx_sdp_answer(uid, sdp_offer)
  646. # 成功获取到SDP answer
  647. res = {
  648. 'sdp_answer': result['sdp_answer']
  649. }
  650. LOGGER.info('获取Alexa answer响应内容: {}'.format(res))
  651. return response.json(0, res)
  652. except Exception as e:
  653. return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
  654. @staticmethod
  655. def dict_to_base64(data_dict: dict) -> str:
  656. # 手动构建JSON字符串,避免自动转义
  657. json_str = '{\n "type": "%s",\n "sdp": "%s"\n}' % (
  658. data_dict["type"],
  659. data_dict["sdp"].replace('"', '\\"').replace('\r', '\\r').replace('\n', '\\n')
  660. )
  661. # 转换为bytes并进行base64编码
  662. base64_data = base64.b64encode(json_str.encode('utf-8'))
  663. return base64_data.decode('utf-8')
  664. @staticmethod
  665. def base64_to_dict(encoded_str: str) -> dict:
  666. decoded_bytes = base64.b64decode(encoded_str)
  667. return json.loads(decoded_bytes.decode('utf-8'))
  668. @staticmethod
  669. def delete_signal_channel(request_dict, response):
  670. """
  671. 删除信号通道
  672. @param request_dict: 请求参数
  673. @request_dict serial: 序列号
  674. @param response: 响应对象
  675. @return: response
  676. """
  677. serial = request_dict.get('serial', None)
  678. if not all([serial]):
  679. return response.json(444)
  680. try:
  681. channel_name = 'Ansjer_Device_{}'.format(serial)
  682. kvs = KVS.objects.filter(channel_name=channel_name).first()
  683. if not kvs:
  684. return response.json(173)
  685. channel_arn = kvs.channel_arn
  686. kinesis_video_obj = AmazonKinesisVideoObject(
  687. aws_access_key_id=ACCESS_KEY_ID,
  688. secret_access_key=SECRET_ACCESS_KEY,
  689. region_name=KVS_REGION
  690. )
  691. kinesis_video_obj.delete_signaling_channel(channel_arn=channel_arn)
  692. kvs.delete()
  693. return response.json(0)
  694. except Exception as e:
  695. return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
  696. @staticmethod
  697. def fix_sdp_directly(sdp_text):
  698. """
  699. 直接修复SDP格式问题,按照四个主要修复点:
  700. 1. 标准化格式:去除多余空格,跳过空行,确保每行以\r\n结束。
  701. 2. 将方向属性从sendrecv改为recvonly(针对WHEP拉流)。
  702. 3. 移除客户端的ssrc行(a=ssrc:),因为不需要发送媒体。
  703. 4. 移除客户端的msid行(a=msid:),以匹配recvonly模式。
  704. """
  705. lines = sdp_text.split('\r\n')
  706. fixed_lines = []
  707. in_audio = False
  708. in_video = False
  709. for line in lines:
  710. # 1. 标准化格式:去除尾部空格,跳过空行
  711. line = line.rstrip()
  712. if not line:
  713. continue
  714. # 检查当前部分(audio或video)
  715. if line.startswith('m=audio'):
  716. in_audio = True
  717. in_video = False
  718. elif line.startswith('m=video'):
  719. in_audio = False
  720. in_video = True
  721. # 2. 将a=sendrecv改为a=recvonly(在audio和video部分)
  722. if line == 'a=sendrecv' and (in_audio or in_video):
  723. line = 'a=recvonly'
  724. # 3 & 4. 移除ssrc和msid相关行(客户端发送意图)
  725. if line.startswith('a=ssrc:') or line.startswith('a=msid:'):
  726. continue
  727. fixed_lines.append(line)
  728. return '\r\n'.join(fixed_lines)
  729. @staticmethod
  730. def zlmediakit_add_stream(uid: str) -> str:
  731. """
  732. zlmediakit添加rtsp推流
  733. @param uid:
  734. @return:
  735. """
  736. dst_url = 'rtsp://{}:{}/{}'.format('127.0.0.1', 8554, uid)
  737. data = {
  738. 'secret': ZLMEDIAKIT_SECRET,
  739. 'schema': 'rtsp',
  740. 'vhost': '__defaultVhost__',
  741. 'app': ZLMEDIAKIT_APP_NAME,
  742. 'stream': uid,
  743. 'dst_url': dst_url
  744. }
  745. url = 'http://{}/index/api/addStreamPusherProxy'.format(WEBRTC_DOMAIN_NAME)
  746. r = requests.post(url, data=data, timeout=5)
  747. r_json = r.json()
  748. LOGGER.info('获取Alexa answer-ZLMediaKit添加rtsp推流响应: {}'.format(r_json))
  749. assert r_json['code'] == 0
  750. return r_json['data']['key']
  751. @staticmethod
  752. def zlmediakit_del_stream(key: str) -> None:
  753. """
  754. zlmediakit删除rtsp推流
  755. @param key: addStreamPusherProxy接口返回的key
  756. @return:
  757. """
  758. data = {
  759. 'secret': ZLMEDIAKIT_SECRET,
  760. 'key': key
  761. }
  762. url = 'http://{}/index/api/delStreamPusherProxy'.format(WEBRTC_DOMAIN_NAME)
  763. r = requests.post(url, data=data, timeout=5)
  764. r_json = r.json()
  765. LOGGER.info('获取Alexa answer-ZLMediaKit删除rtsp推流响应: {}'.format(r_json))
  766. assert r_json['code'] == 0
  767. @staticmethod
  768. def mediamtx_add_steam(uid: str):
  769. """
  770. mediamtx添加流
  771. @param uid:
  772. @return:
  773. """
  774. headers = {
  775. "Content-Type": "application/json",
  776. "Authorization": ZLMEDIAKIT_AUTH
  777. }
  778. source = 'rtsp://{}:{}/live/{}'.format('127.0.0.1', 554, uid)
  779. data = {
  780. 'source': source,
  781. 'sourceOnDemand': True
  782. }
  783. url = 'http://{}:{}/v3/config/paths/add/{}'.format(WEBRTC_DOMAIN_NAME, 9997, uid)
  784. r = requests.post(url, headers=headers, data=json.dumps(data), timeout=10)
  785. LOGGER.info('获取Alexa answer-mediamtx添加流响应: {}'.format(r.text))
  786. @staticmethod
  787. def get_mediamtx_sdp_answer(uid: str, sdp_offer: str) -> dict:
  788. """
  789. 获取mediamtx sdp answer
  790. @param uid:
  791. @param sdp_offer:
  792. @return: 包含sdp_answer的字典或错误字典
  793. """
  794. headers = {
  795. 'Content-Type': 'application/sdp',
  796. "Authorization": ZLMEDIAKIT_AUTH_WHEP
  797. }
  798. url = 'http://{}:{}/{}/whep'.format(WEBRTC_DOMAIN_NAME, 8889, uid)
  799. try:
  800. r = requests.post(url, headers=headers, data=sdp_offer, timeout=10)
  801. r.raise_for_status() # 检查HTTP状态码
  802. r_text = r.text
  803. LOGGER.info('获取Alexa answer-获取mediamtx sdp answer响应: {}'.format(r_text))
  804. # mediamtx的WHEP端点返回的是SDP文本,不是JSON
  805. # 检查响应是否为有效的SDP格式(以"v="开头)
  806. if r_text.strip().startswith('v='):
  807. return {'sdp_answer': r_text}
  808. else:
  809. # 如果不是SDP格式,尝试解析为JSON
  810. try:
  811. return json.loads(r_text)
  812. except json.JSONDecodeError:
  813. try:
  814. # 如果直接解析失败,尝试处理单引号JSON
  815. import ast
  816. return ast.literal_eval(r_text)
  817. except (ValueError, SyntaxError):
  818. # 如果所有解析方法都失败,返回包含原始响应的错误字典
  819. return {'error': 'Failed to parse response', 'raw_response': r_text}
  820. except requests.exceptions.RequestException as e:
  821. LOGGER.error('获取Alexa answer-获取mediamtx sdp answer请求失败: {}'.format(str(e)))
  822. return {'error': 'Request failed', 'details': str(e)}
  823. except Exception as e:
  824. LOGGER.error('获取Alexa answer-获取mediamtx sdp answer发生未知错误: {}'.format(str(e)))
  825. return {'error': 'Unknown error', 'details': str(e)}