| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245 | import jsonimport timeimport requestsfrom django.db.models import Qfrom Ansjer.config import BASE_DIRfrom Model.models import *from Service.EquipmentInfoService import EquipmentInfoService# 针对模型封装的复用性代码class ModelService:    # UID Manage检测权限    @staticmethod    def check_perm_uid_manage(userID, permID):        try:            user_qs = UserModel.objects.filter(id=userID)            if user_qs.exists():                user = user_qs[0]                if int(user.permission) != 0:                    return False                else:                    return True            else:                return False        except Exception as e:            print(repr(e))    # 获取当前用户角色名    @staticmethod    def getRole(rid):        return Role.objects.get(rid=rid).roleName    # 获取用户所有权限    @staticmethod    def own_permission(userID):        permission = Device_User.objects.get(userID=userID).role.values_list('permission', flat=True)        if permission:            return list(permission)        return []    # 获取用户角色相关    @staticmethod    def own_role(userID):        try:            role_qs = Device_User.objects.get(userID=userID).role.values('rid', 'roleName')            if role_qs.exists():                return {'rid': role_qs[0]['rid'], 'roleName': role_qs[0]['roleName']}        except Exception as e:            pass        return {'rid': '', 'roleName': ''}    # 检测权限有无    @staticmethod    def check_perm(userID, permID):        try:            perm_list = Device_User.objects.get(userID=userID).role.values_list('permission', flat=True)            if perm_list:                if permID in perm_list:                    return True        except Exception as e:            return False        return False    # 根据设备主键ID判断是否拥有该设备    @staticmethod    def check_user_own_device(userID, deviceID):        try:            dvqs = Device_Info.objects.filter(userID_id=userID).values_list('id', flat=True)            if dvqs:                if deviceID in dvqs:                    return True        except Exception as e:            return False        return False    # 根据设设备唯一名称UID判断是否拥有该设备    @staticmethod    def check_own_device(userID, UID):        dvqs = Device_Info.objects.filter(userID_id=userID, UID=UID)        if dvqs.exists():            return True        return False    # 根据userID获取用户名    @staticmethod    def get_user_name(userID):        try:            if userID:                device_user = Device_User.objects.get(userID=userID)                return device_user.username            else:                return ''        except Exception as e:            return ''    @staticmethod    def get_user_mark(userID):        if userID:            qs = Device_User.objects.filter(userID=userID).values('username', 'userEmail', 'phone')            if qs[0]['username']:                return qs[0]['username']            elif qs[0]['userEmail']:                return qs[0]['userEmail']            elif qs[0]['phone']:                return qs[0]['phone']            else:                return ''        else:            return ''    # 根据username获取userID    @staticmethod    def get_userID_byname(username):        try:            device_user = Device_User.objects.get(Q(username=username) | Q(userEmail=username) | Q(phone=username))        except Exception as e:            return None        else:            return device_user.userID    # 访问日志批量添加    @staticmethod    def add_batch_log(data_list):        try:            if data_list:                querysetlist = []                for i in data_list:                    data = json.loads(i.decode('utf-8'))                    querysetlist.append(Access_Log(**data))                Access_Log.objects.bulk_create(querysetlist)            else:                return        except Exception as e:            print('ggga')            print(repr(e))            return False        else:            return True    # 通过用户名获取userIDLIST    @staticmethod    def get_user_list_by_username(username):        userID_list = Device_User.objects.filter(Q(username=username) | Q(userEmail=username) | Q(phone=username)). \            values_list('userID', flat=True)        return userID_list    @staticmethod    def del_eq_info(userID, uid):        notify_alexa_delete(userID, uid)        EquipmentInfoService.delete_all_equipment_info(device_user_id=userID, device_uid=uid)    @staticmethod    def del_user_list_eq_info(user_id_list, user_id, uid):        """        根据用户id列表删除设备推送消息        @param user_id_list: 用户id列表        @param user_id: 主用户id        @param uid:        @return:        """        notify_alexa_delete(user_id, uid)        EquipmentInfoService.delete_all_equipment_info(device_user_id__in=user_id_list, device_uid=uid)    # 获取绑定用户设备列表    @staticmethod    def get_uid_list(userID):        uid_list = Device_Info.objects.filter(userID_id=userID).values_list('UID', flat=True)        return list(uid_list)    @staticmethod    def notify_alexa_add(uid, userID, nickname, encrypt_pwd):        url = 'https://www.zositech.xyz/deviceStatus/addOrUpdate'        data = {            'UID': uid,            'userID': userID,            'uid_nick': nickname,            'password': encrypt_pwd,        }        try:            res = requests.post(url, data=data, timeout=5)        except Exception as e:            print(repr(e))    @staticmethod    def add_log(ip, userID, operation):        file_path = '/'.join((BASE_DIR, 'static/delete_device.log'))        file = open(file_path, 'a+')        file.write(ip + "; username:" + userID + "; time:" + time.strftime(            "%Y-%m-%d %H:%M:%S", time.localtime()) + "; " + operation)        file.write('\n')        file.flush()        file.close()    @staticmethod    def update_log(ip, userID, operation, content, id):        content['xid'] = id        file_path = '/'.join((BASE_DIR, 'static/update_device.log'))        file = open(file_path, 'a+')        file.write(ip + "; username:" + userID + "; time:" + time.strftime(            "%Y-%m-%d %H:%M:%S", time.localtime()) + "; content:" + json.dumps(content) + "; " + operation)        file.write('\n')        file.flush()        file.close()    @staticmethod    def add_ip_log(ip, info):        file_path = '/'.join((BASE_DIR, 'static/get_timezone.log'))        file = open(file_path, 'a+')        file.write(ip + "; info:" + str(info) + "; time:" + time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()))        file.write('\n')        file.flush()        file.close()    @staticmethod    def add_tmp_log(info):        file_path = '/'.join((BASE_DIR, 'static/tmp_test.log'))        file = open(file_path, 'a+')        file.write("info:" + str(info))        file.write('\n')        file.flush()        file.close()    @staticmethod    def app_log_log(userID, UID):        file_path = '/'.join((BASE_DIR, 'static/app_log.log'))        file = open(file_path, 'a+')        file.write("username:" + userID + "; time:" + time.strftime(            "%Y-%m-%d %H:%M:%S", time.localtime()) + "; " + "; uid:" + UID)        file.write('\n')        file.flush()        file.close()def notify_alexa_delete(userID, UID):    url = 'https://www.zositech.xyz/deviceStatus/delete'    data = {        'userID': userID,        'UID': UID    }    try:        requests.post(url=url, data=data, timeout=5)    except Exception as e:        print(repr(e))
 |