# -*- coding: utf-8 -*- """ # @Author : cheng # @Time : 2023/7/10 11:20 # @File: SmartSwitchController.py """ import datetime import json import time from django.views import View from Model.models import SwitchDimmingSettings, SwitchScheduler, Device_Info, SceneLog, FamilyRoomDevice from Object.RedisObject import RedisObject from Service.CommonService import CommonService from Object.CeleryBeatObject import CeleryBeatObj from django.db import transaction from Ansjer.config import LOGGER APSCHEDULER_TOPIC_NAME = 'loocam/switch/time_scheduling/{}' # 排程主题 RESET_SWITCH_TOPIC_NAME = 'loocam/smart-switch/{}' # 重置设备 TIMER_TOPIC_NAME = 'loocam/switch/count_down/{}' # 计时器主题 MQTT_TASK = 'Controller.CeleryTasks.tasks.send_mqtt' class SmartSwitchView(View): def get(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') return self.validation(request.GET, request, operation) def post(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') return self.validation(request.POST, request, operation) def validation(self, request_dict, request, operation): token_code, user_id, response = CommonService.verify_token_get_user_id(request_dict, request) if operation == 'switch-scheduler-log': # 设备上报排程日志 return self.create_scheduler_log(request_dict, response) elif operation == 'reset': # 设备重置 return self.reset(request_dict, response) else: if token_code != 0: return response.json(token_code) if operation == 'get-dimming-setting': # 获取智能开关调光设置 return self.get_dimming_setting(request_dict, response) elif operation == 'get-scheduler-setting': # 获取排程计划 return self.get_scheduler_setting(request_dict, response) elif operation == 'add-or-edit-scheduler': # 添加/编辑排程计划 return self.add_or_edit_scheduler(request_dict, response) elif operation == 'edit-scheduler-status': # 修改排程计划状态 return self.edit_scheduler_status(request_dict, response) elif operation == 'delete-scheduler': # 删除排程计划 return self.delete_scheduler(request_dict, response) elif operation == 'get-timer-setting': # 获取计时器 return self.get_timer_setting(request_dict, response) elif operation == 'add-or-edit-timer': # 添加/编辑计时器 return self.add_or_edit_timer(request_dict, response) elif operation == 'edit-dimming-correction': # 设置调光校正 return self.edit_dimming_correction(request_dict, response) elif operation == 'edit-dimming-setting': # 修改智能开关调光设置 return self.edit_dimming_setting(request_dict, response) elif operation == 'get-scheduler-log': # 查询排程日志 return self.get_scheduler_log(request_dict, response) else: return response.json(414) @staticmethod def get_dimming_setting(request_dict, response): """ 获取智能开关调光设置信息 @param request_dict: 请求参数 @request_dict deviceId: 设备id @param response: 响应对象 @return: response """ device_id = request_dict.get('deviceId', None) if not device_id: return response.json(444) try: switch_setting_info_qs = SwitchDimmingSettings.objects.filter(device_id=device_id).values() if not switch_setting_info_qs.exists(): return response.json(173) res = { 'clickTurnOnSpeed': switch_setting_info_qs[0]['click_turn_on_speed'], 'clickTurnOffSpeed': switch_setting_info_qs[0]['click_turn_off_speed'], 'doubleClick': switch_setting_info_qs[0]['double_click'], 'press': switch_setting_info_qs[0]['press'], 'doublePressClickTurnOnSpeed': switch_setting_info_qs[0]['double_press_click_turn_on_speed'], 'doublePressClickTurnOffSpeed': switch_setting_info_qs[0]['double_press_click_turn_off_speed'], 'dimmingCorrection': switch_setting_info_qs[0]['dimming_correction'], } return response.json(0, res) except Exception as e: print(e) return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) @staticmethod def edit_dimming_correction(request_dict, response): """ 修改智能开关调光校正 @param request_dict: 请求参数 @request_dict deviceId: 设备id @request_dict dimmingCorrection: 调光校正 @param response: 响应对象 @return: response """ device_id = request_dict.get('deviceId', None) dimming_correction = request_dict.get('dimmingCorrection', None) if not device_id: return response.json(444) try: SwitchDimmingSettings.objects.filter(device_id=device_id).update(dimming_correction=dimming_correction) return response.json(0) except Exception as e: print(e) return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) @staticmethod def edit_dimming_setting(request_dict, response): """ 修改智能开关调光设置 @param request_dict: 请求参数 @request_dict deviceId: 设备id @request_dict clickTurnOnSpeed: 单击开启速度 @request_dict clickTurnOffSpeed: 单击关闭速度 @request_dict doubleClick: 双击 @request_dict press: 长按 @request_dict doublePressClickTurnOnSpeed: 双击/长按开启速度 @request_dict doublePressClickTurnOffSpeed: 双击/长按单击关闭速度 @param response: 响应对象 @return: response """ device_id = request_dict.get('deviceId', None) click_turn_on_speed = request_dict.get('clickTurnOnSpeed', None) click_turn_off_speed = request_dict.get('clickTurnOffSpeed', None) double_click = request_dict.get('doubleClick', None) press = request_dict.get('press', None) double_press_click_turn_on_speed = request_dict.get('doublePressClickTurnOnSpeed', None) double_press_click_turn_off_speed = request_dict.get('doublePressClickTurnOffSpeed', None) if not device_id: return response.json(444) try: dimming_setting_data = { 'device_id': device_id, 'click_turn_on_speed': click_turn_on_speed, 'click_turn_off_speed': click_turn_off_speed, 'double_click': double_click, 'press': press, 'double_press_click_turn_on_speed': double_press_click_turn_on_speed, 'double_press_click_turn_off_speed': double_press_click_turn_off_speed } SwitchDimmingSettings.objects.filter(device_id=device_id).update(**dimming_setting_data) return response.json(0) except Exception as e: print(e) return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) @staticmethod def get_scheduler_setting(request_dict, response): """ 获取排程计划设置 @param request_dict: 请求参数 @request_dict deviceId: 设备id @param response: 响应对象 @return: response """ device_id = request_dict.get('deviceId', None) if not device_id: return response.json(444) try: switch_scheduler_qs = SwitchScheduler.objects.filter(device_id=device_id).values() if not switch_scheduler_qs.exists(): return response.json(173) switch_scheduler_list = [] for item in switch_scheduler_qs: switch_scheduler_list.append({ 'schedulerId': item['id'], 'timeTypeRadio': item['time_type_radio'], 'timePoint': item['time_point'], 'startTime': item['start_time'], 'endTime': item['end_time'], 'actionsType': item['actions_type'], 'actions': item['actions'], 'slowSpeed': item['slow_speed'], 'repeat': item['repeat'], 'isExecute': item['is_execute'], }) return response.json(0, {'list': switch_scheduler_list}) except Exception as e: print(e) return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) @staticmethod def add_or_edit_scheduler(request_dict, response): """ 添加/编辑排程计划 @param request_dict: 请求参数 @request_dict deviceId: 设备id @request_dict schedulerId: 排程计划id @request_dict timeTypeRadio: 切换时间点/时间段 @request_dict timePoint: 时间点 @request_dict startTime: 时间段开始时间 @request_dict endTime: 时间段结束时间 @request_dict actions: 排程操作 @request_dict actionsType: 操作类型 @request_dict slowOpenOrCloseSpeed: 缓慢开/关速度 @request_dict repeat: 重复周期 @param response: 响应对象 @return: response """ is_edit = request_dict.get('isEdit', None) device_id = request_dict.get('deviceId', None) scheduler_id = request_dict.get('schedulerId', None) time_type_radio = int(request_dict.get('timeTypeRadio', 0)) time_point = request_dict.get('timePoint', None) start_time = request_dict.get('startTime', None) end_time = request_dict.get('endTime', None) actions = request_dict.get('actions', None) actions_type = request_dict.get('actionsType', None) slow_speed = request_dict.get('slowSpeed', None) repeat = request_dict.get('repeat', None) if not all([device_id, repeat]): return response.json(444, {'param': 'deviceId,repeat'}) device_qs = Device_Info.objects.filter(id=device_id).values('serial_number', 'userID') if not device_qs.exists(): return response.json(173) if time_type_radio == 1: # 时间点 if not all([time_point, slow_speed]): return response.json(444, {'param': 'timePoint,slowSpeed'}) scheduler_data = { 'device_id': device_id, 'time_type_radio': time_type_radio, 'time_point': time_point, 'actions': actions, 'actions_type': actions_type, 'slow_speed': slow_speed, 'repeat': repeat } elif time_type_radio == 2: # 时间段 if not all([start_time, end_time]): return response.json(444, {'param': 'startTime,endTime'}) start_time = int(start_time) end_time = int(end_time) scheduler_data = { 'device_id': device_id, 'time_type_radio': time_type_radio, 'start_time': start_time, 'end_time': end_time, 'actions': actions, 'actions_type': actions_type, 'slow_speed': slow_speed, 'repeat': repeat } else: return response.json(444, {'param': 'timeTypeRadio'}) try: with transaction.atomic(): celery_obj = CeleryBeatObj() if is_edit: if not scheduler_id: return response.json(444, {'param': 'schedulerId'}) update_flag = SwitchScheduler.objects.filter(device_id=device_id, id=scheduler_id).update( **scheduler_data) if not update_flag: return response.json(173) celery_obj.del_task('switchscheduler_{}'.format(scheduler_id)) celery_obj.del_task('switchscheduler_{}_1'.format(scheduler_id)) celery_obj.del_task('switchscheduler_{}_2'.format(scheduler_id)) else: switch_qs = SwitchScheduler.objects.create(**scheduler_data) scheduler_id = switch_qs.id # 设置排程任务 serial_number = device_qs[0]['serial_number'] user_id = device_qs[0]['userID'] tz = CommonService.get_user_tz(user_id) topic_name = APSCHEDULER_TOPIC_NAME.format(serial_number) if time_type_radio == 1: # 时间点任务 task_id = 'switchscheduler_{}'.format(scheduler_id) if actions_type == '1': # 开启或关闭 msg = { "task_id": scheduler_id, "device_switch": int(actions), # 设备开关-1:反转,0:关,1:开,2:预设亮度 "slow_time": slow_speed } elif actions_type == '2': # 开启且设置亮度 msg = { "task_id": scheduler_id, "device_switch": 2, "pwm_control": int(actions), 'slow_time': slow_speed } else: return response.json(444, {'param': 'actionsType'}) time_str = datetime.datetime.fromtimestamp(int(time_point)) celery_obj.creat_crontab_task(tz, task_id, MQTT_TASK, time_str.minute, time_str.hour, repeat, args=[serial_number, topic_name, msg, task_id, 1, device_id, json.dumps(scheduler_data)]) else: # 时间段任务 start_hour = int(start_time / 60 // 60) start_minute = int(start_time / 60 % 60) end_hour = int(end_time / 60 // 60) end_minute = int(end_time / 60 % 60) if actions_type == '1': begin_task_id = 'switchscheduler_{}_1'.format(scheduler_id) # 开始任务id end_task_id = 'switchscheduler_{}_2'.format(scheduler_id) # 结束任务id msg = {"task_id": scheduler_id, "device_switch": int(actions)} celery_obj.creat_crontab_task(tz, begin_task_id, MQTT_TASK, start_minute, start_hour, repeat, args=[serial_number, topic_name, msg, begin_task_id, 1, device_id, json.dumps(scheduler_data)]) msg = {"task_id": scheduler_id, "device_switch": 0 if int(actions) == 1 else 1} celery_obj.creat_crontab_task(tz, end_task_id, MQTT_TASK, end_minute, end_hour, repeat, args=[serial_number, topic_name, msg, end_task_id, 1, device_id, json.dumps(scheduler_data)]) elif actions_type == '3': # 间隔任务 minute = int(actions) task_id = 'switchscheduler_{}'.format(scheduler_id) # 开始任务id msg = {"task_id": scheduler_id, "device_switch": -1} if minute >= 60: hour = '{}-{}/{}'.format(start_hour, end_hour, minute // 60) minute = start_minute else: hour = '{}-{}'.format(start_hour, end_hour) minute = '*/{}'.format(minute) celery_obj.creat_crontab_task(tz, task_id, MQTT_TASK, minute, hour, repeat, args=[serial_number, topic_name, msg, task_id, 1, device_id, json.dumps(scheduler_data)]) else: return response.json(444, {'param': 'actionsType'}) return response.json(0) except Exception as e: print(e) return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) @staticmethod def edit_scheduler_status(request_dict, response): """ 修改排程计划状态 @param request_dict: 请求参数 @request_dict deviceId: 设备id @request_dict schedulerId: 排程计划id @request_dict isExecute: 修改状态 @param response: 响应对象 @return: response """ device_id = request_dict.get('deviceId', None) scheduler_id = request_dict.get('schedulerId', None) is_execute = request_dict.get('isExecute', None) if not all([device_id, scheduler_id, is_execute]): return response.json(444, {'param': 'deviceId,schedulerId,isExecute'}) try: is_execute = int(is_execute) celery_obj = CeleryBeatObj() if is_execute: celery_obj.enable_task('switchscheduler_{}'.format(scheduler_id)) celery_obj.enable_task('switchscheduler_{}_1'.format(scheduler_id)) celery_obj.enable_task('switchscheduler_{}_2'.format(scheduler_id)) else: celery_obj.disable_task('switchscheduler_{}'.format(scheduler_id)) celery_obj.disable_task('switchscheduler_{}_1'.format(scheduler_id)) celery_obj.disable_task('switchscheduler_{}_2'.format(scheduler_id)) SwitchScheduler.objects.filter(device_id=device_id, id=scheduler_id).update(is_execute=is_execute) return response.json(0) except Exception as e: return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) @staticmethod def delete_scheduler(request_dict, response): """ 删除排程计划 @param request_dict: 请求参数 @request_dict deviceId: 设备id @request_dict schedulerId: 排程计划id @param response: 响应对象 @return: response """ device_id = request_dict.get('deviceId', None) scheduler_id = request_dict.get('schedulerId', None) if not scheduler_id: return response.json(444, {'error param': 'deviceId or schedulerId'}) try: delete_flag = SwitchScheduler.objects.filter(device_id=device_id, id=scheduler_id).delete() if not delete_flag[0]: return response.json(173) celery_obj = CeleryBeatObj() celery_obj.del_task('switchscheduler_{}'.format(scheduler_id)) # 删除排程任务 celery_obj.del_task('switchscheduler_{}_1'.format(scheduler_id)) celery_obj.del_task('switchscheduler_{}_2'.format(scheduler_id)) return response.json(0) except Exception as e: print(e) return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) @staticmethod def get_timer_setting(request_dict, response): """ 获取计时器 @param request_dict: 请求参数 @request_dict deviceId: 设备id @param response: 响应对象 @return: response """ device_id = request_dict.get('deviceId', None) if not device_id: return response.json(444) try: key = 'Switch-Timer-' + device_id redis_obj = RedisObject() timer_info = redis_obj.get_all_hash_data(key) if not timer_info: res = {'timePoint': -1, 'countdownTime': -1, 'actions': -1, 'timerStatus': -1} else: res = timer_info return response.json(0, res) except Exception as e: print(e) return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) @staticmethod def add_or_edit_timer(request_dict, response): """ 添加/编辑计时器 @param request_dict: 请求参数 @request_dict deviceId: 设备id @request_dict CountdownTime: 倒计时时间(秒) @request_dict timePointDeviceWillDoing: 设备将会 @request_dict timerStatus: 计时器状态 @param response: 响应对象 @return: response """ is_edit = request_dict.get('isEdit', None) device_id = request_dict.get('deviceId', None) countdown_time = request_dict.get('countdownTime', None) actions = request_dict.get('actions', None) timer_status = request_dict.get('timerStatus', None) if not all([device_id, countdown_time, actions]): return response.json(444, {'param': 'deviceId, countdownTime, actions'}) device_qs = Device_Info.objects.filter(id=device_id).values('serial_number', 'userID') if not device_qs.exists(): return response.json(173) try: now_time = int(time.time()) countdown_time = int(countdown_time) serial_number = device_qs[0]['serial_number'] user_id = device_qs[0]['userID'] tz = CommonService.get_user_tz(user_id) celery_obj = CeleryBeatObj() redis_obj = RedisObject() task_id = 'switchtimer_{}'.format(device_id) topic_name = TIMER_TOPIC_NAME.format(serial_number) key = 'Switch-Timer-' + device_id implement_time = now_time + countdown_time redis_dict = {'timePoint': implement_time, 'countdownTime': countdown_time, 'actions': actions, 'timerStatus': timer_status} with transaction.atomic(): if is_edit: celery_obj.del_task(task_id) if not timer_status: return response.json(444, {'param': 'timerStatus'}) timer_status = int(timer_status) if timer_status == 0: # 暂停计时器 redis_dict['timePoint'] = -1 redis_obj.set_hash_data(key, redis_dict) redis_obj.set_persist(key) return response.json(0) redis_obj.set_hash_data(key, redis_dict) redis_obj.set_expire(key, countdown_time) msg = {'device_switch': actions, 'task_id': task_id} celery_obj.creat_clocked_task(task_id, MQTT_TASK, implement_time, tz, args=[serial_number, topic_name, msg, task_id, 2, device_id, json.dumps(redis_dict)]) return response.json(0) except Exception as e: print(e) return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) @staticmethod def create_scheduler_log(request_dict, response): """ 生成执行日志 @param request_dict: 请求参数 @request_dict serialNumber: 设备序列号 @request_dict schedulerId: 排程id @request_dict status: 执行状态 @param response: 响应对象 @return: response """ serial_number = request_dict.get('serial_number', None) event_type = request_dict.get('event_type', None) scheduler_id = request_dict.get('task_id', None) operate_status = request_dict.get('status', None) switch_status = request_dict.get('switch_status', None) send_time = request_dict.get('send_time', None) implement_time = request_dict.get('implement_time', None) if not all([serial_number, scheduler_id, operate_status, switch_status, implement_time]): return response.json(444, { 'error param': 'serial_number, task_id, status, switch_status, implement_time'}) device_qs = Device_Info.objects.filter(serial_number=serial_number).values('id') if not device_qs.exists(): return response.json(173) device_id = device_qs[0]['id'] scheduler_qs = SwitchScheduler.objects.filter(device_id=device_id, id=scheduler_id).values('time_type_radio', 'time_point', 'start_time', 'end_time', 'actions', 'actions_type', 'slow_speed', 'repeat') if not scheduler_qs.exists(): return response.json(173) try: scene_log = { 'status': operate_status, 'created_time': implement_time, } if event_type == '1': # 排程任务 scene_qs = SceneLog.objects.filter(created_time=send_time, device_id=device_id, scene_id=scheduler_id) tasks = json.dumps(scheduler_qs[0]) scene_id = scheduler_id elif event_type == '2': # 计时器任务 scene_qs = SceneLog.objects.filter(created_time=send_time, device_id=device_id, scene_name=scheduler_id) tasks = json.dumps({'timePoint': int(send_time), 'actions': int(switch_status)}) scene_id = 0 else: return response.json(444, {'error param': 'event_type'}) if scene_qs.exists(): scene_qs.update(**scene_log) else: scene_log['tasks'] = tasks scene_log['scene_id'] = scene_id scene_log['scene_name'] = scene_id scene_log['device_id'] = device_id SceneLog.objects.create(**scene_log) return response.json(0) except Exception as e: print(e) return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) @staticmethod def get_scheduler_log(request_dict, response): """ 查询排程执行日志 @param request_dict: 请求参数 @request_dict deviceId: 设备id @param response: 响应对象 @return: response """ device_id = request_dict.get('deviceId', None) if not device_id: return response.json(444, {'error param': 'deviceId'}) try: scene_qs = SceneLog.objects.filter(device_id=device_id).values('tasks', 'status', 'created_time', 'id') res = [] for item in scene_qs: res.append({ 'id': item['id'], 'tasks': json.loads(item['tasks']), 'status': item['status'], 'created_time': item['created_time'] }) return response.json(0, res) except Exception as e: print(e) return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) @staticmethod def reset(request_dict, response): """ 重置设备 @param request_dict: 请求参数 @request_dict serialNumber: 设备序列号 @param response: 响应对象 @return: response """ serial_number = request_dict.get('serial_number', None) if not serial_number: return response.json(444, {'error param': 'serial_number'}) device_qs = Device_Info.objects.filter(serial_number=serial_number).values('id') if not device_qs.exists(): return response.json(173) device_id = device_qs[0]['device_id'] try: # 删除智能开关数据 SwitchDimmingSettings.objects.filter(device_id=device_id).delete() scheduler_qs = SwitchScheduler.objects.filter(device_id=device_id) if scheduler_qs.exists(): celery_obj = CeleryBeatObj() for scheduler in scheduler_qs: scheduler_id = scheduler.id celery_obj.del_task('switchscheduler_{}'.format(scheduler_id)) # 删除排程任务 celery_obj.del_task('switchscheduler_{}_1'.format(scheduler_id)) celery_obj.del_task('switchscheduler_{}_2'.format(scheduler_id)) scheduler_qs.delete() SceneLog.objects.filter(device_id=device_id).delete() FamilyRoomDevice.objects.filter(device_id=device_id).delete() Device_Info.objects.filter(id=device_id).delete() except Exception as e: print(e) return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) @staticmethod def del_switch(device_id, serial_number): """ 删除开关 @param device_id: 设备id @param serial_number: 设备序列号 @return: response """ try: SwitchDimmingSettings.objects.filter(device_id=device_id).delete() scheduler_qs = SwitchScheduler.objects.filter(device_id=device_id) if scheduler_qs.exists(): celery_obj = CeleryBeatObj() for scheduler in scheduler_qs: scheduler_id = scheduler.id celery_obj.del_task('switchscheduler_{}'.format(scheduler_id)) # 删除排程任务 celery_obj.del_task('switchscheduler_{}_1'.format(scheduler_id)) celery_obj.del_task('switchscheduler_{}_2'.format(scheduler_id)) scheduler_qs.delete() SceneLog.objects.filter(device_id=device_id).delete() msg = { "device_reset": 1 # 重置智能开关 } topic_name = RESET_SWITCH_TOPIC_NAME.format(serial_number) result = CommonService.req_publish_mqtt_msg(serial_number, topic_name, msg) LOGGER.info('执行重置开关mqtt结果:{}'.format(result)) except Exception as e: print(e) LOGGER.info('error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))