import base64 import calendar import datetime import re import time from base64 import encodebytes from distutils.version import LooseVersion from pathlib import Path from random import Random import OpenSSL.crypto as ct import ipdb import requests import simplejson as json from dateutil.relativedelta import relativedelta from django.core import serializers from django.db.models import F from django.utils import timezone from django.utils.crypto import constant_time_compare from pyipip import IPIPDatabase from Ansjer.config import BASE_DIR, SERVER_DOMAIN_SSL, CONFIG_INFO, CONFIG_TEST, CONFIG_CN, SERVER_DOMAIN_TEST, \ SERVER_DOMAIN_CN, SERVER_DOMAIN_US, CONFIG_US, CONFIG_EUR, SERVER_DOMAIN_LIST, SERVER_DOMAIN_EUR, ALEXA_DOMAIN, \ ME_COUNTRY_ID_LIST, EA_COUNTRY_ID_LIST from Controller.CheckUserData import RandomStr from Model.models import iotdeviceInfoModel, Device_Info, UIDModel, AppDeviceType, UIDCompanySerialModel, GatewayPush, \ Device_User, UserExModel from Object.AWS.S3Email import S3Email from Object.ResponseObject import ResponseObject from Object.TokenObject import TokenObject class CommonService: # 高复用性函数类 @staticmethod def get_kwargs(data=None): # 添加模糊搜索 if data is None: data = {} kwargs = {} for (k, v) in data.items(): if v is not None and v != u'': kwargs[k + '__icontains'] = v return kwargs @staticmethod def qs_to_dict(query_set): # 格式化query_set转dict sqlJSON = serializers.serialize('json', query_set) sqlList = json.loads(sqlJSON) sqlDict = dict(zip(["datas"], [sqlList])) return sqlDict # 格式化query_set转dict @staticmethod def request_dict_to_dict(request_dict): # 传参格式转换,键包含meta获取meta[]中的值,值'true'/'false'转为True,False key_list = [] value_list = [] for k, v in request_dict.items(): key_list.append(k[k.index('[') + 1:k.index(']')] if 'meta' in k else k) if v == 'true': v = True elif v == 'false': v = False value_list.append(v) data_dict = dict(zip(key_list, value_list)) print(data_dict) return data_dict # 获取文件大小 @staticmethod def get_file_size(file_path='', suffix_type='', decimal_point=0): # for x in ['bytes', 'KB', 'MB', 'GB', 'TB']: # path = Path() / 'D:/TestServer/123444.mp4' path = Path() / file_path size = path.stat().st_size mb_size = 0.0 if suffix_type == 'MB': mb_size = size / 1024.0 / 1024.0 if decimal_point != 0: mb_size = round(mb_size, decimal_point) return mb_size @staticmethod def get_param_flag(data=None): # print(data) if data is None: data = [] flag = True for v in data: if v is None: flag = False break return flag @staticmethod def get_ip_address(request): """ 获取ip地址 :param request: :return: """ try: real_ip = request.META['HTTP_X_FORWARDED_FOR'] clientIP = real_ip.split(",")[0] except: try: clientIP = request.META['REMOTE_ADDR'] except Exception as e: clientIP = '' return clientIP # @获取一天每个小时的datetime.datetime @staticmethod def getTimeDict(times): time_dict = {} t = 0 for x in range(24): if x < 10: x = '0' + str(x) else: x = str(x) a = times.strftime("%Y-%m-%d") + " " + x + ":00:00" time_dict[t] = timezone.datetime.strptime(a, '%Y-%m-%d %H:%M:%S') t += 1 return time_dict # 根据ip获取地址 @staticmethod def getAddr(ip): print('start_time=' + str(time.time())) base_dir = BASE_DIR # ip数据库 db = IPIPDatabase(base_dir + '/DB/17monipdb.dat') addr = db.lookup(ip) # ModelService.add_tmp_log(addr) ts = addr.split('\t')[0] print('end_time=' + str(time.time())) return ts # 通过ip检索ipip指定信息 lang为CN或EN @staticmethod def getIpIpInfo(ip, lang, update=False): ipbd_dir = BASE_DIR + "/DB/mydata4vipday2.ipdb" db = ipdb.City(ipbd_dir) if update: rr = db.reload(ipbd_dir) info = db.find_map(ip, lang) return info @staticmethod def getUserID(userPhone='13800138000', getUser=True, setOTAID=False, μs=True): if μs == True: if getUser == True: timeID = str(round(time.time() * 1000000)) userID = timeID + userPhone return userID else: if setOTAID == False: timeID = str(round(time.time() * 1000000)) ID = userPhone + timeID return ID else: timeID = str(round(time.time() * 1000000)) eID = '13800' + timeID + '138000' return eID else: if getUser == True: timeID = str(round(time.time() * 1000)) userID = timeID + userPhone return userID else: if setOTAID == False: timeID = str(round(time.time() * 1000)) ID = userPhone + timeID return ID else: timeID = str(round(time.time() * 1000)) eID = '13800' + timeID + '138000' return eID @staticmethod def get_username(userID): """ 根据用户id获取用户名/邮箱/电话 @param userID: 用户id @return: """ if userID: device_user_qs = Device_User.objects.filter(userID=userID).values('username', 'userEmail', 'phone') if device_user_qs.exists(): if device_user_qs[0]['username']: return device_user_qs[0]['username'] elif device_user_qs[0]['userEmail']: return device_user_qs[0]['userEmail'] elif device_user_qs[0]['phone']: return device_user_qs[0]['phone'] return '' # 生成随机数 @staticmethod def RandomStr(randomlength=8, number=True): str = '' if number == False: characterSet = 'AaBbCcDdEeFfGgHhIiJjKkLlMmNnOoPpQqRrSsT' \ 'tUuVvWwXxYyZz0123456789' else: characterSet = '0123456789' length = len(characterSet) - 1 random = Random() for index in range(randomlength): str += characterSet[random.randint(0, length)] return str # 生成订单好 @staticmethod def createOrderID(): random_id = CommonService.RandomStr(6, True) order_id = datetime.datetime.now().strftime('%Y%m%d%H%M%S') + str(random_id) print('orderID:') print(order_id) return order_id # qs转换list datetime处理 @staticmethod def qs_to_list(qs): res = [] # print(qs) for ps in qs: try: if 'time' in ps: ps['time'] = ps['time'].strftime("%Y-%m-%d %H:%M:%S") if 'add_time' in ps: ps['add_time'] = ps['add_time'].strftime("%Y-%m-%d %H:%M:%S") if 'update_time' in ps: ps['update_time'] = ps['update_time'].strftime("%Y-%m-%d %H:%M:%S") if 'end_time' in ps: ps['end_time'] = ps['end_time'].strftime("%Y-%m-%d %H:%M:%S") if 'data_joined' in ps: if ps['data_joined']: ps['data_joined'] = ps['data_joined'].strftime("%Y-%m-%d %H:%M:%S") else: ps['data_joined'] = '' if 'userID__data_joined' in ps: if ps['userID__data_joined']: ps['userID__data_joined'] = ps['userID__data_joined'].strftime("%Y-%m-%d %H:%M:%S") else: ps['userID__data_joined'] = '' except Exception as e: pass res.append(ps) return res # 获取当前时间 @staticmethod def get_now_time_str(n_time, tz, lang): print(n_time) print(tz) print(lang) n_time = int(n_time) + 3600 * float(tz) if lang == 'cn': return time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime(int(n_time))) else: return time.strftime('%m-%d-%Y %H:%M:%S', time.gmtime(int(n_time))) # 生成随机数 @staticmethod def encrypt_data(randomlength=8, number=False): str = '' if number == False: characterSet = 'AaBbCcDdEeFfGgHhIiJjKkLlMmNnOoPpQqRrSsT' \ 'tUuVvWwXxYyZz0123456789' else: characterSet = '0123456789' length = len(characterSet) - 1 random = Random() for index in range(randomlength): str += characterSet[random.randint(0, length)] return str @staticmethod def encode_data(content, start=1, end=4): """ 数据加密 @param content: 数据内容 @param start: 起始长度 @param end: 结束长度 @return content: 加密的数据 """ if not content: return '' for i in range(start, end): length = end - i content = RandomStr(length, False) + content + RandomStr(length, False) content = base64.b64encode(str(content).encode('utf-8')).decode('utf8') return content @staticmethod def decode_data(content, start=1, end=4): """ 数据解密 @param content: 数据内容 @param start: 起始长度 @param end: 结束长度 @return content: 解密的数据 """ if not content: return '' for i in range(start, end): content = base64.b64decode(content) content = content.decode('utf-8') content = content[i:-i] return content # 把格式化时间转换成时间戳 @staticmethod def str_to_timestamp(str_time=None, format='%Y-%m-%d %H:%M:%S'): if str_time: time_tuple = time.strptime(str_time, format) # 把格式化好的时间转换成元祖 result = time.mktime(time_tuple) # 把时间元祖转换成时间戳 return int(result) return int(time.time()) # 把时间戳转换成格式化 @staticmethod def timestamp_to_str(timestamp=None, format='%Y-%m-%d %H:%M:%S'): if timestamp: time_tuple = time.localtime(timestamp) # 把时间戳转换成时间元祖 result = time.strftime(format, time_tuple) # 把时间元祖转换成格式化好的时间 return result else: return time.strptime(format) @staticmethod def get_date_from_timestamp(timestamp, timezone_offset): # 创建时区对象 tz = datetime.timezone(datetime.timedelta(hours=timezone_offset)) # 使用时间戳创建 datetime 对象 dt = datetime.datetime.fromtimestamp(timestamp, tz) # 格式化成 '%Y-%m-%d' formatted_date = dt.strftime('%Y-%m-%d %H:%M:%S') return formatted_date # 计算N个月后的时间戳 @staticmethod def calcMonthLater(addMonth, unix_timestamp=None): if unix_timestamp: now_year = time.localtime(unix_timestamp).tm_year now_month = time.localtime(unix_timestamp).tm_mon now_day = time.localtime(unix_timestamp).tm_mday now_hour = time.localtime(unix_timestamp).tm_hour now_min = time.localtime(unix_timestamp).tm_min now_second = time.localtime(unix_timestamp).tm_sec else: now_year = datetime.datetime.now().year now_month = datetime.datetime.now().month now_day = datetime.datetime.now().day now_hour = datetime.datetime.now().hour now_min = datetime.datetime.now().minute now_second = datetime.datetime.now().second for add in range(addMonth): if now_month == 12: now_year += 1 now_month = 1 else: now_month += 1 timestamps = 0 for is_format in range(4): try: date_format = '{now_year}-{now_month}-{now_day} {now_hour}:{now_min}:{now_second}' \ .format(now_year=now_year, now_month=now_month, now_day=now_day, now_hour=now_hour, now_min=now_min, now_second=now_second) timestamps = CommonService.str_to_timestamp(date_format) except Exception as e: if str(e) == 'day is out of range for month': now_day = now_day - 1 return timestamps @staticmethod def updateMac(mac: str): macArray = mac.split(':') macArray[0] = int(macArray[0], 16) macArray[1] = int(macArray[1], 16) macArray[2] = int(macArray[2], 16) first = int(macArray[5], 16) second = int(macArray[4], 16) three = int(macArray[3], 16) if first == 255 and second == 255 and three == 255: return None first += 1 if first / 256 == 1: second += 1 first = first % 256 if second / 256 == 1: three += 1 second = second % 256 macArray[3] = three macArray[4] = second macArray[5] = first tmp = ':'.join(map(lambda x: "%02x" % x, macArray)) return tmp.upper() @staticmethod def encode_data_without_salt(content): return base64.b64encode(str(content).encode("utf-8")).decode('utf8') @staticmethod def check_time_stamp_token(token, time_stamp): # 时间戳token校验 if not all([token, time_stamp]): return False try: token = int(CommonService.decode_data(token)) time_stamp = int(time_stamp) now_time = int(time.time()) distance = now_time - time_stamp if token != time_stamp or distance > 60000 or distance < -60000: # 为了全球化时间控制在一天内 return False return True except Exception as e: print(e) return False @staticmethod def check_time_stamp_token_without_distance(time_stamp_token, time_stamp): """ 用于没有RTC设备的时间戳token校验 @param time_stamp: 时间戳 @param time_stamp_token: 时间戳token @return: boolean True/False """ if not all([time_stamp_token, time_stamp]): return False try: token = CommonService.decode_data(time_stamp_token) if token != time_stamp: return False return True except Exception as e: print(e) return False @staticmethod def req_publish_mqtt_msg(identification_code, topic_name, msg, qos=1): """ 通用发布MQTT消息函数 @param identification_code: 标识码 @param topic_name: 主题名 @param msg: 消息内容 @param qos: mqtt qos等级 @return: boolean """ if not all([identification_code, topic_name]): return False if identification_code.endswith('11L'): thing_name = 'LC_' + identification_code else: thing_name = 'Ansjer_Device_' + identification_code try: # 获取数据组织将要请求的url iot = iotdeviceInfoModel.objects.filter( thing_name=thing_name).values( 'endpoint', 'token_iot_number') if not iot.exists(): return False endpoint = iot[0]['endpoint'] Token = iot[0]['token_iot_number'] # api doc: https://docs.aws.amazon.com/zh_cn/iot/latest/developerguide/http.html # url: https://IoT_data_endpoint/topics/url_encoded_topic_name?qos=1 # post请求url发布MQTT消息 url = 'https://{}/topics/{}?qos={}'.format(endpoint, topic_name, qos) authorizer_name = 'Ansjer_Iot_Auth' signature = CommonService.rsa_sign(Token) # Token签名 headers = { 'x-amz-customauthorizer-name': authorizer_name, 'Token': Token, 'x-amz-customauthorizer-signature': signature} r = requests.post(url=url, headers=headers, json=msg, timeout=2) if r.status_code == 200: res = r.json() if res['message'] == 'OK': return True return False else: return False except Exception as e: return False @staticmethod def rsa_sign(Token): # 私钥签名Token if not Token: return '' private_key_file = '''-----BEGIN RSA PRIVATE KEY----- MIIEpQIBAAKCAQEA5iJzEDPqtGmFMggekVro6C0lrjuC2BjunGkrFNJWpDYzxCzE X5jf4/Fq7hcIaQd5sqHugDxPVollSLPe9zNilbrd0sZfU+Ed8gRVuKW9KwfE9XFr L0pt6bKRQ0IIRfiZ9TuR0tsQysvcO1GZSXcYfPue3tGM1zOnWFThWDqZ06+sOxzt RMRl4yNfbpCG4MfxG3itNXOfrjZv2OMLSXrxmzubSvRpUYSvQPs4fm9302SAnySY 0MKzx6H6528ZQm/IDDSZy6EmNBIyTRDfxC56vnYcXvqedAQh7jJnjdvt6Q4MhASH eIYi1FBSdu2NT6wgpnrqXzx5pq9kR/lnsLID0wIDAQABAoIBAQCiF4GT1/1oNSpr ouxk1PNXFPWFUsVGD8mAwVJmx//eiY7MjfuCmdqYYmI+cFqsH2fIOeYSzGfVO9Dq 9EYHN1oovAWhf7eFDPpajFMUSyiCNmazub8VAAeKowtNpCTPo9pMsDh1m3aoYA4u ebrN0+Sbo16y8kWRDgDAZoiR7DSMs8lczk16hwfv5mw8XpNDbaL3Coi4Koe2S1Yh 2SX3vWFlpd7qF1ZYXuZIp+b8JPrV7n9eUKoFgzj0gqgwQK80CoexIjiOrNMPvkQa q+8kCvFjAzKxOK7e8gjM8lMRiGodb61kmYZkkJzFwWO4EaGbl34lfVECd1Ixp3tF be0OWAGBAoGBAPSteXDzzToD8ovM7LL11x0jWwI6HOiHu89kZtW566rIezjWBuA2 TxrcYKM3h9jQRXS3CsMdoIv6XGk5lqM8ADtjn23FBWe/THYLh8bm8JOgh5RRWQDg SvkLfi9Ih2mM4NJfmuuDOh3Nze2efLM7+kOZWUQwF2Zx9mL5jvRBk351AoGBAPDI sYmT2Li+i5+0vykA2m5uPF8ZOW8BGtAfCZv0suW7BNzSgin78g9WapRd/4p0NNiL /nVMqPPCpd1akCUpV+GDWQt0hV+HZjxANE0KWhciQRyo2qvo51j8SWILJSgh0tXC aTF8qt6oGw3VN3m57vKhbrlDaz0J/NDJFci6msAnAoGBAOuG6bXPGijUj+//DYKf n7jOxdZ49kboEePrtAncdHzri6IEdI3z+WXT6bpzw/LzWUimwldb96WHFNm9s8Hi Ch8hIODbnP5naUTgiIzw1XhmONyPCewL/F+LrqX5XVA/alNX8JrwsUrrR2WLAGLQ Q3I69XDsEjptTU2tCO0bCs3ZAoGBAJ2lCHfm0JHET230zONvp5N9oREyVqQSuRdh +syc3TQDyh85w/bw+X6JOaaCFHj1tFPC9Iqf8k4GNspCLPXnp54CfR4+38O3xnvU HWoDSRC0YKT++IxtJGriYrlKSr2Hx54kdvLriIPW1D+uRW/xCDza7L9nIKMKEvgv b4/IfOEpAoGAeKM9Te7T1VzlAkS0CJOwanzwYV/zrex84WuXxlsGgPQ871lTs5AP H1QLfLfFXH+UVrCEC2yv4eml/cqFkpB3gE5i4MQ8GPVIOSs5tsIyl8YUA03vdNdB GCqvlyw5dfxNA+EtxNE2wCW/LW7ENJlACgcfgPlBZtpLheWoZB/maw4= -----END RSA PRIVATE KEY-----''' # 使用密钥文件方式 # private_key_file_path = os.path.join(BASE_DIR, 'static/iotCore/private.pem')#.replace('\\', '/') # private_key_file = open(private_key_file_path, 'r') private_key = ct.load_privatekey(ct.FILETYPE_PEM, private_key_file) signature = ct.sign(private_key, Token.encode('utf8'), 'sha256') signature = encodebytes(signature).decode('utf8').replace('\n', '') # print('signature:', signature) return signature @staticmethod def get_payment_status_url(lang, payment_status): # 返回相应的支付状态url if lang == 'cn': file_name = 'success.html' if payment_status == 'success' else 'fail.html' else: file_name = 'en_success.html' if payment_status == 'success' else 'en_fail.html' pay_failed_url = "{}web/paid2/{}".format(SERVER_DOMAIN_SSL, file_name) return pay_failed_url # 根据uid查询序列号,存在则返回序列号,否则返uid @staticmethod def query_serial_with_uid(uid): device_info_qs = Device_Info.objects.filter(UID=uid).values('serial_number') if device_info_qs.exists(): serial_number = device_info_qs[0]['serial_number'] if serial_number: return serial_number return uid # 根据序列号查询uid,存在则返回uid,否则返回序列号 @staticmethod def query_uid_with_serial(serial_number): device_info_qs = Device_Info.objects.filter(serial_number=serial_number).values('UID') if device_info_qs.exists(): uid = device_info_qs[0]['UID'] if uid: return uid return serial_number @staticmethod def get_full_serial_number(uid, serial_number, device_type): """ 根据uid查询返回完整序列号 @param uid: uid @param serial_number: 9位序列号 @param device_type: 设备类型 @return: full_serial_number """ p2p_type = str(UIDModel.objects.filter(uid=uid).values('p2p_type')[0]['p2p_type']) # 设备类型转为16进制并补齐4位 device_type = hex(device_type)[2:] device_type = (4 - len(device_type)) * '0' + device_type full_serial_number = serial_number + p2p_type + device_type return full_serial_number # 根据企业标识返回物品名 @staticmethod def get_thing_name(company_mark, thing_name_suffix): if company_mark == '11A': return 'Ansjer_Device_' + thing_name_suffix elif company_mark == '11L': return 'LC_' + thing_name_suffix else: return thing_name_suffix @staticmethod def confirm_region_id(region_country=0): """ 根据配置信息和国家确定region_id @param region_country: 用户国家id @return: region_id """ region_id = 3 if CONFIG_INFO == CONFIG_US: # 美洲 # 中东地区国家id if region_country in ME_COUNTRY_ID_LIST: region_id = 6 # 东亚地区国家id elif region_country in EA_COUNTRY_ID_LIST: region_id = 2 elif CONFIG_INFO == CONFIG_EUR: # 欧洲 region_id = 4 elif CONFIG_INFO == CONFIG_CN: # 中国 region_id = 1 elif CONFIG_INFO == CONFIG_TEST: # 测试 region_id = 5 return region_id @staticmethod def verify_token_get_user_id(request_dict, request): """ 认证token,获取user id @param request_dict: 请求参数 @param request: 请求体 @return: token_obj.code, token_obj.userID, response """ try: token_obj = TokenObject(request.META.get('HTTP_AUTHORIZATION')) lang = request_dict.get('lang', None) response = ResponseObject(lang if lang else token_obj.lang) return token_obj.code, token_obj.userID, response except Exception as e: print(e) return 309, None, None @staticmethod def cutting_time(start_time, end_time, time_unit): """ 按时间单位切割时间段 @param start_time: 开始时间 @param end_time: 结束时间 @param time_unit: 时间单位 @return: time_list 切割后的时间列表 """ time_list = [] while True: if time_unit == 'day': temp_time = start_time + relativedelta(days=1) elif time_unit == 'week': temp_time = start_time + relativedelta(days=7) elif time_unit == 'month': temp_time = start_time + relativedelta(months=1) elif time_unit == 'quarter': temp_time = start_time + relativedelta(months=3) elif time_unit == 'year': temp_time = start_time + relativedelta(years=1) else: break if temp_time < end_time: time_tuple = (CommonService.str_to_timestamp(start_time.strftime('%Y-%m-%d %H:%M:%S')), CommonService.str_to_timestamp(temp_time.strftime('%Y-%m-%d %H:%M:%S'))) time_list.append(time_tuple) start_time = temp_time else: time_tuple = (CommonService.str_to_timestamp(start_time.strftime('%Y-%m-%d %H:%M:%S')), CommonService.str_to_timestamp(end_time.strftime('%Y-%m-%d %H:%M:%S'))) if time_tuple not in time_list: time_list.append(time_tuple) break if not time_list: time_tuple = (CommonService.str_to_timestamp(start_time.strftime('%Y-%m-%d %H:%M:%S')), CommonService.str_to_timestamp(end_time.strftime('%Y-%m-%d %H:%M:%S'))) time_list = [time_tuple] return time_list @staticmethod def cutting_time_stamp(start_time, end_time): """ 按天切割时间段 @param start_time: 开始时间 @param end_time: 结束时间 @return: time_list 切割后的时间列表 """ time_list = [] while True: mid_time = datetime.datetime(start_time.year, start_time.month, start_time.day) + relativedelta(days=1) if mid_time < end_time: time_tuple = (CommonService.str_to_timestamp(start_time.strftime('%Y-%m-%d %H:%M:%S')), CommonService.str_to_timestamp(mid_time.strftime('%Y-%m-%d %H:%M:%S'))) time_list.append(time_tuple) start_time = mid_time else: time_tuple = (CommonService.str_to_timestamp(start_time.strftime('%Y-%m-%d %H:%M:%S')), CommonService.str_to_timestamp(end_time.strftime('%Y-%m-%d %H:%M:%S'))) if time_tuple not in time_list: time_list.append(time_tuple) break if not time_list: time_tuple = (CommonService.str_to_timestamp(start_time.strftime('%Y-%m-%d %H:%M:%S')), CommonService.str_to_timestamp(end_time.strftime('%Y-%m-%d %H:%M:%S'))) time_list = [time_tuple] return time_list @staticmethod def get_domain_name(): """ 获取域名 @return: domain_name_list 域名列表 """ if CONFIG_INFO == CONFIG_TEST: domain_name_list = [SERVER_DOMAIN_TEST[:-1]] elif CONFIG_INFO == CONFIG_CN or CONFIG_INFO == CONFIG_US or CONFIG_INFO == CONFIG_EUR: domain_name_list = [SERVER_DOMAIN_US[:-1], SERVER_DOMAIN_CN[:-1], SERVER_DOMAIN_EUR[:-1]] else: domain_name_list = [] return domain_name_list @staticmethod def get_orders_domain_name_list(): """ 获取其他服务器域名列表 @return: orders_domain_name_list 其他服务器域名列表 """ orders_domain_name_list = SERVER_DOMAIN_LIST if CONFIG_INFO == CONFIG_TEST: orders_domain_name_list = [SERVER_DOMAIN_CN, SERVER_DOMAIN_US, SERVER_DOMAIN_EUR] elif CONFIG_INFO == CONFIG_CN: orders_domain_name_list = [SERVER_DOMAIN_TEST, SERVER_DOMAIN_US, SERVER_DOMAIN_EUR] elif CONFIG_INFO == CONFIG_US: orders_domain_name_list = [SERVER_DOMAIN_TEST, SERVER_DOMAIN_CN, SERVER_DOMAIN_EUR] elif CONFIG_INFO == CONFIG_EUR: orders_domain_name_list = [SERVER_DOMAIN_TEST, SERVER_DOMAIN_CN, SERVER_DOMAIN_US] return orders_domain_name_list @staticmethod def list_sort(e): """ 列表排序 @param e: 列表元素 """ return sorted(e, key=lambda item: -item['count']) @staticmethod def list_sort_v2(e, order_by): """ 列表排序 @param e: 列表元素 @param order_by: 排序对象 """ return sorted(e, key=lambda item: item[order_by], reverse=True) @staticmethod def Package_Type(order_type, content): """ 套餐类型 """ if order_type == 0: content = content + '(' + '云存' + ')' return content elif order_type == 1: content = content + '(' + 'AI' + ')' return content elif order_type == 2: pass elif order_type == 4: content = content + '(' + '云盘' + ')' return content @staticmethod def is_cloud_device(ucode, device_type): """ 设备是否支持云存 @param ucode: 设备版本 @param device_type: 设备类型 """ if len(ucode) > 4: number = ucode[-4] else: return False device_type_qs = AppDeviceType.objects.filter(type=device_type).values('model') model = device_type_qs[0]['model'] if device_type_qs.exists() else '' # 判断设备是否为ipc设备和是否支持云存 if model == 2 and number in ['4', '5']: return True return False @staticmethod def negative_number_judgment(number_list): """ 判断正负数 @param number_list: float或int类型列表 """ if any(i < 0 for i in number_list): return False else: return True @staticmethod def check_password(password1, password2): """ 比较密码 @param 返回True or False """ return constant_time_compare(password1, password2) @staticmethod def compare_version_number(version_number, version_number_list): """ 比对版本号大小 @param version_number: 版本号 @param version_number_list: 版本号列表 """ version_list = [] input_version = LooseVersion(version_number) for version in version_number_list: version = LooseVersion(version) if input_version >= version: version_list.append(version) else: continue return version_list @staticmethod def convert_to_timestamp(timezone_offset, time_string): """ 时间字符串转为时间戳 @param timezone_offset: 时区 @param time_string: 时间字符串 @return: timestamp """ datetime_obj = datetime.datetime.strptime(time_string, '%Y-%m-%d %H:%M:%S') # 创建一个表示指定时区的timedelta对象 utc_offset = datetime.timedelta(hours=timezone_offset) # 调整时区 datetime_obj = datetime_obj - utc_offset # datetime.datetime对象 -> str time_str_utc = datetime_obj.strftime("%Y-%m-%d %H:%M:%S") timestamp = calendar.timegm(time.strptime(time_str_utc, '%Y-%m-%d %H:%M:%S')) return timestamp @staticmethod def get_uid_by_serial_number(serial_number): """ 根据序列号获取绑定uid @param serial_number: 9位序列号 @return: uid信息 """ c_serial_qs = UIDCompanySerialModel.objects.filter(company_serial__serial_number=serial_number[0:6]) if not c_serial_qs.exists(): return serial_number c_serial_info = c_serial_qs.values('uid__uid') return c_serial_info[0]['uid__uid'] @staticmethod def get_uids_by_serial_numbers(serial_numbers): """ 根据多个序列号获取绑定uid列表 @param serial_numbers: 多个6位序列号列表 @return: uid列表 """ if not serial_numbers: return [] # 确保所有序列号都是6位 serial_list = [serial[:6] for serial in serial_numbers if serial] if not serial_list: return [] # 查询所有匹配的序列号 c_serial_qs = UIDCompanySerialModel.objects.filter(company_serial__serial_number__in=serial_list) # 如果没有匹配记录,返回原序列号列表 if not c_serial_qs.exists(): return serial_numbers # 获取所有匹配的uid uid_list = list(c_serial_qs.values_list('uid__uid', flat=True)) return uid_list @staticmethod def get_serial_number_by_uid(uid): """ 根据序列号获取绑定uid @param uid: uid @return: uid信息 """ c_serial_qs = UIDCompanySerialModel.objects.filter(uid__uid=uid) if not c_serial_qs.exists(): return uid c_serial_qs = c_serial_qs.annotate(mark=F('company_serial__company__mark'), serial_number=F('company_serial__serial_number')) c_serial_info = c_serial_qs.values('mark', 'serial_number') return c_serial_info[0]['serial_number'] + c_serial_info[0]['mark'] @staticmethod def get_user_tz(user_id): """ 获取用户时区 @param user_id: 用户id @return: tz """ # 从gateway_push表查询时区 gateway_push_qs = GatewayPush.objects.filter(user_id=user_id).order_by('-id').first() if gateway_push_qs is None: tz = 0.00 else: # 截掉.00然后转为浮点型 tz = float(gateway_push_qs.tz[:-3]) return tz @staticmethod def update_alexa_events(data_list): """ 请求Alexa服务器更新事件网关 邮件提醒捕获的异常 @param data_list: 数据列表 @return: """ try: data_list = json.dumps(data_list) data = {'data_list': data_list} url = ALEXA_DOMAIN + 'deviceStatus/addOrUpdateV2' requests.post(url, data=data, timeout=30) except Exception as e: S3Email().faEmail( '请求Alexa服务器更新事件网关异常:error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)), 'servers@ansjer.com') pass @classmethod def confirm_msg_sign_name(cls, sign_name, phone): """ 确认短信签名 @param sign_name: app签名标识 @param phone: 手机号 @return: """ if sign_name == 'zosi': sign_name = '周视' elif sign_name == 'vsees': # 微瞳移动号码使用 Ansjer 签名 is_china_mobile = cls.is_china_mobile(phone) if is_china_mobile: sign_name = 'Ansjer' else: sign_name = '微瞳' else: sign_name = 'Ansjer' return sign_name @staticmethod def is_china_mobile(phone): """ 检查手机号码是否属于中国移动 :param phone: 手机号码字符串(11位数字) :return: 如果是移动号码返回True,否则False """ # 正则表达式匹配中国移动号段 pattern = r'^1(3[4-9]|4[7]|5[0-27-9]|7[28]|8[2-47-8]|9[58])\d{8}$' return bool(re.fullmatch(pattern, phone)) @staticmethod def confirm_msg_sign_name_with_phone(phone): """ 根据手机号确认短信签名 @param phone: 手机号码 @return: """ sign_name = '周视' # 根据用户APP包名确定短信签名 device_user_qs = Device_User.objects.filter(username=phone).values('userID') if device_user_qs.exists(): user_id = device_user_qs[0]['userID'] user_ex_qs = UserExModel.objects.filter(userID=user_id).values('appBundleId') if user_ex_qs.exists(): if user_ex_qs[0]['appBundleId'] in ['com.cloudlife.commissionf', 'com.cloudlife.commissionf_a']: sign_name = '微瞳' return sign_name