# -*- encoding: utf-8 -*- """ @File : AlgorithmShopController.py @Time : 2022/8/24 20:02 @Author : stephen @Email : zhangdongming@asj6.wecom.work @Software: PyCharm """ import json import logging import math import time from datetime import datetime, timedelta from django.db.models import F, Value, CharField, Sum from django.views.generic.base import View from Model.models import DeviceAlgorithmExplain, DeviceAlgorithmBanner, DeviceUidAlgorithmType, \ DeviceTypeAlgorithmInfo, DeviceAppScenario, DeviceScenarioLangInfo, DeviceAlgorithmScenario, \ DeviceAlgorithmPassengerFlow from Object.ETkObject import ETkObject from Object.Enums.TimeZone import TimeZone from Object.RedisObject import RedisObject from Object.ResponseObject import ResponseObject from Object.TokenObject import TokenObject from Object.utils import LocalDateTimeUtil LOGGER = logging.getLogger('info') class AlgorithmShopView(View): def get(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') return self.validation(request.GET, request, operation) def post(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') return self.validation(request.POST, request, operation) def validation(self, request_dict, request, operation): if operation == 'passengerFlowStatistical': return self.passenger_flow_statistical(request_dict, ResponseObject('en')) token = TokenObject(request.META.get('HTTP_AUTHORIZATION')) lang = request_dict.get('lang', token.lang) response = ResponseObject(lang) if token.code != 0: return response.json(token.code) if operation == 'list': return self.algorithm_list(request_dict, response) elif operation == 'banner-list': return self.get_algorithm_banner(response) elif operation == 'uid-details': return self.get_algorithm_details(request_dict, response) elif operation == 'save': return self.algorithm_setting_save(request_dict, response) elif operation == 'getScenarioList': # 获取应用场景数据列表 return self.get_scenario_list(request_dict, response) elif operation == 'getAlgorithmListByScenarioId': # 根据应用场景id获取算法列表 return self.get_scenario_algorithm_list(request_dict, response) elif operation == 'getPassengerFlowList': # 获取客流统计列表 return self.get_passenger_flow_list(request_dict, response) else: return response.json(0) @classmethod def passenger_flow_statistical(cls, request_dict, response): """ 添加客流统计 Args: request_dict (dict): 请求参数字典 response: 响应对象 """ try: LOGGER.info('*****AlgorithmShopView.passenger_flow_statistical:params{}'.format(json.dumps(request_dict))) sign = request_dict.get('sign') eto = ETkObject(sign) uid = eto.uid if not uid: return response.json(444) d_time = request_dict.get('deviceTime') enter_count = request_dict.get('enterCount') tz_value = request_dict.get('timeZone') exit_count = request_dict.get('exitCount') if not all([sign, enter_count, exit_count, tz_value]): return response.json(444) enter_count = int(enter_count) exit_count = int(exit_count) redis = RedisObject(5) key = f'ASJ:PASSENGER:FLOW:{uid}:{d_time}' # 检查Redis中是否已存在相同key的数据 r_data = redis.get_data(key) if r_data: return response.json(0) now_time = int(time.time()) data = { 'uid': uid, 'updated_time': now_time, 'created_time': now_time, 'device_time': int(d_time) } tz = TimeZone.get_value(int(tz_value)) date_time = LocalDateTimeUtil.time_format_date(int(d_time), tz) data['statistical_time'] = datetime.strptime(date_time, '%Y-%m-%d %H:%M:%S') LOGGER.info(f'uid{uid}-DeviceTime:{d_time},tz:{tz},结果:{date_time}') passenger_flow_list = [] data['count'] = enter_count data['type'] = 1 passenger_flow_list.append(DeviceAlgorithmPassengerFlow(**data)) data['count'] = exit_count data['type'] = 2 passenger_flow_list.append(DeviceAlgorithmPassengerFlow(**data)) DeviceAlgorithmPassengerFlow.objects.bulk_create(passenger_flow_list) # 将数据存入Redis,并设置过期时间为10分钟 redis.CONN.setnx(key, d_time) redis.CONN.expire(key, 600) return response.json(0) except Exception as e: LOGGER.info('***get_algorithm_list_by_scenario_id,errLine:{}, errMsg:{}' .format(e.__traceback__.tb_lineno, repr(e))) return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) @classmethod def get_algorithm_list_by_scenario_id(cls, scenario_id, lang): """ 根据应用场景ID查询算法信息列表 @param scenario_id: 场景ID @param lang: 语言 @return: 算法类型信息 """ try: if not scenario_id or scenario_id == 0: return [] # 根据场景id查询关联的算法id algorithm_scenario_qs = DeviceAlgorithmScenario.objects.filter(scenario_id=scenario_id) \ .order_by('sort').values('algorithm_id') if not algorithm_scenario_qs.exists(): return [] algorithm_list = [] for item in algorithm_scenario_qs: algorithm_id = item['algorithm_id'] # 根据算法id查询多语言数据 algorithm_list.append(cls.get_lang_info_by_algorithm_id(algorithm_id, lang, None)) return algorithm_list except Exception as e: LOGGER.info('***get_algorithm_list_by_scenario_id,errLine:{}, errMsg:{}' .format(e.__traceback__.tb_lineno, repr(e))) return [] @classmethod def get_lang_info_by_algorithm_id(cls, algorithm_id, lang, uid): """ 根据算法id查询多语言数据详情 @param uid: 设备uid @param algorithm_id: 算法id @param lang: 语言 @return: 算法多语言数据详情 """ try: algorithm_qs = DeviceAlgorithmExplain.objects.filter(algorithm_type_id=algorithm_id, lang=lang) \ .values('algorithm_type__icon_url', 'algorithm_type__id', 'title', 'subtitle', 'algorithm_type__image_url', 'algorithm_type__basic_function', 'concerning', 'price', 'algorithm_type__tag', 'algorithm_type__status', 'algorithm_type__type') if not algorithm_qs.exists(): return {} setting = '' # 当前支持设置的算法功能json # 存在uid则查询当前uid是否支持该算法 if uid: setting = cls.get_uid_algorithm_info(algorithm_id, uid) setting = setting if setting else {'status': 0, 'function': {}} data = { 'typeId': algorithm_qs[0]['algorithm_type__id'], 'iconUrl': algorithm_qs[0]['algorithm_type__icon_url'], 'imageUrl': algorithm_qs[0]['algorithm_type__image_url'], 'title': algorithm_qs[0]['title'], 'subtitle': algorithm_qs[0]['subtitle'], 'basicFunction': algorithm_qs[0]['algorithm_type__basic_function'], 'concerning': algorithm_qs[0]['concerning'], 'price': algorithm_qs[0]['price'], 'tag': algorithm_qs[0]['algorithm_type__tag'], 'status': algorithm_qs[0]['algorithm_type__status'], 'setting': setting, 'type': algorithm_qs[0]['algorithm_type__type'] } return data except Exception as e: LOGGER.info('***get_lang_info_by_algorithm_id,errLine:{}, errMsg:{}' .format(e.__traceback__.tb_lineno, repr(e))) return {} @classmethod def get_algorithm_list(cls, lang): """ 获取所有算法数据列表 @return: 算法数据列表 """ algorithm_qs = DeviceAlgorithmExplain.objects.filter(lang=lang).order_by('algorithm_type__sort') \ .annotate(iconUrl=F('algorithm_type__icon_url'), typeId=F('algorithm_type__id'), type=F('algorithm_type__type'), imageUrl=F('algorithm_type__image_url'), basicFunction=F('algorithm_type__basic_function'), tag=F('algorithm_type__tag'), status=F('algorithm_type__status'), setting=Value('', output_field=CharField())) \ .values('iconUrl', 'imageUrl', 'title', 'subtitle', 'concerning', 'basicFunction', 'price', 'tag', 'status', 'setting', 'typeId', 'type') if not algorithm_qs.exists(): return [] return list(algorithm_qs) @classmethod def get_scenario_list(cls, request_dist, response): """ 获取应用场景列表 @param request_dist: lang @param response: 响应结果 @return: 应用场景列表 """ try: lang = request_dist.get('lang', 'en') if not lang: return response.json(444) # 获取应用场景列表 scenario_qs = DeviceAppScenario.objects.filter().exclude(type=0).all().order_by('sort') \ .values('id', 'type', 'cver_url', 'banner_url') scenario_list = [] if not scenario_qs.exists(): return response.json(0, scenario_list) for item in scenario_qs: scenario_vo = {'id': item['id'], 'cverUrl': item['cver_url'], 'bannerUrl': item['banner_url'], 'name': '', 'content': ''} # 获取根据语言应用场景信息 scenario_info_qs = DeviceScenarioLangInfo.objects.filter(lang=lang, scenario_id=item['id']) \ .values('name', 'content') if not scenario_info_qs.exists(): continue scenario_vo['name'] = scenario_info_qs[0]['name'] scenario_vo['content'] = scenario_info_qs[0]['content'] # 根据应用场景id查询关联算法类型数据 # scenario_vo['algorithmList'] = cls.get_algorithm_list_by_scenario_id(item['id'], lang) scenario_list.append(scenario_vo) # 获取所有算法图标地址以及算法名称 algorithm_qs = DeviceAlgorithmExplain.objects.filter(lang=lang).order_by('algorithm_type__sort') \ .annotate(algorithmId=F('algorithm_type__id'), algorithmType=F('algorithm_type__type'), iconUrl=F('algorithm_type__icon_url'), algorithmName=F('title')).values('algorithmId', 'algorithmType', 'iconUrl', 'algorithmName') scenario_qs = DeviceAppScenario.objects.filter(type=0) \ .values('cver_url', 'banner_url') scenario_banner = {} if scenario_qs.exists(): scenario_banner['cverUrl'] = scenario_qs[0]['cver_url'] scenario_banner['bannerUrl'] = scenario_qs[0]['banner_url'] result_dto = {'scenarioList': scenario_list, 'scenarioUrl': scenario_banner} if algorithm_qs.exists(): result_dto['iconList'] = list(algorithm_qs) return response.json(0, result_dto) except Exception as e: LOGGER.info('接口异常,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e))) return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) @classmethod def get_scenario_algorithm_list(cls, request_dist, response): """ 获取应用场景关联算法列表 @param request_dist: scenarioId、lang @param response: 响应结果 @return: 算法列表 """ scenario_id = request_dist.get('scenarioId', None) lang = request_dist.get('lang', 'en') result_dto = {'scenarioBannerUrl': ''} if not scenario_id: result_dto['algorithmList'] = cls.get_algorithm_list(lang) return response.json(0, result_dto) result_dto['algorithmList'] = cls.get_algorithm_list_by_scenario_id(scenario_id, lang) return response.json(0, result_dto) @classmethod def get_algorithm_banner(cls, response): """ 获取算法小店banner """ banner_qs = DeviceAlgorithmBanner.objects.all() banner_vs = banner_qs.order_by('sort') \ .values('algorithm_type__type', 'algorithm_type__id', 'image_url') banner_list = [] if not banner_vs.exists(): return response.json(0, banner_list) for item in banner_vs: banner_list.append({ 'typeId': item['algorithm_type__id'], 'type': item['algorithm_type__type'], 'imageUrl': item['image_url'], }) return response.json(0, banner_list) @classmethod def algorithm_list(cls, request_dict, response): """ 获取算法小店列表 """ try: lang = request_dict.get('lang', 'en') uid = request_dict.get('uid', None) version = request_dict.get('version', 'v1') algorithm_qs = DeviceAlgorithmExplain.objects.filter(lang=lang) device_ver = request_dict.get('deviceVer', None) device_code = request_dict.get('deviceCode', None) LOGGER.info(f'AlgorithmShopView.algorithm_list ver:{device_ver},code:{device_code}') types = [0, 1, 3, 4, 5] if version == 'v1': algorithm_qs = algorithm_qs.filter(algorithm_type__type__in=types) algorithm_qs = algorithm_qs.order_by('algorithm_type__sort') \ .values('algorithm_type__id', 'algorithm_type__type', 'algorithm_type__icon_url', 'title', 'subtitle', 'algorithm_type__image_url', 'algorithm_type__basic_function', 'concerning', 'algorithm_type__resource') algorithm_list = [] device_code = device_code[0:4] if device_code else device_code if not algorithm_qs.exists(): return response.json(0, algorithm_list) for item in algorithm_qs: setting = '' if uid: setting = cls.get_uid_algorithm_info(item['algorithm_type__id'], uid) setting = setting if setting else {'status': 0, 'function': {}} resource = item['algorithm_type__resource'] resource = resource.get(device_code, '') if resource else '' algorithm_list.append({ 'typeId': item['algorithm_type__id'], 'type': item['algorithm_type__type'], 'iconUrl': item['algorithm_type__icon_url'], 'imageUrl': item['algorithm_type__image_url'], 'title': item['title'], 'subtitle': item['subtitle'], 'setting': setting, 'basicFunction': item['algorithm_type__basic_function'], 'concerning': item['concerning'], 'resource': resource }) return response.json(0, algorithm_list) except Exception as e: print('查询算法小店列表异常,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e))) return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) @classmethod def get_algorithm_details(cls, request_dict, response): """ 获取算法小店类型详情 """ try: lang = request_dict.get('lang', 'en') type_id = request_dict.get('typeId', None) if not type_id: return response.json(444, 'typeId not null') type_id = int(type_id) uid = request_dict.get('uid', None) explain_qs = DeviceAlgorithmExplain.objects.filter(lang=lang).filter(algorithm_type__id=type_id) \ .values('algorithm_type__id', 'algorithm_type__type', 'algorithm_type__down_count', 'algorithm_type__details_img_url', 'algorithm_type__icon_url', 'title', 'subtitle', 'introduction', 'install_explain', 'risk_warning', 'algorithm_type__basic_function', 'concerning') if not explain_qs.exists(): return response.json(0, {}) item = explain_qs.first() algorithm_dict = { 'typeId': item['algorithm_type__id'], 'type': item['algorithm_type__type'], 'downCount': item['algorithm_type__down_count'], 'detailsImgUrl': item['algorithm_type__details_img_url'], 'iconUrl': item['algorithm_type__icon_url'], 'title': item['title'], 'subtitle': item['subtitle'], 'introduction': item['introduction'], 'installExplain': item['install_explain'], 'riskWarning': item['risk_warning'], 'basicFunction': item['algorithm_type__basic_function'], 'concerning': item['concerning'] } dt_info_qs = DeviceTypeAlgorithmInfo.objects.filter(algorithm_type=algorithm_dict['type']) \ .annotate(deviceName=F('device_name'), deviceType=F('device_type'), algorithmType=F('algorithm_type'), typeIcon=F('type_icon'), deviceLink=F('device_link'), ) \ .values('deviceName', 'deviceType', 'typeIcon', 'deviceLink') algorithm_dict['recommendDevices'] = list(dt_info_qs) if uid: setting = cls.get_uid_algorithm_info(item['algorithm_type__id'], uid) algorithm_dict['setting'] = setting if setting else {'status': 0, 'function': {}} return response.json(0, algorithm_dict) except Exception as e: print('查询算法详情异常,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e))) return response.json(177, repr(e)) @staticmethod def get_uid_algorithm_info(type_id, uid): """ 获取当前设备使用算法状态信息 @param type_id: 算法类型ID @param uid: 设备唯一标识 @return: dict """ uid_algorithm_qs = DeviceUidAlgorithmType.objects.filter(algorithm_type_id=type_id, device_uid=uid) \ .values('status', 'function') if not uid_algorithm_qs.exists(): return None return uid_algorithm_qs.first() @classmethod def algorithm_setting_save(cls, request_dict, response): """ 算法设置保存 """ try: type_id = request_dict.get('typeId', None) uid = request_dict.get('uid', None) status = request_dict.get('status', None) setting_json = request_dict.get('function') if not all([type_id, uid, status, setting_json]): return response.json(444) status = int(status) type_id = int(type_id) now_time = int(time.time()) uid_algorithm_qs = DeviceUidAlgorithmType.objects.filter(algorithm_type_id=type_id, device_uid=uid) if not uid_algorithm_qs.exists(): param = {'algorithm_type_id': int(type_id), 'device_uid': uid, 'function': setting_json, 'status': status, 'updated_time': now_time, 'created_time': now_time} DeviceUidAlgorithmType.objects.create(**param) return response.json(0) uid_algorithm_qs.update(status=status, function=setting_json, updated_time=now_time) return response.json(0) except Exception as e: print('保存算法设置异常,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e))) return response.json(177, repr(e)) @classmethod def get_passenger_flow_list(cls, request_dict, response): """ 获取客流统计列表 """ try: uid = request_dict.get('uid') now_time = request_dict.get('nowTime') flow_type = request_dict.get('flowType') query_type = request_dict.get('queryType') if not all([now_time, flow_type, query_type]): return response.json(444) query_type = int(query_type) flow_type = int(flow_type) passenger_flow_list = [] # 将需要查询的时间戳列表拼接成缓存的键名 cache_key = f"ASJ:PASSENGER_FLOW:{uid}_{flow_type}_{query_type}_{now_time}" redis = RedisObject(5) passenger_flow_json = redis.get_data(cache_key) if passenger_flow_json: return response.json(0, json.loads(passenger_flow_json)) if query_type == 1: passenger_flow_list = cls.get_passenger_flow_by_day(uid, int(now_time), flow_type) elif query_type == 2: passenger_flow_list = cls.get_passenger_flow_by_month(uid, int(now_time), flow_type) elif query_type == 3: passenger_flow_list = cls.get_passenger_flow_by_year(uid, int(now_time), flow_type) else: return response.json(0, passenger_flow_list) if passenger_flow_list: redis.CONN.setnx(cache_key, json.dumps(passenger_flow_list)) redis.CONN.expire(cache_key, 3600 * 24) return response.json(0, passenger_flow_list) except Exception as e: print('查询客流异常,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e))) return response.json(500, repr(e)) @staticmethod def get_passenger_flow_by_day(uid, now_time, flow_type): """ 按天获取客流统计 """ c_time = datetime.fromtimestamp(int(now_time)) # 一天周期24小时从当前小时整点往前查询近24小时,生成秒级时间戳 t_list = [math.floor((c_time - timedelta(hours=i)).timestamp()) for i in range(24)] passenger_flow_list = [] pf_qs = DeviceAlgorithmPassengerFlow.objects.filter( device_time__gte=t_list[23], device_time__lte=t_list[0] + 60, type=int(flow_type), uid=uid ).order_by('device_time') if not pf_qs.exists(): return passenger_flow_list # 循环查找最近24小时的记录(包含当前时间) for i in range(24): current_time = t_list[i] next_time = -1 if i == 0 else t_list[i - 1] is_data = False count = 0 hour = 0 for item in pf_qs: if next_time == -1 and next_time == item.device_time: hour = item.statistical_time.hour count += item.count is_data = True elif current_time <= item.device_time < next_time: count += item.count hour = item.statistical_time.hour is_data = True elif next_time == -1 and current_time <= item.device_time: hour = item.statistical_time.hour count += item.count is_data = True if not is_data: continue passenger_flow_list.append({ 'index': hour, 'count': count, 'time': current_time }) return passenger_flow_list @staticmethod def get_passenger_flow_by_month(uid, now_time, flow_type): """ 按月获取客流统计 """ # 获取30天前的时间戳作为起始时间 s_time = LocalDateTimeUtil.get_before_days_timestamp(now_time, 30) # 将起始时间设置为当天的开始时间 s_time = LocalDateTimeUtil.start_of_day_timestamp(s_time) # 查询指定类型的客流数据,按设备时间升序排序 pf_qs = DeviceAlgorithmPassengerFlow.objects.filter( device_time__range=(s_time, now_time), type=int(flow_type), uid=uid ).order_by('device_time') passenger_flow_list = [] if not pf_qs.exists(): return passenger_flow_list for i in range(30): # 查询月按30天为一个周期 if i == 0: # 当天的开始时间作为查询的起始时间 s_time = LocalDateTimeUtil.start_of_day_timestamp(now_time) else: # 获取i天前的时间戳作为查询的起始时间 s_time = LocalDateTimeUtil.get_before_days_timestamp(now_time, i) # 将起始时间设置为当天的开始时间 s_time = LocalDateTimeUtil.start_of_day_timestamp(s_time) days_qs = [] day = 0 for item in pf_qs: # 计算结束时间 end_time = now_time if i == 0 else s_time + (3600 * 24) # 将在起始时间和结束时间范围内的客流数据放入列表中 if s_time < item.device_time <= end_time: days_qs.append(item.count) day = item.statistical_time.day if day == 0 else day if days_qs: # 将每天的客流数据统计结果添加到列表中 passenger_flow_list.append({ 'index': day, 'count': sum(days_qs), 'time': s_time }) return passenger_flow_list @staticmethod def get_passenger_flow_by_year(uid, now_time, flow_type): """ 按年获取客流统计 """ month_time = 0 passenger_flow_list = [] for i in range(12): if i == 0: month_time = LocalDateTimeUtil.get_current_month_first_day(now_time) df_qs = DeviceAlgorithmPassengerFlow.objects.filter(type=flow_type, uid=uid) if i == 0: df_qs = df_qs.filter(device_time__range=(month_time, now_time)) data_qs = df_qs.first() else: previous_month_time = LocalDateTimeUtil.get_previous_month_first_day(month_time) df_qs = df_qs.filter(device_time__range=(previous_month_time, month_time)) data_qs = df_qs.first() month_time = previous_month_time df_qs = df_qs.aggregate(total=Sum('count')) if not df_qs['total']: continue passenger_flow_list.append({ 'index': data_qs.statistical_time.month, 'count': df_qs['total'], 'time': month_time }) return passenger_flow_list