# @Author : Rocky # @File : UserController.py # @Time : 2024/11/27 16:35 import hashlib import hmac import os import time import requests from Ansjer.config import LOGGER from django.views.generic.base import View from Model.models import WeChatMiniProgram from Object.Enums.WeChatEnum import WeChatMiniProgramAPIEnum, WeChatMiniProgramConfigEnum from Object.RedisObject import RedisObject from Object.ResponseObject import ResponseObject class UserView(View): def get(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') return self.validation(request.GET, request, operation) def post(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') return self.validation(request.POST, request, operation) def validation(self, request_dict, request, operation): lang = request_dict.get('lang', 'cn') response = ResponseObject(lang) if operation == 'getPhoneNumber': return self.get_phone_number(request_dict, response) elif operation == 'userLogin': return self.user_login(request_dict, response) else: valid = self.check_session_key(request) if not valid: return response.json(309) @classmethod def check_session_key(cls, request): """ 校验 session_key 是否有效 https://developers.weixin.qq.com/miniprogram/dev/OpenApiDoc/user-login/checkSessionKey.html @return: bool """ try: login_status = request.META.get('HTTP_AUTHORIZATION') if not login_status: return False wechat_mini_program_qs = WeChatMiniProgram.objects.filter(login_status=login_status). \ values('openid', 'session_key') if not wechat_mini_program_qs.exists(): return False openid = wechat_mini_program_qs[0]['openid'] session_key = wechat_mini_program_qs[0]['session_key'] signature = cls.generate_signature(session_key) access_token = cls.get_access_token() params = { 'openid': openid, 'access_token': access_token, 'signature': signature, 'sig_method': 'hmac_sha256' } r = requests.get(url=WeChatMiniProgramAPIEnum.checkSessionKeyAPI.value, params=params, timeout=5) result = eval(r.content) errcode = result.get('errcode') assert errcode == 0 return True except Exception as e: return False @staticmethod def get_access_token(): """ 获取小程序全局唯一后台接口调用凭据,token有效期为7200s https://developers.weixin.qq.com/miniprogram/dev/OpenApiDoc/mp-access-token/getAccessToken.html @return: access_token """ try: redis_obj = RedisObject() access_token_key = WeChatMiniProgramConfigEnum.AccessTokenKey.value expires_time = redis_obj.get_ttl(access_token_key) # 如果有效时间大于十分钟,返回token,否则刷新token if expires_time > 10 * 60: access_token = redis_obj.get_data(access_token_key) return access_token else: params = { 'grant_type': 'client_credential', 'appid': WeChatMiniProgramConfigEnum.AppID.value, 'secret': WeChatMiniProgramConfigEnum.AppSecret.value } r = requests.get(url=WeChatMiniProgramAPIEnum.getAccessTokenAPI.value, params=params, timeout=5) result = eval(r.content) access_token = result.get('access_token') assert access_token expires_in = result.get('expires_in') # 保存到Redis redis_obj.set_ex_data(key=access_token_key, val=access_token, expire=expires_in) return access_token except Exception as e: LOGGER.info('微信小程序获取token异常:error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) return None @classmethod def get_phone_number(cls, request_dict, response): """ 获取手机号码 @param request_dict: @param response: @return: res """ code = request_dict.get('code', None) if not code: return response.json(444) try: access_token = cls.get_access_token() url = WeChatMiniProgramAPIEnum.getPhoneNumberAPI.value.format(access_token) data = { 'code': code } r = requests.post(url=url, json=data, timeout=5) result = eval(r.content) errcode = result.get('errcode') assert errcode == 0 phone_number = result['phone_info']['purePhoneNumber'] res = { 'phone_number': phone_number } return response.json(0, res) except Exception as e: LOGGER.info('微信小程序获取手机号码异常:error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) @classmethod def user_login(cls, request_dict, response): """ 小程序登录 https://developers.weixin.qq.com/miniprogram/dev/OpenApiDoc/user-login/code2Session.html https://developers.weixin.qq.com/miniprogram/dev/framework/open-ability/login.html @param request_dict: @param response: @return: """ js_code = request_dict.get('js_code', None) phone_number = request_dict.get('phone_number', None) if not all([js_code, phone_number]): return response.json(444) try: params = { 'js_code': js_code, 'grant_type': 'authorization_code', 'appid': WeChatMiniProgramConfigEnum.AppID.value, 'secret': WeChatMiniProgramConfigEnum.AppSecret.value } r = requests.get(url=WeChatMiniProgramAPIEnum.code2SessionAPI.value, params=params, timeout=5) result = eval(r.content) errcode = result.get('errcode') assert errcode == 0 openid = result['openid'] unionid = result['unionid'] session_key = result['session_key'] # 生成登录态 login_status = cls.generate_login_status(openid, session_key) now_time = int(time.time()) wechat_mini_program_qs = WeChatMiniProgram.objects.filter(openid=openid) if wechat_mini_program_qs.exists(): wechat_mini_program_qs.update( login_status=login_status, session_key=session_key, updated_time=now_time ) else: WeChatMiniProgram.objects.create( login_status=login_status, openid=openid, unionid=unionid, session_key=session_key, created_time=now_time, updated_time=now_time ) res = { 'login_status': login_status } return response.json(0, res) except Exception as e: LOGGER.info('微信小程序登录异常:error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) @staticmethod def generate_login_status(openid, session_key): """ 生成登录态 @param openid: @param session_key: @return: login_status """ salt = os.urandom(16).hex() # 将openid, session_key和salt拼接成一个字符串 to_hash = openid + session_key + salt # 使用hashlib生成SHA256哈希值 hash_object = hashlib.sha256(to_hash.encode()) # 获取十六进制格式的哈希值 login_status = hash_object.hexdigest() return login_status @staticmethod def generate_signature(session_key): """ 用户登录态签名,用session_key对空字符串签名得到的结果。即 signature = hmac_sha256(session_key, "") @param session_key: @return: signature """ # 将session_key和要签名的数据(这里是空字符串)转换为字节类型 session_key_bytes = session_key.encode('utf-8') data = b"" # 空字符串转换为字节类型 # 创建一个新的hmac对象,使用sha256作为hash函数 hmac_object = hmac.new(session_key_bytes, data, hashlib.sha256) # 获取十六进制格式的签名结果 signature = hmac_object.hexdigest() return signature