123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684 |
- # -*- coding: utf-8 -*-
- """
- @Author : Rocky
- @Time : 2022/10/18 9:48
- @File :KVSController.py
- """
- import base64
- import hashlib
- import json
- import time
- import uuid
- import datetime
- import boto3
- import botocore
- import requests
- from django.http import HttpResponse
- from django.views import View
- from Model.models import KVS, Device_User, Device_Info
- from Object.AWS.AmazonKinesisVideoUtil import AmazonKinesisVideoObject, AmazonKVAMObject
- from Object.IOTCore.IotObject import IOTClient
- from Object.RedisObject import RedisObject
- from Object.ResponseObject import ResponseObject
- from Ansjer.config import ACCESS_KEY_ID, SECRET_ACCESS_KEY, SERVER_DOMAIN, LOGGER, KVS_REGION, REGION_ID_LIST
- from Object.TokenObject import TokenObject
- from botocore.auth import SigV4Auth
- from botocore.awsrequest import AWSRequest
- from botocore.credentials import Credentials
- from Service.CommonService import CommonService
- class UserRelatedView(View):
- def get(self, request, *args, **kwargs):
- request.encoding = 'utf-8'
- operation = kwargs.get('operation')
- return self.validation(request.GET, operation, request)
- def post(self, request, *args, **kwargs):
- request.encoding = 'utf-8'
- operation = kwargs.get('operation')
- return self.validation(request.POST, operation, request)
- def validation(self, request_dict, operation, request):
- response = ResponseObject()
- if operation == 'generate-qr-code': # 网页生成二维码
- return self.generate_qr_code(response)
- elif operation == 'get-scanning-status': # 确认app是否扫码
- return self.get_scanning_status(request_dict, response)
- elif operation == 'web-login': # 网页登录
- return self.web_login(request_dict, response)
- elif operation == 'pc-login': # pc端登录
- return self.pc_login(request_dict, response)
- elif operation == 'confirm-login': # app确认登录
- return self.confirm_login(request_dict, response)
- else:
- tko = TokenObject(
- request.META.get('HTTP_AUTHORIZATION'),
- returntpye='pc')
- if tko.code != 0:
- return response.json(tko.code)
- response.lang = tko.lang
- user_id = tko.userID
- if operation == 'get-device': # 获取设备列表
- return self.get_device(response, user_id)
- else:
- return response.json(404)
- @staticmethod
- def get_user_info(user_id):
- """
- 获取用户信息
- @param user_id: 用户id
- @return: response
- """
- device_user_qs = Device_User.objects.filter(userID=user_id).values('NickName', 'userIconPath',
- 'userIconUrl')
- if not device_user_qs.exists():
- user_icon_url = ''
- nick_name = ''
- else:
- users = device_user_qs.first()
- nick_name = users['NickName']
- user_icon_path = str(users['userIconPath'])
- if user_icon_path:
- user_icon_path = user_icon_path.replace('static/', '').replace('\\', '/')
- user_icon_url = SERVER_DOMAIN + 'account/getAvatar/' + user_icon_path
- else:
- user_icon_url = ''
- return user_icon_url, nick_name
- @staticmethod
- def generate_qr_code(response):
- """
- 网页生成二维码
- @param response: 响应对象
- @return: response
- """
- nwo_time = time.time()
- redis_obj = RedisObject()
- try:
- uuid_number = hashlib.md5((str(uuid.uuid1()) + str(nwo_time)).encode('utf-8')).hexdigest()
- flag = redis_obj.set_ex_data(uuid_number, 0, 300) # redis记录uuid,状态为生成二维码
- res = {'type': 'autologin', 'id': uuid_number}
- if flag:
- return response.json(0, res)
- else:
- return response.json(119)
- except Exception as e:
- print(e)
- return response.json(500)
- @staticmethod
- def get_scanning_status(request_dict, response):
- """
- 获取app扫码状态
- @param request_dict: 请求参数
- @request_dict serial_number: 序列号
- @param response: 响应对象
- @return: response
- """
- uuid_number = request_dict.get('uuid', None)
- if not uuid_number:
- return response.json(444, {'error param': 'uuid'})
- try:
- redis_obj = RedisObject()
- status = redis_obj.get_data(uuid_number)
- if status is False:
- return response.json(119)
- elif status == '1' or status == '2':
- res = {'status': 1} # 已扫码
- else:
- res = {'status': 0} # 未扫码
- return response.json(0, res)
- except Exception as e:
- print(e)
- return response.json(500)
- @staticmethod
- def pc_login(request_dict, response):
- """
- pc端登录
- @param request_dict: 请求参数
- @request_dict serial_number: 序列号
- @param response: 响应对象
- @return: response
- """
- uuid_number = request_dict.get('uuid', None)
- if not uuid_number:
- return response.json(444, {'error param': 'uuid'})
- try:
- redis_obj = RedisObject()
- status = redis_obj.get_data(uuid_number)
- token = redis_obj.get_data(uuid_number + 'token')
- if status is False or token is False:
- return response.json(119)
- elif status == '2': # 已登录
- token_obj = TokenObject(token)
- response.lang = token_obj.lang
- if token_obj.code != 0:
- return response.json(token_obj.code)
- user_id = token_obj.userID
- user_icon_url, nick_name = UserRelatedView.get_user_info(user_id)
- res = {'status': 1, 'userIconUrl': user_icon_url, 'nickName': nick_name, 'token': token}
- redis_obj.del_data(uuid_number)
- redis_obj.del_data(uuid_number + 'token')
- else: # 未登录
- res = {'status': 0}
- return response.json(0, res)
- except Exception as e:
- print(e)
- return response.json(500)
- @staticmethod
- def web_login(request_dict, response):
- """
- 网页登录
- @param request_dict: 请求参数
- @request_dict serial_number: 序列号
- @param response: 响应对象
- @return: response
- """
- uuid_number = request_dict.get('uuid', None)
- confirm = request_dict.get('confirm', None)
- if not all([uuid_number, confirm]):
- return response.json(444, {'error param': 'uuid or confirm'})
- try:
- redis_obj = RedisObject()
- if confirm == '1': # 取消登录
- redis_obj.del_data(uuid_number)
- redis_obj.del_data(uuid_number + 'token')
- return response.json(0)
- token = redis_obj.get_data(uuid_number + 'token')
- ttl = redis_obj.get_ttl(uuid_number)
- if token is False or ttl <= 0:
- return response.json(119)
- result = redis_obj.set_ex_data(uuid_number, 2, ttl) # 修改uuid状态为已登录
- if result is False:
- return response.json(119)
- return response.json(0)
- except Exception as e:
- print(e)
- return response.json(500)
- @staticmethod
- def confirm_login(request_dict, response):
- """
- app确认登录
- @param request_dict: 请求参数
- @request_dict serial_number: 序列号
- @param response: 响应对象
- @return: response
- """
- uuid_number = request_dict.get('uuid', None)
- token = request_dict.get('token', None)
- if not all([uuid_number, token]):
- return response.json(444, {'error param': 'uuid or token'})
- redis_obj = RedisObject()
- try:
- status = redis_obj.get_data(uuid_number)
- ttl = redis_obj.get_ttl(uuid_number)
- if status is False or ttl <= 0:
- return response.json(119)
- result1 = redis_obj.set_ex_data(uuid_number, 1, ttl) # 修改uuid状态为已扫码
- result2 = redis_obj.set_ex_data(uuid_number + 'token', token, ttl)
- if result1 is False or result2 is False:
- return response.json(119)
- return response.json(0)
- except Exception as e:
- print(e)
- return response.json(500)
- @staticmethod
- def get_device(response, user_id):
- """
- 获取设备列表
- @param response: 响应对象
- @param user_id: 用户id
- @return: response
- """
- try:
- device_qs = Device_Info.objects.filter(userID=user_id).values('serial_number', 'NickName')
- return response.json(0, list(device_qs))
- except Exception as e:
- print(e)
- return response.json(500)
- class KVSView(View):
- def get(self, request, *args, **kwargs):
- request.encoding = 'utf-8'
- operation = kwargs.get('operation')
- return self.validation(request.GET, request, operation)
- def post(self, request, *args, **kwargs):
- request.encoding = 'utf-8'
- operation = kwargs.get('operation')
- return self.validation(request.POST, request, operation)
- def validation(self, request_dict, request, operation):
- response = ResponseObject()
- if operation == 'create-media': # 创建视频流
- return self.create_media(request_dict, response)
- elif operation == 'update-data-retention': # 修改视频流数据保留时间
- return self.update_data_retention(request_dict, response)
- elif operation == 'get-sts-token': # 获取临时token
- return self.get_sts_token(request_dict, response)
- elif operation == 'createSignalChannel': # 创建通道
- return self.create_signal_channel(request_dict, response)
- elif operation == 'SendAlexaOfferToMaster': # 发送Alexa offer
- return self.send_alexa_offer_to_master(request_dict, response)
- elif operation == 'deleteSignalChannel': # 删除通道
- return self.delete_signal_channel(request_dict, response)
- else:
- # tko = TokenObject(request.META.get('HTTP_AUTHORIZATION'))
- # if tko.code != 0:
- # return response.json(tko.code)
- # response.lang = tko.lang
- # user_id = tko.userID
- if operation == 'get-device-midea-list': # 获取设备列表
- return self.get_device_midea_list(request_dict, response)
- elif operation == 'get-hls-midea': # 获取视频播放地址
- return self.get_hls_midea_url(request_dict, response)
- elif operation == 'download-clip': # 获取视频播放地址
- return self.download_clip(request_dict, response)
- else:
- return response.json(404)
- @staticmethod
- def create_media(request_dict, response):
- """
- 创建视频流
- @param request_dict: 请求参数
- @request_dict serial_number: 序列号
- @param response: 响应对象
- @return: response
- """
- serial_number = request_dict.get('serial_number', None)
- try:
- kvs_qs = KVS.objects.filter(stream_name=serial_number)
- if kvs_qs.exists():
- return response.json(174)
- kinesis_video_obj = AmazonKinesisVideoObject(
- aws_access_key_id=ACCESS_KEY_ID,
- secret_access_key=SECRET_ACCESS_KEY,
- region_name=KVS_REGION
- )
- stream_arn = kinesis_video_obj.create_stream(stream_name=serial_number)
- if stream_arn:
- now_time = int(time.time())
- KVS.objects.create(stream_name=serial_number, stream_arn=stream_arn, created_time=now_time,
- updated_time=now_time)
- return response.json(0)
- else:
- return response.json(178)
- except Exception as e:
- print(e)
- return response.json(500)
- @staticmethod
- def update_data_retention(request_dict, response):
- """
- 修改视频流数据保留时间
- @param request_dict: 请求参数
- @request_dict serial_number: 序列号
- @request_dict operation: 操作,增加/减少
- @request_dict data_retention_change_in_hours: 修改的时间
- @param response: 响应对象
- @return: response
- """
- serial_number = request_dict.get('serial_number', None)
- operation = request_dict.get('operation', None)
- data_retention_change_in_hours = request_dict.get('data_retention_change_in_hours', None)
- try:
- kvs_qs = KVS.objects.filter(stream_name=serial_number)
- if not kvs_qs.exists():
- return response.json(173)
- kinesis_video_obj = AmazonKinesisVideoObject(
- aws_access_key_id=ACCESS_KEY_ID,
- secret_access_key=SECRET_ACCESS_KEY,
- region_name=KVS_REGION
- )
- now_time = int(time.time())
- data_retention_change_in_hours = int(data_retention_change_in_hours)
- kinesis_video_obj.update_data_retention(stream_name=serial_number, operation=operation,
- data_retention_change_in_hours=data_retention_change_in_hours)
- kvs_qs.update(data_retention_in_hours=data_retention_change_in_hours, updated_time=now_time)
- return response.json(0)
- except Exception as e:
- print(e)
- return response.json(500)
- @staticmethod
- def get_hls_midea_url(request_dict, response):
- """
- 获取视频播放地址
- @param request_dict: 请求参数
- @request_dict serial_number: 序列号
- @request_dict startTime: 开始时间
- @request_dict endTime: 结束时间
- @request_dict playMode: 播放模式
- @param response: 响应对象
- @return: response
- """
- serial_number = request_dict.get('serial_number', None)
- start_time = request_dict.get('startTime', None)
- end_time = request_dict.get('endTime', None)
- play_mode = request_dict.get('playMode', None)
- if not all([serial_number, start_time, end_time, play_mode]):
- return response.json(444)
- start_time = datetime.datetime.fromtimestamp(int(start_time)) - datetime.timedelta(hours=8)
- end_time = datetime.datetime.fromtimestamp(int(end_time)) - datetime.timedelta(hours=8)
- play_mode = int(play_mode)
- play_mode = 'ON_DEMAND' if play_mode == 0 else 'LIVE_REPLAY'
- try:
- # kvs_qs = KVS.objects.filter(stream_name=serial_number)
- # if not kvs_qs.exists():
- # return response.json(174)
- kinesis_video_obj = AmazonKVAMObject(
- aws_access_key_id='AKIA2E67UIMD45Y3HL53',
- secret_access_key='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',
- region_name='us-east-1',
- stream_name=serial_number,
- api_name='GET_HLS_STREAMING_SESSION_URL'
- )
- hls_streaming_session_url = kinesis_video_obj.get_hls_streaming_session_url(serial_number, start_time,
- end_time, play_mode)
- return response.json(0, {"HlsStreamingSessionUrl": hls_streaming_session_url})
- except Exception as e:
- print(e)
- return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
- @staticmethod
- def get_device_midea_list(request_dict, response):
- """
- 获取视频播放列表
- @param request_dict: 请求参数
- @request_dict serial_number: 序列号
- @request_dict startTime: 开始时间
- @request_dict endTime: 结束时间
- @param response: 响应对象
- @return: response
- """
- serial_number = request_dict.get('serial_number', None)
- start_time = request_dict.get('startTime', None)
- end_time = request_dict.get('endTime', None)
- page = request_dict.get('page', None)
- size = request_dict.get('size', None)
- if not all([serial_number, start_time, end_time, page, size]):
- return response.json(444)
- page = int(page)
- size = int(size)
- start_time = datetime.datetime.fromtimestamp(int(start_time)) - datetime.timedelta(hours=8)
- end_time = datetime.datetime.fromtimestamp(int(end_time)) - datetime.timedelta(hours=8)
- try:
- # kvs_qs = KVS.objects.filter(stream_name=serial_number)
- # if not kvs_qs.exists():
- # return response.json(174)
- kinesis_fragments_obj = AmazonKVAMObject(
- aws_access_key_id='AKIA2E67UIMD45Y3HL53',
- secret_access_key='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',
- region_name='us-east-1',
- stream_name=serial_number,
- api_name='LIST_FRAGMENTS'
- )
- kinesis_images_obj = AmazonKVAMObject(
- aws_access_key_id='AKIA2E67UIMD45Y3HL53',
- secret_access_key='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',
- region_name='us-east-1',
- stream_name=serial_number,
- api_name='GET_IMAGES'
- )
- stream_list = kinesis_fragments_obj.get_list_fragments(serial_number, start_time, end_time)
- total_page = len(stream_list)
- stream_list = stream_list[(page - 1) * size:page * size]
- for item in stream_list:
- temp_start_time = (item['startTime'] - datetime.timedelta(hours=8)).replace(
- tzinfo=datetime.timezone.utc)
- temp_end_time = temp_start_time + datetime.timedelta(seconds=300)
- item['image'] = kinesis_images_obj.get_images(serial_number, temp_start_time, temp_end_time)
- item['startTime'] = int(item['startTime'].timestamp())
- item['endTime'] = int(item['endTime'].timestamp())
- res = {
- 'totalPage': total_page,
- 'fragments': stream_list
- }
- return response.json(0, res)
- except Exception as e:
- print(e)
- return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
- @staticmethod
- def download_clip(request_dict, response):
- """
- 获取视频播放地址
- @param request_dict: 请求参数
- @request_dict serial_number: 序列号
- @request_dict startTime: 开始时间
- @request_dict endTime: 结束时间
- @param response: 响应对象
- @return: response
- """
- serial_number = request_dict.get('serial_number', None)
- start_time = request_dict.get('startTime', None)
- end_time = request_dict.get('endTime', None)
- if not all([serial_number, start_time, end_time]):
- return response.json(444)
- start_time = datetime.datetime.fromtimestamp(int(start_time)) - datetime.timedelta(hours=8)
- end_time = datetime.datetime.fromtimestamp(int(end_time)) - datetime.timedelta(hours=8)
- try:
- # kvs_qs = KVS.objects.filter(stream_name=serial_number)
- # if not kvs_qs.exists():
- # return response.json(174)
- kinesis_video_obj = AmazonKVAMObject(
- aws_access_key_id='AKIA2E67UIMD45Y3HL53',
- secret_access_key='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',
- region_name='us-east-1',
- stream_name=serial_number,
- api_name='GET_CLIP'
- )
- clip_obj, clip_size = kinesis_video_obj.get_clip(serial_number, start_time, end_time)
- res = HttpResponse(clip_obj.read())
- res["content_type"] = "video/mp4"
- res["Content-Disposition"] = "attachment;filename=video.mp4"
- res['Content-Length'] = str(clip_size)
- return res
- except Exception as e:
- print(e)
- return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
- @staticmethod
- def get_sts_token(request_dict, response):
- """
- 获取临时token
- @param request_dict: 请求参数
- @request_dict uid: 设备uid
- @param response: 响应对象
- @return: response
- """
- uid = request_dict.get('uid', None)
- # if not all([]):
- # return response.json(444)
- try:
- sts_client_conn = boto3.client(
- 'sts',
- aws_access_key_id=ACCESS_KEY_ID,
- aws_secret_access_key=SECRET_ACCESS_KEY,
- region_name=KVS_REGION
- )
- sts_obj = sts_client_conn.get_session_token(DurationSeconds=129600)
- res = {
- 'AccessKeyId': sts_obj['Credentials']['AccessKeyId'],
- 'AccessKeySecret': sts_obj['Credentials']['SecretAccessKey'],
- 'SessionToken': sts_obj['Credentials']['SessionToken'],
- 'Expiration': str(sts_obj['Credentials']['Expiration'])
- }
- return response.json(0, res)
- except Exception as e:
- print(e)
- return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
- @staticmethod
- def create_signal_channel(request_dict, response):
- """
- 创建信号通道
- @param request_dict: 请求参数
- @request_dict serial: 序列号
- @param response: 响应对象
- @return: response
- """
- serial = request_dict.get('serial', None)
- if not all([serial]):
- return response.json(444)
- try:
- # 获取并判断region_id是否有效
- region_id = CommonService.confirm_region_id()
- if region_id not in REGION_ID_LIST:
- return response.json(444, {'invalid region_id': region_id})
- # 获取iot:CredentialProvider
- endpoint_type = 'iot:CredentialProvider'
- iot_client = IOTClient(region_id)
- iot_credential_provider_endpoint = iot_client.describe_iot_endpoint(endpoint_type)
- # 已有数据直接返回
- res = {
- 'region': KVS_REGION,
- 'role_alias': 'KvsCameraIoTRoleAlias',
- 'iot_credential_provider_endpoint': iot_credential_provider_endpoint,
- }
- channel_name = 'Ansjer_Device_{}'.format(serial)
- kvs = KVS.objects.filter(channel_name=channel_name)
- if kvs.exists():
- return response.json(0, res)
- kinesis_video_obj = AmazonKinesisVideoObject(
- aws_access_key_id=ACCESS_KEY_ID,
- secret_access_key=SECRET_ACCESS_KEY,
- region_name=KVS_REGION
- )
- channel_arn = kinesis_video_obj.create_signaling_channel(channel_name=channel_name)
- now_time = int(time.time())
- KVS.objects.create(
- channel_name=channel_name, channel_arn=channel_arn, channel_ttl=60,
- created_time=now_time, updated_time=now_time
- )
- return response.json(0, res)
- except Exception as e:
- return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
- @classmethod
- def send_alexa_offer_to_master(cls, request_dict, response):
- """
- 发送Alexa offer
- @param request_dict: 请求参数
- @request_dict serial: 序列号
- @param response: 响应对象
- @return: response
- """
- uid = request_dict.get('uid', None)
- sdp_offer = request_dict.get('sdp_offer', None)
- if not all([uid, sdp_offer]):
- return response.json(444)
- if uid == '517J385BNUGP3CPP111A':
- uid = 'NUWGTV5TUK8G8VSS111A'
- try:
- serial = CommonService.get_serial_number_by_uid(uid)
- kvs_qs = KVS.objects.filter(stream_name=serial).values('channel_arn')
- if not kvs_qs.exists():
- return response.json(173)
- channel_arn = kvs_qs[0]['channel_arn']
- kinesis_video_obj = AmazonKinesisVideoObject(
- aws_access_key_id=ACCESS_KEY_ID,
- secret_access_key=SECRET_ACCESS_KEY,
- region_name=KVS_REGION
- )
- endpoint = kinesis_video_obj.get_signaling_channel_endpoint(channel_arn)
- url = '{}/v1/send-alexa-offer-to-master'.format(endpoint)
- # 构造请求 body
- client_id = hashlib.md5((str(uuid.uuid1()) + str(int(time.time()))).encode('utf-8')).hexdigest()
- # offer转base64
- offer = {
- 'type': 'offer',
- 'sdp': sdp_offer
- }
- offer = cls.dict_to_base64(offer)
- LOGGER.info('offer:{}'.format(offer))
- payload = {
- 'ChannelARN': channel_arn,
- 'SenderClientId': client_id,
- 'MessagePayload': offer
- }
- # 构造 AWSRequest 并签名
- req = AWSRequest(method='POST', url=url, data=json.dumps(payload))
- credentials = Credentials(ACCESS_KEY_ID, SECRET_ACCESS_KEY)
- SigV4Auth(credentials, 'kinesisvideo', KVS_REGION).add_auth(req)
- # 使用 requests 发送签名后的请求
- headers = dict(req.headers)
- headers['Content-Type'] = 'application/json'
- r = requests.post(url, headers=headers, data=json.dumps(payload))
- assert r.status_code == 200
- LOGGER.info('SendAlexaOfferToMaster响应: {}'.format(r.json()))
- sdp_answer = r.json()['Answer']
- assert sdp_answer
- # answer转字典
- sdp_answer = cls.base64_to_dict(sdp_answer)
- sdp_answer = sdp_answer['sdp']
- res = {
- 'sdp_answer': sdp_answer
- }
- return response.json(0, res)
- except Exception as e:
- return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
- @staticmethod
- def dict_to_base64(data_dict: dict) -> str:
- # 手动构建JSON字符串,避免自动转义
- json_str = '{\n "type": "%s",\n "sdp": "%s"\n}' % (
- data_dict["type"],
- data_dict["sdp"].replace('"', '\\"').replace('\r', '\\r').replace('\n', '\\n')
- )
- # 转换为bytes并进行base64编码
- base64_data = base64.b64encode(json_str.encode('utf-8'))
- return base64_data.decode('utf-8')
- @staticmethod
- def base64_to_dict(encoded_str: str) -> dict:
- decoded_bytes = base64.b64decode(encoded_str)
- return json.loads(decoded_bytes.decode('utf-8'))
- @staticmethod
- def delete_signal_channel(request_dict, response):
- """
- 删除信号通道
- @param request_dict: 请求参数
- @request_dict serial: 序列号
- @param response: 响应对象
- @return: response
- """
- serial = request_dict.get('serial', None)
- if not all([serial]):
- return response.json(444)
- try:
- channel_name = 'Ansjer_Device_{}'.format(serial)
- kvs = KVS.objects.filter(channel_name=channel_name).first()
- if not kvs:
- return response.json(173)
- channel_arn = kvs.channel_arn
- kinesis_video_obj = AmazonKinesisVideoObject(
- aws_access_key_id=ACCESS_KEY_ID,
- secret_access_key=SECRET_ACCESS_KEY,
- region_name=KVS_REGION
- )
- kinesis_video_obj.delete_signaling_channel(channel_arn=channel_arn)
- kvs.delete()
- return response.json(0)
- except Exception as e:
- return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
|