Bläddra i källkod

同步KVS,webrtc相关代码,定时更新套餐状态

locky 2 dagar sedan
förälder
incheckning
8fe8cb66c2

+ 5 - 0
Ansjer/config.py

@@ -48,6 +48,11 @@ PAY_TYPE_CDK = 11
 
 # 域名
 SERIAL_DOMAIN_NAME = 'serial.zositechc.cn:7880'
+WEBRTC_DOMAIN_NAME = '173.255.220.180'
+ZLMEDIAKIT_AUTH = 'Bearer ansjer:ansjer@api88888!!!'
+ZLMEDIAKIT_AUTH_WHEP = 'Bearer ansjer:ansjer@88888!!!'
+ZLMEDIAKIT_SECRET = 'IkhE3B0aBJrMHJHKfdgfdhfJIUjiooKY'
+ZLMEDIAKIT_APP_NAME = 'live'
 
 # 华为云配置
 HUAWEICLOUD_OBS_SERVER = 'https://obs.cn-east-3.myhuaweicloud.com'

+ 417 - 8
Controller/AWS/KVSController.py

@@ -4,20 +4,31 @@
 @Time : 2022/10/18 9:48
 @File :KVSController.py
 """
+import base64
 import hashlib
+import json
 import time
 import uuid
 import datetime
+from collections import OrderedDict
 
+import boto3
+import requests
 from django.http import HttpResponse
 from django.views import View
 
 from Model.models import KVS, Device_User, Device_Info
 from Object.AWS.AmazonKinesisVideoUtil import AmazonKinesisVideoObject, AmazonKVAMObject
+from Object.IOTCore.IotObject import IOTClient
 from Object.RedisObject import RedisObject
 from Object.ResponseObject import ResponseObject
-from Ansjer.config import REGION_NAME, SERVER_DOMAIN
+from Ansjer.config import SERVER_DOMAIN, LOGGER, KVS_REGION, REGION_ID_LIST, WEBRTC_DOMAIN_NAME, \
+    ZLMEDIAKIT_SECRET, ZLMEDIAKIT_APP_NAME, ZLMEDIAKIT_AUTH, ZLMEDIAKIT_AUTH_WHEP
 from Object.TokenObject import TokenObject
+from botocore.auth import SigV4Auth
+from botocore.awsrequest import AWSRequest
+from botocore.credentials import Credentials
+from Service.CommonService import CommonService
 from django.conf import settings
 ACCESS_KEY_ID = settings.ACCESS_KEY_ID
 SECRET_ACCESS_KEY = settings.SECRET_ACCESS_KEY
@@ -256,6 +267,18 @@ class KVSView(View):
             return self.create_media(request_dict, response)
         elif operation == 'update-data-retention':  # 修改视频流数据保留时间
             return self.update_data_retention(request_dict, response)
+        elif operation == 'get-sts-token':  # 获取临时token
+            return self.get_sts_token(request_dict, response)
+        elif operation == 'createSignalChannel':  # 创建通道
+            return self.create_signal_channel(request_dict, response)
+        elif operation == 'SendAlexaOfferToMaster':  # 发送Alexa offer
+            return self.send_alexa_offer_to_master(request_dict, response)
+        elif operation == 'deleteSignalChannel':  # 删除通道
+            return self.delete_signal_channel(request_dict, response)
+        elif operation == 'sendMqttCommand':  # 发送MQTT
+            return self.send_mqtt_command(request_dict, response)
+        elif operation == 'getAlexaAnswer':  # 测试获取Alexa answer
+            return self.get_alexa_answer(request_dict, response)
         else:
             # tko = TokenObject(request.META.get('HTTP_AUTHORIZATION'))
             # if tko.code != 0:
@@ -289,7 +312,7 @@ class KVSView(View):
             kinesis_video_obj = AmazonKinesisVideoObject(
                 aws_access_key_id=ACCESS_KEY_ID,
                 secret_access_key=SECRET_ACCESS_KEY,
-                region_name=REGION_NAME
+                region_name=KVS_REGION
             )
             stream_arn = kinesis_video_obj.create_stream(stream_name=serial_number)
             if stream_arn:
@@ -321,11 +344,11 @@ class KVSView(View):
         try:
             kvs_qs = KVS.objects.filter(stream_name=serial_number)
             if not kvs_qs.exists():
-                return response.json(174)
+                return response.json(173)
             kinesis_video_obj = AmazonKinesisVideoObject(
                 aws_access_key_id=ACCESS_KEY_ID,
                 secret_access_key=SECRET_ACCESS_KEY,
-                region_name=REGION_NAME
+                region_name=KVS_REGION
             )
             now_time = int(time.time())
             data_retention_change_in_hours = int(data_retention_change_in_hours)
@@ -375,7 +398,7 @@ class KVSView(View):
             return response.json(0, {"HlsStreamingSessionUrl": hls_streaming_session_url})
         except Exception as e:
             print(e)
-            return response.json(500, repr(e))
+            return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
 
     @staticmethod
     def get_device_midea_list(request_dict, response):
@@ -421,7 +444,8 @@ class KVSView(View):
             total_page = len(stream_list)
             stream_list = stream_list[(page - 1) * size:page * size]
             for item in stream_list:
-                temp_start_time = (item['startTime'] - datetime.timedelta(hours=8)).replace(tzinfo=datetime.timezone.utc)
+                temp_start_time = (item['startTime'] - datetime.timedelta(hours=8)).replace(
+                    tzinfo=datetime.timezone.utc)
                 temp_end_time = temp_start_time + datetime.timedelta(seconds=300)
                 item['image'] = kinesis_images_obj.get_images(serial_number, temp_start_time, temp_end_time)
                 item['startTime'] = int(item['startTime'].timestamp())
@@ -433,7 +457,7 @@ class KVSView(View):
             return response.json(0, res)
         except Exception as e:
             print(e)
-            return response.json(500, repr(e))
+            return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
 
     @staticmethod
     def download_clip(request_dict, response):
@@ -472,4 +496,389 @@ class KVSView(View):
             return res
         except Exception as e:
             print(e)
-            return response.json(500, repr(e))
+            return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
+
+    @staticmethod
+    def get_sts_token(request_dict, response):
+        """
+        获取临时token
+        @param request_dict: 请求参数
+        @request_dict uid: 设备uid
+        @param response: 响应对象
+        @return: response
+        """
+        uid = request_dict.get('uid', None)
+        # if not all([]):
+        #     return response.json(444)
+        try:
+            sts_client_conn = boto3.client(
+                'sts',
+                aws_access_key_id=ACCESS_KEY_ID,
+                aws_secret_access_key=SECRET_ACCESS_KEY,
+                region_name=KVS_REGION
+            )
+            sts_obj = sts_client_conn.get_session_token(DurationSeconds=129600)
+            res = {
+                'AccessKeyId': sts_obj['Credentials']['AccessKeyId'],
+                'AccessKeySecret': sts_obj['Credentials']['SecretAccessKey'],
+                'SessionToken': sts_obj['Credentials']['SessionToken'],
+                'Expiration': str(sts_obj['Credentials']['Expiration'])
+            }
+            return response.json(0, res)
+        except Exception as e:
+            print(e)
+            return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
+
+    @staticmethod
+    def create_signal_channel(request_dict, response):
+        """
+        创建信号通道
+        @param request_dict: 请求参数
+        @request_dict serial: 序列号
+        @param response: 响应对象
+        @return: response
+        """
+        serial = request_dict.get('serial', None)
+        if not all([serial]):
+            return response.json(444)
+        try:
+            # 获取并判断region_id是否有效
+            region_id = CommonService.confirm_region_id()
+            if region_id not in REGION_ID_LIST:
+                return response.json(444, {'invalid region_id': region_id})
+            # 获取iot:CredentialProvider
+            endpoint_type = 'iot:CredentialProvider'
+            iot_client = IOTClient(region_id)
+            iot_credential_provider_endpoint = iot_client.describe_iot_endpoint(endpoint_type)
+            # 已有数据直接返回
+            res = {
+                'region': KVS_REGION,
+                'role_alias': 'KvsCameraIoTRoleAlias',
+                'iot_credential_provider_endpoint': iot_credential_provider_endpoint,
+            }
+            channel_name = 'Ansjer_Device_{}'.format(serial)
+            kvs = KVS.objects.filter(channel_name=channel_name)
+            if kvs.exists():
+                return response.json(0, res)
+
+            kinesis_video_obj = AmazonKinesisVideoObject(
+                aws_access_key_id=ACCESS_KEY_ID,
+                secret_access_key=SECRET_ACCESS_KEY,
+                region_name=KVS_REGION
+            )
+            channel_arn = kinesis_video_obj.create_signaling_channel(channel_name=channel_name)
+            now_time = int(time.time())
+            KVS.objects.create(
+                channel_name=channel_name, channel_arn=channel_arn, channel_ttl=60,
+                created_time=now_time, updated_time=now_time
+            )
+            return response.json(0, res)
+        except Exception as e:
+            return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
+
+    @classmethod
+    def send_alexa_offer_to_master(cls, request_dict, response):
+        """
+        发送Alexa offer
+        @param request_dict: 请求参数
+        @request_dict serial: 序列号
+        @param response: 响应对象
+        @return: response
+        """
+        uid = request_dict.get('uid', None)
+        sdp_offer = request_dict.get('sdp_offer', None)
+        if not all([uid, sdp_offer]):
+            return response.json(444)
+
+        try:
+            serial = CommonService.get_serial_number_by_uid(uid)
+            channel_name = 'Ansjer_Device_{}'.format(serial)
+            kvs_qs = KVS.objects.filter(channel_name=channel_name).values('channel_arn')
+            if not kvs_qs.exists():
+                return response.json(173)
+            channel_arn = kvs_qs[0]['channel_arn']
+
+            kinesis_video_obj = AmazonKinesisVideoObject(
+                aws_access_key_id=ACCESS_KEY_ID,
+                secret_access_key=SECRET_ACCESS_KEY,
+                region_name=KVS_REGION
+            )
+            endpoint = kinesis_video_obj.get_signaling_channel_endpoint(channel_arn)
+            url = '{}/v1/send-alexa-offer-to-master'.format(endpoint)
+            # 构造请求 body
+            client_id = hashlib.md5((str(uuid.uuid1()) + str(int(time.time()))).encode('utf-8')).hexdigest()
+            # offer转base64
+            offer = {
+                'type': 'offer',
+                'sdp': sdp_offer
+            }
+            offer = cls.dict_to_base64(offer)
+            LOGGER.info('offer:{}'.format(offer))
+            payload = {
+                'ChannelARN': channel_arn,
+                'SenderClientId': client_id,
+                'MessagePayload': offer
+            }
+
+            # 构造 AWSRequest 并签名
+            req = AWSRequest(method='POST', url=url, data=json.dumps(payload))
+            credentials = Credentials(ACCESS_KEY_ID, SECRET_ACCESS_KEY)
+            SigV4Auth(credentials, 'kinesisvideo', KVS_REGION).add_auth(req)
+
+            # 使用 requests 发送签名后的请求
+            headers = dict(req.headers)
+            headers['Content-Type'] = 'application/json'
+
+            r = requests.post(url, headers=headers, data=json.dumps(payload))
+            assert r.status_code == 200
+            LOGGER.info('SendAlexaOfferToMaster响应: {}'.format(r.json()))
+            sdp_answer = r.json()['Answer']
+            assert sdp_answer
+            # answer转字典
+            sdp_answer = cls.base64_to_dict(sdp_answer)
+            sdp_answer = sdp_answer['sdp']
+            res = {
+                'sdp_answer': sdp_answer
+            }
+            return response.json(0, res)
+        except Exception as e:
+            return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
+
+    @staticmethod
+    def send_mqtt_command(request_dict, response):
+        """
+        发送MQTT指令控制设备开始/停止推流
+        """
+        uid = request_dict.get('uid', None)
+        enable = request_dict.get('enable', '1')
+
+        if not uid:
+            return response.json(444)
+
+        try:
+            steam_name = 'rtsp://{}:554/{}/{}'.format(WEBRTC_DOMAIN_NAME, ZLMEDIAKIT_APP_NAME, uid)
+            thing_name = CommonService.get_serial_number_by_uid(uid)  # 存在序列号则为使用序列号作为物品名
+            topic_name = 'ansjer/generic/{}'.format(thing_name)
+
+            msg = OrderedDict(
+                [
+                    ('alexaRtspCommand', steam_name),
+                    ('enable', int(enable)),
+                ]
+            )
+            if not CommonService.req_publish_mqtt_msg(thing_name, topic_name, msg):
+                return response.json(10044)
+            return response.json(0)
+        except Exception as e:
+            return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
+
+    @classmethod
+    def get_alexa_answer(cls, request_dict, response):
+        uid = request_dict.get('uid', None)
+        sdp_offer = request_dict.get('sdp_offer', None)
+
+        if not all([uid, sdp_offer]):
+            return response.json(444)
+        try:
+            cls.mediamtx_add_steam(uid)
+            result = cls.get_mediamtx_sdp_answer(uid, sdp_offer)
+
+            # 成功获取到SDP answer
+            res = {
+                'sdp_answer': result['sdp_answer']
+            }
+            LOGGER.info('获取Alexa answer响应内容: {}'.format(res))
+            return response.json(0, res)
+        except Exception as e:
+            return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
+
+    @staticmethod
+    def dict_to_base64(data_dict: dict) -> str:
+        # 手动构建JSON字符串,避免自动转义
+        json_str = '{\n  "type": "%s",\n  "sdp": "%s"\n}' % (
+            data_dict["type"],
+            data_dict["sdp"].replace('"', '\\"').replace('\r', '\\r').replace('\n', '\\n')
+        )
+
+        # 转换为bytes并进行base64编码
+        base64_data = base64.b64encode(json_str.encode('utf-8'))
+        return base64_data.decode('utf-8')
+
+    @staticmethod
+    def base64_to_dict(encoded_str: str) -> dict:
+        decoded_bytes = base64.b64decode(encoded_str)
+        return json.loads(decoded_bytes.decode('utf-8'))
+
+    @staticmethod
+    def delete_signal_channel(request_dict, response):
+        """
+        删除信号通道
+        @param request_dict: 请求参数
+        @request_dict serial: 序列号
+        @param response: 响应对象
+        @return: response
+        """
+        serial = request_dict.get('serial', None)
+        if not all([serial]):
+            return response.json(444)
+        try:
+            channel_name = 'Ansjer_Device_{}'.format(serial)
+            kvs = KVS.objects.filter(channel_name=channel_name).first()
+            if not kvs:
+                return response.json(173)
+            channel_arn = kvs.channel_arn
+            kinesis_video_obj = AmazonKinesisVideoObject(
+                aws_access_key_id=ACCESS_KEY_ID,
+                secret_access_key=SECRET_ACCESS_KEY,
+                region_name=KVS_REGION
+            )
+            kinesis_video_obj.delete_signaling_channel(channel_arn=channel_arn)
+            kvs.delete()
+            return response.json(0)
+        except Exception as e:
+            return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
+
+    @staticmethod
+    def fix_sdp_directly(sdp_text):
+        """
+        直接修复SDP格式问题,按照四个主要修复点:
+        1. 标准化格式:去除多余空格,跳过空行,确保每行以\r\n结束。
+        2. 将方向属性从sendrecv改为recvonly(针对WHEP拉流)。
+        3. 移除客户端的ssrc行(a=ssrc:),因为不需要发送媒体。
+        4. 移除客户端的msid行(a=msid:),以匹配recvonly模式。
+        """
+        lines = sdp_text.split('\r\n')
+        fixed_lines = []
+        in_audio = False
+        in_video = False
+
+        for line in lines:
+            # 1. 标准化格式:去除尾部空格,跳过空行
+            line = line.rstrip()
+            if not line:
+                continue
+
+            # 检查当前部分(audio或video)
+            if line.startswith('m=audio'):
+                in_audio = True
+                in_video = False
+            elif line.startswith('m=video'):
+                in_audio = False
+                in_video = True
+
+            # 2. 将a=sendrecv改为a=recvonly(在audio和video部分)
+            if line == 'a=sendrecv' and (in_audio or in_video):
+                line = 'a=recvonly'
+
+            # 3 & 4. 移除ssrc和msid相关行(客户端发送意图)
+            if line.startswith('a=ssrc:') or line.startswith('a=msid:'):
+                continue
+
+            fixed_lines.append(line)
+
+        return '\r\n'.join(fixed_lines)
+
+    @staticmethod
+    def zlmediakit_add_stream(uid: str) -> str:
+        """
+        zlmediakit添加rtsp推流
+        @param uid:
+        @return:
+        """
+        dst_url = 'rtsp://{}:{}/{}'.format('127.0.0.1', 8554, uid)
+        data = {
+            'secret': ZLMEDIAKIT_SECRET,
+            'schema': 'rtsp',
+            'vhost': '__defaultVhost__',
+            'app': ZLMEDIAKIT_APP_NAME,
+            'stream': uid,
+            'dst_url': dst_url
+        }
+        url = 'http://{}/index/api/addStreamPusherProxy'.format(WEBRTC_DOMAIN_NAME)
+        r = requests.post(url, data=data, timeout=5)
+        r_json = r.json()
+        LOGGER.info('获取Alexa answer-ZLMediaKit添加rtsp推流响应: {}'.format(r_json))
+        assert r_json['code'] == 0
+        return r_json['data']['key']
+
+    @staticmethod
+    def zlmediakit_del_stream(key: str) -> None:
+        """
+        zlmediakit删除rtsp推流
+        @param key: addStreamPusherProxy接口返回的key
+        @return:
+        """
+        data = {
+            'secret': ZLMEDIAKIT_SECRET,
+            'key': key
+        }
+        url = 'http://{}/index/api/delStreamPusherProxy'.format(WEBRTC_DOMAIN_NAME)
+        r = requests.post(url, data=data, timeout=5)
+        r_json = r.json()
+        LOGGER.info('获取Alexa answer-ZLMediaKit删除rtsp推流响应: {}'.format(r_json))
+        assert r_json['code'] == 0
+
+    @staticmethod
+    def mediamtx_add_steam(uid: str):
+        """
+        mediamtx添加流
+        @param uid:
+        @return:
+        """
+        headers = {
+            "Content-Type": "application/json",
+            "Authorization": ZLMEDIAKIT_AUTH
+        }
+        source = 'rtsp://{}:{}/live/{}'.format('127.0.0.1', 554, uid)
+        data = {
+            'source': source,
+            'sourceOnDemand': True
+        }
+        url = 'http://{}:{}/v3/config/paths/add/{}'.format(WEBRTC_DOMAIN_NAME, 9997, uid)
+        r = requests.post(url, headers=headers, data=json.dumps(data), timeout=10)
+        LOGGER.info('获取Alexa answer-mediamtx添加流响应: {}'.format(r.text))
+
+    @staticmethod
+    def get_mediamtx_sdp_answer(uid: str, sdp_offer: str) -> dict:
+        """
+        获取mediamtx sdp answer
+        @param uid:
+        @param sdp_offer:
+        @return: 包含sdp_answer的字典或错误字典
+        """
+        headers = {
+            'Content-Type': 'application/sdp',
+            "Authorization": ZLMEDIAKIT_AUTH_WHEP
+        }
+        url = 'http://{}:{}/{}/whep'.format(WEBRTC_DOMAIN_NAME, 8889, uid)
+        
+        try:
+            r = requests.post(url, headers=headers, data=sdp_offer, timeout=10)
+            r.raise_for_status()  # 检查HTTP状态码
+            
+            r_text = r.text
+            LOGGER.info('获取Alexa answer-获取mediamtx sdp answer响应: {}'.format(r_text))
+            
+            # mediamtx的WHEP端点返回的是SDP文本,不是JSON
+            # 检查响应是否为有效的SDP格式(以"v="开头)
+            if r_text.strip().startswith('v='):
+                return {'sdp_answer': r_text}
+            else:
+                # 如果不是SDP格式,尝试解析为JSON
+                try:
+                    return json.loads(r_text)
+                except json.JSONDecodeError:
+                    try:
+                        # 如果直接解析失败,尝试处理单引号JSON
+                        import ast
+                        return ast.literal_eval(r_text)
+                    except (ValueError, SyntaxError):
+                        # 如果所有解析方法都失败,返回包含原始响应的错误字典
+                        return {'error': 'Failed to parse response', 'raw_response': r_text}
+                    
+        except requests.exceptions.RequestException as e:
+            LOGGER.error('获取Alexa answer-获取mediamtx sdp answer请求失败: {}'.format(str(e)))
+            return {'error': 'Request failed', 'details': str(e)}
+        except Exception as e:
+            LOGGER.error('获取Alexa answer-获取mediamtx sdp answer发生未知错误: {}'.format(str(e)))
+            return {'error': 'Unknown error', 'details': str(e)}

+ 23 - 0
Controller/Cron/CronTaskController.py

@@ -498,6 +498,8 @@ class CronUpdateDataView(View):
             return self.update_vod_meal(request_dict, response)
         elif operation == 'checkCustomizedPush':  # 定时检查定制化推送,重新执行没有推送成功的请求
             return self.check_customized_push(response)
+        elif operation == 'packageSwitch':
+            return self.package_switch(request_dict, response)
         else:
             return response.json(404)
 
@@ -911,6 +913,27 @@ class CronUpdateDataView(View):
         except Exception as e:
             return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
 
+    def package_switch(self, request_dict, response):
+        """定时任务启动,控制活动套餐上下架"""
+        try:
+            action = int(request_dict.get('action'))
+            if action not in [0, 1]:
+                return response.json(444, "参数错误")
+            if action == 1:
+                LOGGER.info(f'套餐开始上架')
+            else:
+                LOGGER.info(f'套餐开始下架')
+            Store_Meal.objects.filter(
+                id__gte=52,
+                id__lte=63
+            ).update(
+                is_show=0 if action == 1 else 1
+            )
+
+            return response.json(0)
+        except Exception as e:
+            return response.json(500, f"error_line:{e.__traceback__.tb_lineno}, error_msg:{repr(e)}")
+
 
 class CronCollectDataView(View):
     def get(self, request, *args, **kwargs):

+ 64 - 2
Object/AWS/AmazonKinesisVideoUtil.py

@@ -4,9 +4,9 @@
 @Time : 2022/10/18 17:13
 @File :AmazonKinesisVideoUtil.py
 """
