# -*- coding: utf-8 -*- """ @Author : Rocky @Time : 2022/10/18 9:48 @File :KVSController.py """ import base64 import hashlib import json import time import uuid import datetime from collections import OrderedDict import boto3 import requests from django.http import HttpResponse from django.views import View from Model.models import KVS, Device_User, Device_Info from Object.AWS.AmazonKinesisVideoUtil import AmazonKinesisVideoObject, AmazonKVAMObject from Object.IOTCore.IotObject import IOTClient from Object.RedisObject import RedisObject from Object.ResponseObject import ResponseObject from Ansjer.config import SERVER_DOMAIN, LOGGER, KVS_REGION, REGION_ID_LIST, WEBRTC_DOMAIN_NAME, \ ZLMEDIAKIT_SECRET, ZLMEDIAKIT_APP_NAME, ZLMEDIAKIT_AUTH, ZLMEDIAKIT_AUTH_WHEP from Object.TokenObject import TokenObject from botocore.auth import SigV4Auth from botocore.awsrequest import AWSRequest from botocore.credentials import Credentials from Service.CommonService import CommonService from django.conf import settings ACCESS_KEY_ID = settings.ACCESS_KEY_ID SECRET_ACCESS_KEY = settings.SECRET_ACCESS_KEY class UserRelatedView(View): def get(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') return self.validation(request.GET, operation, request) def post(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') return self.validation(request.POST, operation, request) def validation(self, request_dict, operation, request): response = ResponseObject() if operation == 'generate-qr-code': # 网页生成二维码 return self.generate_qr_code(response) elif operation == 'get-scanning-status': # 确认app是否扫码 return self.get_scanning_status(request_dict, response) elif operation == 'web-login': # 网页登录 return self.web_login(request_dict, response) elif operation == 'pc-login': # pc端登录 return self.pc_login(request_dict, response) elif operation == 'confirm-login': # app确认登录 return self.confirm_login(request_dict, response) else: tko = TokenObject( request.META.get('HTTP_AUTHORIZATION'), returntpye='pc') if tko.code != 0: return response.json(tko.code) response.lang = tko.lang user_id = tko.userID if operation == 'get-device': # 获取设备列表 return self.get_device(response, user_id) else: return response.json(404) @staticmethod def get_user_info(user_id): """ 获取用户信息 @param user_id: 用户id @return: response """ device_user_qs = Device_User.objects.filter(userID=user_id).values('NickName', 'userIconPath', 'userIconUrl') if not device_user_qs.exists(): user_icon_url = '' nick_name = '' else: users = device_user_qs.first() nick_name = users['NickName'] user_icon_path = str(users['userIconPath']) if user_icon_path: user_icon_path = user_icon_path.replace('static/', '').replace('\\', '/') user_icon_url = SERVER_DOMAIN + 'account/getAvatar/' + user_icon_path else: user_icon_url = '' return user_icon_url, nick_name @staticmethod def generate_qr_code(response): """ 网页生成二维码 @param response: 响应对象 @return: response """ nwo_time = time.time() redis_obj = RedisObject() try: uuid_number = hashlib.md5((str(uuid.uuid1()) + str(nwo_time)).encode('utf-8')).hexdigest() flag = redis_obj.set_ex_data(uuid_number, 0, 300) # redis记录uuid,状态为生成二维码 res = {'type': 'autologin', 'id': uuid_number} if flag: return response.json(0, res) else: return response.json(119) except Exception as e: print(e) return response.json(500) @staticmethod def get_scanning_status(request_dict, response): """ 获取app扫码状态 @param request_dict: 请求参数 @request_dict serial_number: 序列号 @param response: 响应对象 @return: response """ uuid_number = request_dict.get('uuid', None) if not uuid_number: return response.json(444, {'error param': 'uuid'}) try: redis_obj = RedisObject() status = redis_obj.get_data(uuid_number) if status is False: return response.json(119) elif status == '1' or status == '2': res = {'status': 1} # 已扫码 else: res = {'status': 0} # 未扫码 return response.json(0, res) except Exception as e: print(e) return response.json(500) @staticmethod def pc_login(request_dict, response): """ pc端登录 @param request_dict: 请求参数 @request_dict serial_number: 序列号 @param response: 响应对象 @return: response """ uuid_number = request_dict.get('uuid', None) if not uuid_number: return response.json(444, {'error param': 'uuid'}) try: redis_obj = RedisObject() status = redis_obj.get_data(uuid_number) token = redis_obj.get_data(uuid_number + 'token') if status is False or token is False: return response.json(119) elif status == '2': # 已登录 token_obj = TokenObject(token) response.lang = token_obj.lang if token_obj.code != 0: return response.json(token_obj.code) user_id = token_obj.userID user_icon_url, nick_name = UserRelatedView.get_user_info(user_id) res = {'status': 1, 'userIconUrl': user_icon_url, 'nickName': nick_name, 'token': token} redis_obj.del_data(uuid_number) redis_obj.del_data(uuid_number + 'token') else: # 未登录 res = {'status': 0} return response.json(0, res) except Exception as e: print(e) return response.json(500) @staticmethod def web_login(request_dict, response): """ 网页登录 @param request_dict: 请求参数 @request_dict serial_number: 序列号 @param response: 响应对象 @return: response """ uuid_number = request_dict.get('uuid', None) confirm = request_dict.get('confirm', None) if not all([uuid_number, confirm]): return response.json(444, {'error param': 'uuid or confirm'}) try: redis_obj = RedisObject() if confirm == '1': # 取消登录 redis_obj.del_data(uuid_number) redis_obj.del_data(uuid_number + 'token') return response.json(0) token = redis_obj.get_data(uuid_number + 'token') ttl = redis_obj.get_ttl(uuid_number) if token is False or ttl <= 0: return response.json(119) result = redis_obj.set_ex_data(uuid_number, 2, ttl) # 修改uuid状态为已登录 if result is False: return response.json(119) return response.json(0) except Exception as e: print(e) return response.json(500) @staticmethod def confirm_login(request_dict, response): """ app确认登录 @param request_dict: 请求参数 @request_dict serial_number: 序列号 @param response: 响应对象 @return: response """ uuid_number = request_dict.get('uuid', None) token = request_dict.get('token', None) if not all([uuid_number, token]): return response.json(444, {'error param': 'uuid or token'}) redis_obj = RedisObject() try: status = redis_obj.get_data(uuid_number) ttl = redis_obj.get_ttl(uuid_number) if status is False or ttl <= 0: return response.json(119) result1 = redis_obj.set_ex_data(uuid_number, 1, ttl) # 修改uuid状态为已扫码 result2 = redis_obj.set_ex_data(uuid_number + 'token', token, ttl) if result1 is False or result2 is False: return response.json(119) return response.json(0) except Exception as e: print(e) return response.json(500) @staticmethod def get_device(response, user_id): """ 获取设备列表 @param response: 响应对象 @param user_id: 用户id @return: response """ try: device_qs = Device_Info.objects.filter(userID=user_id).values('serial_number', 'NickName') return response.json(0, list(device_qs)) except Exception as e: print(e) return response.json(500) class KVSView(View): def get(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') return self.validation(request.GET, request, operation) def post(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') return self.validation(request.POST, request, operation) def validation(self, request_dict, request, operation): response = ResponseObject() if operation == 'create-media': # 创建视频流 return self.create_media(request_dict, response) elif operation == 'update-data-retention': # 修改视频流数据保留时间 return self.update_data_retention(request_dict, response) elif operation == 'get-sts-token': # 获取临时token return self.get_sts_token(request_dict, response) elif operation == 'createSignalChannel': # 创建通道 return self.create_signal_channel(request_dict, response) elif operation == 'SendAlexaOfferToMaster': # 发送Alexa offer return self.send_alexa_offer_to_master(request_dict, response) elif operation == 'deleteSignalChannel': # 删除通道 return self.delete_signal_channel(request_dict, response) elif operation == 'sendMqttCommand': # 发送MQTT return self.send_mqtt_command(request_dict, response) elif operation == 'getAlexaAnswer': # 测试获取Alexa answer return self.get_alexa_answer(request_dict, response) else: # tko = TokenObject(request.META.get('HTTP_AUTHORIZATION')) # if tko.code != 0: # return response.json(tko.code) # response.lang = tko.lang # user_id = tko.userID if operation == 'get-device-midea-list': # 获取设备列表 return self.get_device_midea_list(request_dict, response) elif operation == 'get-hls-midea': # 获取视频播放地址 return self.get_hls_midea_url(request_dict, response) elif operation == 'download-clip': # 获取视频播放地址 return self.download_clip(request_dict, response) else: return response.json(404) @staticmethod def create_media(request_dict, response): """ 创建视频流 @param request_dict: 请求参数 @request_dict serial_number: 序列号 @param response: 响应对象 @return: response """ serial_number = request_dict.get('serial_number', None) try: kvs_qs = KVS.objects.filter(stream_name=serial_number) if kvs_qs.exists(): return response.json(174) kinesis_video_obj = AmazonKinesisVideoObject( aws_access_key_id=ACCESS_KEY_ID, secret_access_key=SECRET_ACCESS_KEY, region_name=KVS_REGION ) stream_arn = kinesis_video_obj.create_stream(stream_name=serial_number) if stream_arn: now_time = int(time.time()) KVS.objects.create(stream_name=serial_number, stream_arn=stream_arn, created_time=now_time, updated_time=now_time) return response.json(0) else: return response.json(178) except Exception as e: print(e) return response.json(500) @staticmethod def update_data_retention(request_dict, response): """ 修改视频流数据保留时间 @param request_dict: 请求参数 @request_dict serial_number: 序列号 @request_dict operation: 操作,增加/减少 @request_dict data_retention_change_in_hours: 修改的时间 @param response: 响应对象 @return: response """ serial_number = request_dict.get('serial_number', None) operation = request_dict.get('operation', None) data_retention_change_in_hours = request_dict.get('data_retention_change_in_hours', None) try: kvs_qs = KVS.objects.filter(stream_name=serial_number) if not kvs_qs.exists(): return response.json(173) kinesis_video_obj = AmazonKinesisVideoObject( aws_access_key_id=ACCESS_KEY_ID, secret_access_key=SECRET_ACCESS_KEY, region_name=KVS_REGION ) now_time = int(time.time()) data_retention_change_in_hours = int(data_retention_change_in_hours) kinesis_video_obj.update_data_retention(stream_name=serial_number, operation=operation, data_retention_change_in_hours=data_retention_change_in_hours) kvs_qs.update(data_retention_in_hours=data_retention_change_in_hours, updated_time=now_time) return response.json(0) except Exception as e: print(e) return response.json(500) @staticmethod def get_hls_midea_url(request_dict, response): """ 获取视频播放地址 @param request_dict: 请求参数 @request_dict serial_number: 序列号 @request_dict startTime: 开始时间 @request_dict endTime: 结束时间 @request_dict playMode: 播放模式 @param response: 响应对象 @return: response """ serial_number = request_dict.get('serial_number', None) start_time = request_dict.get('startTime', None) end_time = request_dict.get('endTime', None) play_mode = request_dict.get('playMode', None) if not all([serial_number, start_time, end_time, play_mode]): return response.json(444) start_time = datetime.datetime.fromtimestamp(int(start_time)) - datetime.timedelta(hours=8) end_time = datetime.datetime.fromtimestamp(int(end_time)) - datetime.timedelta(hours=8) play_mode = int(play_mode) play_mode = 'ON_DEMAND' if play_mode == 0 else 'LIVE_REPLAY' try: # kvs_qs = KVS.objects.filter(stream_name=serial_number) # if not kvs_qs.exists(): # return response.json(174) kinesis_video_obj = AmazonKVAMObject( aws_access_key_id='AKIA2E67UIMD45Y3HL53', secret_access_key='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', region_name='us-east-1', stream_name=serial_number, api_name='GET_HLS_STREAMING_SESSION_URL' ) hls_streaming_session_url = kinesis_video_obj.get_hls_streaming_session_url(serial_number, start_time, end_time, play_mode) return response.json(0, {"HlsStreamingSessionUrl": hls_streaming_session_url}) except Exception as e: print(e) return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) @staticmethod def get_device_midea_list(request_dict, response): """ 获取视频播放列表 @param request_dict: 请求参数 @request_dict serial_number: 序列号 @request_dict startTime: 开始时间 @request_dict endTime: 结束时间 @param response: 响应对象 @return: response """ serial_number = request_dict.get('serial_number', None) start_time = request_dict.get('startTime', None) end_time = request_dict.get('endTime', None) page = request_dict.get('page', None) size = request_dict.get('size', None) if not all([serial_number, start_time, end_time, page, size]): return response.json(444) page = int(page) size = int(size) start_time = datetime.datetime.fromtimestamp(int(start_time)) - datetime.timedelta(hours=8) end_time = datetime.datetime.fromtimestamp(int(end_time)) - datetime.timedelta(hours=8) try: # kvs_qs = KVS.objects.filter(stream_name=serial_number) # if not kvs_qs.exists(): # return response.json(174) kinesis_fragments_obj = AmazonKVAMObject( aws_access_key_id='AKIA2E67UIMD45Y3HL53', secret_access_key='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', region_name='us-east-1', stream_name=serial_number, api_name='LIST_FRAGMENTS' ) kinesis_images_obj = AmazonKVAMObject( aws_access_key_id='AKIA2E67UIMD45Y3HL53', secret_access_key='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', region_name='us-east-1', stream_name=serial_number, api_name='GET_IMAGES' ) stream_list = kinesis_fragments_obj.get_list_fragments(serial_number, start_time, end_time) total_page = len(stream_list) stream_list = stream_list[(page - 1) * size:page * size] for item in stream_list: temp_start_time = (item['startTime'] - datetime.timedelta(hours=8)).replace( tzinfo=datetime.timezone.utc) temp_end_time = temp_start_time + datetime.timedelta(seconds=300) item['image'] = kinesis_images_obj.get_images(serial_number, temp_start_time, temp_end_time) item['startTime'] = int(item['startTime'].timestamp()) item['endTime'] = int(item['endTime'].timestamp()) res = { 'totalPage': total_page, 'fragments': stream_list } return response.json(0, res) except Exception as e: print(e) return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) @staticmethod def download_clip(request_dict, response): """ 获取视频播放地址 @param request_dict: 请求参数 @request_dict serial_number: 序列号 @request_dict startTime: 开始时间 @request_dict endTime: 结束时间 @param response: 响应对象 @return: response """ serial_number = request_dict.get('serial_number', None) start_time = request_dict.get('startTime', None) end_time = request_dict.get('endTime', None) if not all([serial_number, start_time, end_time]): return response.json(444) start_time = datetime.datetime.fromtimestamp(int(start_time)) - datetime.timedelta(hours=8) end_time = datetime.datetime.fromtimestamp(int(end_time)) - datetime.timedelta(hours=8) try: # kvs_qs = KVS.objects.filter(stream_name=serial_number) # if not kvs_qs.exists(): # return response.json(174) kinesis_video_obj = AmazonKVAMObject( aws_access_key_id='AKIA2E67UIMD45Y3HL53', secret_access_key='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', region_name='us-east-1', stream_name=serial_number, api_name='GET_CLIP' ) clip_obj, clip_size = kinesis_video_obj.get_clip(serial_number, start_time, end_time) res = HttpResponse(clip_obj.read()) res["content_type"] = "video/mp4" res["Content-Disposition"] = "attachment;filename=video.mp4" res['Content-Length'] = str(clip_size) return res except Exception as e: print(e) return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) @staticmethod def get_sts_token(request_dict, response): """ 获取临时token @param request_dict: 请求参数 @request_dict uid: 设备uid @param response: 响应对象 @return: response """ uid = request_dict.get('uid', None) # if not all([]): # return response.json(444) try: sts_client_conn = boto3.client( 'sts', aws_access_key_id=ACCESS_KEY_ID, aws_secret_access_key=SECRET_ACCESS_KEY, region_name=KVS_REGION ) sts_obj = sts_client_conn.get_session_token(DurationSeconds=129600) res = { 'AccessKeyId': sts_obj['Credentials']['AccessKeyId'], 'AccessKeySecret': sts_obj['Credentials']['SecretAccessKey'], 'SessionToken': sts_obj['Credentials']['SessionToken'], 'Expiration': str(sts_obj['Credentials']['Expiration']) } return response.json(0, res) except Exception as e: print(e) return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) @staticmethod def create_signal_channel(request_dict, response): """ 创建信号通道 @param request_dict: 请求参数 @request_dict serial: 序列号 @param response: 响应对象 @return: response """ serial = request_dict.get('serial', None) if not all([serial]): return response.json(444) try: # 获取并判断region_id是否有效 region_id = CommonService.confirm_region_id() if region_id not in REGION_ID_LIST: return response.json(444, {'invalid region_id': region_id}) # 获取iot:CredentialProvider endpoint_type = 'iot:CredentialProvider' iot_client = IOTClient(region_id) iot_credential_provider_endpoint = iot_client.describe_iot_endpoint(endpoint_type) # 已有数据直接返回 res = { 'region': KVS_REGION, 'role_alias': 'KvsCameraIoTRoleAlias', 'iot_credential_provider_endpoint': iot_credential_provider_endpoint, } channel_name = 'Ansjer_Device_{}'.format(serial) kvs = KVS.objects.filter(channel_name=channel_name) if kvs.exists(): return response.json(0, res) kinesis_video_obj = AmazonKinesisVideoObject( aws_access_key_id=ACCESS_KEY_ID, secret_access_key=SECRET_ACCESS_KEY, region_name=KVS_REGION ) channel_arn = kinesis_video_obj.create_signaling_channel(channel_name=channel_name) now_time = int(time.time()) KVS.objects.create( channel_name=channel_name, channel_arn=channel_arn, channel_ttl=60, created_time=now_time, updated_time=now_time ) return response.json(0, res) except Exception as e: return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) @classmethod def send_alexa_offer_to_master(cls, request_dict, response): """ 发送Alexa offer @param request_dict: 请求参数 @request_dict serial: 序列号 @param response: 响应对象 @return: response """ uid = request_dict.get('uid', None) sdp_offer = request_dict.get('sdp_offer', None) if not all([uid, sdp_offer]): return response.json(444) try: serial = CommonService.get_serial_number_by_uid(uid) channel_name = 'Ansjer_Device_{}'.format(serial) kvs_qs = KVS.objects.filter(channel_name=channel_name).values('channel_arn') if not kvs_qs.exists(): return response.json(173) channel_arn = kvs_qs[0]['channel_arn'] kinesis_video_obj = AmazonKinesisVideoObject( aws_access_key_id=ACCESS_KEY_ID, secret_access_key=SECRET_ACCESS_KEY, region_name=KVS_REGION ) endpoint = kinesis_video_obj.get_signaling_channel_endpoint(channel_arn) url = '{}/v1/send-alexa-offer-to-master'.format(endpoint) # 构造请求 body client_id = hashlib.md5((str(uuid.uuid1()) + str(int(time.time()))).encode('utf-8')).hexdigest() # offer转base64 offer = { 'type': 'offer', 'sdp': sdp_offer } offer = cls.dict_to_base64(offer) LOGGER.info('offer:{}'.format(offer)) payload = { 'ChannelARN': channel_arn, 'SenderClientId': client_id, 'MessagePayload': offer } # 构造 AWSRequest 并签名 req = AWSRequest(method='POST', url=url, data=json.dumps(payload)) credentials = Credentials(ACCESS_KEY_ID, SECRET_ACCESS_KEY) SigV4Auth(credentials, 'kinesisvideo', KVS_REGION).add_auth(req) # 使用 requests 发送签名后的请求 headers = dict(req.headers) headers['Content-Type'] = 'application/json' r = requests.post(url, headers=headers, data=json.dumps(payload)) assert r.status_code == 200 LOGGER.info('SendAlexaOfferToMaster响应: {}'.format(r.json())) sdp_answer = r.json()['Answer'] assert sdp_answer # answer转字典 sdp_answer = cls.base64_to_dict(sdp_answer) sdp_answer = sdp_answer['sdp'] res = { 'sdp_answer': sdp_answer } return response.json(0, res) except Exception as e: return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) @staticmethod def send_mqtt_command(request_dict, response): """ 发送MQTT指令控制设备开始/停止推流 """ uid = request_dict.get('uid', None) enable = request_dict.get('enable', '1') if not uid: return response.json(444) try: steam_name = 'rtsp://{}:554/{}/{}'.format(WEBRTC_DOMAIN_NAME, ZLMEDIAKIT_APP_NAME, uid) thing_name = CommonService.get_serial_number_by_uid(uid) # 存在序列号则为使用序列号作为物品名 topic_name = 'ansjer/generic/{}'.format(thing_name) msg = OrderedDict( [ ('alexaRtspCommand', steam_name), ('enable', int(enable)), ] ) if not CommonService.req_publish_mqtt_msg(thing_name, topic_name, msg): return response.json(10044) return response.json(0) except Exception as e: return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) @classmethod def get_alexa_answer(cls, request_dict, response): uid = request_dict.get('uid', None) sdp_offer = request_dict.get('sdp_offer', None) if not all([uid, sdp_offer]): return response.json(444) try: cls.mediamtx_add_steam(uid) result = cls.get_mediamtx_sdp_answer(uid, sdp_offer) # 成功获取到SDP answer res = { 'sdp_answer': result['sdp_answer'] } LOGGER.info('获取Alexa answer响应内容: {}'.format(res)) return response.json(0, res) except Exception as e: return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) @staticmethod def dict_to_base64(data_dict: dict) -> str: # 手动构建JSON字符串,避免自动转义 json_str = '{\n "type": "%s",\n "sdp": "%s"\n}' % ( data_dict["type"], data_dict["sdp"].replace('"', '\\"').replace('\r', '\\r').replace('\n', '\\n') ) # 转换为bytes并进行base64编码 base64_data = base64.b64encode(json_str.encode('utf-8')) return base64_data.decode('utf-8') @staticmethod def base64_to_dict(encoded_str: str) -> dict: decoded_bytes = base64.b64decode(encoded_str) return json.loads(decoded_bytes.decode('utf-8')) @staticmethod def delete_signal_channel(request_dict, response): """ 删除信号通道 @param request_dict: 请求参数 @request_dict serial: 序列号 @param response: 响应对象 @return: response """ serial = request_dict.get('serial', None) if not all([serial]): return response.json(444) try: channel_name = 'Ansjer_Device_{}'.format(serial) kvs = KVS.objects.filter(channel_name=channel_name).first() if not kvs: return response.json(173) channel_arn = kvs.channel_arn kinesis_video_obj = AmazonKinesisVideoObject( aws_access_key_id=ACCESS_KEY_ID, secret_access_key=SECRET_ACCESS_KEY, region_name=KVS_REGION ) kinesis_video_obj.delete_signaling_channel(channel_arn=channel_arn) kvs.delete() return response.json(0) except Exception as e: return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) @staticmethod def fix_sdp_directly(sdp_text): """ 直接修复SDP格式问题,按照四个主要修复点: 1. 标准化格式:去除多余空格,跳过空行,确保每行以\r\n结束。 2. 将方向属性从sendrecv改为recvonly(针对WHEP拉流)。 3. 移除客户端的ssrc行(a=ssrc:),因为不需要发送媒体。 4. 移除客户端的msid行(a=msid:),以匹配recvonly模式。 """ lines = sdp_text.split('\r\n') fixed_lines = [] in_audio = False in_video = False for line in lines: # 1. 标准化格式:去除尾部空格,跳过空行 line = line.rstrip() if not line: continue # 检查当前部分(audio或video) if line.startswith('m=audio'): in_audio = True in_video = False elif line.startswith('m=video'): in_audio = False in_video = True # 2. 将a=sendrecv改为a=recvonly(在audio和video部分) if line == 'a=sendrecv' and (in_audio or in_video): line = 'a=recvonly' # 3 & 4. 移除ssrc和msid相关行(客户端发送意图) if line.startswith('a=ssrc:') or line.startswith('a=msid:'): continue fixed_lines.append(line) return '\r\n'.join(fixed_lines) @staticmethod def zlmediakit_add_stream(uid: str) -> str: """ zlmediakit添加rtsp推流 @param uid: @return: """ dst_url = 'rtsp://{}:{}/{}'.format('127.0.0.1', 8554, uid) data = { 'secret': ZLMEDIAKIT_SECRET, 'schema': 'rtsp', 'vhost': '__defaultVhost__', 'app': ZLMEDIAKIT_APP_NAME, 'stream': uid, 'dst_url': dst_url } url = 'http://{}/index/api/addStreamPusherProxy'.format(WEBRTC_DOMAIN_NAME) r = requests.post(url, data=data, timeout=5) r_json = r.json() LOGGER.info('获取Alexa answer-ZLMediaKit添加rtsp推流响应: {}'.format(r_json)) assert r_json['code'] == 0 return r_json['data']['key'] @staticmethod def zlmediakit_del_stream(key: str) -> None: """ zlmediakit删除rtsp推流 @param key: addStreamPusherProxy接口返回的key @return: """ data = { 'secret': ZLMEDIAKIT_SECRET, 'key': key } url = 'http://{}/index/api/delStreamPusherProxy'.format(WEBRTC_DOMAIN_NAME) r = requests.post(url, data=data, timeout=5) r_json = r.json() LOGGER.info('获取Alexa answer-ZLMediaKit删除rtsp推流响应: {}'.format(r_json)) assert r_json['code'] == 0 @staticmethod def mediamtx_add_steam(uid: str): """ mediamtx添加流 @param uid: @return: """ headers = { "Content-Type": "application/json", "Authorization": ZLMEDIAKIT_AUTH } source = 'rtsp://{}:{}/live/{}'.format('127.0.0.1', 554, uid) data = { 'source': source, 'sourceOnDemand': True } url = 'http://{}:{}/v3/config/paths/add/{}'.format(WEBRTC_DOMAIN_NAME, 9997, uid) r = requests.post(url, headers=headers, data=json.dumps(data), timeout=10) LOGGER.info('获取Alexa answer-mediamtx添加流响应: {}'.format(r.text)) @staticmethod def get_mediamtx_sdp_answer(uid: str, sdp_offer: str) -> dict: """ 获取mediamtx sdp answer @param uid: @param sdp_offer: @return: 包含sdp_answer的字典或错误字典 """ headers = { 'Content-Type': 'application/sdp', "Authorization": ZLMEDIAKIT_AUTH_WHEP } url = 'http://{}:{}/{}/whep'.format(WEBRTC_DOMAIN_NAME, 8889, uid) try: r = requests.post(url, headers=headers, data=sdp_offer, timeout=10) r.raise_for_status() # 检查HTTP状态码 r_text = r.text LOGGER.info('获取Alexa answer-获取mediamtx sdp answer响应: {}'.format(r_text)) # mediamtx的WHEP端点返回的是SDP文本,不是JSON # 检查响应是否为有效的SDP格式(以"v="开头) if r_text.strip().startswith('v='): return {'sdp_answer': r_text} else: # 如果不是SDP格式,尝试解析为JSON try: return json.loads(r_text) except json.JSONDecodeError: try: # 如果直接解析失败,尝试处理单引号JSON import ast return ast.literal_eval(r_text) except (ValueError, SyntaxError): # 如果所有解析方法都失败,返回包含原始响应的错误字典 return {'error': 'Failed to parse response', 'raw_response': r_text} except requests.exceptions.RequestException as e: LOGGER.error('获取Alexa answer-获取mediamtx sdp answer请求失败: {}'.format(str(e))) return {'error': 'Request failed', 'details': str(e)} except Exception as e: LOGGER.error('获取Alexa answer-获取mediamtx sdp answer发生未知错误: {}'.format(str(e))) return {'error': 'Unknown error', 'details': str(e)}