# @Author : Rocky # @File : UserController.py # @Time : 2024/11/27 16:35 import hashlib import hmac import os import time import requests from Ansjer.config import LOGGER from django.views.generic.base import View from Model.models import WeChatMiniProgram, Device_User, Device_Info, DeviceNameLanguage from Object.Enums.WeChatEnum import WeChatMiniProgramAPIEnum, WeChatMiniProgramConfigEnum from Object.RedisObject import RedisObject from Object.ResponseObject import ResponseObject class UserView(View): def get(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') return self.validation(request.GET, request, operation) def post(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') return self.validation(request.POST, request, operation) def validation(self, request_dict, request, operation): lang = request_dict.get('lang', 'cn') response = ResponseObject(lang) if operation == 'getPhoneNumber': return self.get_phone_number(request_dict, response) elif operation == 'userLogin': return self.user_login(request_dict, response) else: user_id = self.check_session_key_and_get_user_id(request) if not user_id: return response.json(309) if operation == 'deviceList': return self.device_list(user_id, response) @classmethod def check_session_key_and_get_user_id(cls, request): """ 校验 session_key 是否有效 https://developers.weixin.qq.com/miniprogram/dev/OpenApiDoc/user-login/checkSessionKey.html @return: bool """ try: login_status = request.META.get('HTTP_AUTHORIZATION') if not login_status: return False wechat_mini_program_qs = WeChatMiniProgram.objects.filter(login_status=login_status). \ values('openid', 'session_key', 'phone_number') if not wechat_mini_program_qs.exists(): return False openid = wechat_mini_program_qs[0]['openid'] session_key = wechat_mini_program_qs[0]['session_key'] signature = cls.generate_signature(session_key) access_token = cls.get_access_token() params = { 'openid': openid, 'access_token': access_token, 'signature': signature, 'sig_method': 'hmac_sha256' } r = requests.get(url=WeChatMiniProgramAPIEnum.checkSessionKeyAPI.value, params=params, timeout=5) result = eval(r.content) errcode = result.get('errcode') assert errcode == 0 # 根据手机号查询用户id phone_number = wechat_mini_program_qs[0]['phone_number'] device_user_qs = Device_User.objects.filter(phone=phone_number).values('userID') if not device_user_qs.exists(): return False return device_user_qs[0]['userID'] except Exception as e: return False @staticmethod def get_access_token(): """ 获取小程序全局唯一后台接口调用凭据,token有效期为7200s https://developers.weixin.qq.com/miniprogram/dev/OpenApiDoc/mp-access-token/getAccessToken.html @return: access_token """ try: redis_obj = RedisObject() access_token_key = WeChatMiniProgramConfigEnum.AccessTokenKey.value expires_time = redis_obj.get_ttl(access_token_key) # 如果有效时间大于十分钟,返回token,否则刷新token if expires_time > 10 * 60: access_token = redis_obj.get_data(access_token_key) return access_token else: params = { 'grant_type': 'client_credential', 'appid': WeChatMiniProgramConfigEnum.AppID.value, 'secret': WeChatMiniProgramConfigEnum.AppSecret.value } r = requests.get(url=WeChatMiniProgramAPIEnum.getAccessTokenAPI.value, params=params, timeout=5) result = eval(r.content) access_token = result.get('access_token') assert access_token expires_in = result.get('expires_in') # 保存到Redis redis_obj.set_ex_data(key=access_token_key, val=access_token, expire=expires_in) return access_token except Exception as e: LOGGER.info('微信小程序获取token异常:error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) return None @classmethod def get_phone_number(cls, request_dict, response): """ 获取手机号码 @param request_dict: @param response: @return: res """ code = request_dict.get('code', None) if not code: return response.json(444) try: access_token = cls.get_access_token() url = WeChatMiniProgramAPIEnum.getPhoneNumberAPI.value.format(access_token) data = { 'code': code } r = requests.post(url=url, json=data, timeout=5) result = eval(r.content) errcode = result.get('errcode') assert errcode == 0 phone_number = result['phone_info']['purePhoneNumber'] res = { 'phone_number': phone_number } return response.json(0, res) except Exception as e: LOGGER.info('微信小程序获取手机号码异常:error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) @classmethod def user_login(cls, request_dict, response): """ 小程序登录 https://developers.weixin.qq.com/miniprogram/dev/OpenApiDoc/user-login/code2Session.html https://developers.weixin.qq.com/miniprogram/dev/framework/open-ability/login.html @param request_dict: @param response: @return: """ js_code = request_dict.get('js_code', None) phone_number = request_dict.get('phone_number', None) if not all([js_code, phone_number]): return response.json(444) try: params = { 'js_code': js_code, 'grant_type': 'authorization_code', 'appid': WeChatMiniProgramConfigEnum.AppID.value, 'secret': WeChatMiniProgramConfigEnum.AppSecret.value } r = requests.get(url=WeChatMiniProgramAPIEnum.code2SessionAPI.value, params=params, timeout=5) result = eval(r.content) # {'session_key': 'xxx' 会话密钥, 'openid': 'xxx' 用户唯一标识} openid = result.get('openid') assert openid openid = result['openid'] session_key = result['session_key'] # 生成登录态 login_status = cls.generate_login_status(openid, session_key) now_time = int(time.time()) wechat_mini_program_qs = WeChatMiniProgram.objects.filter(openid=openid) if wechat_mini_program_qs.exists(): wechat_mini_program_qs.update( login_status=login_status, session_key=session_key, phone_number=phone_number, updated_time=now_time ) else: WeChatMiniProgram.objects.create( login_status=login_status, openid=openid, session_key=session_key, phone_number=phone_number, created_time=now_time, updated_time=now_time ) res = { 'login_status': login_status } return response.json(0, res) except Exception as e: LOGGER.info('微信小程序登录异常:error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) @staticmethod def generate_login_status(openid, session_key): """ 生成登录态 @param openid: @param session_key: @return: login_status """ salt = os.urandom(16).hex() # 将openid, session_key和salt拼接成一个字符串 to_hash = openid + session_key + salt # 使用hashlib生成SHA256哈希值 hash_object = hashlib.sha256(to_hash.encode()) # 获取十六进制格式的哈希值 login_status = hash_object.hexdigest() return login_status @staticmethod def generate_signature(session_key): """ 用户登录态签名,用session_key对空字符串签名得到的结果。即 signature = hmac_sha256(session_key, "") @param session_key: 会话密钥 @return: signature """ # 将session_key和要签名的数据(这里是空字符串)转换为字节类型 session_key_bytes = session_key.encode('utf-8') data = b"" # 空字符串转换为字节类型 # 创建一个新的hmac对象,使用sha256作为hash函数 hmac_object = hmac.new(session_key_bytes, data, hashlib.sha256) # 获取十六进制格式的签名结果 signature = hmac_object.hexdigest() return signature @staticmethod def device_list(user_id, response): """ 查询设备列表 @param user_id: 用户id @param response: @return: """ try: img_url_1 = DeviceNameLanguage.objects.filter(name='智能摄像机 C520M').values('app_device_type__iconV2')[0]['app_device_type__iconV2'] img_url_2 = DeviceNameLanguage.objects.filter(name='智能摄像机 C518').values('app_device_type__iconV2')[0]['app_device_type__iconV2'] res = [ { 'device_name': '智能摄像机520', 'service_status': '服务使用中 2025-11-25', 'img_url': img_url_1}, { 'device_name': '智能摄像机518', 'service_status': '服务到期', 'img_url': img_url_2} ] return response.json(0, res) except Exception as e: return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))