-import datetime
-
 import boto3
+import datetime
+from Ansjer.config import LOGGER
 
 
 class AmazonKinesisVideoObject:
@@ -74,6 +74,68 @@ class AmazonKinesisVideoObject:
         """
         return self.client_conn.get_data_endpoint(StreamName=stream_name, APIName=api_name)['DataEndpoint']
 
+    def create_signaling_channel(self, channel_name):
+        """
+        创建通道,如果通道已存在则直接返回其ARN
+        @param channel_name: 通道名称
+        @return: channel_arn: 通道ARN
+        """
+        try:
+            # 先检查信令通道是否已存在
+            response = self.client_conn.describe_signaling_channel(ChannelName=channel_name)
+            # 如果通道存在,直接返回其ARN
+            channel_arn = response['ChannelInfo']['ChannelARN']
+            return channel_arn
+        except self.client_conn.exceptions.ResourceNotFoundException:
+            # 通道不存在,创建新通道
+            res = self.client_conn.create_signaling_channel(ChannelName=channel_name)
+            channel_arn = res['ChannelARN']
+            return channel_arn
+        except Exception as e:
+            # 处理其他可能的异常
+            LOGGER.info('创建通道异常,error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
+            # 尝试创建通道
+            res = self.client_conn.create_signaling_channel(ChannelName=channel_name)
+            channel_arn = res['ChannelARN']
+            return channel_arn
+
+    def get_signaling_channel_endpoint(self, channel_arn):
+        """
+        创建通道
+        @param channel_arn: 通道arn
+        @return stream_info: 视频流信息数据
+        """
+        res = self.client_conn.get_signaling_channel_endpoint(
+            ChannelARN=channel_arn,
+            SingleMasterChannelEndpointConfiguration={
+                'Protocols': ['HTTPS'],
+                'Role': 'MASTER'
+            }
+        )
+        endpoint = res['ResourceEndpointList'][0]['ResourceEndpoint']
+        return endpoint
+
+    def delete_signaling_channel(self, channel_arn):
+        """
+        删除通道
+        @param channel_arn: 通道ARN
+        @return: True 删除成功,False 删除失败或通道不存在
+        """
+        try:
+            # 先检查信令通道是否存在
+            self.client_conn.describe_signaling_channel(ChannelARN=channel_arn)
+            # 通道存在,执行删除操作
+            self.client_conn.delete_signaling_channel(ChannelARN=channel_arn)
+            return True
+        except self.client_conn.exceptions.ResourceNotFoundException:
+            # 通道不存在,记录日志并返回
+            LOGGER.info('删除通道失败:通道不存在,channel_arn: {}'.format(channel_arn))
+            return False
+        except Exception as e:
+            # 处理其他可能的异常
+            LOGGER.info('删除通道异常,error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
+            return False
+
 
 class AmazonKVAMObject:
     def __init__(self, aws_access_key_id, secret_access_key, region_name, stream_name, api_name):