# -*- coding: utf-8 -*- # 高复用性函数封装到CommonService类 import base64 import datetime import time from pathlib import Path from random import Random import ipdb import simplejson as json from django.core import serializers from django.utils import timezone from pyipip import IPIPDatabase from Ansjer.config import BASE_DIR, UNICODE_ASCII_CHARACTER_SET import OpenSSL.crypto as ct from base64 import encodebytes from Controller.CheckUserData import RandomStr from Service.ModelService import ModelService class CommonService: # 添加模糊搜索 @staticmethod def get_kwargs(data={}): kwargs = {} for (k, v) in data.items(): if v is not None and v != u'': kwargs[k + '__icontains'] = v return kwargs # 定义静态方法 # 格式化query_set转dict @staticmethod def qs_to_dict(query_set): sqlJSON = serializers.serialize('json', query_set) sqlList = json.loads(sqlJSON) sqlDict = dict(zip(["datas"], [sqlList])) return sqlDict # 格式化query_set转dict @staticmethod def request_dict_to_dict(request_dict): # 传参格式转换,键包含meta获取meta[]中的值,值'true'/'false'转为True,False key_list = [] value_list = [] for k, v in request_dict.items(): key_list.append(k[k.index('[')+1:k.index(']')] if 'meta' in k else k) if v == 'true': v = True elif v == 'false': v = False value_list.append(v) data_dict = dict(zip(key_list, value_list)) print(data_dict) return data_dict # 获取文件大小 @staticmethod def get_file_size(file_path='', suffix_type='', decimal_point=0): # for x in ['bytes', 'KB', 'MB', 'GB', 'TB']: # path = Path() / 'D:/TestServer/123444.mp4' path = Path() / file_path size = path.stat().st_size mb_size = 0.0 if suffix_type == 'MB': mb_size = size / 1024.0 / 1024.0 if decimal_point != 0: mb_size = round(mb_size, decimal_point) return mb_size @staticmethod def get_param_flag(data=[]): # print(data) flag = True for v in data: if v is None: flag = False break return flag @staticmethod def get_ip_address(request): """ 获取ip地址 :param request: :return: """ try: real_ip = request.META['HTTP_X_FORWARDED_FOR'] clientIP = real_ip.split(",")[0] except: try: clientIP = request.META['REMOTE_ADDR'] except Exception as e: clientIP = '' return clientIP # @获取一天每个小时的datetime.datetime @staticmethod def getTimeDict(times): time_dict = {} t = 0 for x in range(24): if x < 10: x = '0' + str(x) else: x = str(x) a = times.strftime("%Y-%m-%d") + " " + x + ":00:00" time_dict[t] = timezone.datetime.strptime(a, '%Y-%m-%d %H:%M:%S') t += 1 return time_dict # 根据ip获取地址 @staticmethod def getAddr(ip): print('start_time=' + str(time.time())) base_dir = BASE_DIR # ip数据库 db = IPIPDatabase(base_dir + '/DB/17monipdb.dat') addr = db.lookup(ip) # ModelService.add_tmp_log(addr) ts = addr.split('\t')[0] print('end_time=' + str(time.time())) return ts # 通过ip检索ipip指定信息 lang为CN或EN @staticmethod def getIpIpInfo(ip, lang, update=False): ipbd_dir = BASE_DIR + "/DB/mydata4vipday2.ipdb" db = ipdb.City(ipbd_dir) if update: rr = db.reload(ipbd_dir) info = db.find_map(ip, lang) return info @staticmethod def getUserID(userPhone='13800138000', getUser=True, setOTAID=False, μs=True): if μs == True: if getUser == True: timeID = str(round(time.time() * 1000000)) userID = timeID + userPhone return userID else: if setOTAID == False: timeID = str(round(time.time() * 1000000)) ID = userPhone + timeID return ID else: timeID = str(round(time.time() * 1000000)) eID = '13800' + timeID + '138000' return eID else: if getUser == True: timeID = str(round(time.time() * 1000)) userID = timeID + userPhone return userID else: if setOTAID == False: timeID = str(round(time.time() * 1000)) ID = userPhone + timeID return ID else: timeID = str(round(time.time() * 1000)) eID = '13800' + timeID + '138000' return eID # 生成随机数 @staticmethod def RandomStr(randomlength=8, number=True): str = '' if number == False: characterSet = 'AaBbCcDdEeFfGgHhIiJjKkLlMmNnOoPpQqRrSsT' \ 'tUuVvWwXxYyZz0123456789' else: characterSet = '0123456789' length = len(characterSet) - 1 random = Random() for index in range(randomlength): str += characterSet[random.randint(0, length)] return str # 生成订单好 @staticmethod def createOrderID(): random_id = CommonService.RandomStr(6, True) order_id = datetime.datetime.now().strftime('%Y%m%d%H%M%S') + str(random_id) print('orderID:') print(order_id) return order_id # qs转换list datetime处理 @staticmethod def qs_to_list(qs): res = [] # print(qs) for ps in qs: try: if 'time' in ps: ps['time'] = ps['time'].strftime("%Y-%m-%d %H:%M:%S") if 'add_time' in ps: ps['add_time'] = ps['add_time'].strftime("%Y-%m-%d %H:%M:%S") if 'update_time' in ps: ps['update_time'] = ps['update_time'].strftime("%Y-%m-%d %H:%M:%S") if 'end_time' in ps: ps['end_time'] = ps['end_time'].strftime("%Y-%m-%d %H:%M:%S") if 'data_joined' in ps: if ps['data_joined']: ps['data_joined'] = ps['data_joined'].strftime("%Y-%m-%d %H:%M:%S") else: ps['data_joined'] = '' if 'userID__data_joined' in ps: if ps['userID__data_joined']: ps['userID__data_joined'] = ps['userID__data_joined'].strftime("%Y-%m-%d %H:%M:%S") else: ps['userID__data_joined'] = '' except Exception as e: pass res.append(ps) return res # 获取当前时间 @staticmethod def get_now_time_str(n_time, tz, lang): print(n_time) print(tz) print(lang) n_time = int(n_time) + 3600 * float(tz) if lang == 'cn': return time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime(int(n_time))) else: return time.strftime('%m-%d-%Y %H:%M:%S', time.gmtime(int(n_time))) # 生成随机数 @staticmethod def encrypt_data(randomlength=8, number=False): str = '' if number == False: characterSet = 'AaBbCcDdEeFfGgHhIiJjKkLlMmNnOoPpQqRrSsT' \ 'tUuVvWwXxYyZz0123456789' else: characterSet = '0123456789' length = len(characterSet) - 1 random = Random() for index in range(randomlength): str += characterSet[random.randint(0, length)] return str @staticmethod def decode_data(content, start=1, end=4): if not content: return '' try: for i in range(start, end): if i == 1: content = base64.b64decode(content) content = content.decode('utf-8') content = content[1:-1] if i == 2: content = base64.b64decode(content) content = content.decode('utf-8') content = content[2:-2] if i == 3: content = base64.b64decode(content) content = content.decode('utf-8') content = content[3:-3] return content except Exception as e: print(e) return None @staticmethod def encode_data(content, start=1, end=4): if not content: return '' for i in range(start, end): if i == 1: content = RandomStr(3, False)+content+RandomStr(3, False) content = base64.b64encode(str(content).encode("utf-8")).decode('utf8') if i == 2: content = RandomStr(2, False)+str(content)+RandomStr(2, False) content = base64.b64encode(str(content).encode("utf-8")).decode('utf8') if i == 3: content = RandomStr(1, False)+str(content)+RandomStr(1, False) content = base64.b64encode(str(content).encode("utf-8")).decode('utf8') return content #把格式化时间转换成时间戳 @staticmethod def str_to_timestamp(str_time=None, format='%Y-%m-%d %H:%M:%S'): if str_time: time_tuple = time.strptime(str_time, format) # 把格式化好的时间转换成元祖 result = time.mktime(time_tuple) # 把时间元祖转换成时间戳 return int(result) return int(time.time()) # 把时间戳转换成格式化 @staticmethod def timestamp_to_str(timestamp=None, format='%Y-%m-%d %H:%M:%S'): if timestamp: time_tuple = time.localtime(timestamp) # 把时间戳转换成时间元祖 result = time.strftime(format, time_tuple) # 把时间元祖转换成格式化好的时间 return result else: return time.strptime(format) #计算N个月后的时间戳 @staticmethod def calcMonthLater(addMonth, unix_timestamp=None): if unix_timestamp: now_year = time.localtime(unix_timestamp).tm_year now_month = time.localtime(unix_timestamp).tm_mon now_day = time.localtime(unix_timestamp).tm_mday now_hour = time.localtime(unix_timestamp).tm_hour now_min = time.localtime(unix_timestamp).tm_min now_second = time.localtime(unix_timestamp).tm_sec else: now_year = datetime.datetime.now().year now_month = datetime.datetime.now().month now_day = datetime.datetime.now().day now_hour = datetime.datetime.now().hour now_min = datetime.datetime.now().minute now_second = datetime.datetime.now().second for add in range(addMonth): if now_month == 12: now_year += 1 now_month = 1 else: now_month += 1 for is_format in range(4): try: date_format = '{now_year}-{now_month}-{now_day} {now_hour}:{now_min}:{now_second}' \ .format(now_year=now_year,now_month=now_month,now_day=now_day,now_hour=now_hour, now_min=now_min,now_second=now_second) timestamps = CommonService.str_to_timestamp(date_format) except Exception as e: if str(e) == 'day is out of range for month': now_day = now_day - 1 return timestamps @staticmethod def updateMac(mac: str): macArray = mac.split(':') macArray[0] = int(macArray[0], 16) macArray[1] = int(macArray[1], 16) macArray[2] = int(macArray[2], 16) first = int(macArray[5], 16) second = int(macArray[4], 16) three = int(macArray[3], 16) # print(macArray) # print(first) # print(second) # print(three) if first == 255 and second == 255 and three == 255: return None first += 1 if first / 256 == 1: second += 1 first = first % 256 if second / 256 == 1: three += 1 second = second % 256 macArray[3] = three macArray[4] = second macArray[5] = first # print(macArray) tmp = ':'.join(map(lambda x: "%02x" % x, macArray)) # print(tmp) return tmp.upper() @staticmethod def decode_data(content, start=1, end=4): if not content: return '' try: for i in range(start, end): if i == 1: content = base64.b64decode(content) content = content.decode('utf-8') content = content[1:-1] if i == 2: content = base64.b64decode(content) content = content.decode('utf-8') content = content[2:-2] if i == 3: content = base64.b64decode(content) content = content.decode('utf-8') content = content[3:-3] print(content) return content except Exception as e: print(e) return None @staticmethod def encode_data(content, start=1, end=4): if not content: return '' for i in range(start, end): if i == 1: content = CommonService.RandomStr(3, False) + content + CommonService.RandomStr(3, False) content = base64.b64encode(str(content).encode("utf-8")).decode('utf8') if i == 2: content = CommonService.RandomStr(2, False) + str(content) + CommonService.RandomStr(2, False) content = base64.b64encode(str(content).encode("utf-8")).decode('utf8') if i == 3: content = CommonService.RandomStr(1, False) + str(content) + CommonService.RandomStr(1, False) content = base64.b64encode(str(content).encode("utf-8")).decode('utf8') return content @staticmethod def encode_data_without_salt(content): return base64.b64encode(str(content).encode("utf-8")).decode('utf8') @staticmethod def check_time_stamp_token(token, time_stamp): # 时间戳token校验 if not all([token, time_stamp]): return False try: token = int(CommonService.decode_data(token)) time_stamp = int(time_stamp) now_time = int(time.time()) distance = now_time - time_stamp if token != time_stamp or distance > 60000 or distance < -60000: # 为了全球化时间控制在一天内 return False return True except Exception as e: print(e) return False @staticmethod def rsa_sign(Token): # 私钥签名Token if not Token: return '' private_key_file = '''-----BEGIN RSA PRIVATE KEY----- MIIEpQIBAAKCAQEA5iJzEDPqtGmFMggekVro6C0lrjuC2BjunGkrFNJWpDYzxCzE X5jf4/Fq7hcIaQd5sqHugDxPVollSLPe9zNilbrd0sZfU+Ed8gRVuKW9KwfE9XFr L0pt6bKRQ0IIRfiZ9TuR0tsQysvcO1GZSXcYfPue3tGM1zOnWFThWDqZ06+sOxzt RMRl4yNfbpCG4MfxG3itNXOfrjZv2OMLSXrxmzubSvRpUYSvQPs4fm9302SAnySY 0MKzx6H6528ZQm/IDDSZy6EmNBIyTRDfxC56vnYcXvqedAQh7jJnjdvt6Q4MhASH eIYi1FBSdu2NT6wgpnrqXzx5pq9kR/lnsLID0wIDAQABAoIBAQCiF4GT1/1oNSpr ouxk1PNXFPWFUsVGD8mAwVJmx//eiY7MjfuCmdqYYmI+cFqsH2fIOeYSzGfVO9Dq 9EYHN1oovAWhf7eFDPpajFMUSyiCNmazub8VAAeKowtNpCTPo9pMsDh1m3aoYA4u ebrN0+Sbo16y8kWRDgDAZoiR7DSMs8lczk16hwfv5mw8XpNDbaL3Coi4Koe2S1Yh 2SX3vWFlpd7qF1ZYXuZIp+b8JPrV7n9eUKoFgzj0gqgwQK80CoexIjiOrNMPvkQa q+8kCvFjAzKxOK7e8gjM8lMRiGodb61kmYZkkJzFwWO4EaGbl34lfVECd1Ixp3tF be0OWAGBAoGBAPSteXDzzToD8ovM7LL11x0jWwI6HOiHu89kZtW566rIezjWBuA2 TxrcYKM3h9jQRXS3CsMdoIv6XGk5lqM8ADtjn23FBWe/THYLh8bm8JOgh5RRWQDg SvkLfi9Ih2mM4NJfmuuDOh3Nze2efLM7+kOZWUQwF2Zx9mL5jvRBk351AoGBAPDI sYmT2Li+i5+0vykA2m5uPF8ZOW8BGtAfCZv0suW7BNzSgin78g9WapRd/4p0NNiL /nVMqPPCpd1akCUpV+GDWQt0hV+HZjxANE0KWhciQRyo2qvo51j8SWILJSgh0tXC aTF8qt6oGw3VN3m57vKhbrlDaz0J/NDJFci6msAnAoGBAOuG6bXPGijUj+//DYKf n7jOxdZ49kboEePrtAncdHzri6IEdI3z+WXT6bpzw/LzWUimwldb96WHFNm9s8Hi Ch8hIODbnP5naUTgiIzw1XhmONyPCewL/F+LrqX5XVA/alNX8JrwsUrrR2WLAGLQ Q3I69XDsEjptTU2tCO0bCs3ZAoGBAJ2lCHfm0JHET230zONvp5N9oREyVqQSuRdh +syc3TQDyh85w/bw+X6JOaaCFHj1tFPC9Iqf8k4GNspCLPXnp54CfR4+38O3xnvU HWoDSRC0YKT++IxtJGriYrlKSr2Hx54kdvLriIPW1D+uRW/xCDza7L9nIKMKEvgv b4/IfOEpAoGAeKM9Te7T1VzlAkS0CJOwanzwYV/zrex84WuXxlsGgPQ871lTs5AP H1QLfLfFXH+UVrCEC2yv4eml/cqFkpB3gE5i4MQ8GPVIOSs5tsIyl8YUA03vdNdB GCqvlyw5dfxNA+EtxNE2wCW/LW7ENJlACgcfgPlBZtpLheWoZB/maw4= -----END RSA PRIVATE KEY-----''' # 使用密钥文件方式 # private_key_file_path = os.path.join(BASE_DIR, 'static/iotCore/private.pem')#.replace('\\', '/') # private_key_file = open(private_key_file_path, 'r') private_key = ct.load_privatekey(ct.FILETYPE_PEM, private_key_file) signature = ct.sign(private_key, Token.encode('utf8'), 'sha256') signature = encodebytes(signature).decode('utf8').replace('\n', '') # print('signature:', signature) return signature