# -*- encoding: utf-8 -*- """ @File : DeviceVersionInfoController.py @Time : 2024/11/20 14:20 @Author : stephen @Email : zhangdongming@asj6.wecom.work @Software: PyCharm """ import json import time from django.http import QueryDict from django.views import View from Ansjer.config import LOGGER from Model.models import DeviceVersionInfo, Device_Info, UID_Bucket, ExperienceContextModel, UidSetModel from Object.Enums.RedisKeyConstant import RedisKeyConstant from Object.RedisObject import RedisObject from Object.ResponseObject import ResponseObject from Object.TokenObject import TokenObject from Service.CommonService import CommonService from typing import Dict, Any from django.http import HttpRequest, JsonResponse class DeviceVersionInfoView(View): def get(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') return self.validation(request.GET, request, operation) def post(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') return self.validation(request.POST, request, operation) def delete(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') delete = QueryDict(request.body) if not delete: delete = request.GET return self.validation(delete, request, operation) def put(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') put = QueryDict(request.body) return self.validation(put, request, operation) def validation(self, request_dict, request, operation): response = ResponseObject('cn') if operation == 'syncVerConfig': return self.sync_ver_config(request, request_dict, response) tko = TokenObject(request.META.get('HTTP_AUTHORIZATION')) if tko.code != 0: return response.json(tko.code) response.lang = tko.lang userID = tko.userID if operation == 'getInfo': return self.get_device_version_info(userID, request, request_dict, response) elif operation == 'validateUserDevice': return self.validateUserDevice(userID, request, request_dict, response) elif operation == 'clearDeviceVersionCache': return self.clear_device_version_cache(userID, request, request_dict, response) elif operation == 'getDeviceVersion': d_code = request_dict.get('d_code') ver = request_dict.get('ver') res = self.cache_device_version_info(d_code, ver) return response.json(0, res) elif operation == 'getDeviceVodStatus': return self.get_device_vod_status(userID, request, request_dict, response) else: return response.json(414) @classmethod def clear_device_version_cache(cls, user_id, request, request_dict, response): """ 清除设备版本缓存 @param user_id: 用户ID @param request: 请求对象 @param request_dict: 请求参数 @param response: 响应对象 @return: 响应结果 """ d_code = request_dict.get('d_code', None) timestamp = request_dict.get('timestamp', None) if not d_code and not timestamp: return response.json(444) # 错误代码:缺少必要参数 try: LOGGER.info(f'清除缓存 user: {user_id}, d_code: {d_code}, timestamp: {timestamp}') redis = RedisObject() if d_code: # 清除指定d_code的所有版本缓存 pattern = RedisKeyConstant.ZOSI_DEVICE_VERSION_INFO.value + '*' + d_code matched_keys = redis.get_keys(pattern) # 将结果赋值给 matched_keys if matched_keys: for key in matched_keys: # 使用正确的变量名 redis.del_data(key) if timestamp: # 清除创建时间早于指定时间戳的缓存 devices = DeviceVersionInfo.objects.filter(created_time__lt=timestamp).values('d_code', 'software_ver') for device in devices: key = RedisKeyConstant.ZOSI_DEVICE_VERSION_INFO.value + device['software_ver'] + device['d_code'] redis.del_data(key) return response.json(0, "缓存清除成功") except Exception as e: error_line = e.__traceback__.tb_lineno LOGGER.error(f'清除缓存失败 user:{user_id}, error_line:{error_line}, error_msg:{repr(e)}') return response.json(500, f'error_line:{error_line}, error_msg:{repr(e)}') @classmethod def get_device_version_info(cls, user_id, request, request_dict, response): # 从请求字典中获取uid和version uid = request_dict.get('uid') version = request_dict.get('version') # 检查version是否存在 if not version: return response.json(444) # 错误代码:没有提供version try: # 示例输入字符串 ip = CommonService.get_ip_address(request) LOGGER.info(f'获取设备版本配置信息 user: {user_id}, uid: {uid}, version: {version}, ip: {ip} ') # 从右侧分割字符串,获取版本和设备代码 ver, d_code = version.rsplit('.', 1) # 使用从右到左分割 ver = ver.replace('V', '') # 移除版本前的'V' # 创建Redis对象 redis = RedisObject() # 构建Redis键 version_key = RedisKeyConstant.ZOSI_DEVICE_VERSION_INFO.value + ver + d_code # 尝试从Redis中获取数据 version_info = redis.get_data(version_key) if version_info: # 如果在Redis中找到了数据,直接返回 return response.json(0, json.loads(version_info)) # 从数据库查询设备版本信息 device_version_qs = DeviceVersionInfo.objects.filter(d_code=d_code, software_ver=ver) if not device_version_qs.exists(): LOGGER.info(f'获取设备版本配置信息失败 user: {user_id}, uid: {uid}, version: {version}, ip: {ip} ') return response.json(173) # 错误代码:未找到设备版本信息 # 从QuerySet中获取设备信息 device_info = device_version_qs.first() device_dict = device_version_qs.values().first() other_features = cls.build_other_features(device_dict, d_code) device_dict["other_features"] = other_features if getattr(device_info, "other_features", {}) != other_features: device_info.other_features = other_features device_info.save(update_fields=['other_features']) DeviceVersionInfo.objects.filter(d_code=d_code, software_ver=ver).update(other_features=other_features)#更新数据库 LOGGER.info(f'更新other_features成功 id={device_info.id}') device_json = json.dumps(device_dict) redis.set_data(version_key, device_json, 60 * 60 * 24) # 设置TTL为24小时 # 返回设备信息 return response.json(0, device_dict) except Exception as e: LOGGER.error('uid:{}返回173,error_line:{}, error_msg:{}'.format(uid, e.__traceback__.tb_lineno, repr(e))) return response.json(173) @classmethod def cache_device_version_info(cls, d_code, ver): """ 缓存设备版本信息 :param d_code: 设备规格码(软件方案) :param ver: 设备版本号 :return: 设备版本信息字典或 None """ redis = RedisObject() version_key = f"{RedisKeyConstant.ZOSI_DEVICE_VERSION_INFO.value}{ver}{d_code}" try: version_info = redis.get_data(version_key) if version_info: return json.loads(version_info) except Exception as e: LOGGER.error(f"Redis get failed for key {version_key}: {e}") return None device_info = DeviceVersionInfo.objects.filter( d_code=d_code, software_ver=ver ).values().first() if not device_info: return None other_features = cls.build_other_features(device_info, d_code) device_info["other_features"] = other_features DeviceVersionInfo.objects.filter(d_code=d_code, software_ver=ver).update(other_features= other_features) # 更新数据库 try: redis.set_data(version_key, json.dumps(device_info), 60 * 60 * 24) except Exception as e: LOGGER.error(f"Redis set failed for key {version_key}: {e}") return None return device_info @staticmethod def build_other_features(device_dict, d_code): """根据设备信息构建完整的other_features""" other_features_fields = [ 'supports_lighting', 'is_support_multi_speed', 'supports_algorithm_switch', 'supports_playback_filtering', 'resolution_list' ] mp_resolution_map = { '8': ["800", "50"], '5': ["500", "50"], '4': ["400", "50"], '3': ["300", "50"], '2': ["200", "50"], '1': ["100", "50"], 'A': ["1000", "50"], 'B': ["1100", "50"], 'C': ["1200", "50"], } resolution_list = mp_resolution_map.get(d_code[-5], ["400", "50"]) other_features = {} try: raw = device_dict.get("other_features") if raw: other_features = json.loads(raw) if isinstance(raw, str) else raw.copy() except Exception: pass # 填充默认值 for field in other_features_fields: if field not in other_features or other_features[field] in (None, ''): other_features[field] = resolution_list if field == 'resolution_list' else -1 if other_features.get('resolution_list') != resolution_list: other_features['resolution_list'] = resolution_list return other_features @classmethod def validateUserDevice(cls, user_id, request, request_dict, response): """ 验证用户设备 @param user_id: 用户id @param request: 请求 @param request_dict: 请求参数 @param response: 响应参数 @return: 成功 0 失败 173 """ uid = request_dict.get('uid') serial_number = request_dict.get('serialNumber') app_bundle_id = request_dict.get('appBundleId') m_code = request_dict.get('mCode') if not uid: return response.json(444) # 错误代码:没有提供uid try: LOGGER.info(f'直播验证用户uid:{uid},user:{user_id},m_code:{m_code}') redis = RedisObject(3) # 构建Redis键 device_key = f"{RedisKeyConstant.BASIC_USER.value}{user_id}:UID:{uid}" # 尝试从Redis中获取数据 device_info = redis.get_data(device_key) # 检查缓存命中和UID匹配 if device_info == uid: return response.json(0, uid) # 缓存命中返回 0 ip = CommonService.get_ip_address(request) LOGGER.info(f'直播验证用户uid:{uid},ip:{ip},serial:{serial_number},app{app_bundle_id}') # 从数据库查询设备版本信息 try: device_qs = Device_Info.objects.filter(userID_id=user_id, UID=uid).values('UID') if not device_qs.exists() or uid != device_qs.first().get('UID'): LOGGER.error(f'验证用户uid错误,未找到设备信息 uid:{uid}') return response.json(173) # 错误代码:未找到设备信息 # 将数据写入Redis,以便后续使用,设置TTL为48小时 redis.set_data(device_key, uid, 60 * 60 * 24 * 2) return response.json(0, uid) # 返回设备信息 except Exception as db_exception: LOGGER.error(f'数据库查询失败 user:{user_id}, uid:{uid}, error: {repr(db_exception)}') return response.json(500, '数据库查询错误') except Exception as e: error_line = e.__traceback__.tb_lineno LOGGER.error(f'验证用户uid异常 user:{user_id}, uid:{uid}, error_line:{error_line}, error_msg:{repr(e)}') return response.json(500, f'error_line:{error_line}, error_msg:{repr(e)}') @classmethod def sync_ver_config(cls, request, request_dict, response): # 定义必要字段列表 required_fields = ['dCode', 'softwareVer', 'videoCode', 'deviceType', 'supportsAlarm', 'screenChannels', 'networkType'] # 验证必要参数是否存在 if not all(request_dict.get(field) for field in required_fields): LOGGER.error(f'缺少必要参数: {request_dict}') return response.json(444) LOGGER.info(f'同步设备版本信息 params: {request_dict}') try: d_code = request_dict.get('dCode', '') ver = request_dict.get('softwareVer', '') # 提取参数并设置默认值(添加必要字段的类型转换) params = { 'd_code': d_code, # 字符串类型,添加默认空字符串 'software_ver': ver, # 字符串类型,添加默认空字符串 'firmware_ver': request_dict.get('firmwareVer', ''), # 已有默认空字符串,保持不变 'video_code': int(request_dict.get('videoCode', 0)), # 使用get()添加默认值0,避免KeyError 'region_alexa': request_dict.get('regionAlexa', 'ALL'), # 匹配模型默认值'ALL' 'supports_human_tracking': bool(int(request_dict.get('supportsHumanTracking', 0))), # 转为布尔值 'supports_custom_voice': bool(int(request_dict.get('supportsCustomVoice', 0))), # 转为布尔值 'supports_dual_band_wifi': bool(int(request_dict.get('supportsDualBandWifi', 0))), # 转为布尔值 'supports_four_point': bool(int(request_dict.get('supportsFourPoint', 0))), # 转为布尔值 'supports_4g': bool(int(request_dict.get('supports4g', 0))), # 转为布尔值 'supports_ptz': bool(int(request_dict.get('supportsPTZ', 0))), # 转为布尔值 'supports_ai': bool(int(request_dict.get('supportsAi', 0))), # 转为布尔值 'supports_cloud_storage': bool(int(request_dict.get('supportsCloudStorage', 0))), # 转为布尔值 'supports_alexa': bool(int(request_dict.get('supportsAlexa', 0))), # 转为布尔值 'device_type': int(request_dict.get('deviceType', 0)), # 使用get()添加默认值0 'resolution': request_dict.get('resolution', ''), # 字符串类型,添加默认空字符串 'ai_type': int(request_dict.get('aiType', 0)), # 使用get()添加默认值0 'supports_alarm': int(request_dict.get('supportsAlarm', -1)), # 使用get()添加默认值-1 'supports_night_vision': int(request_dict.get('supportsNightVision', -1)), # 使用get()添加默认值-1 'screen_channels': int(request_dict.get('screenChannels', 1)), # 匹配模型默认值1 'network_type': int(request_dict.get('networkType', 1)), # 匹配模型默认值1 'other_features': json.loads(request_dict['otherFeatures']) if request_dict.get( 'otherFeatures') else None, # 保持不变 'electricity_statistics': int(request_dict.get('electricityStatistics', 0)), # 使用get()添加默认值0 'supports_pet_tracking': int(request_dict.get('supportsPetTracking', 0)), # 使用get()添加默认值0 'has_4g_cloud': int(request_dict.get('has4gCloud', -1)), # 使用get()添加默认值0 } now_time = int(time.time()) # 使用 update_or_create 合并更新和创建操作 version_config_qs = DeviceVersionInfo.objects.filter( d_code=params['d_code'], software_ver=params['software_ver'] ) if version_config_qs.exists(): version_config_qs.update(**params, updated_time=now_time) # 创建Redis对象 redis = RedisObject() # 构建Redis键 version_key = RedisKeyConstant.ZOSI_DEVICE_VERSION_INFO.value + ver + d_code redis.del_data(version_key) else: DeviceVersionInfo.objects.create(**params, created_time=now_time, updated_time=now_time) except json.JSONDecodeError as e: # 专门捕获 JSON 解析错误 LOGGER.error(f"JSON 解析失败: {str(e)}") return response.json(500, f'Invalid JSON format: {str(e)}') except KeyError as e: # 处理字段缺失(理论上 required_fields 已校验,此处作为备用) LOGGER.error(f"参数缺失: {str(e)}") return response.json(444, f'Missing parameter: {str(e)}') except ValueError as e: # 处理整型转换失败 LOGGER.error(f"参数类型错误: {str(e)}") return response.json(400, f'Invalid parameter type: {str(e)}') except Exception as e: # 通用异常处理(建议替换为具体异常类型) LOGGER.exception("同步设备配置失败") return response.json(500, f'Server error: {str(e)}') return response.json(0) def get_device_vod_status(self, userID: int, request: HttpRequest, request_dict: Dict[str, Any], response: ResponseObject) -> JsonResponse: """ 获取设备云存状态:0使用中;1已过期; 2未开通 Args: userID (int): 用户ID request (HttpRequest): 请求对象 request_dict (QueryDict): 请求参数字典 response (ResponseObject): 响应对象 Returns: JsonResponse: 返回状态 """ UID: str = request_dict.get('UID') if UID is None: return response.json(444) try: now_time = int(time.time()) uid_set_qs = UidSetModel.objects.filter(uid=UID).values('ucode', 'device_type').first() ucode = uid_set_qs['ucode'] if uid_set_qs else '' device_type = uid_set_qs['device_type'] if uid_set_qs else '' is_cloud_device = CommonService.is_cloud_device(ucode, device_type) if not is_cloud_device: cloud_type = -1 # 不支持 else: exper_qs = ExperienceContextModel.objects.filter(uid=UID) if exper_qs.exists(): uid_bucket_qs = UID_Bucket.objects.filter(uid=UID, status=1).values('endTime').first() if uid_bucket_qs and uid_bucket_qs['endTime'] > now_time: cloud_type = 1 # 使用中 else: cloud_type = 2 # 已过期 else: cloud_type = 0 # 未开通 return response.json(0, {'status':cloud_type}) except Exception as e: error_msg: str = f'查询云存状态异常, 错误: {str(e)}' error_line: int = e.__traceback__.tb_lineno LOGGER.error(f'{error_msg} 行号: {error_line}') return response.json(500)