| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577 | # -*- coding: utf-8 -*-"""# @Author : cheng# @Time : 2023/7/10 11:20# @File: SmartSwitchController.py"""import datetimeimport jsonimport osimport threadingimport timefrom django.views import Viewfrom Model.models import SwitchDimmingSettings, SwitchChronopher, Device_Info, SceneLog, FamilyRoomDevice, \    SwitchOperateLogfrom Object.RedisObject import RedisObjectfrom Service.CommonService import CommonServicefrom Object.ApschedulerObject import ApschedulerObjectfrom django.db import transactionfrom Ansjer.config import LOGGERAPSCHEDULER_TOPIC_NAME = 'loocam/switch/time_scheduling/{}'  # 排程主题RESET_SWITCH_TOPIC_NAME = 'loocam/smart-switch/{}'  # 重置设备class SmartSwitchView(View):    def get(self, request, *args, **kwargs):        request.encoding = 'utf-8'        operation = kwargs.get('operation')        return self.validation(request.GET, request, operation)    def post(self, request, *args, **kwargs):        request.encoding = 'utf-8'        operation = kwargs.get('operation')        return self.validation(request.POST, request, operation)    def validation(self, request_dict, request, operation):        token_code, user_id, response = CommonService.verify_token_get_user_id(request_dict, request)        if operation == 'switch-chronopher-log':  # 设备上报排程日志            return self.create_chronopher_log(request_dict, response)        if operation == 'switch-operate-log':  # 设备上报操作日志            return self.create_operate_log(request_dict, response)        elif operation == 'reset':  # 设备重置            return self.reset(request_dict, response)        else:            if token_code != 0:                return response.json(token_code)            if operation == 'get-dimming-setting':  # 获取智能开关调光设置                return self.get_dimming_setting(request_dict, response)            elif operation == 'get-chronopher-setting':  # 获取定时计划                return self.get_chronopher_setting(request_dict, response)            elif operation == 'add-or-edit-chronopher':  # 添加/编辑定时计划                return self.add_or_edit_chronopher(request_dict, response)            elif operation == 'delete-chronopher':  # 删除定时计划                return self.delete_chronopher(request_dict, response)            elif operation == 'edit-dimming-correction':  # 设置调光校正                return self.edit_dimming_correction(request_dict, response)            elif operation == 'edit-dimming-setting':  # 修改智能开关调光设置                return self.edit_dimming_setting(request_dict, response)            elif operation == 'get-chronopher-log':  # 查询排程日志                return self.get_chronopher_log(request_dict, response)            elif operation == 'get-operate-log':  # 查询普通操作日志                return self.get_operate_log(request_dict, response)            else:                return response.json(414)    @staticmethod    def get_dimming_setting(request_dict, response):        """        获取智能开关调光设置信息        @param request_dict: 请求参数        @request_dict deviceId: 设备id        @param response: 响应对象        @return: response        """        device_id = request_dict.get('deviceId', None)        if not device_id:            return response.json(444)        try:            switch_setting_info_qs = SwitchDimmingSettings.objects.filter(device_id=device_id).values()            if not switch_setting_info_qs.exists():                return response.json(173)            res = {                'clickTurnOnSpeed': switch_setting_info_qs[0]['click_turn_on_speed'],                'clickTurnOffSpeed': switch_setting_info_qs[0]['click_turn_off_speed'],                'doubleClick': switch_setting_info_qs[0]['double_click'],                'press': switch_setting_info_qs[0]['press'],                'doublePressClickTurnOnSpeed': switch_setting_info_qs[0]['double_press_click_turn_on_speed'],                'doublePressClickTurnOffSpeed': switch_setting_info_qs[0]['double_press_click_turn_off_speed'],                'dimmingCorrection': switch_setting_info_qs[0]['dimming_correction'],            }            return response.json(0, res)        except Exception as e:            print(e)            return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))    @staticmethod    def edit_dimming_correction(request_dict, response):        """        修改智能开关调光校正        @param request_dict: 请求参数        @request_dict deviceId: 设备id        @request_dict dimmingCorrection: 调光校正        @param response: 响应对象        @return: response        """        device_id = request_dict.get('deviceId', None)        dimming_correction = request_dict.get('dimmingCorrection', None)        if not device_id:            return response.json(444)        try:            SwitchDimmingSettings.objects.filter(device_id=device_id).update(dimming_correction=dimming_correction)            return response.json(0)        except Exception as e:            print(e)            return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))    @staticmethod    def edit_dimming_setting(request_dict, response):        """        修改智能开关调光设置        @param request_dict: 请求参数        @request_dict deviceId: 设备id        @request_dict clickTurnOnSpeed: 单击开启速度        @request_dict clickTurnOffSpeed: 单击关闭速度        @request_dict doubleClick: 双击        @request_dict press: 长按        @request_dict doublePressClickTurnOnSpeed: 双击/长按开启速度        @request_dict doublePressClickTurnOffSpeed: 双击/长按单击关闭速度        @param response: 响应对象        @return: response        """        device_id = request_dict.get('deviceId', None)        click_turn_on_speed = request_dict.get('clickTurnOnSpeed', None)        click_turn_off_speed = request_dict.get('clickTurnOffSpeed', None)        double_click = request_dict.get('doubleClick', None)        press = request_dict.get('press', None)        double_press_click_turn_on_speed = request_dict.get('doublePressClickTurnOnSpeed', None)        double_press_click_turn_off_speed = request_dict.get('doublePressClickTurnOffSpeed', None)        if not device_id:            return response.json(444)        try:            dimming_setting_data = {                'device_id': device_id,                'click_turn_on_speed': click_turn_on_speed,                'click_turn_off_speed': click_turn_off_speed,                'double_click': double_click,                'press': press,                'double_press_click_turn_on_speed': double_press_click_turn_on_speed,                'double_press_click_turn_off_speed': double_press_click_turn_off_speed            }            SwitchDimmingSettings.objects.filter(device_id=device_id).update(**dimming_setting_data)            return response.json(0)        except Exception as e:            print(e)            return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))    @staticmethod    def get_chronopher_setting(request_dict, response):        """        获取定时计划设置        @param request_dict: 请求参数        @request_dict deviceId: 设备id        @param response: 响应对象        @return: response        """        device_id = request_dict.get('deviceId', None)        if not device_id:            return response.json(444)        try:            switch_chronopher_qs = SwitchChronopher.objects.filter(device_id=device_id).values()            if not switch_chronopher_qs.exists():                return response.json(173)            switch_chronopher_list = []            for item in switch_chronopher_qs:                switch_chronopher_list.append({                    'chronopherId': item['id'],                    'timeTypeRadio': item['time_type_radio'],                    'timePoint': item['time_point'],                    'timeQuantumStartTime': item['time_quantum_start_time'],                    'timeQuantumEndTime': item['time_quantum_end_time'],                    'timePointDeviceWillDoing': item['time_point_device_will_doing'],                    'timeQuantumDeviceWillDoing': item['time_quantum_device_will_doing'],                    'slowOpenOrCloseSpeed': item['slow_open_or_close_speed'],                    'repeat': item['repeat'],                })            return response.json(0, {'list': switch_chronopher_list})        except Exception as e:            print(e)            return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))    @staticmethod    def add_or_edit_chronopher(request_dict, response):        """        添加/编辑定时计划        @param request_dict: 请求参数        @request_dict deviceId: 设备id        @request_dict chronopherId: 定时计划id        @request_dict timeTypeRadio: 切换时间点/时间段        @request_dict timePoint: 时间点        @request_dict timeQuantumStartTime: 时间段开始时间        @request_dict timeQuantumEndTime: 时间段结束时间        @request_dict timePointDeviceWillDoing: 设备将会        @request_dict timeQuantumDeviceWillDoing: 设备将会        @request_dict slowOpenOrCloseSpeed: 缓慢开/关速度        @request_dict repeat: 重复周期        @param response: 响应对象        @return: response        """        is_edit = request_dict.get('isEdit', None)        device_id = request_dict.get('deviceId', None)        chronopher_id = request_dict.get('chronopherId', None)        time_type_radio = int(request_dict.get('timeTypeRadio', 0))        time_point = request_dict.get('timePoint', None)        time_quantum_start_time = request_dict.get('timeQuantumStartTime', None)        time_quantum_end_time = request_dict.get('timeQuantumEndTime', None)        time_point_device_will_doing = request_dict.get('timePointDeviceWillDoing', None)        time_quantum_device_will_doing = request_dict.get('timeQuantumDeviceWillDoing', None)        slow_open_or_close_speed = request_dict.get('slowOpenOrCloseSpeed', None)        repeat = request_dict.get('repeat', None)        if not all([device_id, repeat]):            return response.json(444, {'param': 'deviceId,repeat'})        device_qs = Device_Info.objects.filter(id=device_id).values('serial_number')        if not device_qs.exists():            return response.json(173)        if time_type_radio == 1:  # 时间点            if not all([time_point, slow_open_or_close_speed]):                return response.json(444, {'param': 'timePoint,slowOpenOrCloseSpeed'})            chronopher_data = {                'device_id': device_id,                'time_type_radio': time_type_radio,                'time_point': time_point,                'time_point_device_will_doing': time_point_device_will_doing,                'slow_open_or_close_speed': slow_open_or_close_speed,                'repeat': repeat            }        elif time_type_radio == 2:  # 时间段            if not all([time_quantum_start_time, time_quantum_end_time]):                return response.json(444, {'param': 'timeQuantumStartTime,timeQuantumEndTime'})            time_quantum_start_time = int(time_quantum_start_time)            time_quantum_end_time = int(time_quantum_end_time)            chronopher_data = {                'device_id': device_id,                'time_type_radio': time_type_radio,                'time_quantum_start_time': time_quantum_start_time,                'time_quantum_end_time': time_quantum_end_time,                'time_quantum_device_will_doing': time_quantum_device_will_doing,                'repeat': repeat            }        else:            return response.json(444, {'param': 'timeTypeRadio'})        try:            with transaction.atomic():                apscheduler_obj = ApschedulerObject()                if is_edit:                    if not chronopher_id:                        return response.json(444, {'param': 'timeTypeRadio'})                    update_flag = SwitchChronopher.objects.filter(device_id=device_id, id=chronopher_id).update(                        **chronopher_data)                    if not update_flag:                        return response.json(173)                    apscheduler_obj.del_job('switchchronopher_{}'.format(chronopher_id))                    apscheduler_obj.del_job('switchchronopher_{}_1'.format(chronopher_id))                    apscheduler_obj.del_job('switchchronopher_{}_2'.format(chronopher_id))                else:                    switch_qs = SwitchChronopher.objects.create(**chronopher_data)                    chronopher_id = switch_qs.id                # 设置定时任务                serial_number = device_qs[0]['serial_number']                topic_name = APSCHEDULER_TOPIC_NAME.format(serial_number)                if time_type_radio == 1:                    task_id = 'switchchronopher_{}'.format(chronopher_id)                    if time_point_device_will_doing in ['0', '1']:  # 开启或关闭                        msg = {                            "taskId": chronopher_id,                            "deviceSwitch": int(time_point_device_will_doing),  # 设备开关-1:反转,0:关,1:开,2:预设亮度                            "slowTime": slow_open_or_close_speed                        }                    else:  # 开启且设置亮度                        msg = {                            "taskId": chronopher_id,                            "deviceSwitch": 2,                            "pwmControl": int(time_point_device_will_doing),                            'slowTime': slow_open_or_close_speed                        }                    time_str = datetime.datetime.fromtimestamp(int(time_point))                    apscheduler_obj.create_cron_job(SmartSwitchView.send_mqtt, task_id, repeat, time_str.hour,                                                    time_str.minute, [serial_number, topic_name, msg, task_id])                else:                    start_hour = int(time_quantum_start_time / 60 // 60)                    start_minute = int(time_quantum_start_time / 60 % 60)                    end_hour = int(time_quantum_end_time / 60 // 60)                    end_minute = int(time_quantum_end_time / 60 % 60)                    if time_quantum_device_will_doing in ['0', '1']:                        begin_task_id = 'switchchronopher_{}_1'.format(chronopher_id)  # 开始任务id                        end_task_id = 'switchchronopher_{}_2'.format(chronopher_id)  # 结束任务id                        msg = {"taskId": chronopher_id,                               "deviceSwitch": int(time_quantum_device_will_doing)}                        apscheduler_obj.create_cron_job(SmartSwitchView.send_mqtt, begin_task_id, repeat, start_hour,                                                        start_minute, [serial_number, topic_name, msg, begin_task_id])                        msg = {"taskId": chronopher_id,                               "deviceSwitch": 0 if int(time_quantum_device_will_doing) == 1 else 1}                        apscheduler_obj.create_cron_job(SmartSwitchView.send_mqtt, end_task_id, repeat, end_hour,                                                        end_minute, [serial_number, topic_name, msg, end_task_id])                    else:  # 间隔任务                        minute = int(time_quantum_device_will_doing)                        task_id = 'switchchronopher_{}'.format(chronopher_id)  # 开始任务id                        msg = {"taskId": chronopher_id,                               "deviceSwitch": -1}                        if minute >= 60:                            hour = '{}-{}/{}'.format(start_hour, end_hour, minute // 60)                            minute = start_minute                        else:                            hour = '{}-{}'.format(start_hour, end_hour)                            minute = '{}/{}'.format(start_minute, minute)                        apscheduler_obj.create_cron_job(SmartSwitchView.send_mqtt, task_id, repeat, hour, minute,                                                        [serial_number, topic_name, msg, task_id])                return response.json(0)        except Exception as e:            print(e)            return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))    @staticmethod    def delete_chronopher(request_dict, response):        """        删除定时计划        @param request_dict: 请求参数        @request_dict deviceId: 设备id        @request_dict chronopherId: 定时计划id        @param response: 响应对象        @return: response        """        device_id = request_dict.get('deviceId', None)        chronopher_id = request_dict.get('chronopherId', None)        if not chronopher_id:            return response.json(444, {'error param': 'deviceId or chronopherId'})        try:            delete_flag = SwitchChronopher.objects.filter(device_id=device_id, id=chronopher_id).delete()            if not delete_flag[0]:                return response.json(173)            apscheduler_obj = ApschedulerObject()            apscheduler_obj.del_job('switchchronopher_{}'.format(chronopher_id))  # 删除定时任务            apscheduler_obj.del_job('switchchronopher_{}_1'.format(chronopher_id))            apscheduler_obj.del_job('switchchronopher_{}_2'.format(chronopher_id))            return response.json(0)        except Exception as e:            print(e)            return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))    @staticmethod    def send_mqtt(serial_number, topic_name, msg, task_id):        """        定时发送mqtt, (不要随意更改,否则定时任务不执行)        @param serial_number: 设备序列号        @param topic_name: 主题        @param msg: 消息        @param task_id: 任务id        @return: response        """        now_time = int(time.time())        msg['implementTime'] = now_time        redis_obj = RedisObject()        is_lock = redis_obj.CONN.setnx(task_id + 'do_notify', 1)        redis_obj.CONN.expire(task_id + 'do_notify', 60)        if not is_lock:            return        result = CommonService.req_publish_mqtt_msg(serial_number, topic_name, msg)        LOGGER.info('定时发送mqtt结果:{},参数:{},{},{},{},{},线程:{},进程:{}'.format(result, serial_number, topic_name, msg,                                                                         now_time, task_id,                                                                         threading.get_ident(), os.getpid()))        redis_obj.del_data(key=task_id + 'do_notify')    @staticmethod    def create_chronopher_log(request_dict, response):        """        生成执行日志        @param request_dict: 请求参数        @request_dict serialNumber: 设备序列号        @request_dict chronopherId: 排程id        @request_dict status: 执行状态        @param response: 响应对象        @return: response        """        serial_number = request_dict.get('serialNumber', None)        chronopher_id = request_dict.get('taskId', None)        operate_status = request_dict.get('status', None)        switch_status = request_dict.get('switchStatus', None)        implement_time = request_dict.get('implementTime', None)        if not all([serial_number, chronopher_id, operate_status, switch_status, implement_time]):            return response.json(444, {'error param': 'deviceId or chronopherId'})        device_qs = Device_Info.objects.filter(serial_number=serial_number).values('id')        if not device_qs.exists():            return response.json(173)        device_id = device_qs[0]['id']        chronopher_qs = SwitchChronopher.objects.filter(device_id=device_id, id=chronopher_id).values(            'time_type_radio', 'time_point', 'time_quantum_start_time', 'time_quantum_end_time',            'time_point_device_will_doing', 'time_quantum_device_will_doing', 'slow_open_or_close_speed', 'repeat')        if not chronopher_qs.exists():            return response.json(173)        try:            scene_log = {                'scene_id': chronopher_id,                'device_id': device_id,                'tasks': json.dumps(chronopher_qs[0]),                'status': operate_status,                'created_time': implement_time,            }            scene_qs = SceneLog.objects.filter(created_time=implement_time, device_id=device_id,                                               scene_id=chronopher_id)            if not scene_qs.exists():                SceneLog.objects.create(**scene_log)            operate_log = {                'device_id': device_id,                'status': switch_status,                'operate_type': 2,                'created_time': implement_time            }            operate_qs = SwitchOperateLog.objects.filter(**operate_log)            if not operate_qs.exists():                SwitchOperateLog.objects.create(**operate_log)            return response.json(0)        except Exception as e:            print(e)            return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))    @staticmethod    def get_chronopher_log(request_dict, response):        """        查询排程执行日志        @param request_dict: 请求参数        @request_dict deviceId: 设备id        @param response: 响应对象        @return: response        """        device_id = request_dict.get('deviceId', None)        if not device_id:            return response.json(444, {'error param': 'deviceId'})        try:            scene_qs = SceneLog.objects.filter(device_id=device_id).values('tasks', 'status', 'created_time', 'id')            res = []            for item in scene_qs:                res.append({                    'id': item['id'],                    'tasks': json.loads(item['tasks']),                    'status': item['status'],                    'created_time': item['created_time']                })            return response.json(0, res)        except Exception as e:            print(e)            return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))    @staticmethod    def reset(request_dict, response):        """        查询执行日志        @param request_dict: 请求参数        @request_dict serialNumber: 设备序列号        @param response: 响应对象        @return: response        """        serial_number = request_dict.get('serialNumber', None)        if not serial_number:            return response.json(444, {'error param': 'serialNumber'})        device_qs = Device_Info.objects.filter(serial_number=serial_number).values('id')        if not device_qs.exists():            return response.json(173)        device_id = device_qs[0]['device_id']        try:            # 删除智能开关数据            SwitchDimmingSettings.objects.filter(device_id=device_id).delete()            chronopher_qs = SwitchChronopher.objects.filter(device_id=device_id)            if chronopher_qs.exists():                apscheduler_obj = ApschedulerObject()                for chronopher in chronopher_qs:                    chronopher_id = chronopher.id                    apscheduler_obj.del_job('switchchronopher_{}'.format(chronopher_id))  # 删除定时任务                    apscheduler_obj.del_job('switchchronopher_{}_1'.format(chronopher_id))                    apscheduler_obj.del_job('switchchronopher_{}_2'.format(chronopher_id))                chronopher_qs.delete()            SceneLog.objects.filter(device_id=device_id).delete()            FamilyRoomDevice.objects.filter(device_id=device_id).delete()            Device_Info.objects.filter(id=device_id).delete()        except Exception as e:            print(e)            return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))    @staticmethod    def del_switch(device_id, serial_number):        """        删除开关        @param device_id: 设备id        @param serial_number: 设备序列号        @return: response        """        try:            SwitchDimmingSettings.objects.filter(device_id=device_id).delete()            chronopher_qs = SwitchChronopher.objects.filter(device_id=device_id)            if chronopher_qs.exists():                apscheduler_obj = ApschedulerObject()                for chronopher in chronopher_qs:                    chronopher_id = chronopher.id                    apscheduler_obj.del_job('switchchronopher_{}'.format(chronopher_id))  # 删除定时任务                    apscheduler_obj.del_job('switchchronopher_{}_1'.format(chronopher_id))                    apscheduler_obj.del_job('switchchronopher_{}_2'.format(chronopher_id))                chronopher_qs.delete()            SceneLog.objects.filter(device_id=device_id).delete()            msg = {                "device_reset": 1  # 重置智能开关            }            topic_name = RESET_SWITCH_TOPIC_NAME.format(serial_number)            result = CommonService.req_publish_mqtt_msg(serial_number, topic_name, msg)            LOGGER.info('执行重置开关mqtt结果:{}'.format(result))        except Exception as e:            print(e)            LOGGER.info('error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))    @staticmethod    def create_operate_log(request_dict, response):        """        设备上报排程日志        @param request_dict: 请求参数        @request_dict serialNumber: 设备序列号        @param response: 响应对象        @return: response        """        serial_number = request_dict.get('serialNumber', None)        status = request_dict.get('switchStatus', None)        implement_time = request_dict.get('implementTime', None)        if not all([serial_number, status, implement_time]):            return response.json(444, {'error param': 'deviceId or chronopherId'})        device_qs = Device_Info.objects.filter(serial_number=serial_number).values('id')        if not device_qs.exists():            return response.json(173)        device_id = device_qs[0]['id']        try:            operate_log = {                'device_id': device_id,                'status': status,                'created_time': implement_time,            }            operate_qs = SwitchOperateLog.objects.filter(**operate_log)            if not operate_qs.exists():                SwitchOperateLog.objects.create(**operate_log)            return response.json(0)        except Exception as e:            print(e)            return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))    @staticmethod    def get_operate_log(request_dict, response):        """        查询普通操作日志        @param request_dict: 请求参数        @request_dict deviceId: 设备id        @param response: 响应对象        @return: response        """        device_id = request_dict.get('deviceId', None)        if not device_id:            return response.json(444, {'error param': 'deviceId'})        try:            scene_qs = SwitchOperateLog.objects.filter(device_id=device_id).values('status', 'created_time', 'id',                                                                                   'operate_type')            return response.json(0, list(scene_qs))        except Exception as e:            print(e)            return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
 |