# -*- coding: utf-8 -*- from datetime import datetime from calendar import timegm from Model.models import AuthToken_Token, Device_User import time, base64, hmac, random, string,simplejson as json import traceback from django.utils.timezone import utc import datetime as utdatetime from Service.ResponseService import ResponseFormal from Ansjer.config import EXPIRATION_DELTA,REFRESH_EXPIRATION_DELTA def SQLManager(content): tokenID_id = content.get('tokenID_id', None) if tokenID_id != None: tokenIDValid = AuthToken_Token.objects.filter(tokenID_id=tokenID_id) if tokenIDValid: try: content.pop('tokenID_id') tokenIDValid.update(**content) tokenIDValid.update(last_update=utdatetime.datetime.utcnow().replace(tzinfo=utc).astimezone(utc)) except Exception as e: errorInfo = traceback.format_exc() print('更新Token错误: %s ' % errorInfo) return ResponseFormal(302,{'details':repr(e)}) else: return ResponseFormal(0) else: try: authToken = AuthToken_Token(**content) authToken.save() except Exception as e: errorInfo = traceback.format_exc() print('添加Token错误: %s ' % errorInfo) return ResponseFormal(301,{'details':repr(e)}) else: return ResponseFormal(0) else: return ResponseFormal(312) class JSONTokenManager: def __init__(self): self.iCode = '' self.errormsg = '' self.accessDict = {} self. refreshDict = {} def getSalt(self, strLen = 6): """ 获取指定长度strLen的字符串 :param strLen: :return: 返回获取到指定长度的字符串 """ salt = ''.join(random.sample(string.ascii_letters + string.digits, strLen)) return salt def generate_AToken(self, JSON , iCode): """ :param JSON: :param iCode: :return: """ orig_iat = datetime.utcnow().utctimetuple() exp_at = (datetime.utcnow() + EXPIRATION_DELTA).utctimetuple() exp_rt = (datetime.utcnow() + REFRESH_EXPIRATION_DELTA).utctimetuple() userJSON = json.loads(JSON) expDict = {'orig_iat': timegm(orig_iat), 'exp_at': timegm(exp_at), 'exp_rt': timegm(exp_rt)} userJSON.update(expDict) exp_rtJSON = json.dumps(userJSON, ensure_ascii=False) userJSON.pop('exp_rt') exp_atJSON = json.dumps(userJSON, ensure_ascii=False) print(exp_rtJSON, exp_atJSON) hmac_at = hmac.new(exp_atJSON.encode('utf-8'), iCode.encode('utf-8')).hexdigest() hmac_rt = hmac.new(exp_rtJSON.encode('utf-8'), iCode.encode('utf-8')).hexdigest() access_token = exp_atJSON + '&' + hmac_at refresh_token = exp_rtJSON + '&' + hmac_rt b64_access_token = base64.urlsafe_b64encode(access_token.encode("utf-8")) b64_refresh_token = base64.urlsafe_b64encode(refresh_token.encode("utf-8")) tokenDict = {'access_token': b64_access_token.decode('utf-8'), 'refresh_token': b64_refresh_token.decode('utf-8'), \ 'iCode': iCode, 'mCode': userJSON.get('mCode', '')} tokenDict['tokenID_id'] = userJSON.get('userID', None) print(tokenDict) sqlJSON = SQLManager(content=tokenDict) sqlDict = json.loads(sqlJSON) error_code = sqlDict.get('error_code', None) if error_code != None and error_code == 0: b64_alist = list(b64_access_token.decode('utf-8')) b64_alist.insert(12, self.getSalt(strLen=6)) b64_atoken = ''.join(b64_alist) b64_rflist = list(b64_refresh_token.decode('utf-8')) b64_rflist.insert(12, self.getSalt(strLen=6)) b64_rftoken = ''.join(b64_rflist) token = {'access_token': b64_atoken[::-1], 'refresh_token': b64_rftoken[::-1]} dictJSON = {'result_code': 0, 'reason': 'Success', 'result': token, 'error_code': 0} return json.dumps(dictJSON, ensure_ascii=False) else: return sqlJSON def verify_AToken(self, token, isAToken = True): if token == 'stest': self.accessDict['userID'] = '151547867345163613800138001' return 0 if token == 'sformal': self.accessDict['userID'] = '151564262337939513800138001' return 0 """ :param token: :param isAToken: :return: """ access_token = token[::-1] if len(access_token) < 18: return 303 atoken = access_token[:12] + access_token[18:] print(atoken) try: token_str = base64.urlsafe_b64decode(atoken).decode('utf-8') except Exception as e: errorInfo = traceback.format_exc() print('base64 decode error: %s' % errorInfo) self.errormsg = 'base64 decode error: %s' % repr(e) return 304 token_list = token_str.split('&') if len(token_list) != 2: return 303 ts_str = token_list[0] jsonDict = json.loads(ts_str) print(jsonDict) userID = jsonDict.get('userID', None) mCode = jsonDict.get('mCode', None) if userID == None or mCode == None: return 303 try: if isAToken: authToken = AuthToken_Token.objects.filter(tokenID_id=userID, access_token=atoken) else: authToken = AuthToken_Token.objects.filter(tokenID_id=userID, refresh_token=atoken) except Exception as e: errorInfo = traceback.format_exc() print('Database Query error: %s' % errorInfo) self.errormsg = 'Database Query error: %s' % repr(e) return 500 if authToken: self.iCode = authToken[0].iCode hmac_token = token_list[1] hmac_at = hmac.new(ts_str.encode('utf-8'), self.iCode.encode('utf-8')).hexdigest() if hmac_at != hmac_token: return 306 else: try: mCodeToken = AuthToken_Token.objects.filter(tokenID_id=userID) except Exception as e: errorInfo = traceback.format_exc() print('Database Query error: %s' % errorInfo) self.errormsg = 'Database Query error: %s' % repr(e) return 500 if mCodeToken: ''' if mCodeToken[0].mCode != mCode: return 313 else: return 309 ''' pass else: return 309 if isAToken: self.accessDict = jsonDict exp_at = jsonDict.get('exp_at', None) if exp_at == None: return 305 if time.time() - float(exp_at) > 0: return 309 else: self.refreshDict = jsonDict exp_rt = jsonDict.get('exp_rt', None) if exp_rt == None: return 305 if time.time() - float(exp_rt) > 0: try: Device_User.objects.filter(userID = userID).update(online = False) except Exception as e: errorInfo = traceback.format_exc() print('Database Query error: %s' % errorInfo) self.errormsg = 'Database Query error: %s' % repr(e) return 500 return 308 try: device_user = Device_User.objects.get(userID=userID) device_user.online=True device_user.save() except Exception as e: pass return 0 def refresh_AToken(self, Token): """ :param Token: :return: """ print('refresh_AToken') error_code = self.verify_AToken(token = Token, isAToken = False) if error_code == 0: refreshDict = self.refreshDict if refreshDict.get('exp_at', None) != None: exp_at = (datetime.utcnow() + EXPIRATION_DELTA).utctimetuple() refreshDict['exp_at'] = timegm(exp_at) refreshDict.pop('exp_rt') JSON = json.dumps(refreshDict, ensure_ascii=False) hmac_at = hmac.new(JSON.encode('utf-8'), self.iCode.encode('utf-8')).hexdigest() access_token = JSON + '&' + hmac_at b64_at = base64.urlsafe_b64encode(access_token.encode("utf-8")) userID = refreshDict.get('userID', None) tokenDict = {'access_token': b64_at.decode('utf-8')} tokenDict['tokenID_id'] = userID sqlJSON = SQLManager(content=tokenDict) sqlDict = json.loads(sqlJSON) error_code = sqlDict.get('error_code', None) if error_code != None and error_code == 0: b64_alist = list(b64_at.decode('utf-8')) b64_alist.insert(12, self.getSalt(strLen=6)) b64_atoken = ''.join(b64_alist) token = {'access_token': b64_atoken[::-1], } dictJSON = {'result_code': 0, 'reason': 'Success', 'result': token, 'error_code': 0} return json.dumps(dictJSON, ensure_ascii=False) else: return sqlJSON else: return self.errorCodeInfo(error_code) def deToken(self,token): try: b64_alist = list(token) b64_alist.insert(12, self.getSalt(strLen=6)) b64_atoken = ''.join(b64_alist) res_token = b64_atoken[::-1] code = self.verify_AToken(res_token) if code == 0: return res_token else: return False except Exception as e: return False def errorCodeInfo(self, error_code): print(error_code) return ResponseFormal(error_code)