# @Author : Rocky # @File : UserController.py # @Time : 2024/11/27 16:35 import requests from Ansjer.config import LOGGER from django.views.generic.base import View from Object.Enums.WeChatEnum import WeChatMiniProgramAPIEnum, WeChatMiniProgramConfigEnum from Object.RedisObject import RedisObject from Object.ResponseObject import ResponseObject class EquipmentFamilyView(View): def get(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') return self.validation(request.GET, request, operation) def post(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') return self.validation(request.POST, request, operation) def validation(self, request_dict, request, operation): lang = request_dict.get('lang', 'cn') response = ResponseObject(lang) access_token = self.get_access_token() if operation == 'getPhoneNumber': return self.get_phone_number(request_dict, response) else: return response.json(414) @staticmethod def get_access_token(): """ 获取小程序全局唯一后台接口调用凭据,token有效期为7200s https://developers.weixin.qq.com/miniprogram/dev/OpenApiDoc/mp-access-token/getAccessToken.html @return: access_token """ try: redis_obj = RedisObject() access_token_key = WeChatMiniProgramConfigEnum.AccessTokenKey.value expires_time = redis_obj.get_ttl(access_token_key) # 如果有效时间大于十分钟,返回token,否则刷新token if expires_time > 10 * 60: access_token = redis_obj.get_data(access_token_key) return access_token else: params = { 'grant_type': 'client_credential', 'appid': WeChatMiniProgramConfigEnum.AppID.value, 'secret': WeChatMiniProgramConfigEnum.AppSecret.value } r = requests.get(url=WeChatMiniProgramAPIEnum.getAccessTokenAPI.value, params=params, timeout=5) result = eval(r.content) access_token = result.get('access_token') assert access_token expires_in = result.get('expires_in') # 保存到Redis redis_obj.set_ex_data(key=access_token_key, val=access_token, expire=expires_in) return access_token except Exception as e: LOGGER.info('微信小程序获取token异常:error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) return None @classmethod def get_phone_number(cls, request_dict, response): """ 获取手机号码 @param request_dict: @param response: @return: phone_number """ code = request_dict.get('code', None) if not code: return response.json(444) try: access_token = cls.get_access_token() url = WeChatMiniProgramAPIEnum.getPhoneNumberAPI.value.format(access_token) data = { 'code': code } r = requests.post(url=url, data=data, timeout=5) result = eval(r.content) errcode = result.get('errcode') assert errcode == 0 pure_phone_number = result['phone_info']['purePhoneNumber'] res = { 'purePhoneNumber': pure_phone_number } return response.json(0, res) except Exception as e: LOGGER.info('微信小程序获取手机号码异常:error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))