UVerifyObject.py 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. import base64
  4. import hashlib
  5. import hmac
  6. import json
  7. import logging
  8. import random
  9. import string
  10. import time
  11. import requests
  12. from collections import OrderedDict
  13. from datetime import datetime, timedelta
  14. class UVerifyObject:
  15. '''
  16. 【友盟+】智能认证 U-Verify(一键登录)
  17. 开发文档:
  18. https://developer.umeng.com/docs/143070/detail/144785
  19. https://developer.umeng.com/docs/143070/detail/144783
  20. '''
  21. def __init__(self, token):
  22. # aliyun config
  23. self.ali_appkey = '204024884'
  24. self.ali_app_secret = 'lD7AXxc0ji0HzrNXZmfJaz3AecAboItD'
  25. # umeng config
  26. self.host_url = 'https://verify5.market.alicloudapi.com'
  27. self.path_url = '/api/v1/mobile/info'
  28. self.umeng_app_key = '5d8da0670cafb2a6b5000bc3'
  29. self.token = token
  30. self.phone = ''
  31. self.body_data_str = json.dumps({'token': self.token})
  32. content_md5_str = base64.b64encode(hashlib.md5(self.body_data_str.encode("UTF-8")).digest()).decode("UTF-8")
  33. now_date_time = datetime.utcnow() + timedelta(hours=8)
  34. timestamp = str(int(time.mktime(now_date_time.timetuple()) * 1000))
  35. sign_headers_dict = {
  36. 'X-Ca-Key': self.ali_appkey,
  37. 'X-Ca-Stage': 'RELEASE',
  38. 'X-Ca-Timestamp': timestamp,
  39. 'X-Ca-Version': '1'
  40. }
  41. sign_headers_str = ''
  42. for key in sorted(sign_headers_dict.keys()):
  43. value = sign_headers_dict[key]
  44. sign_headers_str = sign_headers_str + key + ':' + value + '\n'
  45. # 需要使用有序字典,否则StringToSign错误
  46. sign_dict = OrderedDict(
  47. [('HTTPMethod', 'POST'),
  48. ('Accept', 'application/json'),
  49. ('Content-MD5', content_md5_str),
  50. ('Content-Type', 'application/json; charset=UTF-8'),
  51. ('Date', None,),
  52. ('Headers', sign_headers_str),
  53. ('Url', self.path_url + '?' + 'appkey=' + self.umeng_app_key)]
  54. )
  55. string_to_sign = ""
  56. for key in sign_dict.keys():
  57. value = sign_dict[key]
  58. if key not in ['Headers', 'Url']:
  59. if value:
  60. string_to_sign = string_to_sign + value + "\n"
  61. else:
  62. string_to_sign = string_to_sign + "\n"
  63. else:
  64. string_to_sign = string_to_sign + value
  65. logger = logging.getLogger('info')
  66. logger.info('string_to_sign: {}'.format(string_to_sign))
  67. key_bytes = self.ali_app_secret.encode('utf-8')
  68. text_bytes = string_to_sign.encode('utf-8')
  69. hash_bytes = hmac.new(key_bytes, text_bytes, hashlib.sha256).digest()
  70. signature_str = base64.b64encode(hash_bytes).decode('utf-8')
  71. nonce_str = self.generate_code(8) + '-' + self.generate_code(4) + '-' + self.generate_code(4) + '-' + \
  72. self.generate_code(4) + '-' + self.generate_code(12)
  73. self.headers_dict = {
  74. 'Content-Type': 'application/json; charset=UTF-8',
  75. 'Accept': 'application/json',
  76. 'X-Ca-Version': '1',
  77. 'X-Ca-Signature-Headers': 'X-Ca-Version,X-Ca-Stage,X-Ca-Key,X-Ca-Timestamp',
  78. 'X-Ca-Stage': 'RELEASE',
  79. 'X-Ca-Key': self.ali_appkey,
  80. 'X-Ca-Timestamp': timestamp,
  81. 'X-Ca-Nonce': nonce_str,
  82. 'Content-MD5': content_md5_str,
  83. 'X-Ca-Signature': signature_str
  84. }
  85. self.request_url = self.host_url + self.path_url + '?' + 'appkey=' + self.umeng_app_key
  86. def generate_code(self, length):
  87. key = string.digits + 'abcdef'
  88. str_list = random.sample(key + key, length) # length >= len(key+key)
  89. result_str = ''.join(str_list)
  90. return result_str
  91. def verify_request(self):
  92. logger = logging.getLogger('info')
  93. logger.info('request_url: {}, data: {}, headers: {}'.format(self.request_url, self.body_data_str, self.headers_dict))
  94. response = requests.post(url=self.request_url, data=self.body_data_str.encode("UTF-8"), headers=self.headers_dict)
  95. logger.info('response: {}, headers: {}, text: {}'.format(response, response.headers, response.text))
  96. if response.status_code != 200:
  97. return False
  98. # 替换字符串,防止将字符串转为字典时报错
  99. res_text = response.text.replace('true', '"true"').replace('false', '"false"').replace('null', '"null"')
  100. logger.info('res_text: {}'.format(res_text))
  101. res = eval(res_text)
  102. print(res['success'])
  103. if res['success'] == 'false':
  104. return False
  105. self.phone = res['data']['mobile'] # 获取手机号
  106. return True