import json import time import requests from django.db.models import Q from Ansjer.config import BASE_DIR from Model.models import * from Service.EquipmentInfoService import EquipmentInfoService # 针对模型封装的复用性代码 class ModelService: # UID Manage检测权限 @staticmethod def check_perm_uid_manage(userID, permID): try: user_qs = UserModel.objects.filter(id=userID) if user_qs.exists(): user = user_qs[0] if int(user.permission) != 0: return False else: return True else: return False except Exception as e: print(repr(e)) # 获取当前用户角色名 @staticmethod def getRole(rid): return Role.objects.get(rid=rid).roleName # 获取用户所有权限 @staticmethod def own_permission(userID): permission = Device_User.objects.get(userID=userID).role.values_list('permission', flat=True) if permission: return list(permission) return [] # 获取用户角色相关 @staticmethod def own_role(userID): try: role_qs = Device_User.objects.get(userID=userID).role.values('rid', 'roleName') if role_qs.exists(): return {'rid': role_qs[0]['rid'], 'roleName': role_qs[0]['roleName']} except Exception as e: pass return {'rid': '', 'roleName': ''} # 检测权限有无 @staticmethod def check_perm(userID, permID): try: perm_list = Device_User.objects.get(userID=userID).role.values_list('permission', flat=True) if perm_list: if permID in perm_list: return True except Exception as e: return False return False # 根据设备主键ID判断是否拥有该设备 @staticmethod def check_user_own_device(userID, deviceID): try: dvqs = Device_Info.objects.filter(userID_id=userID).values_list('id', flat=True) if dvqs: if deviceID in dvqs: return True except Exception as e: return False return False # 根据设设备唯一名称UID判断是否拥有该设备 @staticmethod def check_own_device(userID, UID): dvqs = Device_Info.objects.filter(userID_id=userID, UID=UID) if dvqs.exists(): return True return False # 根据userID获取用户名 @staticmethod def get_user_name(userID): try: if userID: device_user = Device_User.objects.get(userID=userID) return device_user.username else: return '' except Exception as e: return '' @staticmethod def get_user_mark(userID): if userID: qs = Device_User.objects.filter(userID=userID).values('username', 'userEmail', 'phone') if qs[0]['username']: return qs[0]['username'] elif qs[0]['userEmail']: return qs[0]['userEmail'] elif qs[0]['phone']: return qs[0]['phone'] else: return '' else: return '' # 根据username获取userID @staticmethod def get_userID_byname(username): try: device_user = Device_User.objects.get(Q(username=username) | Q(userEmail=username) | Q(phone=username)) except Exception as e: return None else: return device_user.userID # 访问日志批量添加 @staticmethod def add_batch_log(data_list): try: if data_list: querysetlist = [] for i in data_list: data = json.loads(i.decode('utf-8')) querysetlist.append(Access_Log(**data)) Access_Log.objects.bulk_create(querysetlist) else: return except Exception as e: print('ggga') print(repr(e)) return False else: return True # 通过用户名获取userIDLIST @staticmethod def get_user_list_by_username(username): userID_list = Device_User.objects.filter(Q(username=username) | Q(userEmail=username) | Q(phone=username)). \ values_list('userID', flat=True) return userID_list @staticmethod def del_eq_info(userID, uid): notify_alexa_delete(userID, uid) ei_qs = Equipment_Info.objects.filter(userID_id=userID, devUid=uid) ei_qs.delete() for i in range(1, 8): eq_list = EquipmentInfoService.get_equipment_info_model('', i) eq_list = eq_list.filter(device_user_id=userID, device_uid=uid) if eq_list.exists(): eq_list.delete() @staticmethod def del_user_list_eq_info(user_id_list, user_id, uid): """ 根据用户id列表删除设备推送消息 @param user_id_list: 用户id列表 @param user_id: 主用户id @param uid: @return: """ notify_alexa_delete(user_id, uid) for i in range(1, 8): EquipmentInfoService.get_equipment_info_model('', i).\ filter(device_user_id__in=user_id_list, device_uid=uid).delete() # 获取绑定用户设备列表 @staticmethod def get_uid_list(userID): uid_list = Device_Info.objects.filter(userID_id=userID).values_list('UID', flat=True) return list(uid_list) @staticmethod def notify_alexa_add(uid, userID, nickname, encrypt_pwd): url = 'https://www.zositech.xyz/deviceStatus/addOrUpdate' data = { 'UID': uid, 'userID': userID, 'uid_nick': nickname, 'password': encrypt_pwd, } try: res = requests.post(url, data=data, timeout=5) except Exception as e: print(repr(e)) @staticmethod def add_log(ip, userID, operation): file_path = '/'.join((BASE_DIR, 'static/delete_device.log')) file = open(file_path, 'a+') file.write(ip + "; username:" + userID + "; time:" + time.strftime( "%Y-%m-%d %H:%M:%S", time.localtime()) + "; " + operation) file.write('\n') file.flush() file.close() @staticmethod def update_log(ip, userID, operation, content, id): content['xid'] = id file_path = '/'.join((BASE_DIR, 'static/update_device.log')) file = open(file_path, 'a+') file.write(ip + "; username:" + userID + "; time:" + time.strftime( "%Y-%m-%d %H:%M:%S", time.localtime()) + "; content:" + json.dumps(content) + "; " + operation) file.write('\n') file.flush() file.close() @staticmethod def add_ip_log(ip, info): file_path = '/'.join((BASE_DIR, 'static/get_timezone.log')) file = open(file_path, 'a+') file.write(ip + "; info:" + str(info) + "; time:" + time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())) file.write('\n') file.flush() file.close() @staticmethod def add_tmp_log(info): file_path = '/'.join((BASE_DIR, 'static/tmp_test.log')) file = open(file_path, 'a+') file.write("info:" + str(info)) file.write('\n') file.flush() file.close() @staticmethod def app_log_log(userID, UID): file_path = '/'.join((BASE_DIR, 'static/app_log.log')) file = open(file_path, 'a+') file.write("username:" + userID + "; time:" + time.strftime( "%Y-%m-%d %H:%M:%S", time.localtime()) + "; " + "; uid:" + UID) file.write('\n') file.flush() file.close() def notify_alexa_delete(userID, UID): url = 'https://www.zositech.xyz/deviceStatus/delete' data = { 'userID': userID, 'UID': UID } try: requests.post(url=url, data=data, timeout=5) except Exception as e: print(repr(e))