# -*- coding: utf-8 -*- import simplejson as json from django.core import serializers from pyipip import IPIPDatabase import redis,time,base64 from pathlib import Path from django.utils import timezone from Ansjer.settings import SERVER_HOST from django_global_request.middleware import get_request from Ansjer.settings import BASE_DIR from random import Random # 复用性且公用较高封装代码在这 class CommonService: # 返回数据格式化 @staticmethod def response_formal(data={'code': '', 'reason': '', 'result': {}}): resJSON = json.dumps( { "result_code": data['code'], "reason": data['reason'], "result": data['result'], "error_code": data['code'], }, ensure_ascii=False) return resJSON # 添加模糊搜索 @staticmethod def get_kwargs(data={}): kwargs = {} for (k, v) in data.items(): if v is not None and v != u'': kwargs[k + '__icontains'] = v return kwargs # 定义静态方法 # 格式化query_set转dict @staticmethod def qs_to_dict(query_set): sqlJSON = serializers.serialize('json', query_set) sqlList = json.loads(sqlJSON) sqlDict = dict(zip(["datas"], [sqlList])) return sqlDict @staticmethod def set_redis_data(key, val, expire=0): POOL = redis.ConnectionPool(host=SERVER_HOST, port=6379) CONN = redis.Redis(connection_pool=POOL) CONN.set(key, val) if expire > 0: CONN.expire(key, expire) return True @staticmethod def get_redis_data(key): POOL = redis.ConnectionPool(host=SERVER_HOST, port=6379) CONN = redis.Redis(connection_pool=POOL) try: val = CONN.get(key) except Exception as e: return False else: if val == b'False': return False if val: return val else: return False @staticmethod def del_redis_data(key): POOL = redis.ConnectionPool(host=SERVER_HOST, port=6379) CONN = redis.Redis(connection_pool=POOL) val = CONN.delete(key) if val: return True else: return False # 获取文件大小 @staticmethod def get_file_size(file_path='', suffix_type='', decimal_point=0): # for x in ['bytes', 'KB', 'MB', 'GB', 'TB']: # path = Path() / 'D:/TestServer/123444.mp4' path = Path() / file_path size = path.stat().st_size mb_size = 0.0 if suffix_type == 'MB': mb_size = size / 1024.0 / 1024.0 if decimal_point != 0: mb_size = round(mb_size, decimal_point) return mb_size @staticmethod def get_param_flag(data=[]): print(data) flag = True for v in data: if v is None: flag = False break return flag @staticmethod def get_ip_address(request): """ 获取ip地址 :param request: :return: """ ip = request.META.get("HTTP_X_FORWARDED_FOR", "") if not ip: ip = request.META.get('REMOTE_ADDR', "") client_ip = ip.split(",")[-1].strip() if ip else "" return client_ip # @获取一天每个小时的datetime.datetime @staticmethod def getTimeDict(times): time_dict = {} t = 0 for x in range(24): if x < 10: x = '0' + str(x) else: x = str(x) a = times.strftime("%Y-%m-%d") + " " + x + ":00:00" time_dict[t] = timezone.datetime.strptime(a, '%Y-%m-%d %H:%M:%S') t += 1 return time_dict #根据ip获取地址 @staticmethod def getAddr(ip): base_dir = BASE_DIR # ip数据库 db = IPIPDatabase(base_dir+'/DB/17monipdb.dat') addr = db.lookup(ip) ts = addr.split('\t')[0] return ts #getMoreID @staticmethod def getMoreID(uPhone='13800138000', μs='uID'): if μs == 'uID': userID = str(round(time.time() * 1000000)) + uPhone return userID elif μs == 'otaID': otaID = '13800138000' + str(round(time.time() * 1000000)) return otaID elif μs == 'eID': eID = '13800' + str(round(time.time() * 1000000)) + '138000' return eID elif μs == 'ID': eID = '13800' + str(round(time.time() * 1000000)) + '138000' return eID elif μs == 'gID': gID = '10000' + str(round(time.time() * 1000000)) + '100001' return gID elif μs == 'rID': rID = '10010' + str(round(time.time() * 1000000)) + '100011' return rID else: defaultID = '10000100001' + str(round(time.time() * 1000000)) return defaultID # getMoreID @staticmethod def getUserID(userPhone='13800138000', getUser=True, setOTAID=False, μs=True): if μs == True: if getUser == True: timeID = str(round(time.time() * 1000000)) userID = timeID + userPhone return userID else: if setOTAID == False: timeID = str(round(time.time() * 1000000)) ID = userPhone + timeID return ID else: timeID = str(round(time.time() * 1000000)) eID = '13800' + timeID + '138000' return eID else: if getUser == True: timeID = str(round(time.time() * 1000)) userID = timeID + userPhone return userID else: if setOTAID == False: timeID = str(round(time.time() * 1000)) ID = userPhone + timeID return ID else: timeID = str(round(time.time() * 1000)) eID = '13800' + timeID + '138000' return eID @staticmethod def get_request_val(key): request = get_request() if request.method == 'GET': request_dict = request.GET if request.method == 'POST': request_dict = request.POST val = request_dict.get(key, None) return val @staticmethod def get_userID_byT(token): if token == 'stest': userID = '151547867345163613800138001' # var_dump(userID) return userID if token == 'sformal': userID = '151564262337939513800138001' return userID try: access_token = token[::-1] if len(access_token) < 18: return None atoken = access_token[:12] + access_token[18:] try: token_str = base64.urlsafe_b64decode(atoken).decode('utf-8') except Exception as e: print('base64 decode error: %s' % repr(e)) else: token_list = token_str.split('&') if len(token_list) != 2: return None ts_str = token_list[0] jsonDict = json.loads(ts_str) userID = jsonDict.get('userID', None) except Exception as e: return None else: return userID @staticmethod def req_path(request, ssl=False): get_host = request.get_host() # 获取请求地址 if ssl is True: tran_p = "https://" else: tran_p = "http://" url_all = tran_p + get_host return url_all # 生成随机数 @staticmethod def RandomStr(randomlength=8, number=True): str = '' if number == False: characterSet = 'AaBbCcDdEeFfGgHhIiJjKkLlMmNnOoPpQqRrSsT' \ 'tUuVvWwXxYyZz0123456789' else: characterSet = '0123456789' length = len(characterSet) - 1 random = Random() for index in range(randomlength): str += characterSet[random.randint(0, length)] return str