# -*- encoding: utf-8 -*- """ @File : EquipmentInfoService.py @Time : 2022/4/14 17:28 @Author : stephen @Email : zhangdongming@asj6.wecom.work @Software: PyCharm """ import datetime import itertools import logging import time from django.db.models import Value, CharField, Q from Model.models import EquipmentInfoMonday, EquipmentInfoTuesday, EquipmentInfoWednesday, EquipmentInfoThursday, \ EquipmentInfoFriday, EquipmentInfoSaturday, EquipmentInfoSunday from Object.utils import LocalDateTimeUtil """ 设备分表查询Service 因数据量不断增加,单表保留近七天数据进行分表优化设计 进行拆分为七张表星期一至星期天进行分表存储。分担单表存储读写压力。 """ class EquipmentInfoService: @staticmethod def get_equipment_info_model(dt, val=0): """ 根据日期判断是星期几,返回相应的Model对象 @param val: 1-7代表week @param dt: 日期 例:2022-03-03 @return: 星期一至星期天equipment_info对象实例 """ week = 1 if dt: week = LocalDateTimeUtil.date_to_week(dt) if 0 < val < 8: week = val equipment_info = None if week == 1: equipment_info = EquipmentInfoMonday.objects.all().annotate(tab_val=Value('1', output_field=CharField())) elif week == 2: equipment_info = EquipmentInfoTuesday.objects.all().annotate(tab_val=Value('2', output_field=CharField())) elif week == 3: equipment_info = EquipmentInfoWednesday.objects.all().annotate(tab_val=Value('3', output_field=CharField())) elif week == 4: equipment_info = EquipmentInfoThursday.objects.all().annotate(tab_val=Value('4', output_field=CharField())) elif week == 5: equipment_info = EquipmentInfoFriday.objects.all().annotate(tab_val=Value('5', output_field=CharField())) elif week == 6: equipment_info = EquipmentInfoSaturday.objects.all().annotate(tab_val=Value('6', output_field=CharField())) elif week == 7: equipment_info = EquipmentInfoSunday.objects.all().annotate(tab_val=Value('7', output_field=CharField())) return equipment_info @classmethod def find_by_start_time_equipment_info(cls, page, size, user_id, start_time, end_time, event_type, uid_list): """ 通过start_time查找指定日期当天设备消息推送 @param page: 页数 @param size: 每页条数 @param user_id: 设备用户id @param start_time: 开始时间 @param end_time: 结束时间 @param event_type: 事件类型 @param uid_list: 设备uid列表 @return: result 查询结果 """ if start_time and end_time: start_date = datetime.datetime.fromtimestamp(int(start_time)) # 根据开始日期,获取设备信息查询对象 qs = EquipmentInfoService.get_equipment_info_model(str(start_date.date()), 0) # 调用查询方法 qs = cls.query_equipment_info(qs, user_id, start_time, end_time, event_type, uid_list) # 时区问题 week = LocalDateTimeUtil.date_to_week(str(start_date.date())) if week > 0: # 根据筛选日期 查找昨天数据 yesterday = 7 if week == 1 else week - 1 yesterday_info = EquipmentInfoService.get_equipment_info_model('', yesterday) yesterday_info = cls.query_equipment_info(yesterday_info, user_id, start_time, end_time, event_type, uid_list) # 根据筛选日期 查找明天数据 tomorrow = 1 if week == 7 else week + 1 tomorrow_info = EquipmentInfoService.get_equipment_info_model('', tomorrow) tomorrow_info = cls.query_equipment_info(tomorrow_info, user_id, start_time, end_time, event_type, uid_list) qs = qs.union(yesterday_info, tomorrow_info, all=True) if qs.exists(): count = qs.count() qs_page = cls.get_equipment_info_page(qs, page, size) return qs_page, count return None, 0 @classmethod def get_equipment_info_week_all(cls, page, size, user_id, start_time, end_time, event_type, uid_list): """ 分表查询近七天设备消息推送 @param page: 页数 @param size: 分页大小 @param user_id: 设备用户id @param start_time: 事件开始时间 @param end_time: 事件结束时间 @param event_type: 事件类型 @param uid_list: uid列表 @return: qs_page, count 结果集 """ # 星期一设备信息查询 monday_qs = EquipmentInfoService.get_equipment_info_model('', 1) monday_qs = cls.query_equipment_info(monday_qs, user_id, start_time, end_time, event_type, uid_list) # 星期二设备信息查询 tuesday_qs = EquipmentInfoService.get_equipment_info_model('', 2) tuesday_qs = cls.query_equipment_info(tuesday_qs, user_id, start_time, end_time, event_type, uid_list) # 星期三设备信息查询 wednesday_qs = EquipmentInfoService.get_equipment_info_model('', 3) wednesday_qs = cls.query_equipment_info(wednesday_qs, user_id, start_time, end_time, event_type, uid_list) # 星期四设备信息查询 thursday_qs = EquipmentInfoService.get_equipment_info_model('', 4) thursday_qs = cls.query_equipment_info(thursday_qs, user_id, start_time, end_time, event_type, uid_list) # 星期五设备信息查询 friday_qs = EquipmentInfoService.get_equipment_info_model('', 5) friday_qs = cls.query_equipment_info(friday_qs, user_id, start_time, end_time, event_type, uid_list) # 星期六设备信息查询 saturday_qs = EquipmentInfoService.get_equipment_info_model('', 6) saturday_qs = cls.query_equipment_info(saturday_qs, user_id, start_time, end_time, event_type, uid_list) # 星期天设备信息查询 sunday_qs = EquipmentInfoService.get_equipment_info_model('', 7) sunday_qs = cls.query_equipment_info(sunday_qs, user_id, start_time, end_time, event_type, uid_list) result = monday_qs.union(tuesday_qs, wednesday_qs, thursday_qs, friday_qs, saturday_qs, sunday_qs, all=True) count = result.count() qs_page = cls.get_equipment_info_page(result, page, size) return qs_page, count @classmethod def query_equipment_info(cls, qs, user_id, start_time, end_time, event_type, uid_list): """ 设备信息条件查询,根据分表设计,默认条件event_time大于七天前时间 @param qs: 设备信息查询对象 @param user_id: 设备用户id @param start_time: 开始时间 @param end_time: 结束时间 @param event_type: 事件类型 @param uid_list: 设备uid列表 @return: result 设备信息结果集 """ now_time = int(time.time()) # 获取七天前时间戳 seven_days_before_time = LocalDateTimeUtil.get_before_days_timestamp(now_time, 7) # 默认查询当前表event_time大于七天前时间 qs = qs.filter(event_time__gt=seven_days_before_time) if user_id: qs = qs.filter(device_user_id=user_id) if event_type: # 多类型查询 eventTypeList = cls.get_comb_event_type(event_type) # eventTypeList += cls.get_combo_type_bins(event_type) eventTypeList = list(set(eventTypeList)) tags = cls.get_event_tag(event_type) if eventTypeList: qs = qs.filter(Q(event_type__in=eventTypeList, event_tag='') | Q(event_tag__regex=tags)) elif tags: qs = qs.filter(event_tag__regex=tags) if start_time and end_time: qs = qs.filter(event_time__range=(start_time, end_time)) else: qs = qs.filter(event_time__range=(start_time, now_time)) if uid_list: uid_list = uid_list.split(',') qs = qs.filter(device_uid__in=uid_list) return qs @classmethod def get_equipment_info_page(cls, equipment_info_qs, page, size): """ 获取查询结果集进行排序、分页,遍历重命名字典key(主要针对原函数返回结果集) @param equipment_info_qs: 设备信息结果集 @param page: 页数 @param size: 分页大小 @return: qs_page 遍历后的设备信息结果集 """ equipment_info_qs = equipment_info_qs.values('id', 'device_uid', 'device_nick_name', 'channel', 'event_type', 'status', 'answer_status', 'alarm', 'event_time', 'receive_time', 'is_st', 'add_time', 'storage_location', 'border_coords', 'tab_val', 'event_tag') equipment_info_qs = equipment_info_qs.order_by('-event_time') qs_page = equipment_info_qs[(page - 1) * size:page * size] if not qs_page or not qs_page.exists() or qs_page.count == 0: return qs_page for item in qs_page: # 星期表值 tab_val = item['tab_val'] # id = 星期表值+id item['id'] = int(tab_val + str(item['id'])) item['devUid'] = item['device_uid'] item['devNickName'] = item['device_nick_name'] item['Channel'] = item['channel'] item['eventType'] = item['event_type'] item['eventTime'] = item['event_time'] item['receiveTime'] = item['receive_time'] item['addTime'] = item['add_time'] item['borderCoords'] = item['border_coords'] item['eventTag'] = item['event_tag'] item.pop('device_uid') item.pop('device_nick_name') item.pop('channel') item.pop('event_type') item.pop('event_time') item.pop('receive_time') item.pop('add_time') item.pop('border_coords') item.pop('tab_val') item.pop('event_tag') return qs_page @classmethod def get_comb_event_type(cls, event_type): """ 重新组合ai消息类型查询,使其支持ai多标签查询 @param event_type: 消息类型 @return: event_type_list 消息类型数组 """ if ',' in event_type: event_type_list = event_type.split(',') event_type_list = [int(i.strip()) for i in event_type_list] else: event_type_list = [int(event_type)] ai_event_type_list = [] normal_event_type_list = [] a_type = [60, 61, 62, 63, 64, 65, 66] for val in event_type_list: if val <= 4: # 分离出ai类型,以便后续组合ai标签,目前只存在4个ai类型1,2,3,4 ai_event_type_list.append(val) else: if val not in a_type: normal_event_type_list.append(val) if len(ai_event_type_list) < 1: return normal_event_type_list ai_event_type_list.sort() ai_type = [1, 2, 3, 4] # AI目前所有的标签,1人,2车,3宠物,4包裹,后续有新类型需要这里加, 后续会优化,存在表里,包裹存对应的aws标签 comb_ai_event_type = [] seen = set() for i in range(1, len(ai_type) + 1): # 计算所有组合,如[1, 2, 3, 4], 4取1,4取2,4取3,4取4 for s in itertools.combinations(ai_type, i): if s not in seen: # 去除重复项, 如a=[1,2,3,4,4],会有两个[1,2,3,4,4],[1,2,3,4,4]的组合 seen.add(s) s_list = list(s) for ai_event_type in ai_event_type_list: if ai_event_type in s_list: # 排除没有选择的标签组合 if s_list not in comb_ai_event_type: s_list = [str(v) for v in s_list] comb_ai_event_type.append(s_list) regroup_list = [] for val in comb_ai_event_type: # 组合ai类型组合,如[[2,3],[1,3]] -> [23, 13] val = ''.join(val) regroup_list.append(int(val)) group_list = regroup_list + normal_event_type_list # 加上普通移动消息类型 return group_list @classmethod def get_all_comb_event_type(cls): """ 计算ai消息类型全组合 @return: event_type_list ai所有消息类型数组 """ ai_type = [1, 2, 3, 4] # AI目前所有的标签,1人,2车,3宠物,4包裹,后续有新类型需要这里加, 后续会优化,存在表里,包裹存对应的aws标签 comb_ai_event_type = [] for i in range(1, len(ai_type) + 1): # 计算所有组合,如[1, 2, 3, 4], 4取1,4取2,4取3,4取4 for s in itertools.combinations(ai_type, i): s_list = list(s) s_list = [str(v) for v in s_list] comb_ai_event_type.append(s_list) regroup_list = [] for val in comb_ai_event_type: # 组合ai类型组合,如[[2,3],[1,3]] -> [23, 13] val = ''.join(val) regroup_list.append(int(val)) return regroup_list @staticmethod def get_equipment_info_obj(dt, **kwargs): """ 根据日期判断是星期几,返回相应的对象实例 @param dt: 日期 例:2022-03-03 @param kwargs: 设备信息属性值 @return: 星期一至星期天equipment_info对象实例 """ logger = logging.getLogger('info') week = LocalDateTimeUtil.date_to_week(dt) logger.info('本周{}'.format(str(week))) equipment_info = None if week == 1: equipment_info = EquipmentInfoMonday(**kwargs) elif week == 2: equipment_info = EquipmentInfoTuesday(**kwargs) elif week == 3: equipment_info = EquipmentInfoWednesday(**kwargs) elif week == 4: equipment_info = EquipmentInfoThursday(**kwargs) elif week == 5: equipment_info = EquipmentInfoFriday(**kwargs) elif week == 6: equipment_info = EquipmentInfoSaturday(**kwargs) elif week == 7: equipment_info = EquipmentInfoSunday(**kwargs) logger.info(type(equipment_info)) logger.info(equipment_info) return equipment_info @classmethod def get_combo_types(cls, event_type, event_tag): """ 获取设备算法组合类型 51:移动侦测,52:传感器报警,53:影像遗失,54:PIR,55:门磁报警,56:外部发报,57:人型报警(提示:有人出现),58:车型,59:宠物,60:人脸,61:异响, 62:区域闯入,63:区域闯出,64:长时间无人检测,65:长时间无人检测 0:代表空字符,702:摄像头休眠,703:摄像头唤醒,704:电量过低 AWS AI识别 1:人形,2:车型,3:宠物,4:包裹。云端AI类型 @param event_tag: @param event_type: @return: """ try: types = [] if event_tag: res = event_tag.split(',') types = [int(var) for var in res if var] return types res_type = cls.is_type_exist(event_type) if res_type == 0: return types combo_types = [51, 57, 58, 60, 59, 61, 62, 63, 64, 65] event_type = str(event_type) len_type = len(event_type) for i in range(0, len_type): e_type = int(event_type[len_type - 1 - i]) if e_type == 1: types.append(combo_types[i]) return types except Exception as e: print('推送错误异常,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e))) return event_type @staticmethod def is_combo_tag(event_type, event_tag): """ 是否为多标签 """ if not event_tag: return False res = event_tag.split(',') types = [int(var) for var in res if var] if event_type in types and len(types) == 1: return False return True @classmethod def is_type_exist(cls, event_type): """ 判断类型是否存在列表 @param event_type: @return: 0 or event_type """ combo_types = cls.combo_type_all() if not combo_types: return 0 if event_type in combo_types: return event_type return 0 @staticmethod def combo_type_all(): """ 获取所有组合类型 @return: """ arr_list = [] event_arr = [] resource_list = [1, 2, 4, 8, 16, 32, 64, 128, 256, 512] for i in range(2, len(resource_list) + 1): arr_list += list(itertools.combinations(resource_list, i)) # 表示从 [1,2,3,4] 中选出 3个元素的组合情况 for i in arr_list: val = 0 for item in i: val += item event_arr.append(int(EquipmentInfoService.dec_to_bin(val))) return event_arr @staticmethod def dec_to_bin(num): """ 十进制转二进制 @param num: @return: """ result = "" while num != 0: ret = num % 2 num //= 2 result = str(ret) + result return result @staticmethod def get_combo_type_bins(event_type): """ 获取组合类型二进制列表 @param event_type: 标签类型 @return: """ res_list = [] try: if ',' in event_type: res_list = event_type.split(',') res_list = [int(i.strip()) for i in res_list] else: res_list = [int(event_type)] combo_types = [51, 57, 58, 60, 59, 61] for e_item in res_list: bins = EquipmentInfoService.combo_type_all() if e_item in combo_types: event_label = combo_types.index(e_item) + 1 for item in bins: val = str(item) val_len = len(val) if val_len >= event_label and int(val[val_len - event_label]) == 1: res_list.append(int(item)) return res_list except Exception as e: print('推送错误异常,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e))) return res_list @staticmethod def get_event_tag(event_type): """ 获取标签用于筛选推送消息 """ if ',' in event_type: tags = '' res_list = event_type.split(',') tag_size = len(res_list) for i, item in enumerate(res_list): tags += ',' + str(item) + ',' if i < (tag_size - 1): tags += '|' return tags else: return ',' + str(event_type) + ',' @staticmethod def update_equipment_answer_status(user_id, uid, event_time): """ 更新一键通话消息状态 @param user_id: 用户id @param uid: uid @param event_time: 事件时间 @return updated_flag: bool """ updated_flag = False updated = EquipmentInfoMonday.objects.filter(device_user_id=user_id, device_uid=uid, event_time=event_time, event_type=606).update(answer_status=1) if updated: updated_flag = True updated = EquipmentInfoTuesday.objects.filter(device_user_id=user_id, device_uid=uid, event_time=event_time, event_type=606).update(answer_status=1) if updated: updated_flag = True updated = EquipmentInfoWednesday.objects.filter(device_user_id=user_id, device_uid=uid, event_time=event_time, event_type=606).update(answer_status=1) if updated: updated_flag = True updated = EquipmentInfoThursday.objects.filter(device_user_id=user_id, device_uid=uid, event_time=event_time, event_type=606).update(answer_status=1) if updated: updated_flag = True updated = EquipmentInfoFriday.objects.filter(device_user_id=user_id, device_uid=uid, event_time=event_time, event_type=606).update(answer_status=1) if updated: updated_flag = True updated = EquipmentInfoSaturday.objects.filter(device_user_id=user_id, device_uid=uid, event_time=event_time, event_type=606).update(answer_status=1) if updated: updated_flag = True updated = EquipmentInfoSunday.objects.filter(device_user_id=user_id, device_uid=uid, event_time=event_time, event_type=606).update(answer_status=1) if updated: updated_flag = True return updated_flag