# -*- encoding: utf-8 -*- """ @File : SmartSocketController.py @Time : 2023/3/17 11:52 @Author : stephen @Email : zhangdongming@asj6.wecom.work @Software: PyCharm """ import datetime import logging import time from decimal import Decimal from django.db import transaction from django.http import QueryDict from django.views import View from Model.models import SocketInfo, SocketSchedule, Device_Info, UidSetModel, SocketPowerStatistics from Object.ResponseObject import ResponseObject from Object.utils import LocalDateTimeUtil from Service.CommonService import CommonService LOGGER = logging.getLogger('info') SOCKET_TOPIC_NAME = 'loocam/smart-socket/{}' # 插座发布消息主题(因设备当前版本只能订阅一个主题) class SmartSocketView(View): def get(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') return self.validation(request.GET, request, operation) def post(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') return self.validation(request.POST, request, operation) def delete(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') delete = QueryDict(request.body) if not delete: delete = request.GET return self.validation(delete, request, operation) def put(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') put = QueryDict(request.body) return self.validation(put, request, operation) def validation(self, request_dict, request, operation): ResponseObject('cn') if operation == 'savePowerStatistics': # 保存电量上报统计 return self.save_power_statistics(request_dict, ResponseObject('cn')) token_code, user_id, response = CommonService \ .verify_token_get_user_id(request_dict, request) if token_code != 0: return response.json(token_code) if operation == 'saveSwitch': # 添加插座开关 return self.save_switch(request_dict, response) elif operation == 'saveCountDown': # 添加插座倒计时 return self.save_count_down(request_dict, response) elif operation == 'saveSchedule': # 添加插座排程 return self.save_socket_schedule(request_dict, response) elif operation == 'get-today-scene': # 查询当天插座电量 return self.get_today_scene(request_dict, response) elif operation == 'get-all-scene': # 统计智能插座电量 return self.get_all_scene(request_dict, response) elif operation == 'get-socket-schedule': # 智能插座排程记录查询 return self.get_socket_schedule(request_dict, response) elif operation == 'get-log': # 智能插座开关日志记录查询 return self.get_log(request_dict, response, user_id) elif operation == 'editor-socket-device': # 编辑设备信息 return self.editor_socket_device(request_dict, response, user_id) return response.json(404) @classmethod def save_power_statistics(cls, request_dict, response): """ 保存设备上报电量统计 """ try: serial_number = request_dict.get('serialNumber', None) electricity = request_dict.get('electricity', 0.00) power = request_dict.get('power', 0.00) accumulated_time = request_dict.get('accumulatedTime', None) device_time = request_dict.get('deviceTime', None) LOGGER.info('{}上报电量统计data:{}'.format(serial_number, request_dict)) if not all([serial_number, electricity, power, accumulated_time, device_time]): return response.json(444) device_time = int(device_time) electricity = Decimal(electricity).quantize(Decimal("0.00")) power = Decimal(power).quantize(Decimal("0.00")) accumulated_time = int(accumulated_time) now_time = int(time.time()) start_time, end_time = LocalDateTimeUtil.get_today_date(True) # 查询当前序列号当天是否有上传过电量统计 power_qs = SocketPowerStatistics.objects.filter(serial_number=serial_number, created_time__gt=start_time, created_time__lte=end_time) data = { 'electricity': electricity, 'power': power, 'accumulated_time': accumulated_time, 'updated_time': now_time } if not power_qs.exists(): socket_info_qs = SocketInfo.objects.filter(serial_number=serial_number).values('device_id') if not socket_info_qs.exists(): return response.json(173) data['device_id'] = socket_info_qs[0]['device_id'] data['created_time'] = device_time data['serial_number'] = serial_number SocketPowerStatistics.objects.create(**data) return response.json(0) power_qs.update(**data) return response.json(0) except Exception as e: LOGGER.info('智能插座电量存库异常,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e))) return response.json(177) @staticmethod def get_serial_number_by_device_id(deviceId): """ 根据设备ID获取序列号 """ device_info = Device_Info.objects.get(id=deviceId) return device_info.serial_number @classmethod def save_switch(cls, request_dict, response): """ 添加开关 """ device_id = request_dict.get('deviceId', None) status = request_dict.get('status', None) if not all([device_id, status]): return response.json(444) serial_number = cls.get_serial_number_by_device_id(device_id) # 保存数据库并下发MQTT消息到插座设备 result = cls.save_socket_switch(device_id, serial_number, int(status)) if not result: return response.json(177) return response.json(0) @staticmethod def save_socket_switch(device_id, serial_number, status, type_switch=0): """ 保存插座开关信息 @param device_id: 设备ID @param serial_number: 序列号 @param status: 状态 0关,1开 @param type_switch: 0:总开关,1倒计时开关 @return: True | False """ if not device_id: return False socket_info_qs = SocketInfo.objects.filter(device_id=device_id, type_switch=type_switch) now_time = int(time.time()) try: with transaction.atomic(): # 创建插座开关信息 if not socket_info_qs.exists(): socket_dict = {"device_id": device_id, "serial_number": serial_number, "status": status, "type_switch": type_switch, "created_time": now_time, "updated_time": now_time, "online": True} SocketInfo.objects.create(**socket_dict) return True if socket_info_qs.first().status == status: return True socket_info_qs.update(status=status, updated_time=now_time) # 主题名称 topic_name = SOCKET_TOPIC_NAME.format(serial_number) # 发布消息内容 msg = {'type': 1, 'data': {'deviceSwitch': status}} result = CommonService.req_publish_mqtt_msg(serial_number, topic_name, msg) LOGGER.info('智能插座开关设置发布MQTT消息结果{}'.format(result)) return True except Exception as e: LOGGER.info('智能插座异常,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e))) return False @classmethod def save_count_down(cls, request_dict, response): """ 添加倒计时 """ device_id = request_dict.get('deviceId', None) status = request_dict.get('status', None) start = request_dict.get('start', None) count_down_time = request_dict.get('countDownTime', None) if not all([device_id, status, count_down_time]): return response.json(444) serial_number = cls.get_serial_number_by_device_id(device_id) # 保存数据库并下发MQTT消息到插座设备 result = cls.save_socket_count_down(device_id, serial_number, int(status), int(start), int(count_down_time)) if not result: return response.json(177) return response.json(0) @staticmethod def save_socket_count_down(device_id, serial_number, status, start, count_down_time, type_switch=1): """ 保存插座倒计时信息 @param count_down_time: 倒计时时间戳 @param start: 是否启动倒计时 0:关闭,1:开始 @param device_id: 设备ID @param serial_number: 序列号 @param status: 倒计时电源状态 0关,1开 @param type_switch: 0:总开关,1倒计时开关 @return: """ if not device_id: return False socket_info_qs = SocketInfo.objects.filter(device_id=device_id, type_switch=type_switch) now_time = int(time.time()) try: with transaction.atomic(): # 创建插座倒计时信息 if not socket_info_qs.exists(): socket_dict = {"device_id": device_id, "serial_number": serial_number, "status": status, "type_switch": type_switch, "created_time": now_time, "updated_time": now_time, "online": True, "count_down_time": count_down_time} socket_info_qs = SocketInfo.objects.create(**socket_dict) count_down_id = socket_info_qs.id else: socket_info_qs.update(status=status, count_down_time=count_down_time, updated_time=now_time) count_down_id = socket_info_qs.first().id # 主题名称 topic_name = SOCKET_TOPIC_NAME.format(serial_number) # 发布消息内容 msg = {'type': 2, 'data': {'powerType': status, 'countDownId': count_down_id, 'time': count_down_time, 'start': start}} result = CommonService.req_publish_mqtt_msg(serial_number, topic_name, msg) LOGGER.info('智能插座倒计时发布MQTT消息结果{}'.format(result)) return True except Exception as e: LOGGER.info('智能插座异常,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e))) return False @classmethod def save_socket_schedule(cls, request_dict, response): """ 插座添加排程 """ try: device_id = request_dict.get('deviceId', None) task_type = request_dict.get('timeType', None) start_time = request_dict.get('startTime', None) end_time = request_dict.get('endTime', None) repeat = request_dict.get('repeat', None) task_id = request_dict.get('taskId', None) device_switch = request_dict.get('deviceSwitch', None) task_switch = request_dict.get('taskSwitch', None) if not all([task_type, start_time, repeat, device_switch, task_switch]): return response.json(444) device_switch = int(device_switch) task_switch = int(task_switch) now_time = int(time.time()) data = {'time_type': int(task_type), 'start_time': int(start_time), 'repeat': int(repeat), 'switch_status': True if device_switch == 1 else False, 'task_status': True if task_switch == 1 else False} serial_number = cls.get_serial_number_by_device_id(device_id) if task_id: # 修改排程 task_id = int(task_id) socket_schedule_qs = SocketSchedule.objects.filter(id=task_id) if not socket_schedule_qs.exists(): return response.json(174) if end_time: data['end_time'] = int(end_time) data['updated_time'] = now_time socket_schedule_qs.update(**data) else: # 添加排程 data['device_id'] = device_id data['serial_number'] = serial_number data['updated_time'] = now_time data['created_time'] = now_time socket_schedule = SocketSchedule.objects.create(**data) task_id = socket_schedule.id # 将排程任务下发给设备 cls.send_socket_schedule(serial_number, task_id, int(task_type), int(start_time), int(end_time), int(repeat), device_switch, task_switch) return response.json(0) except Exception as e: LOGGER.info('智能插座异常,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e))) return False @staticmethod def send_socket_schedule(serial_number, task_id, time_type, start_time, end_time, repeat, device_switch, task_switch): """ 排程下发设备 @param serial_number: 序列号 @param task_id: 当前排程任务id @param time_type: 任务类型 0:设定时间,1:设定时间段 @param start_time: 开启时间 @param end_time: 结束时间 @param repeat: 重复日期 @param device_switch: 任务执行后期望设备状态,0:关闭,1:开启 @param task_switch: 任务执行状态 0:不执行,1:执行 @return: True | False """ msg = { 'type': 3, 'data': {'taskId': task_id, 'timeType': time_type, 'startTime': start_time, 'endTime': end_time, 'repeat': repeat, 'deviceSwitch': device_switch, 'taskSwitch': task_switch} } # 主题名称 topic_name = SOCKET_TOPIC_NAME.format(serial_number) result = CommonService.req_publish_mqtt_msg(serial_number, topic_name, msg) LOGGER.info('智能插座排程任务发布MQTT消息结果{}'.format(result)) return result # 以下是查询智能插座接口 @staticmethod def get_today_scene(request_dict, response): """ 查询当天插座电量 @request_dict serialNumber: 序列号 @param request_dict: 请求数据 @param response: 响应 @return: response """ serial_number = request_dict.get('serialNumber', None) if not all([serial_number]): return response.json(444) socket_power_qs = SocketPowerStatistics.objects.filter(serial_number=serial_number).values('power', 'accumulated_time', 'electricity') if not socket_power_qs.exists(): return response.json(173) socket_info_qs = SocketInfo.objects.filter(serial_number=serial_number).values('status', 'online', 'count_down_time') if not socket_info_qs.exists(): return response.json(173) try: data = {} data['serialNumber'] = serial_number data['power'] = socket_power_qs[0]['power'] if socket_power_qs[0]['power'] else 0.00 data['electricity'] = socket_power_qs[0]['electricity'] if socket_power_qs[0]['electricity'] else 0.00 data['accumulatedTime'] = socket_power_qs[0]['accumulated_time'] if socket_power_qs[0][ 'accumulated_time'] else '0:00' data['status'] = socket_info_qs[0]['status'] if socket_info_qs[0]['status'] else False data['online'] = socket_info_qs[0]['online'] if socket_info_qs[0]['online'] else False data['count_down_time'] = socket_info_qs[0]['count_down_time'] if socket_info_qs[0][ 'count_down_time'] else '00:00:00' return response.json(0, data) except Exception as e: print(e) return response.json(500) @classmethod def get_all_scene(cls, request_dict, response): ''' 统计智能插座电量 @request_dict serialNumber: 序列号 @request_dict unit: 时间单位 @param request_dict: 请求数据 @param response: 响应 @return: response ''' serial_number = request_dict.get('serialNumber', None) unit = request_dict.get('unit', None) # 确定是否会传值 startTime = request_dict.get('startTime', None) endTime = request_dict.get('endTime', None) if not all([unit, serial_number]): return response.json(444) socket_power_qs = SocketPowerStatistics.objects.filter(serial_number=serial_number).values( 'electricity', 'accumulated_time', 'power') if not socket_power_qs.exists(): return response.json(173) data = {} # for device_info in device_info_qs: # device_id = device_info['id'] # device_list.append(device_id) try: data = { 'electricityToday': '', 'accumulated_time': '', 'power': '', 'electricityYesterday': '', 'electricityMonth': '', 'allElectricity': '', 'accumulatedTime': '', 'accumulatedPower': '', } nowTime = endTime data['electricity'] = socket_power_qs[0]['electricity'] if socket_power_qs[0]['electricity'] else 0.00 # 昨天使用电量 # data['electricityYesterday'] = socket_power_qs[1]['electricity'] if socket_power_qs[1][ # 'electricity'] else 0.00 data['accumulated_time'] = socket_power_qs[0]['accumulated_time'] if socket_power_qs[0][ 'accumulated_time'] else 0.00 data['power'] = socket_power_qs[0]['power'] if socket_power_qs[0]['power'] else 0.00 time_list = [] if not endTime: nowTime = 1679241600 if unit == 'week': startTime = startTime if not startTime: startTime = datetime.datetime.fromtimestamp(int(nowTime)) - datetime.timedelta(days=7) end_time = datetime.datetime.fromtimestamp(int(nowTime)) time_list = CommonService.cutting_time(startTime, end_time, time_unit='day') elif unit == 'month': startTime = startTime if not startTime: startTime = datetime.datetime.fromtimestamp(int(nowTime)) - datetime.timedelta(days=30) end_time = datetime.datetime.fromtimestamp(int(nowTime)) time_list = CommonService.cutting_time(startTime, end_time, time_unit='day') elif unit == 'year': startTime = startTime if not startTime: startTime = datetime.datetime.fromtimestamp(int(nowTime)) - datetime.timedelta(days=365) end_time = datetime.datetime.fromtimestamp(int(nowTime)) time_list = CommonService.cutting_time(startTime, end_time, time_unit='month') # all_time_list = [] new_list = [] for item in time_list: socket_power_qs = socket_power_qs.filter(created_time__gte=item[0], created_time__lt=item[1]) time_tuple = time.localtime(item[-1]) # 把时间戳转换成时间元祖 items = time.strftime('%Y-%m-%d', time_tuple) electricity = 0.00 new_list.append({ 'time': items, 'electricity': electricity }) # #字典key相同则value放一起 # socket_list = list(socket_power_qs) # new_list.append(socket_list[0]) # for socket in range(1,len(socket_list)): # for new in new_list: # if operator.eq(new.keys(), socket_list[socket].keys()): # for key in new.keys(): # new[key] += socket_list[socket][key] # break # elif operator.eq(new, new_list[-1]): # new_list.append(socket_list[socket]) # break # data['dict'] = new_list data['week_or_month_or_year'] = new_list return response.json(0, data) except Exception as e: print(e) return response.json(500) @staticmethod def get_socket_schedule(request_dict, response): """ 智能插座排程记录查询 @param request_dict: 请求参数 @request_dict page: 页数 @request_dict size: 条数 @request_dict serialNumber: 设备序列号 @param response: 响应对象 @return: response """ page = request_dict.get('pageNo', None) size = request_dict.get('pageSize', None) serial_number = request_dict.get('serialNumber', None) if not all([page, size, serial_number]): return response.json(444) page, size = int(page), int(size) socket_schedule_qs = SocketSchedule.objects.filter(serial_number=serial_number).values('switch_status', 'start_time', 'end_time', 'repeat', 'task_status') count = socket_schedule_qs.count() socket_schedule_qs = socket_schedule_qs[(page - 1) * size:page * size] if not socket_schedule_qs.exists(): return response.json(173) try: schedule_list = [] for socket_schedule in socket_schedule_qs: schedule_list.append({ 'start_time': socket_schedule['start_time'], 'end_time': socket_schedule['end_time'], 'switch_status': socket_schedule['switch_status'] if socket_schedule['switch_status'] else False, 'task_status': socket_schedule['task_status'] if socket_schedule['task_status'] else False, # 需转换进制 'repeat': socket_schedule['repeat'] }) return response.json(0, {'list': schedule_list, 'total': count}) except Exception as e: print(e) return response.json(500) @staticmethod def get_log(request_dict, response, user_id): """ 智能插座开关日志记录查询 @param request_dict: 请求参数 @param user_id: 用户ID @request_dict page: 页数 @request_dict size: 条数 @request_dict serialNumber: 设备序列号 @request_dict startTime: 开始时间 @request_dict endTime: 结束时间 @param response: 响应对象 @return: response """ page = request_dict.get('pageNo', None) size = request_dict.get('pageSize', None) serial_number = request_dict.get('serialNumber', None) start_time = request_dict.get('startTime', None) end_time = request_dict.get('endTime', None) if not all([page, size, serial_number]): return response.json(444) socket_info_qs = SocketInfo.objects.filter(serial_number=serial_number) device_id_list = [] pass @staticmethod def editor_socket_device(request_dict, response, user_id): """ 编辑设备 """ # 编辑插座信息 serial_number = request_dict.get('serialNumber', None) NickName = request_dict.get('NickName', None) # room_save # familyRoom = request_dict.get('familyRoom', None) type_switch = request_dict.get('typeSwitch', None) # 1:倒计时开关 status = request_dict.get('status', None) # 开关状态 0:关闭,1:开启' # 编辑插座排程 count_down_time = request_dict.get('countDownTime', None) # 倒计时时间戳 time_type = request_dict.get('timeType', None) # 排查时间类型 0:按时间 1:按时间段划分' switch_status = request_dict.get('switchStatus', None) # 开关状态 0:关闭,1:开启 start_time = request_dict.get('startTime', None) end_time = request_dict.get('endTime', None) repeat = request_dict.get('repeat', None) # 重复周期用数值表示 if not all([serial_number]): return response.json(444) socket_info_qs = SocketInfo.objects.filter(serial_number=serial_number) if not socket_info_qs.exists(): return response.json(173) try: with transaction.atomic(): if NickName: Device_Info.objects.update(NickName=NickName) UidSetModel.objects.update(nickname=NickName) elif type_switch: socket_info_qs.update(type_switch=type_switch) elif status: socket_info_qs.update(status=status) elif count_down_time: socket_info_qs.update(count_down_time=count_down_time) socket_qs = SocketSchedule.objects.filter(serial_number=serial_number) if not socket_qs.exists(): socket_info_qs.save() return response(0) elif time_type: socket_qs.update(time_type=time_type) elif switch_status: socket_qs.update(switch_status=switch_status) elif start_time: socket_qs.update(start_time=start_time) elif end_time: socket_qs.update(end_time=end_time) elif repeat: socket_qs.update(repeat=repeat) socket_info_qs.save() socket_qs.save() return response.json(0) except Exception as e: print(e) return response.json(500)