# -*- coding: utf-8 -*- """ @Author : Rocky @Time : 2022/10/18 9:48 @File :KVSController.py """ import base64 import hashlib import json import time import uuid import datetime import boto3 import botocore import requests from django.http import HttpResponse from django.views import View from Model.models import KVS, Device_User, Device_Info from Object.AWS.AmazonKinesisVideoUtil import AmazonKinesisVideoObject, AmazonKVAMObject from Object.IOTCore.IotObject import IOTClient from Object.RedisObject import RedisObject from Object.ResponseObject import ResponseObject from Ansjer.config import ACCESS_KEY_ID, SECRET_ACCESS_KEY, SERVER_DOMAIN, LOGGER, KVS_REGION, REGION_ID_LIST from Object.TokenObject import TokenObject from botocore.auth import SigV4Auth from botocore.awsrequest import AWSRequest from botocore.credentials import Credentials from Service.CommonService import CommonService class UserRelatedView(View): def get(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') return self.validation(request.GET, operation, request) def post(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') return self.validation(request.POST, operation, request) def validation(self, request_dict, operation, request): response = ResponseObject() if operation == 'generate-qr-code': # 网页生成二维码 return self.generate_qr_code(response) elif operation == 'get-scanning-status': # 确认app是否扫码 return self.get_scanning_status(request_dict, response) elif operation == 'web-login': # 网页登录 return self.web_login(request_dict, response) elif operation == 'pc-login': # pc端登录 return self.pc_login(request_dict, response) elif operation == 'confirm-login': # app确认登录 return self.confirm_login(request_dict, response) else: tko = TokenObject( request.META.get('HTTP_AUTHORIZATION'), returntpye='pc') if tko.code != 0: return response.json(tko.code) response.lang = tko.lang user_id = tko.userID if operation == 'get-device': # 获取设备列表 return self.get_device(response, user_id) else: return response.json(404) @staticmethod def get_user_info(user_id): """ 获取用户信息 @param user_id: 用户id @return: response """ device_user_qs = Device_User.objects.filter(userID=user_id).values('NickName', 'userIconPath', 'userIconUrl') if not device_user_qs.exists(): user_icon_url = '' nick_name = '' else: users = device_user_qs.first() nick_name = users['NickName'] user_icon_path = str(users['userIconPath']) if user_icon_path: user_icon_path = user_icon_path.replace('static/', '').replace('\\', '/') user_icon_url = SERVER_DOMAIN + 'account/getAvatar/' + user_icon_path else: user_icon_url = '' return user_icon_url, nick_name @staticmethod def generate_qr_code(response): """ 网页生成二维码 @param response: 响应对象 @return: response """ nwo_time = time.time() redis_obj = RedisObject() try: uuid_number = hashlib.md5((str(uuid.uuid1()) + str(nwo_time)).encode('utf-8')).hexdigest() flag = redis_obj.set_ex_data(uuid_number, 0, 300) # redis记录uuid,状态为生成二维码 res = {'type': 'autologin', 'id': uuid_number} if flag: return response.json(0, res) else: return response.json(119) except Exception as e: print(e) return response.json(500) @staticmethod def get_scanning_status(request_dict, response): """ 获取app扫码状态 @param request_dict: 请求参数 @request_dict serial_number: 序列号 @param response: 响应对象 @return: response """ uuid_number = request_dict.get('uuid', None) if not uuid_number: return response.json(444, {'error param': 'uuid'}) try: redis_obj = RedisObject() status = redis_obj.get_data(uuid_number) if status is False: return response.json(119) elif status == '1' or status == '2': res = {'status': 1} # 已扫码 else: res = {'status': 0} # 未扫码 return response.json(0, res) except Exception as e: print(e) return response.json(500) @staticmethod def pc_login(request_dict, response): """ pc端登录 @param request_dict: 请求参数 @request_dict serial_number: 序列号 @param response: 响应对象 @return: response """ uuid_number = request_dict.get('uuid', None) if not uuid_number: return response.json(444, {'error param': 'uuid'}) try: redis_obj = RedisObject() status = redis_obj.get_data(uuid_number) token = redis_obj.get_data(uuid_number + 'token') if status is False or token is False: return response.json(119) elif status == '2': # 已登录 token_obj = TokenObject(token) response.lang = token_obj.lang if token_obj.code != 0: return response.json(token_obj.code) user_id = token_obj.userID user_icon_url, nick_name = UserRelatedView.get_user_info(user_id) res = {'status': 1, 'userIconUrl': user_icon_url, 'nickName': nick_name, 'token': token} redis_obj.del_data(uuid_number) redis_obj.del_data(uuid_number + 'token') else: # 未登录 res = {'status': 0} return response.json(0, res) except Exception as e: print(e) return response.json(500) @staticmethod def web_login(request_dict, response): """ 网页登录 @param request_dict: 请求参数 @request_dict serial_number: 序列号 @param response: 响应对象 @return: response """ uuid_number = request_dict.get('uuid', None) confirm = request_dict.get('confirm', None) if not all([uuid_number, confirm]): return response.json(444, {'error param': 'uuid or confirm'}) try: redis_obj = RedisObject() if confirm == '1': # 取消登录 redis_obj.del_data(uuid_number) redis_obj.del_data(uuid_number + 'token') return response.json(0) token = redis_obj.get_data(uuid_number + 'token') ttl = redis_obj.get_ttl(uuid_number) if token is False or ttl <= 0: return response.json(119) result = redis_obj.set_ex_data(uuid_number, 2, ttl) # 修改uuid状态为已登录 if result is False: return response.json(119) return response.json(0) except Exception as e: print(e) return response.json(500) @staticmethod def confirm_login(request_dict, response): """ app确认登录 @param request_dict: 请求参数 @request_dict serial_number: 序列号 @param response: 响应对象 @return: response """ uuid_number = request_dict.get('uuid', None) token = request_dict.get('token', None) if not all([uuid_number, token]): return response.json(444, {'error param': 'uuid or token'}) redis_obj = RedisObject() try: status = redis_obj.get_data(uuid_number) ttl = redis_obj.get_ttl(uuid_number) if status is False or ttl <= 0: return response.json(119) result1 = redis_obj.set_ex_data(uuid_number, 1, ttl) # 修改uuid状态为已扫码 result2 = redis_obj.set_ex_data(uuid_number + 'token', token, ttl) if result1 is False or result2 is False: return response.json(119) return response.json(0) except Exception as e: print(e) return response.json(500) @staticmethod def get_device(response, user_id): """ 获取设备列表 @param response: 响应对象 @param user_id: 用户id @return: response """ try: device_qs = Device_Info.objects.filter(userID=user_id).values('serial_number', 'NickName') return response.json(0, list(device_qs)) except Exception as e: print(e) return response.json(500) class KVSView(View): def get(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') return self.validation(request.GET, request, operation) def post(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') return self.validation(request.POST, request, operation) def validation(self, request_dict, request, operation): response = ResponseObject() if operation == 'create-media': # 创建视频流 return self.create_media(request_dict, response) elif operation == 'update-data-retention': # 修改视频流数据保留时间 return self.update_data_retention(request_dict, response) elif operation == 'get-sts-token': # 获取临时token return self.get_sts_token(request_dict, response) elif operation == 'createSignalChannel': # 创建通道 return self.create_signal_channel(request_dict, response) elif operation == 'SendAlexaOfferToMaster': # 发送Alexa offer return self.send_alexa_offer_to_master(request_dict, response) elif operation == 'deleteSignalChannel': # 删除通道 return self.delete_signal_channel(request_dict, response) else: # tko = TokenObject(request.META.get('HTTP_AUTHORIZATION')) # if tko.code != 0: # return response.json(tko.code) # response.lang = tko.lang # user_id = tko.userID if operation == 'get-device-midea-list': # 获取设备列表 return self.get_device_midea_list(request_dict, response) elif operation == 'get-hls-midea': # 获取视频播放地址 return self.get_hls_midea_url(request_dict, response) elif operation == 'download-clip': # 获取视频播放地址 return self.download_clip(request_dict, response) else: return response.json(404) @staticmethod def create_media(request_dict, response): """ 创建视频流 @param request_dict: 请求参数 @request_dict serial_number: 序列号 @param response: 响应对象 @return: response """ serial_number = request_dict.get('serial_number', None) try: kvs_qs = KVS.objects.filter(stream_name=serial_number) if kvs_qs.exists(): return response.json(174) kinesis_video_obj = AmazonKinesisVideoObject( aws_access_key_id=ACCESS_KEY_ID, secret_access_key=SECRET_ACCESS_KEY, region_name=KVS_REGION ) stream_arn = kinesis_video_obj.create_stream(stream_name=serial_number) if stream_arn: now_time = int(time.time()) KVS.objects.create(stream_name=serial_number, stream_arn=stream_arn, created_time=now_time, updated_time=now_time) return response.json(0) else: return response.json(178) except Exception as e: print(e) return response.json(500) @staticmethod def update_data_retention(request_dict, response): """ 修改视频流数据保留时间 @param request_dict: 请求参数 @request_dict serial_number: 序列号 @request_dict operation: 操作,增加/减少 @request_dict data_retention_change_in_hours: 修改的时间 @param response: 响应对象 @return: response """ serial_number = request_dict.get('serial_number', None) operation = request_dict.get('operation', None) data_retention_change_in_hours = request_dict.get('data_retention_change_in_hours', None) try: kvs_qs = KVS.objects.filter(stream_name=serial_number) if not kvs_qs.exists(): return response.json(173) kinesis_video_obj = AmazonKinesisVideoObject( aws_access_key_id=ACCESS_KEY_ID, secret_access_key=SECRET_ACCESS_KEY, region_name=KVS_REGION ) now_time = int(time.time()) data_retention_change_in_hours = int(data_retention_change_in_hours) kinesis_video_obj.update_data_retention(stream_name=serial_number, operation=operation, data_retention_change_in_hours=data_retention_change_in_hours) kvs_qs.update(data_retention_in_hours=data_retention_change_in_hours, updated_time=now_time) return response.json(0) except Exception as e: print(e) return response.json(500) @staticmethod def get_hls_midea_url(request_dict, response): """ 获取视频播放地址 @param request_dict: 请求参数 @request_dict serial_number: 序列号 @request_dict startTime: 开始时间 @request_dict endTime: 结束时间 @request_dict playMode: 播放模式 @param response: 响应对象 @return: response """ serial_number = request_dict.get('serial_number', None) start_time = request_dict.get('startTime', None) end_time = request_dict.get('endTime', None) play_mode = request_dict.get('playMode', None) if not all([serial_number, start_time, end_time, play_mode]): return response.json(444) start_time = datetime.datetime.fromtimestamp(int(start_time)) - datetime.timedelta(hours=8) end_time = datetime.datetime.fromtimestamp(int(end_time)) - datetime.timedelta(hours=8) play_mode = int(play_mode) play_mode = 'ON_DEMAND' if play_mode == 0 else 'LIVE_REPLAY' try: # kvs_qs = KVS.objects.filter(stream_name=serial_number) # if not kvs_qs.exists(): # return response.json(174) kinesis_video_obj = AmazonKVAMObject( aws_access_key_id='AKIA2E67UIMD45Y3HL53', secret_access_key='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', region_name='us-east-1', stream_name=serial_number, api_name='GET_HLS_STREAMING_SESSION_URL' ) hls_streaming_session_url = kinesis_video_obj.get_hls_streaming_session_url(serial_number, start_time, end_time, play_mode) return response.json(0, {"HlsStreamingSessionUrl": hls_streaming_session_url}) except Exception as e: print(e) return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) @staticmethod def get_device_midea_list(request_dict, response): """ 获取视频播放列表 @param request_dict: 请求参数 @request_dict serial_number: 序列号 @request_dict startTime: 开始时间 @request_dict endTime: 结束时间 @param response: 响应对象 @return: response """ serial_number = request_dict.get('serial_number', None) start_time = request_dict.get('startTime', None) end_time = request_dict.get('endTime', None) page = request_dict.get('page', None) size = request_dict.get('size', None) if not all([serial_number, start_time, end_time, page, size]): return response.json(444) page = int(page) size = int(size) start_time = datetime.datetime.fromtimestamp(int(start_time)) - datetime.timedelta(hours=8) end_time = datetime.datetime.fromtimestamp(int(end_time)) - datetime.timedelta(hours=8) try: # kvs_qs = KVS.objects.filter(stream_name=serial_number) # if not kvs_qs.exists(): # return response.json(174) kinesis_fragments_obj = AmazonKVAMObject( aws_access_key_id='AKIA2E67UIMD45Y3HL53', secret_access_key='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', region_name='us-east-1', stream_name=serial_number, api_name='LIST_FRAGMENTS' ) kinesis_images_obj = AmazonKVAMObject( aws_access_key_id='AKIA2E67UIMD45Y3HL53', secret_access_key='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', region_name='us-east-1', stream_name=serial_number, api_name='GET_IMAGES' ) stream_list = kinesis_fragments_obj.get_list_fragments(serial_number, start_time, end_time) total_page = len(stream_list) stream_list = stream_list[(page - 1) * size:page * size] for item in stream_list: temp_start_time = (item['startTime'] - datetime.timedelta(hours=8)).replace( tzinfo=datetime.timezone.utc) temp_end_time = temp_start_time + datetime.timedelta(seconds=300) item['image'] = kinesis_images_obj.get_images(serial_number, temp_start_time, temp_end_time) item['startTime'] = int(item['startTime'].timestamp()) item['endTime'] = int(item['endTime'].timestamp()) res = { 'totalPage': total_page, 'fragments': stream_list } return response.json(0, res) except Exception as e: print(e) return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) @staticmethod def download_clip(request_dict, response): """ 获取视频播放地址 @param request_dict: 请求参数 @request_dict serial_number: 序列号 @request_dict startTime: 开始时间 @request_dict endTime: 结束时间 @param response: 响应对象 @return: response """ serial_number = request_dict.get('serial_number', None) start_time = request_dict.get('startTime', None) end_time = request_dict.get('endTime', None) if not all([serial_number, start_time, end_time]): return response.json(444) start_time = datetime.datetime.fromtimestamp(int(start_time)) - datetime.timedelta(hours=8) end_time = datetime.datetime.fromtimestamp(int(end_time)) - datetime.timedelta(hours=8) try: # kvs_qs = KVS.objects.filter(stream_name=serial_number) # if not kvs_qs.exists(): # return response.json(174) kinesis_video_obj = AmazonKVAMObject( aws_access_key_id='AKIA2E67UIMD45Y3HL53', secret_access_key='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', region_name='us-east-1', stream_name=serial_number, api_name='GET_CLIP' ) clip_obj, clip_size = kinesis_video_obj.get_clip(serial_number, start_time, end_time) res = HttpResponse(clip_obj.read()) res["content_type"] = "video/mp4" res["Content-Disposition"] = "attachment;filename=video.mp4" res['Content-Length'] = str(clip_size) return res except Exception as e: print(e) return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) @staticmethod def get_sts_token(request_dict, response): """ 获取临时token @param request_dict: 请求参数 @request_dict uid: 设备uid @param response: 响应对象 @return: response """ uid = request_dict.get('uid', None) # if not all([]): # return response.json(444) try: sts_client_conn = boto3.client( 'sts', aws_access_key_id=ACCESS_KEY_ID, aws_secret_access_key=SECRET_ACCESS_KEY, region_name=KVS_REGION ) sts_obj = sts_client_conn.get_session_token(DurationSeconds=129600) res = { 'AccessKeyId': sts_obj['Credentials']['AccessKeyId'], 'AccessKeySecret': sts_obj['Credentials']['SecretAccessKey'], 'SessionToken': sts_obj['Credentials']['SessionToken'], 'Expiration': str(sts_obj['Credentials']['Expiration']) } return response.json(0, res) except Exception as e: print(e) return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) @staticmethod def create_signal_channel(request_dict, response): """ 创建信号通道 @param request_dict: 请求参数 @request_dict serial: 序列号 @param response: 响应对象 @return: response """ serial = request_dict.get('serial', None) if not all([serial]): return response.json(444) try: # 获取并判断region_id是否有效 region_id = CommonService.confirm_region_id() if region_id not in REGION_ID_LIST: return response.json(444, {'invalid region_id': region_id}) # 获取iot:CredentialProvider endpoint_type = 'iot:CredentialProvider' iot_client = IOTClient(region_id) iot_credential_provider_endpoint = iot_client.describe_iot_endpoint(endpoint_type) # 已有数据直接返回 res = { 'region': KVS_REGION, 'role_alias': 'KvsCameraIoTRoleAlias', 'iot_credential_provider_endpoint': iot_credential_provider_endpoint, } channel_name = 'Ansjer_Device_{}'.format(serial) kvs = KVS.objects.filter(channel_name=channel_name) if kvs.exists(): return response.json(0, res) kinesis_video_obj = AmazonKinesisVideoObject( aws_access_key_id=ACCESS_KEY_ID, secret_access_key=SECRET_ACCESS_KEY, region_name=KVS_REGION ) channel_arn = kinesis_video_obj.create_signaling_channel(channel_name=channel_name) now_time = int(time.time()) KVS.objects.create( channel_name=channel_name, channel_arn=channel_arn, channel_ttl=60, created_time=now_time, updated_time=now_time ) return response.json(0, res) except Exception as e: return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) @classmethod def send_alexa_offer_to_master(cls, request_dict, response): """ 发送Alexa offer @param request_dict: 请求参数 @request_dict serial: 序列号 @param response: 响应对象 @return: response """ uid = request_dict.get('uid', None) sdp_offer = request_dict.get('sdp_offer', None) if not all([uid, sdp_offer]): return response.json(444) if uid == '517J385BNUGP3CPP111A': uid = 'NUWGTV5TUK8G8VSS111A' try: serial = CommonService.get_serial_number_by_uid(uid) kvs_qs = KVS.objects.filter(stream_name=serial).values('channel_arn') if not kvs_qs.exists(): return response.json(173) channel_arn = kvs_qs[0]['channel_arn'] kinesis_video_obj = AmazonKinesisVideoObject( aws_access_key_id=ACCESS_KEY_ID, secret_access_key=SECRET_ACCESS_KEY, region_name=KVS_REGION ) endpoint = kinesis_video_obj.get_signaling_channel_endpoint(channel_arn) url = '{}/v1/send-alexa-offer-to-master'.format(endpoint) # 构造请求 body client_id = hashlib.md5((str(uuid.uuid1()) + str(int(time.time()))).encode('utf-8')).hexdigest() # offer转base64 offer = { 'type': 'offer', 'sdp': sdp_offer } offer = cls.dict_to_base64(offer) LOGGER.info('offer:{}'.format(offer)) payload = { 'ChannelARN': channel_arn, 'SenderClientId': client_id, 'MessagePayload': offer } # 构造 AWSRequest 并签名 req = AWSRequest(method='POST', url=url, data=json.dumps(payload)) credentials = Credentials(ACCESS_KEY_ID, SECRET_ACCESS_KEY) SigV4Auth(credentials, 'kinesisvideo', KVS_REGION).add_auth(req) # 使用 requests 发送签名后的请求 headers = dict(req.headers) headers['Content-Type'] = 'application/json' r = requests.post(url, headers=headers, data=json.dumps(payload)) assert r.status_code == 200 LOGGER.info('SendAlexaOfferToMaster响应: {}'.format(r.json())) sdp_answer = r.json()['Answer'] assert sdp_answer # answer转字典 sdp_answer = cls.base64_to_dict(sdp_answer) sdp_answer = sdp_answer['sdp'] res = { 'sdp_answer': sdp_answer } return response.json(0, res) except Exception as e: return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) @staticmethod def dict_to_base64(data_dict: dict) -> str: # 手动构建JSON字符串,避免自动转义 json_str = '{\n "type": "%s",\n "sdp": "%s"\n}' % ( data_dict["type"], data_dict["sdp"].replace('"', '\\"').replace('\r', '\\r').replace('\n', '\\n') ) # 转换为bytes并进行base64编码 base64_data = base64.b64encode(json_str.encode('utf-8')) return base64_data.decode('utf-8') @staticmethod def base64_to_dict(encoded_str: str) -> dict: decoded_bytes = base64.b64decode(encoded_str) return json.loads(decoded_bytes.decode('utf-8')) @staticmethod def delete_signal_channel(request_dict, response): """ 删除信号通道 @param request_dict: 请求参数 @request_dict serial: 序列号 @param response: 响应对象 @return: response """ serial = request_dict.get('serial', None) if not all([serial]): return response.json(444) try: channel_name = 'Ansjer_Device_{}'.format(serial) kvs = KVS.objects.filter(channel_name=channel_name).first() if not kvs: return response.json(173) channel_arn = kvs.channel_arn kinesis_video_obj = AmazonKinesisVideoObject( aws_access_key_id=ACCESS_KEY_ID, secret_access_key=SECRET_ACCESS_KEY, region_name=KVS_REGION ) kinesis_video_obj.delete_signaling_channel(channel_arn=channel_arn) kvs.delete() return response.json(0) except Exception as e: return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))