# -*- coding: utf-8 -*- """ @Author : Rocky @Time : 2022/6/29 9:31 @File :SmartSceneController.py """ import json import time from django.core.exceptions import ObjectDoesNotExist from django.db import transaction from django.db.models import F, Q, Count from django.views import View from Ansjer.Config.gatewaySensorConfig import SMART_SCENE_TOPIC, SENSOR_TYPE, EVENT_TYPE, SCENE_EVENT_CREATE, \ SCENE_EVENT_EDIT, SCENE_EVENT_DELETE, SCENE_STATUS_ON, SCENE_STATUS_OFF, SCENE_EVENT_EDIT_STATUS, \ VOICE_AUDITION_TOPIC, SMART_SOCKET_TOPIC, DEVICE_TYPE from Model.models import FamilyRoomDevice, GatewaySubDevice, FamilyRoom, SmartScene, EffectiveTime, Device_Info, \ SceneLog, GatewayPush from Object.ApschedulerObject import ApschedulerObject from Object.ResponseObject import ResponseObject from Service.CommonService import CommonService class SmartSceneView(View): def get(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') return self.validation(request.GET, request, operation) def post(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') return self.validation(request.POST, request, operation) def validation(self, request_dict, request, operation): if operation == 'get-scene': # 设备获取智能场景数据 return self.get_scene_data(request_dict, ResponseObject('cn')) token_code, user_id, response = CommonService.verify_token_get_user_id(request_dict, request) if token_code != 0: return response.json(token_code) if operation == 'condition-devices': # 添加条件-查询设备 return self.condition_devices(request_dict, response) elif operation == 'task-devices': # 添加任务-查询设备 return self.task_devices(request_dict, user_id, response) elif operation == 'create': # 创建智能场景 return self.create_smart_scene(request_dict, user_id, response) elif operation == 'scene-list': # 查询智能场景列表 return self.scene_list(request_dict, user_id, response) elif operation == 'smart-button-scene-list': # 查询智能按钮场景列表 return self.smart_button_scene_list(request_dict, user_id, response) elif operation == 'update-status': # 更新智能场景状态 return self.update_status(request_dict, response) elif operation == 'detail': # 查询智能场景详情 return self.scene_detail(request_dict, response) elif operation == 'edit': # 编辑智能场景 return self.edit_smart_scene(request_dict, user_id, response) elif operation == 'delete': # 删除智能场景 return self.delete_smart_scene(request_dict, response) elif operation == 'log': # 查询智能场景日志 return self.scene_log(request_dict, response) elif operation == 'log-date': # 查询智能场景日志日期 return self.scene_log_date(request_dict, response) elif operation == 'voice-audition': # 智能场景音频试听 return self.voice_audition(request_dict, response) else: return response.json(414) @classmethod def condition_devices(cls, request_dict, response): """ 添加条件-查询设备 @param request_dict: 请求参数 @request_dict deviceId: 网关设备id @request_dict subDeviceId: 子设备id @param response: 响应对象 @return: response """ device_id = request_dict.get('deviceId', None) sub_device_id = request_dict.get('subDeviceId', None) if not any([device_id, sub_device_id]): return response.json(444, {'error param': 'deviceId or subDeviceId'}) try: if sub_device_id: device_id = GatewaySubDevice.objects.get(id=sub_device_id).device_id gateway_sub_device_qs = GatewaySubDevice.objects.filter(device_id=device_id) if not gateway_sub_device_qs.exists(): return response.json(173) res = cls.get_sub_device_room_name(gateway_sub_device_qs) return response.json(0, res) except Exception as e: return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) @classmethod def task_devices(cls, request_dict, user_id, response): """ 添加任务-查询设备 @param request_dict: 请求参数 @request_dict deviceId: 网关设备id @param user_id: 用户id @param response: 响应对象 @return: response """ sub_device_id = request_dict.get('subDeviceId', None) device_id = request_dict.get('deviceId', None) if not any([device_id, sub_device_id]): return response.json(444, {'error param': 'deviceId or subDeviceId'}) try: if device_id: res = [cls.get_gateway_data(device_id)] else: sub_device_qs = GatewaySubDevice.objects.filter(id=sub_device_id).values('device_id', 'device_type') device_id = sub_device_qs[0]['device_id'] device_type = sub_device_qs[0]['device_type'] if device_type != SENSOR_TYPE['smart_button']: # 非智能按钮只返回网关 res = [cls.get_gateway_data(device_id)] else: # 智能按钮返回网关,门磁和人体传感器(如果存在) gateway_data = cls.get_gateway_data(device_id) sub_device_qs = GatewaySubDevice.objects.filter( Q(Q(device_id=device_id) & Q(device_type=SENSOR_TYPE['door_magnet'])) | Q(Q(device_id=device_id) & Q(device_type=SENSOR_TYPE['body_sensor'])) ).values('id', 'nickname', 'status', 'device_type') if sub_device_qs.exists(): res = cls.get_sub_device_room_name(sub_device_qs, gateway_data) else: res = [gateway_data] # 添加插座和开关数据 res = cls.append_plug_and_switch_data(res, user_id) return response.json(0, res) except Exception as e: return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) @staticmethod def get_sub_device_room_name(sub_device_qs, gateway_data=None): """ 获取房间名称 @param sub_device_qs: 子设备信息 @param gateway_data: 网关参数 @return: sub_device_list """ sub_device_list = [] if gateway_data: sub_device_list.append(gateway_data) sub_device_qs = sub_device_qs.annotate(gatewaySubId=F('id'), deviceType=F('device_type'), deviceNickName=F('nickname')). \ values('gatewaySubId', 'deviceType', 'deviceNickName', 'status') for sub_device in sub_device_qs: family_room_device_qs = FamilyRoomDevice.objects.filter(sub_device=sub_device['gatewaySubId']). \ values('room_id') if not family_room_device_qs.exists(): sub_device['roomName'] = '' else: room_id = family_room_device_qs[0]['room_id'] try: sub_device['roomName'] = FamilyRoom.objects.get(id=room_id).name except ObjectDoesNotExist: sub_device['roomName'] = '' sub_device_list.append(sub_device) return sub_device_list @staticmethod def get_gateway_data(device_id): """ 获取网关数据 @param device_id: 网关设备id @return: res """ device_info_qs = Device_Info.objects.filter(id=device_id).values('NickName', 'Type', 'serial_number') nickname = device_info_qs[0]['NickName'] device_type = device_info_qs[0]['Type'] serial_number = device_info_qs[0]['serial_number'] room_id = FamilyRoomDevice.objects.filter(device_id=device_id).values('room_id')[0]['room_id'] room_id_qs = FamilyRoom.objects.filter(id=room_id).values('name') room_name = room_id_qs.first()['name'] if room_id_qs.exists() else '' res = { 'serialNumber': serial_number, 'deviceNickName': nickname, 'deviceType': device_type, 'roomName': room_name, 'status': 1, } return res @staticmethod def append_plug_and_switch_data(res, user_id): """ 添加插座和开关数据 @param res: 网关等设备数据 @param user_id: 用户id @return: res """ device_info_qs = Device_Info.objects.filter(userID_id=user_id, Type__in=[DEVICE_TYPE['socket']]).\ values('id', 'NickName', 'Type', 'serial_number') if device_info_qs.exists(): for device_info in device_info_qs: nickname = device_info['NickName'] device_type = device_info['Type'] serial_number = device_info['serial_number'] device_id = device_info['id'] room_id = FamilyRoomDevice.objects.filter(device_id=device_id).values('room_id')[0]['room_id'] room_id_qs = FamilyRoom.objects.filter(id=room_id).values('name') room_name = room_id_qs.first()['name'] if room_id_qs.exists() else '' data = { 'serialNumber': serial_number, 'deviceNickName': nickname, 'deviceType': device_type, 'roomName': room_name, 'status': 1, } res.append(data) return res @classmethod def create_smart_scene(cls, request_dict, user_id, response): """ 创建智能场景 @param request_dict: 请求参数 @param user_id: 用户id @request_dict deviceId: 网关设备id @request_dict subDeviceId: 子设备id @request_dict sceneName: 场景名称 @request_dict conditions: 条件 @request_dict tasks: 任务 @request_dict isAllDay: 是否全天执行 @request_dict startTime: 开始时间 @request_dict endTime: 结束时间 @request_dict repeat: 重复周期 @param response: 响应对象 @return: response """ device_id = request_dict.get('deviceId', None) sub_device_id = request_dict.get('subDeviceId', None) scene_name = request_dict.get('sceneName', None) conditions = request_dict.get('conditions', None) tasks = request_dict.get('tasks', None) is_all_day = request_dict.get('isAllDay', None) if not all([scene_name, conditions, tasks]): return response.json(444, {'error param': 'scene_name and conditions and tasks'}) now_time = int(time.time()) conditions_dict = eval(conditions) tasks_list = eval(tasks) try: # 判断是否已存在该场景名 smart_scene_qs = SmartScene.objects.filter(user_id=user_id, scene_name=scene_name) if smart_scene_qs.exists(): return response.json(179) tz = CommonService.get_user_tz(user_id) smart_scene_dict = { 'user_id': user_id, 'scene_name': scene_name, 'conditions': conditions, 'tasks': tasks, 'tz': tz, 'created_time': now_time, 'updated_time': now_time, } msg = { 'scene_event': SCENE_EVENT_CREATE, 'scene_status': SCENE_STATUS_ON } # 处理设置时间 if is_all_day is not None: is_all_day = int(is_all_day) smart_scene_dict['is_all_day'] = is_all_day # 处理传网关设备id和子设备id的情况 if conditions_dict['type'] == 1: # 网关设置时间 if not device_id: return response.json(444, {'error param': 'deviceId'}) smart_scene_dict['device_id'] = device_id device_info_qs = Device_Info.objects.filter(id=device_id).values('serial_number') if not device_info_qs.exists(): return response.json(173) serial_number = device_info_qs[0]['serial_number'] # 网关数据 msg['sensor_type'] = DEVICE_TYPE['gateway'] msg['sensor_status'] = 2002 msg['sensor_ieee_addr'] = 'FFFFFFFFFFFFFFFF' else: # 子设备设置场景 if not sub_device_id: return response.json(444, {'error param': 'subDeviceId'}) # 查询数据 sub_device_qs = GatewaySubDevice.objects.filter(id=sub_device_id).\ values('device__serial_number', 'ieee_addr', 'is_tampered') if not sub_device_qs.exists(): return response.json(173) if cls.time_conflict(sub_device_id, conditions, is_all_day, request_dict): return response.json(182) device_type = int(conditions_dict['sensor']['device_type']) # 智能按钮不能创建触发条件相同的场景 if device_type == SENSOR_TYPE['smart_button']: event_type = conditions_dict['sensor']['eventValues'][0]['event_type'] smart_scene_qs = SmartScene.objects.filter(sub_device_id=sub_device_id, conditions__contains=event_type) if smart_scene_qs.exists(): return response.json(180) # 紧急按钮打开时,创建的场景状态默认为关闭 if sub_device_qs[0]['is_tampered'] == 1: smart_scene_dict['is_enable'] = False # 温湿度传感器返回温湿度 elif device_type == SENSOR_TYPE['tem_hum_sensor']: event_values = conditions_dict['sensor']['eventValues'][0] if '≥' in event_values['value']: replace_str = '≥ ' msg['sensor_symbol'] = 2 else: replace_str = '≤ ' msg['sensor_symbol'] = 1 value = event_values['value'].replace(replace_str, '') msg['sensor_data'] = float(value) smart_scene_dict['sub_device_id'] = sub_device_id serial_number = sub_device_qs[0]['device__serial_number'] msg['sensor_type'] = int(conditions_dict['sensor']['device_type']) msg['sensor_ieee_addr'] = sub_device_qs[0]['ieee_addr'] msg['sensor_status'] = int(conditions_dict['sensor']['eventValues'][0]['event_type']) with transaction.atomic(): if is_all_day is None: # 不设置时间 smart_scene_qs = SmartScene.objects.create(**smart_scene_dict) # 设备的time数据,分钟转为秒 time_dict = { 'start_time': conditions_dict['time']['minutes'] * 60, 'repeat': conditions_dict['time']['repeat'] } elif is_all_day == 1: # 全天 smart_scene_qs = SmartScene.objects.create(**smart_scene_dict) # 设备的time数据 time_dict = { 'is_all_day': is_all_day } elif is_all_day == 2: # 非全天 start_time = int(request_dict.get('startTime', None)) end_time = int(request_dict.get('endTime', None)) repeat = int(request_dict.get('repeat', None)) effective_time_qs = EffectiveTime.objects.filter(start_time=start_time, end_time=end_time, repeat=repeat).values('id') if effective_time_qs.exists(): effective_time_id = effective_time_qs[0]['id'] else: effective_time_id = EffectiveTime.objects.create(start_time=start_time, end_time=end_time, repeat=repeat).id smart_scene_dict['effective_time_id'] = effective_time_id smart_scene_qs = SmartScene.objects.create(**smart_scene_dict) # 设备的time数据,分钟转为秒 time_dict = { 'is_all_day': is_all_day, 'start_time': start_time * 60, 'end_time': end_time * 60, 'repeat': repeat } else: return response.json(444, {'error param': 'invalid isAllDay'}) msg['time'] = time_dict msg['scene_id'] = smart_scene_qs.id # 获取设备任务数据 msg['task'], mqtt_tasks = cls.get_msg_task_list(tasks_list, conditions_dict, tz, now_time) smart_scene_qs.device_data = json.dumps(msg) if mqtt_tasks: smart_scene_qs.mqtt_tasks = json.dumps(mqtt_tasks) smart_scene_qs.save() # 发布MQTT消息通知网关设备 thing_name = serial_number topic_name = SMART_SCENE_TOPIC.format(serial_number) success = CommonService.req_publish_mqtt_msg(thing_name, topic_name, msg) try: assert success except AssertionError: return response.json(10044) return response.json(0) except Exception as e: return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) @staticmethod def scene_list(request_dict, user_id, response): """ 查询智能场景列表 @param request_dict: 请求参数 @param user_id: 用户id @request_dict deviceId: 网关设备id @request_dict subDeviceId: 子设备id @param response: 响应对象 @return: response """ device_id = request_dict.get('deviceId', None) sub_device_id = request_dict.get('subDeviceId', None) if not any([device_id, sub_device_id]): return response.json(444, {'error param': 'deviceId or subDeviceId'}) try: if device_id: sub_device_id = GatewaySubDevice.objects.filter(device_id=device_id).values('id') smart_scene_qs = SmartScene.objects.filter( Q(user_id=user_id) & Q(device_id=device_id) | Q(sub_device_id__in=sub_device_id)) else: smart_scene_qs = SmartScene.objects.filter(user_id=user_id, sub_device_id=sub_device_id) if not smart_scene_qs.exists(): return response.json(173) smart_scene_qs = smart_scene_qs.values('id', 'scene_name', 'is_enable', 'device_id', 'sub_device_id') smart_scene_list = [] for item in smart_scene_qs: smart_scene_dict = { 'id': item['id'], 'scene_name': item['scene_name'], 'is_enable': item['is_enable'] } if item['device_id']: device_qs = Device_Info.objects.filter(id=item['device_id'], userID=user_id).values('NickName', 'Type') smart_scene_dict['device_type'] = device_qs[0]['Type'] if device_qs.exists() else '' smart_scene_dict['device_nickname'] = device_qs[0]['NickName'] if device_qs.exists() else '' else: device_qs = GatewaySubDevice.objects.filter(id=item['sub_device_id']).values('device_type', 'nickname') smart_scene_dict['device_type'] = device_qs[0]['device_type'] if device_qs.exists() else '' smart_scene_dict['device_nickname'] = device_qs[0]['nickname'] if device_qs.exists() else '' smart_scene_list.append(smart_scene_dict) return response.json(0, smart_scene_list) except Exception as e: return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) @staticmethod def smart_button_scene_list(request_dict, user_id, response): """ 查询智能按钮场景列表 @param request_dict: 请求参数 @param user_id: 用户id @request_dict subDeviceId: 子设备id @param response: 响应对象 @return: response """ sub_device_id = request_dict.get('subDeviceId', None) if not sub_device_id: return response.json(444, {'error param': 'subDeviceId'}) try: click_scene_qs = SmartScene.objects.filter(user_id=user_id, sub_device_id=sub_device_id, conditions__contains=str( EVENT_TYPE['smart_button_click'])).values('id', 'scene_name', 'is_enable') double_click_scene_qs = SmartScene.objects.filter(user_id=user_id, sub_device_id=sub_device_id, conditions__contains=str( EVENT_TYPE['smart_button_double_click'])). \ values('id', 'scene_name', 'is_enable') three_click_scene_qs = SmartScene.objects.filter(user_id=user_id, sub_device_id=sub_device_id, conditions__contains=str( EVENT_TYPE['smart_button_three_click'])). \ values('id', 'scene_name', 'is_enable') scene_list = [] if click_scene_qs.exists(): scene_list.append({ 'trigger_type': 1, 'id': click_scene_qs[0]['id'], 'scene_name': click_scene_qs[0]['scene_name'], 'is_enable': click_scene_qs[0]['is_enable'] }) if double_click_scene_qs.exists(): scene_list.append({ 'trigger_type': 2, 'id': double_click_scene_qs[0]['id'], 'scene_name': double_click_scene_qs[0]['scene_name'], 'is_enable': double_click_scene_qs[0]['is_enable'] }) if three_click_scene_qs.exists(): scene_list.append({ 'trigger_type': 3, 'id': three_click_scene_qs[0]['id'], 'scene_name': three_click_scene_qs[0]['scene_name'], 'is_enable': three_click_scene_qs[0]['is_enable'] }) return response.json(0, scene_list) except Exception as e: return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) @staticmethod def update_status(request_dict, response): """ 更新智能场景状态 @param request_dict: 请求参数 @request_dict smartSceneId: 智能场景id @request_dict isEnable: 状态,True or False @param response: 响应对象 @return: response """ smart_scene_id = request_dict.get('smartSceneId', None) is_enable = request_dict.get('isEnable', None) if not all([smart_scene_id, is_enable]): return response.json(444, {'error param': 'smartSceneId and status'}) try: smart_scene_id = int(smart_scene_id) scene_status = SCENE_STATUS_ON if is_enable == 'True' else SCENE_STATUS_OFF msg = { 'scene_event': SCENE_EVENT_EDIT_STATUS, 'scene_id': smart_scene_id, 'scene_status': scene_status } # 查询序列号 smart_scene_qs = SmartScene.objects.filter(id=smart_scene_id).values('device_id', 'sub_device_id') device_id = smart_scene_qs[0]['device_id'] if device_id: serial_number = Device_Info.objects.filter(id=device_id).values('serial_number')[0]['serial_number'] else: sub_device_id = smart_scene_qs[0]['sub_device_id'] serial_number = GatewaySubDevice.objects.filter(id=sub_device_id).values('device__serial_number')[0][ 'device__serial_number'] topic_name = SMART_SCENE_TOPIC.format(serial_number) with transaction.atomic(): SmartScene.objects.filter(id=smart_scene_id).update(is_enable=is_enable) # 通过mqtt发送设备数据 success = CommonService.req_publish_mqtt_msg(serial_number, topic_name, msg) try: assert success except AssertionError: return response.json(10044) return response.json(0) except Exception as e: return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) @staticmethod def scene_detail(request_dict, response): """ 查询智能场景详情 @param request_dict: 请求参数 @request_dict smartSceneId: 智能场景id @param response: 响应对象 @return: response """ smart_scene_id = request_dict.get('smartSceneId', None) if not smart_scene_id: return response.json(444, {'error param': 'smartSceneId'}) try: smart_scene_qs = SmartScene.objects.filter(id=smart_scene_id).values('id', 'scene_name', 'conditions', 'tasks', 'effective_time_id', 'is_all_day') if not smart_scene_qs.exists(): return response.json(173) res = { 'scene_name': smart_scene_qs[0]['scene_name'], 'condition': eval(smart_scene_qs[0]['conditions']), 'task': eval(smart_scene_qs[0]['tasks']), } # 如果存在关联的时间数据,组织时间数据 is_all_day = smart_scene_qs[0]['is_all_day'] effectiveTime = {} if is_all_day != 0: effectiveTime['isAllDay'] = is_all_day if is_all_day == 2: try: effective_time_qs = EffectiveTime.objects.get(id=smart_scene_qs[0]['effective_time_id']) effectiveTime['startTime'] = effective_time_qs.start_time effectiveTime['endTime'] = effective_time_qs.end_time effectiveTime['repeat'] = effective_time_qs.repeat except ObjectDoesNotExist: return response.json(0, res) res['effectiveTime'] = effectiveTime return response.json(0, res) except Exception as e: return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) @classmethod def edit_smart_scene(cls, request_dict, user_id, response): """ 编辑智能场景 @param request_dict: 请求参数 @param user_id: 用户id @request_dict smartSceneId: 智能场景id @param response: 响应对象 @return: response """ smart_scene_id = int(request_dict.get('smartSceneId', None)) device_id = request_dict.get('deviceId', None) sub_device_id = request_dict.get('subDeviceId', None) scene_name = request_dict.get('sceneName', None) conditions = request_dict.get('conditions', None) tasks = request_dict.get('tasks', None) is_all_day = request_dict.get('isAllDay', None) conditions_dict = eval(conditions) tasks_list = eval(tasks) now_time = int(time.time()) smart_scene_qs = SmartScene.objects.filter(user_id=user_id, scene_name=scene_name).filter(~Q(id=smart_scene_id)) if smart_scene_qs.exists(): return response.json(179) res = { 'scene_name': scene_name, 'conditions': conditions_dict, 'tasks': tasks_list } effective_time = {} if is_all_day: is_all_day = int(is_all_day) effective_time['is_all_day'] = is_all_day if not all([smart_scene_id, scene_name, conditions, tasks]): return response.json(444, {'error param': 'smartSceneId,sceneName,conditions or tasks'}) try: smart_scene_qs = SmartScene.objects.filter(id=smart_scene_id) if not smart_scene_qs.exists(): return response.json(173) tz = smart_scene_qs[0].tz mqtt_tasks_list = eval(smart_scene_qs[0].mqtt_tasks) scene_status = 1 if smart_scene_qs[0].is_enable else 0 msg = { 'scene_id': smart_scene_id, 'scene_event': SCENE_EVENT_EDIT, 'scene_status': scene_status } if conditions_dict['type'] == 2: # 条件为选择子设备 if not sub_device_id: return response.json(444, {'error param': 'subDeviceId'}) if cls.time_conflict(sub_device_id, conditions, is_all_day, request_dict, smart_scene_id): return response.json(182) device_type = int(conditions_dict['sensor']['device_type']) # 智能按钮不能创建触发条件相同的场景 if device_type == SENSOR_TYPE['smart_button']: event_type = conditions_dict['sensor']['eventValues'][0]['event_type'] smart_scene_temp_qs = SmartScene.objects.filter(Q(sub_device_id=sub_device_id), ~Q(id=smart_scene_id), conditions__contains=event_type) if smart_scene_temp_qs.exists(): return response.json(180) # 温湿度传感器返回温湿度 elif device_type == SENSOR_TYPE['tem_hum_sensor']: event_values = conditions_dict['sensor']['eventValues'][0] if '≥' in event_values['value']: replace_str = '≥ ' msg['sensor_symbol'] = 2 else: replace_str = '≤ ' msg['sensor_symbol'] = 1 value = event_values['value'].replace(replace_str, '') msg['sensor_data'] = float(value) device_id = '' sub_device_qs = GatewaySubDevice.objects.filter(id=sub_device_id).values('ieee_addr', 'device__serial_number') if not sub_device_qs.exists(): return response.json(173) serial_number = sub_device_qs[0]['device__serial_number'] msg['sensor_type'] = int(conditions_dict['sensor']['device_type']) msg['sensor_ieee_addr'] = sub_device_qs[0]['ieee_addr'] msg['sensor_status'] = int(conditions_dict['sensor']['eventValues'][0]['event_type']) else: if not device_id: return response.json(444, {'error param': 'deviceId'}) sub_device_id = 0 device_qs = Device_Info.objects.filter(id=device_id).values('serial_number') if not device_qs.exists(): return response.json(173) serial_number = device_qs[0]['serial_number'] # 网关数据 msg['sensor_type'] = DEVICE_TYPE['gateway'] msg['sensor_status'] = 2002 msg['sensor_ieee_addr'] = 'FFFFFFFFFFFFFFFF' # 获取设备任务数据 msg['task'], mqtt_tasks = cls.get_msg_task_list(tasks_list, conditions_dict, tz, now_time, mqtt_tasks_list) if mqtt_tasks: mqtt_tasks = json.dumps(mqtt_tasks) with transaction.atomic(): smart_scene_qs.update(scene_name=scene_name, conditions=conditions, tasks=tasks, mqtt_tasks=mqtt_tasks, device_data=json.dumps(msg), updated_time=now_time, device_id=device_id, sub_device_id=sub_device_id) if is_all_day is None: # 不设置时间或全天 smart_scene_qs.update(effective_time_id=0, is_all_day=0) time_dict = { 'start_time': conditions_dict['time']['minutes'] * 60, 'repeat': conditions_dict['time']['repeat'] } elif is_all_day == 1: smart_scene_qs.update(effective_time_id=0, is_all_day=is_all_day) time_dict = { 'is_all_day': is_all_day } else: start_time = int(request_dict.get('startTime', None)) end_time = int(request_dict.get('endTime', None)) repeat = int(request_dict.get('repeat', None)) effective_time_qs = EffectiveTime.objects.filter(start_time=start_time, end_time=end_time, repeat=repeat).values('id') if effective_time_qs.exists(): effective_time_id = effective_time_qs[0]['id'] else: effective_time_id = EffectiveTime.objects.create(start_time=start_time, end_time=end_time, repeat=repeat).id smart_scene_qs.update(effective_time_id=effective_time_id, is_all_day=is_all_day) time_dict = { 'is_all_day': is_all_day, 'start_time': start_time * 60, 'end_time': end_time * 60, 'repeat': repeat } effective_time = { 'isAllDay': is_all_day, 'startTime': start_time, 'endTime': end_time, 'repeat': repeat } msg['time'] = time_dict # 通过mqtt发送设备数据 thing_name = serial_number topic_name = SMART_SCENE_TOPIC.format(serial_number) success = CommonService.req_publish_mqtt_msg(thing_name, topic_name, msg) try: assert success except AssertionError: return response.json(10044) res['effectiveTime'] = effective_time return response.json(0, res) except Exception as e: return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) @staticmethod def delete_smart_scene(request_dict, response): """ 删除智能场景 @param request_dict: 请求参数 @request_dict smartSceneIds: 智能场景id @param response: 响应对象 @return: response """ smart_scene_ids = request_dict.get('smartSceneIds', None) if not smart_scene_ids: return response.json(444, {'error param': 'smartSceneIds'}) try: smart_scene_id_list = smart_scene_ids.split(',') # 获取序列号 smart_scene_id = smart_scene_id_list[0] smart_scene_qs = SmartScene.objects.filter(id=smart_scene_id).values('device_id', 'sub_device_id') device_id = smart_scene_qs[0]['device_id'] if device_id: serial_number = Device_Info.objects.filter(id=device_id).values('serial_number')[0]['serial_number'] else: serial_number = GatewaySubDevice.objects.filter(id=smart_scene_qs[0]['sub_device_id']). \ values('device__serial_number')[0]['device__serial_number'] topic_name = SMART_SCENE_TOPIC.format(serial_number) with transaction.atomic(): SmartScene.objects.filter(id__in=smart_scene_id_list).delete() for smart_scene_id in smart_scene_id_list: # 通知设备删除场景id msg = { 'scene_event': SCENE_EVENT_DELETE, 'scene_id': int(smart_scene_id) } success = CommonService.req_publish_mqtt_msg(serial_number, topic_name, msg) try: assert success except AssertionError: return response.json(10044) time.sleep(0.3) return response.json(0) except Exception as e: return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) @staticmethod def scene_log(request_dict, response): """ 查询场景日志 @param request_dict: 请求参数 @request_dict deviceId: 网关id @request_dict subDeviceId: 子设备id @request_dict page: 页数 @request_dict size: 条数 @request_dict startTime: 开始时间 @request_dict endTime: 结束时间 @param response: 响应对象 @return: response """ family_id = request_dict.get('familyId', None) device_id = request_dict.get('deviceId', None) sub_device_id = request_dict.get('subDeviceId', None) page = request_dict.get('page', None) size = request_dict.get('size', None) start_time = request_dict.get('startTime', None) end_time = request_dict.get('endTime', None) if not all([family_id, page, size]): return response.json(444, {'error param': 'familyId or page or size'}) device_list = [] sub_device_list = [] if not device_id and not sub_device_id: family_room_device_qs = FamilyRoomDevice.objects.filter(family_id=family_id).values('device', 'sub_device') for device in family_room_device_qs: if device['device'] not in device_list: device_list.append(device['device']) if device['sub_device']: sub_device_list.append(device['sub_device']) elif sub_device_id: # 查询子设备 family_room_device_qs = FamilyRoomDevice.objects.filter(family_id=family_id, sub_device=sub_device_id).values('device_id') for device in family_room_device_qs: device_list.append(device['device_id']) sub_device_list.append(sub_device_id) else: # 查询网关 family_room_device_qs = FamilyRoomDevice.objects.filter(Q(family_id=family_id) & Q(device=device_id) & ~Q(sub_device=0)).values( 'sub_device') device_list.append(device_id) for device in family_room_device_qs: sub_device_list.append(device['sub_device']) try: page, size = int(page), int(size) scene_log_qs = SceneLog.objects.filter(Q(device_id__in=device_list) | Q(sub_device_id__in=sub_device_list)) if start_time and end_time: scene_log_qs = scene_log_qs.filter(created_time__range=(start_time, end_time)).values( 'status', 'created_time', 'device_id', 'sub_device_id', 'scene_name', 'tasks').order_by( '-created_time')[(page - 1) * size:page * size] else: scene_log_qs = scene_log_qs.values('status', 'created_time', 'device_id', 'sub_device_id', 'scene_name', 'tasks').order_by( '-created_time')[(page - 1) * size:page * size] if not scene_log_qs.exists(): return response.json(0, []) for item in scene_log_qs: if not device_id and not sub_device_id: if not item['sub_device_id']: device_qs = Device_Info.objects.filter(id=item['device_id']).values('Type') item['device_type'] = device_qs[0]['Type'] if device_qs.exists() else '' else: device_qs = GatewaySubDevice.objects.filter(id=item['sub_device_id']).values('device_type') item['device_type'] = device_qs[0]['device_type'] if device_qs.exists() else '' elif sub_device_id: device_qs = GatewaySubDevice.objects.filter(id=item['sub_device_id']).values('device_type') item['device_type'] = device_qs[0]['device_type'] if device_qs.exists() else '' else: device_qs = Device_Info.objects.filter(id=item['device_id']).values('Type') item['device_type'] = device_qs[0]['Type'] if device_qs.exists() else '' if item['tasks'] != '': item['tasks'] = eval(item['tasks']) scene_log_list = list(scene_log_qs) return response.json(0, scene_log_list) except Exception as e: print('error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) @staticmethod def scene_log_date(request_dict, response): """ 查询场景日志日期 @param request_dict: 请求参数 @request_dict deviceId: 网关id @request_dict subDeviceId: 子设备id @param response: 响应对象 @return: response """ family_id = request_dict.get('familyId', None) device_id = request_dict.get('deviceId', None) sub_device_id = request_dict.get('subDeviceId', None) if not family_id: return response.json(444, {'error param': 'familyId'}) device_list = [] sub_device_list = [] if not device_id and not sub_device_id: family_room_device_qs = FamilyRoomDevice.objects.filter(family_id=family_id).values('device', 'sub_device') for device in family_room_device_qs: if device['device'] not in device_list: device_list.append(device['device']) if device['sub_device']: sub_device_list.append(device['sub_device']) elif sub_device_id: family_room_device_qs = FamilyRoomDevice.objects.filter(family_id=family_id, sub_device=sub_device_id) sub_device_list.append(sub_device_id) else: family_room_device_qs = FamilyRoomDevice.objects.filter(family_id=family_id, device=device_id).values( 'sub_device') device_list.append(device_id) for device in family_room_device_qs: sub_device_list.append(device['sub_device']) if not family_room_device_qs.exists(): return response.json(173) try: scene_log_qs = SceneLog.objects.extra( select={'date': "FROM_UNIXTIME(created_time,'%%Y-%%m-%%d')"}).values('date'). \ filter(Q(device_id__in=device_list) | Q(sub_device_id__in=sub_device_list)). \ annotate(count=Count('created_time')). \ order_by('-date')[:31] log_date_list = [] for scene_log in scene_log_qs: log_date_list.append({ 'timestamp': CommonService.str_to_timestamp(scene_log['date'], '%Y-%m-%d'), 'count': scene_log['count'], 'format': scene_log['date'], }) return response.json(0, log_date_list) except Exception as e: return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) @staticmethod def get_scene_data(request_dict, response): """ 设备获取智能场景数据 @param request_dict: 请求参数 @request_dict serial_number: 序列号 @param response: 响应对象 @return: response """ serial_number = request_dict.get('serial_number', None) if not serial_number: return response.json(444, {'error param': 'serial_number'}) try: device_info_qs = Device_Info.objects.filter(serial_number=serial_number).values('id') if not device_info_qs.exists(): return response.json(173) device_id = device_info_qs[0]['id'] sub_device_id_list = GatewaySubDevice.objects.filter(device_id=device_id).values_list('id', flat=True) if sub_device_id_list: smart_scene_qs = SmartScene.objects.filter( Q(device_id=device_id) | Q(sub_device_id__in=sub_device_id_list)) else: smart_scene_qs = SmartScene.objects.filter(device_id=device_id) if not smart_scene_qs.exists(): return response.json(173) # 下发智能场景数据 smart_scene_qs = smart_scene_qs.values('device_data') topic_name = SMART_SCENE_TOPIC.format(serial_number) for smart_scene in smart_scene_qs: msg = eval(smart_scene['device_data']) success = CommonService.req_publish_mqtt_msg(serial_number, topic_name, msg) try: assert success except AssertionError: return response.json(10044) time.sleep(2) # 下发智能按钮数据 smart_button_qs = GatewaySubDevice.objects.filter(device_id=device_id, device_type=SENSOR_TYPE['smart_button']).values( 'ieee_addr', 'is_tampered') if smart_button_qs.exists(): sos_count = smart_button_qs.count() for index, smart_button in enumerate(smart_button_qs): msg = { 'sos_count': sos_count, # 该网关下的智能按钮数量 'sos_num': index + 1, # 第几个按钮 'sensor_ieee_addr': smart_button['ieee_addr'], 'sos_select': smart_button['is_tampered'] } success = CommonService.req_publish_mqtt_msg(serial_number, topic_name, msg) try: assert success except AssertionError: return response.json(10044) time.sleep(2) return response.json(0) except Exception as e: return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) @staticmethod def voice_audition(request_dict, response): """ 智能场景音频试听 @param request_dict: 请求参数 @request_dict serial_number: 序列号 @request_dict voiceId: 音频id @param response: 响应对象 @return: response """ serial_number = request_dict.get('serial_number', None) voice_id = request_dict.get('voiceId', None) if not all([serial_number, voice_id]): return response.json(444) try: topic_name = VOICE_AUDITION_TOPIC.format(serial_number) msg = {'voice_id': int(voice_id)} success = CommonService.req_publish_mqtt_msg(serial_number, topic_name, msg) try: assert success except AssertionError: return response.json(10044) return response.json(0) except Exception as e: return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) @classmethod def get_msg_task_list(cls, tasks_list, conditions_dict, tz, now_time, mqtt_tasks_list=None): """ 获取设备任务数据 @param tasks_list: app任务列表 @param conditions_dict: 条件 @param tz: 时区 @param now_time: 当前时间 @param mqtt_tasks_list: mqtt定时任务时列表 @return: task_list, mqtt_task_list """ # 传入任务列表,删除旧任务 if mqtt_tasks_list is not None: cls.del_aps_job(mqtt_tasks_list) task_list = [] mqtt_task_list = [] for task in tasks_list: sensor_type = int(task['device_type']) # 处理插座数据 # 不用添加到设备的任务列表,添加到mqtt任务列表 if sensor_type == DEVICE_TYPE['socket']: serial_number = task['serial_number'] event_type = int(task['event_type']) delay_time = task['delay_time'] task_temp = { 'device_type': sensor_type, 'event_type': event_type, 'delay_time': delay_time, 'serial_number': serial_number } # 如果条件为设置时间,创建或修改定时任务 if conditions_dict['type'] == 1: minutes = conditions_dict['time']['minutes'] repeat = conditions_dict['time']['repeat'] task_temp['task_id'], task_temp['time_dict'] = cls.create_aps_job( minutes, delay_time, tz, now_time, repeat, sensor_type, event_type, serial_number) mqtt_task_list.append(task_temp) else: task_temp = { 'sensor_type': sensor_type, 'sensor_delay': 0 } # 延时 if 'delay_time' in task and task['delay_time'] != 0: task_temp['sensor_delay'] = task['delay_time'] # 不为-1时需要其他数据 if sensor_type != -1: task_temp['sensor_action'] = int(task['event_type']) # 子设备返回长地址 sub_device_id = task.get('subDeviceId', None) if sub_device_id: sub_device_qs = GatewaySubDevice.objects.filter(id=sub_device_id).values('ieee_addr').first() task_temp['sensor_ieee_addr'] = sub_device_qs['ieee_addr'] # 网关添加报警类型数据 else: task_temp['voice_type'] = task.get('voice_type') task_temp['voice_id'] = task.get('voice_id') task_temp['count'] = task.get('count') task_temp['delay_time'] = task.get('delay_time') task_temp['duration'] = task.get('duration') task_temp['value_type'] = task.get('value_type') task_list.append(task_temp) # 列表为空保存为空字符串 if not mqtt_task_list: mqtt_task_list = '' return task_list, mqtt_task_list @staticmethod def time_conflict(sub_device_id, conditions, is_all_day, request_dict, smart_scene_id=None): """ 判断传感器是否创建过条件相同且生效时间冲突的场景 @param sub_device_id: 传感器设备id @param conditions: 场景条件 @param is_all_day: 全天标识 @param request_dict: @param smart_scene_id: 场景id,编辑场景时传 @return: bool, True: 冲突, False: 不冲突 """ # 不设置时间不会冲突 if is_all_day is None: return False # 查询设置过时间的数据 if smart_scene_id is None: # 创建场景 smart_scene_qs = SmartScene.objects.filter( ~Q(is_all_day=0), sub_device_id=sub_device_id, conditions=conditions).values('effective_time_id') else: # 编辑场景,过滤本身场景数据 smart_scene_qs = SmartScene.objects.filter( ~Q(id=smart_scene_id), ~Q(is_all_day=0), sub_device_id=sub_device_id, conditions=conditions).values('effective_time_id') if not smart_scene_qs.exists(): return False # 再设置全天必冲突 if is_all_day == 1: return True # 非全天 elif is_all_day == 2: start_time = int(request_dict.get('startTime')) end_time = int(request_dict.get('endTime')) repeat = int(request_dict.get('repeat')) for smart_scene in smart_scene_qs: effective_time_id = smart_scene['effective_time_id'] effective_time_qs = EffectiveTime.objects.filter(id=effective_time_id).\ values('start_time', 'end_time', 'repeat') if effective_time_qs.exists(): old_start_time = effective_time_qs[0]['start_time'] old_end_time = effective_time_qs[0]['end_time'] old_repeat = effective_time_qs[0]['repeat'] # 每天重复 if repeat == 127: # 判断时间是否在已设置过的时间范围之内 if old_start_time <= start_time <= old_end_time or \ old_start_time <= end_time <= old_end_time: return True else: # 有相同的重复天 if repeat & old_repeat != 0: # 判断时间是否在已设置过的时间范围之内 if old_start_time <= start_time <= old_end_time or \ old_start_time <= end_time <= old_end_time: return True return False @classmethod def create_aps_job(cls, minutes, delay_time, tz, now_time, repeat, device_type, event_type, serial_number): """ 创建定时任务 返回任务id和时间 @param minutes: 分钟时间 @param delay_time: 延迟时间 @param tz: 时区 @param now_time: 当前时间 @param repeat: 星期周期的十进制数 @param device_type: 设备类型 @param event_type: 事件类型 @param serial_number: 序列号 @return: task_id, time_dict """ task_id = serial_number + '_' apscheduler_obj = ApschedulerObject() # 一次性任务 if repeat == 0: # 根据时间戳和时区获取年月日,拼接由分钟转换出来的时间 time_string = CommonService.get_date_from_timestamp(now_time, tz) hour, minute = divmod(minutes, 60) time_string += ' {:02d}:{:02d}:00'.format(hour, minute) time_stamp = CommonService.convert_to_timestamp(tz, time_string) # 加上延时,如果执行时间小于当前时间,延迟24小时执行 time_stamp += delay_time if time_stamp < now_time: time_stamp += 24 * 60 * 60 task_id += str(time_stamp) apscheduler_obj.create_date_job(func=cls.pub_mqtt, task_id=task_id, time_stamp=time_stamp, args=(device_type, event_type, serial_number)) time_dict = {'time_stamp': time_stamp} # 周期任务 else: hour, minute, second, is_next_day = cls.handle_delay_time(minutes, delay_time) # 加上延时,如果执行时间超过23:59,隔天执行 weeks = cls.int_to_weeks(repeat, is_next_day) time_str = weeks + '_{:02d}{:02d}{:02d}'.format(hour, minute, second) task_id += time_str apscheduler_obj.create_cron_job(func=cls.pub_mqtt, task_id=task_id, day_of_week=weeks, hour=hour, minute=minute, args=(device_type, event_type, serial_number)) time_dict = {'weeks': weeks, 'hour': hour, 'minute': minute, 'second': second} return task_id, time_dict @staticmethod def handle_delay_time(minutes, delay_time): """ 处理延迟时间 @param minutes: 时间分钟数,如1439代表23:59 @param delay_time: 延迟时间,单位:秒 @return: hour, minute, second, is_next_day """ is_next_day = False hour, minute = divmod(minutes, 60) # 延迟时间转为分钟,如果加上时间分钟数大于1439,隔天执行 minute, second = divmod(delay_time, 60) total_minutes = minutes + minute ex_min = total_minutes - 1439 if ex_min <= 0: hour, minute = divmod(total_minutes, 60) else: hour, minute = divmod(ex_min, 60) is_next_day = True return hour, minute, second, is_next_day @staticmethod def int_to_weeks(repeat, is_next_day): """ 十进制转星期周期 @param repeat: 星期周期的十进制数,如127 -> 0,1,2,3,4,5,6 @param is_next_day: 是否隔天 @return: weeks """ # 十进制转为7位的二进制,低六位倒序 bin_str = bin(repeat)[2:].zfill(7) bin_str = bin_str[:1] + bin_str[-6:][::-1] # 生成星期周期字符串 weeks = '' next_day = 1 if is_next_day else 0 for i, bit in enumerate(bin_str): if bit == '1': weeks += str(i+next_day) + ',' # 删除最后一个逗号并返回结果 return weeks[:-1] @staticmethod def pub_mqtt(device_type, event_type, serial_number): """ 发布mqtt消息 @param device_type: 设备类型 @param event_type: 事件类型 @param serial_number: 序列号 @return: """ if device_type == DEVICE_TYPE['socket']: topic_name = SMART_SOCKET_TOPIC.format(serial_number) status = 1 if event_type == EVENT_TYPE['socket_power_on'] else 0 msg = { 'type': 1, 'data': {'deviceSwitch': status} } CommonService.req_publish_mqtt_msg(serial_number, topic_name, msg) @staticmethod def del_aps_job(mqtt_tasks_list): """ 删除定时任务 @param mqtt_tasks_list: mqtt任务列表 @return: """ apscheduler_obj = ApschedulerObject() for mqtt_task in mqtt_tasks_list: task_id = mqtt_task['task_id'] apscheduler_obj.del_job(task_id) # # ___====-_ _-====___ # _--^^^#####// \\#####^^^--_ # _-^##########// ( ) \\##########^-_ # -############// |\^^/| \\############- # _/############// (@::@) \\############\_ # /#############(( \\// ))#############\ # -###############\\ (oo) //###############- # -#################\\ / VV \ //#################- # -###################\\/ \//###################- # _#/|##########/\######( /\ )######/\##########|\#_ # |/ |#/\#/\#/\/ \#/\##\ | | /##/\#/ \/\#/\#/\#| \| # ` |/ V V ` V \#\| | | |/#/ V ' V V \| ' # ` ` ` ` / | | | | \ ' ' ' ' # ( | | | | ) # __\ | | | | /__ # (vvv(VVV)(VVV)vvv) # 神兽保佑 # 代码无BUG! #