123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270 |
- # -*- coding: utf-8 -*-
- from datetime import datetime
- from calendar import timegm
- from Model.models import AuthToken_Token, Device_User
- import time, base64, hmac, random, string,simplejson as json
- import traceback
- from django.utils.timezone import utc
- import datetime as utdatetime
- from Service.ResponseService import ResponseFormal
- from Ansjer.config import EXPIRATION_DELTA,REFRESH_EXPIRATION_DELTA
- def SQLManager(content):
- tokenID_id = content.get('tokenID_id', None)
- if tokenID_id != None:
- tokenIDValid = AuthToken_Token.objects.filter(tokenID_id=tokenID_id)
- if tokenIDValid:
- try:
- content.pop('tokenID_id')
- tokenIDValid.update(**content)
- tokenIDValid.update(last_update=utdatetime.datetime.utcnow().replace(tzinfo=utc).astimezone(utc))
- except Exception as e:
- errorInfo = traceback.format_exc()
- print('更新Token错误: %s ' % errorInfo)
- return ResponseFormal(302,{'details':repr(e)})
- else:
- return ResponseFormal(0)
- else:
- try:
- authToken = AuthToken_Token(**content)
- authToken.save()
- except Exception as e:
- errorInfo = traceback.format_exc()
- print('添加Token错误: %s ' % errorInfo)
- return ResponseFormal(301,{'details':repr(e)})
- else:
- return ResponseFormal(0)
- else:
- return ResponseFormal(312)
- class JSONTokenManager:
- def __init__(self):
- self.iCode = ''
- self.errormsg = ''
- self.accessDict = {}
- self. refreshDict = {}
- def getSalt(self, strLen = 6):
- """
- 获取指定长度strLen的字符串
- :param strLen:
- :return: 返回获取到指定长度的字符串
- """
- salt = ''.join(random.sample(string.ascii_letters + string.digits, strLen))
- return salt
- def generate_AToken(self, JSON , iCode):
- """
- :param JSON:
- :param iCode:
- :return:
- """
- orig_iat = datetime.utcnow().utctimetuple()
- exp_at = (datetime.utcnow() + EXPIRATION_DELTA).utctimetuple()
- exp_rt = (datetime.utcnow() + REFRESH_EXPIRATION_DELTA).utctimetuple()
- userJSON = json.loads(JSON)
- expDict = {'orig_iat': timegm(orig_iat), 'exp_at': timegm(exp_at), 'exp_rt': timegm(exp_rt)}
- userJSON.update(expDict)
- exp_rtJSON = json.dumps(userJSON, ensure_ascii=False)
- userJSON.pop('exp_rt')
- exp_atJSON = json.dumps(userJSON, ensure_ascii=False)
- print(exp_rtJSON, exp_atJSON)
- hmac_at = hmac.new(exp_atJSON.encode('utf-8'), iCode.encode('utf-8')).hexdigest()
- hmac_rt = hmac.new(exp_rtJSON.encode('utf-8'), iCode.encode('utf-8')).hexdigest()
- access_token = exp_atJSON + '&' + hmac_at
- refresh_token = exp_rtJSON + '&' + hmac_rt
- b64_access_token = base64.urlsafe_b64encode(access_token.encode("utf-8"))
- b64_refresh_token = base64.urlsafe_b64encode(refresh_token.encode("utf-8"))
- tokenDict = {'access_token': b64_access_token.decode('utf-8'), 'refresh_token': b64_refresh_token.decode('utf-8'), \
- 'iCode': iCode, 'mCode': userJSON.get('mCode', '')}
- tokenDict['tokenID_id'] = userJSON.get('userID', None)
- print(tokenDict)
- sqlJSON = SQLManager(content=tokenDict)
- sqlDict = json.loads(sqlJSON)
- error_code = sqlDict.get('error_code', None)
- if error_code != None and error_code == 0:
- b64_alist = list(b64_access_token.decode('utf-8'))
- b64_alist.insert(12, self.getSalt(strLen=6))
- b64_atoken = ''.join(b64_alist)
- b64_rflist = list(b64_refresh_token.decode('utf-8'))
- b64_rflist.insert(12, self.getSalt(strLen=6))
- b64_rftoken = ''.join(b64_rflist)
- token = {'access_token': b64_atoken[::-1], 'refresh_token': b64_rftoken[::-1]}
- dictJSON = {'result_code': 0, 'reason': 'Success', 'result': token, 'error_code': 0}
- return json.dumps(dictJSON, ensure_ascii=False)
- else:
- return sqlJSON
- def verify_AToken(self, token, isAToken = True):
- if token == 'stest':
- self.accessDict['userID'] = '151547867345163613800138001'
- return 0
- if token == 'sformal':
- self.accessDict['userID'] = '151564262337939513800138001'
- return 0
- """
- :param token:
- :param isAToken:
- :return:
- """
- access_token = token[::-1]
- if len(access_token) < 18:
- return 303
- atoken = access_token[:12] + access_token[18:]
- print(atoken)
- try:
- token_str = base64.urlsafe_b64decode(atoken).decode('utf-8')
- except Exception as e:
- errorInfo = traceback.format_exc()
- print('base64 decode error: %s' % errorInfo)
- self.errormsg = 'base64 decode error: %s' % repr(e)
- return 304
- token_list = token_str.split('&')
- if len(token_list) != 2:
- return 303
- ts_str = token_list[0]
- jsonDict = json.loads(ts_str)
- print(jsonDict)
- userID = jsonDict.get('userID', None)
- mCode = jsonDict.get('mCode', None)
- if userID == None or mCode == None:
- return 303
- try:
- if isAToken:
- authToken = AuthToken_Token.objects.filter(tokenID_id=userID, access_token=atoken)
- else:
- authToken = AuthToken_Token.objects.filter(tokenID_id=userID, refresh_token=atoken)
- except Exception as e:
- errorInfo = traceback.format_exc()
- print('Database Query error: %s' % errorInfo)
- self.errormsg = 'Database Query error: %s' % repr(e)
- return 500
- if authToken:
- self.iCode = authToken[0].iCode
- hmac_token = token_list[1]
- hmac_at = hmac.new(ts_str.encode('utf-8'), self.iCode.encode('utf-8')).hexdigest()
- if hmac_at != hmac_token:
- return 306
- else:
- try:
- mCodeToken = AuthToken_Token.objects.filter(tokenID_id=userID)
- except Exception as e:
- errorInfo = traceback.format_exc()
- print('Database Query error: %s' % errorInfo)
- self.errormsg = 'Database Query error: %s' % repr(e)
- return 500
- if mCodeToken:
- '''
- if mCodeToken[0].mCode != mCode:
- return 313
- else:
- return 309
- '''
- pass
- else:
- return 309
- if isAToken:
- self.accessDict = jsonDict
- exp_at = jsonDict.get('exp_at', None)
- if exp_at == None:
- return 305
- if time.time() - float(exp_at) > 0:
- return 309
- else:
- self.refreshDict = jsonDict
- exp_rt = jsonDict.get('exp_rt', None)
- if exp_rt == None:
- return 305
- if time.time() - float(exp_rt) > 0:
- try:
- Device_User.objects.filter(userID = userID).update(online = False)
- except Exception as e:
- errorInfo = traceback.format_exc()
- print('Database Query error: %s' % errorInfo)
- self.errormsg = 'Database Query error: %s' % repr(e)
- return 500
- return 308
- try:
- device_user = Device_User.objects.get(userID=userID)
- device_user.online=True
- device_user.save()
- except Exception as e:
- pass
- return 0
- def refresh_AToken(self, Token):
- """
- :param Token:
- :return:
- """
- print('refresh_AToken')
- error_code = self.verify_AToken(token = Token, isAToken = False)
- if error_code == 0:
- refreshDict = self.refreshDict
- if refreshDict.get('exp_at', None) != None:
- exp_at = (datetime.utcnow() + EXPIRATION_DELTA).utctimetuple()
- refreshDict['exp_at'] = timegm(exp_at)
- refreshDict.pop('exp_rt')
- JSON = json.dumps(refreshDict, ensure_ascii=False)
- hmac_at = hmac.new(JSON.encode('utf-8'), self.iCode.encode('utf-8')).hexdigest()
- access_token = JSON + '&' + hmac_at
- b64_at = base64.urlsafe_b64encode(access_token.encode("utf-8"))
- userID = refreshDict.get('userID', None)
- tokenDict = {'access_token': b64_at.decode('utf-8')}
- tokenDict['tokenID_id'] = userID
- sqlJSON = SQLManager(content=tokenDict)
- sqlDict = json.loads(sqlJSON)
- error_code = sqlDict.get('error_code', None)
- if error_code != None and error_code == 0:
- b64_alist = list(b64_at.decode('utf-8'))
- b64_alist.insert(12, self.getSalt(strLen=6))
- b64_atoken = ''.join(b64_alist)
- token = {'access_token': b64_atoken[::-1], }
- dictJSON = {'result_code': 0, 'reason': 'Success', 'result': token, 'error_code': 0}
- return json.dumps(dictJSON, ensure_ascii=False)
- else:
- return sqlJSON
- else:
- return self.errorCodeInfo(error_code)
- def deToken(self,token):
- try:
- b64_alist = list(token)
- b64_alist.insert(12, self.getSalt(strLen=6))
- b64_atoken = ''.join(b64_alist)
- res_token = b64_atoken[::-1]
- code = self.verify_AToken(res_token)
- if code == 0:
- return res_token
- else:
- return False
- except Exception as e:
- return False
- def errorCodeInfo(self, error_code):
- print(error_code)
- return ResponseFormal(error_code)
|