# -*- coding: utf-8 -*- """ @Author : Rocky @Time : 2022/10/18 9:48 @File :KVSController.py """ import hashlib import time import uuid import datetime from django.http import HttpResponse from django.views import View from Model.models import KVS, Device_User, Device_Info from Object.AWS.AmazonKinesisVideoUtil import AmazonKinesisVideoObject, AmazonKVAMObject from Object.RedisObject import RedisObject from Object.ResponseObject import ResponseObject from Ansjer.config import ACCESS_KEY_ID, SECRET_ACCESS_KEY, REGION_NAME, SERVER_DOMAIN from Object.TokenObject import TokenObject class UserRelatedView(View): def get(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') return self.validation(request.GET, operation, request) def post(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') return self.validation(request.POST, operation, request) def validation(self, request_dict, operation, request): response = ResponseObject() if operation == 'generate-qr-code': # 网页生成二维码 return self.generate_qr_code(response) elif operation == 'get-scanning-status': # 确认app是否扫码 return self.get_scanning_status(request_dict, response) elif operation == 'web-login': # 网页登录 return self.web_login(request_dict, response) elif operation == 'pc-login': # pc端登录 return self.pc_login(request_dict, response) elif operation == 'confirm-login': # app确认登录 return self.confirm_login(request_dict, response) else: tko = TokenObject( request.META.get('HTTP_AUTHORIZATION'), returntpye='pc') if tko.code != 0: return response.json(tko.code) response.lang = tko.lang user_id = tko.userID if operation == 'get-device': # 获取设备列表 return self.get_device(response, user_id) else: return response.json(404) @staticmethod def get_user_info(user_id): """ 获取用户信息 @param user_id: 用户id @return: response """ device_user_qs = Device_User.objects.filter(userID=user_id).values('NickName', 'userIconPath', 'userIconUrl') if not device_user_qs.exists(): user_icon_url = '' nick_name = '' else: users = device_user_qs.first() nick_name = users['NickName'] user_icon_path = str(users['userIconPath']) if user_icon_path: user_icon_path = user_icon_path.replace('static/', '').replace('\\', '/') user_icon_url = SERVER_DOMAIN + 'account/getAvatar/' + user_icon_path else: user_icon_url = '' return user_icon_url, nick_name @staticmethod def generate_qr_code(response): """ 网页生成二维码 @param response: 响应对象 @return: response """ nwo_time = time.time() redis_obj = RedisObject() try: uuid_number = hashlib.md5((str(uuid.uuid1()) + str(nwo_time)).encode('utf-8')).hexdigest() flag = redis_obj.set_ex_data(uuid_number, 0, 300) # redis记录uuid,状态为生成二维码 res = {'type': 'autologin', 'id': uuid_number} if flag: return response.json(0, res) else: return response.json(119) except Exception as e: print(e) return response.json(500) @staticmethod def get_scanning_status(request_dict, response): """ 获取app扫码状态 @param request_dict: 请求参数 @request_dict serial_number: 序列号 @param response: 响应对象 @return: response """ uuid_number = request_dict.get('uuid', None) if not uuid_number: return response.json(444, {'error param': 'uuid'}) try: redis_obj = RedisObject() status = redis_obj.get_data(uuid_number) if status is False: return response.json(119) elif status == '1' or status == '2': res = {'status': 1} # 已扫码 else: res = {'status': 0} # 未扫码 return response.json(0, res) except Exception as e: print(e) return response.json(500) @staticmethod def pc_login(request_dict, response): """ pc端登录 @param request_dict: 请求参数 @request_dict serial_number: 序列号 @param response: 响应对象 @return: response """ uuid_number = request_dict.get('uuid', None) if not uuid_number: return response.json(444, {'error param': 'uuid'}) try: redis_obj = RedisObject() status = redis_obj.get_data(uuid_number) token = redis_obj.get_data(uuid_number + 'token') if status is False or token is False: return response.json(119) elif status == '2': # 已登录 token_obj = TokenObject(token) response.lang = token_obj.lang if token_obj.code != 0: return response.json(token_obj.code) user_id = token_obj.userID user_icon_url, nick_name = UserRelatedView.get_user_info(user_id) res = {'status': 1, 'userIconUrl': user_icon_url, 'nickName': nick_name, 'token': token} redis_obj.del_data(uuid_number) redis_obj.del_data(uuid_number + 'token') else: # 未登录 res = {'status': 0} return response.json(0, res) except Exception as e: print(e) return response.json(500) @staticmethod def web_login(request_dict, response): """ 网页登录 @param request_dict: 请求参数 @request_dict serial_number: 序列号 @param response: 响应对象 @return: response """ uuid_number = request_dict.get('uuid', None) confirm = request_dict.get('confirm', None) if not all([uuid_number, confirm]): return response.json(444, {'error param': 'uuid or confirm'}) try: redis_obj = RedisObject() if confirm == '1': # 取消登录 redis_obj.del_data(uuid_number) redis_obj.del_data(uuid_number + 'token') return response.json(0) token = redis_obj.get_data(uuid_number + 'token') ttl = redis_obj.get_ttl(uuid_number) if token is False or ttl <= 0: return response.json(119) result = redis_obj.set_ex_data(uuid_number, 2, ttl) # 修改uuid状态为已登录 if result is False: return response.json(119) return response.json(0) except Exception as e: print(e) return response.json(500) @staticmethod def confirm_login(request_dict, response): """ app确认登录 @param request_dict: 请求参数 @request_dict serial_number: 序列号 @param response: 响应对象 @return: response """ uuid_number = request_dict.get('uuid', None) token = request_dict.get('token', None) if not all([uuid_number, token]): return response.json(444, {'error param': 'uuid or token'}) redis_obj = RedisObject() try: status = redis_obj.get_data(uuid_number) ttl = redis_obj.get_ttl(uuid_number) if status is False or ttl <= 0: return response.json(119) result1 = redis_obj.set_ex_data(uuid_number, 1, ttl) # 修改uuid状态为已扫码 result2 = redis_obj.set_ex_data(uuid_number + 'token', token, ttl) if result1 is False or result2 is False: return response.json(119) return response.json(0) except Exception as e: print(e) return response.json(500) @staticmethod def get_device(response, user_id): """ 获取设备列表 @param response: 响应对象 @param user_id: 用户id @return: response """ try: device_qs = Device_Info.objects.filter(userID=user_id).values('serial_number', 'NickName') return response.json(0, list(device_qs)) except Exception as e: print(e) return response.json(500) class KVSView(View): def get(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') return self.validation(request.GET, request, operation) def post(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') return self.validation(request.POST, request, operation) def validation(self, request_dict, request, operation): response = ResponseObject() if operation == 'create-media': # 创建视频流 return self.create_media(request_dict, response) elif operation == 'update-data-retention': # 修改视频流数据保留时间 return self.update_data_retention(request_dict, response) else: # tko = TokenObject(request.META.get('HTTP_AUTHORIZATION')) # if tko.code != 0: # return response.json(tko.code) # response.lang = tko.lang # user_id = tko.userID if operation == 'get-device-midea-list': # 获取设备列表 return self.get_device_midea_list(request_dict, response) elif operation == 'get-hls-midea': # 获取视频播放地址 return self.get_hls_midea_url(request_dict, response) elif operation == 'download-clip': # 获取视频播放地址 return self.download_clip(request_dict, response) else: return response.json(404) @staticmethod def create_media(request_dict, response): """ 创建视频流 @param request_dict: 请求参数 @request_dict serial_number: 序列号 @param response: 响应对象 @return: response """ serial_number = request_dict.get('serial_number', None) try: kvs_qs = KVS.objects.filter(stream_name=serial_number) if kvs_qs.exists(): return response.json(174) kinesis_video_obj = AmazonKinesisVideoObject( aws_access_key_id=ACCESS_KEY_ID, secret_access_key=SECRET_ACCESS_KEY, region_name=REGION_NAME ) stream_arn = kinesis_video_obj.create_stream(stream_name=serial_number) if stream_arn: now_time = int(time.time()) KVS.objects.create(stream_name=serial_number, stream_arn=stream_arn, created_time=now_time, updated_time=now_time) return response.json(0) else: return response.json(178) except Exception as e: print(e) return response.json(500) @staticmethod def update_data_retention(request_dict, response): """ 修改视频流数据保留时间 @param request_dict: 请求参数 @request_dict serial_number: 序列号 @request_dict operation: 操作,增加/减少 @request_dict data_retention_change_in_hours: 修改的时间 @param response: 响应对象 @return: response """ serial_number = request_dict.get('serial_number', None) operation = request_dict.get('operation', None) data_retention_change_in_hours = request_dict.get('data_retention_change_in_hours', None) try: kvs_qs = KVS.objects.filter(stream_name=serial_number) if not kvs_qs.exists(): return response.json(174) kinesis_video_obj = AmazonKinesisVideoObject( aws_access_key_id=ACCESS_KEY_ID, secret_access_key=SECRET_ACCESS_KEY, region_name=REGION_NAME ) now_time = int(time.time()) data_retention_change_in_hours = int(data_retention_change_in_hours) kinesis_video_obj.update_data_retention(stream_name=serial_number, operation=operation, data_retention_change_in_hours=data_retention_change_in_hours) kvs_qs.update(data_retention_in_hours=data_retention_change_in_hours, updated_time=now_time) return response.json(0) except Exception as e: print(e) return response.json(500) @staticmethod def get_hls_midea_url(request_dict, response): """ 获取视频播放地址 @param request_dict: 请求参数 @request_dict serial_number: 序列号 @request_dict startTime: 开始时间 @request_dict endTime: 结束时间 @request_dict playMode: 播放模式 @param response: 响应对象 @return: response """ serial_number = request_dict.get('serial_number', None) start_time = request_dict.get('startTime', None) end_time = request_dict.get('endTime', None) play_mode = request_dict.get('playMode', None) if not all([serial_number, start_time, end_time, play_mode]): return response.json(444) start_time = datetime.datetime.fromtimestamp(int(start_time)) - datetime.timedelta(hours=8) end_time = datetime.datetime.fromtimestamp(int(end_time)) - datetime.timedelta(hours=8) play_mode = int(play_mode) play_mode = 'ON_DEMAND' if play_mode == 0 else 'LIVE_REPLAY' try: # kvs_qs = KVS.objects.filter(stream_name=serial_number) # if not kvs_qs.exists(): # return response.json(174) kinesis_video_obj = AmazonKVAMObject( aws_access_key_id='AKIA2E67UIMD45Y3HL53', secret_access_key='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', region_name='us-east-1', stream_name=serial_number, api_name='GET_HLS_STREAMING_SESSION_URL' ) hls_streaming_session_url = kinesis_video_obj.get_hls_streaming_session_url(serial_number, start_time, end_time, play_mode) return response.json(0, {"HlsStreamingSessionUrl": hls_streaming_session_url}) except Exception as e: print(e) return response.json(500, repr(e)) @staticmethod def get_device_midea_list(request_dict, response): """ 获取视频播放列表 @param request_dict: 请求参数 @request_dict serial_number: 序列号 @request_dict startTime: 开始时间 @request_dict endTime: 结束时间 @param response: 响应对象 @return: response """ serial_number = request_dict.get('serial_number', None) start_time = request_dict.get('startTime', None) end_time = request_dict.get('endTime', None) page = request_dict.get('page', None) size = request_dict.get('size', None) if not all([serial_number, start_time, end_time, page, size]): return response.json(444) page = int(page) size = int(size) start_time = datetime.datetime.fromtimestamp(int(start_time)) - datetime.timedelta(hours=8) end_time = datetime.datetime.fromtimestamp(int(end_time)) - datetime.timedelta(hours=8) try: # kvs_qs = KVS.objects.filter(stream_name=serial_number) # if not kvs_qs.exists(): # return response.json(174) kinesis_fragments_obj = AmazonKVAMObject( aws_access_key_id='AKIA2E67UIMD45Y3HL53', secret_access_key='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', region_name='us-east-1', stream_name=serial_number, api_name='LIST_FRAGMENTS' ) kinesis_images_obj = AmazonKVAMObject( aws_access_key_id='AKIA2E67UIMD45Y3HL53', secret_access_key='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', region_name='us-east-1', stream_name=serial_number, api_name='GET_IMAGES' ) stream_list = kinesis_fragments_obj.get_list_fragments(serial_number, start_time, end_time) total_page = len(stream_list) stream_list = stream_list[(page - 1) * size:page * size] for item in stream_list: temp_start_time = (item['startTime'] - datetime.timedelta(hours=8)).replace(tzinfo=datetime.timezone.utc) temp_end_time = temp_start_time + datetime.timedelta(seconds=300) item['image'] = kinesis_images_obj.get_images(serial_number, temp_start_time, temp_end_time) item['startTime'] = int(item['startTime'].timestamp()) item['endTime'] = int(item['endTime'].timestamp()) res = { 'totalPage': total_page, 'fragments': stream_list } return response.json(0, res) except Exception as e: print(e) return response.json(500, repr(e)) @staticmethod def download_clip(request_dict, response): """ 获取视频播放地址 @param request_dict: 请求参数 @request_dict serial_number: 序列号 @request_dict startTime: 开始时间 @request_dict endTime: 结束时间 @param response: 响应对象 @return: response """ serial_number = request_dict.get('serial_number', None) start_time = request_dict.get('startTime', None) end_time = request_dict.get('endTime', None) if not all([serial_number, start_time, end_time]): return response.json(444) start_time = datetime.datetime.fromtimestamp(int(start_time)) - datetime.timedelta(hours=8) end_time = datetime.datetime.fromtimestamp(int(end_time)) - datetime.timedelta(hours=8) try: # kvs_qs = KVS.objects.filter(stream_name=serial_number) # if not kvs_qs.exists(): # return response.json(174) kinesis_video_obj = AmazonKVAMObject( aws_access_key_id='AKIA2E67UIMD45Y3HL53', secret_access_key='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', region_name='us-east-1', stream_name=serial_number, api_name='GET_CLIP' ) clip_obj, clip_size = kinesis_video_obj.get_clip(serial_number, start_time, end_time) res = HttpResponse(clip_obj.read()) res["content_type"] = "video/mp4" res["Content-Disposition"] = "attachment;filename=video.mp4" res['Content-Length'] = str(clip_size) return res except Exception as e: print(e) return response.json(500, repr(e))