from datetime import datetime import pytz import requests from django.db.models import Q from django.views import View from Crypto.Cipher import AES from Crypto.Util.Padding import pad from django.contrib.auth.hashers import check_password, make_password from Controller.CheckUserData import DataValid from Model.models import Device_User from Object.RedisObject import RedisObject from Object.ResponseObject import ResponseObject import base64 import hmac import hashlib import os import json from Ansjer.config import SHOPIFY_CONFIG from Service.CommonService import CommonService class ShopifyMultipass: @staticmethod def generate_multipass_token(secret, customer_data): """ 使用指定的密钥对加密并签名JSON数据,返回Base64编码的Multipass令牌 """ # 第一步:将客户数据转换为JSON格式 json_data = json.dumps(customer_data) # 第二步:生成加密密钥和签名密钥 hash_digest = hashlib.sha256(secret.encode()).digest() encryption_key = hash_digest[:16] # 128位加密密钥 signature_key = hash_digest[16:32] # 128位签名密钥 # 第三步:加密JSON数据 iv = os.urandom(16) # 随机初始化向量 cipher = AES.new(encryption_key, AES.MODE_CBC, iv) ciphertext = cipher.encrypt(pad(json_data.encode(), AES.block_size)) # 第四步:签名加密数据 data_to_sign = iv + ciphertext signature = hmac.new(signature_key, data_to_sign, hashlib.sha256).digest() # 第五步:Base64编码 multipass_token = base64.urlsafe_b64encode(iv + ciphertext + signature).decode() return multipass_token @staticmethod def search_customer_by_email(store_name, access_token, email): # 设置请求URL url = f"https://{store_name}.myshopify.com/admin/api/2024-10/customers/search.json" params = { "query": f"email:{email}" } # 设置请求头 headers = { "X-Shopify-Access-Token": access_token, } # 发送GET请求 response = requests.get(url, headers=headers, params=params) # 返回响应的JSON数据 return response.json() class ShopifyView(View): def get(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') request_dict = request.GET return self.validation(request, request_dict, operation) def post(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') request_dict = request.POST return self.validation(request, request_dict, operation) def validation(self, request, request_dict, operation): language = request_dict.get('language', 'cn') response = ResponseObject(language) if operation == 'shopifyLogin': # APP查詢定制客户信息 return self.shopify_login(request_dict, response) elif operation == 'shopifyRegister': # APP注册定制客户信息 return self.shopify_register(request_dict, response) else: return response.json(414) @staticmethod def shopify_login(request_dict, response): email = request_dict.get("email", None) password = request_dict.get("password", None) if not all([email, password]): return response.json(444) user_qs = Device_User.objects.filter(Q(username=email) | Q(userEmail=email)) if not user_qs.exists(): return response.json(104) users = user_qs.values('role__rid', 'role__roleName', 'userID', 'NickName', 'username', 'userEmail', 'phone', 'password', 'userIconPath')[0] check_flag = check_password(password, users['password']) if not check_flag: return response.json(111) # 获取当前时间 now = datetime.now(pytz.timezone('America/New_York')) # 你可以根据需要更改时区 # 格式化时间戳 timestamp = now.strftime('%Y-%m-%dT%H:%M:%S%z') # 添加冒号到时区部分 timestamp = timestamp[:-2] + ':' + timestamp[-2:] customer_data = { "email": email, "created_at": timestamp, } multipass_secret = SHOPIFY_CONFIG["eu_multipass_secret"] # multipass密钥 token = ShopifyMultipass.generate_multipass_token(multipass_secret, customer_data) # 构造重定向URL redirect_url = f"https://eu.zositech.com/account/login/multipass/{token}" return response.json(0, redirect_url) @staticmethod def shopify_register(request_dict, response): email = request_dict.get("email", None) password = request_dict.get("password", None) authcode = request_dict.get("authCode", None) if not all([email, password]): return response.json(444) if authcode is None: # 查询是否在shopify有账号 access_token = SHOPIFY_CONFIG["eu_token"] customer_data = ShopifyMultipass.search_customer_by_email("0ef557-2", access_token, email) if not customer_data['customers']: return response.json(10077) # 邮箱验证 else: reds = RedisObject() identifyingCode = reds.get_data(key=email + '_identifyingCode') # 判断验证码是否过期 if identifyingCode is False: return response.json(120) # 验证码是否正确 if authcode != identifyingCode: return response.json(121) # 注册 if Device_User.objects.filter(Q(username=email) | Q(userEmail=email)).exists(): return response.json(103) # 创建用户 password = make_password(password) new_userID = CommonService.getUserID(μs=False, setOTAID=True) user_data = { "username": email, "NickName": email, "userEmail": email, "password": password, "userID": new_userID, "is_active": True, "user_isValid": True, } Device_User.objects.create(**user_data) return response.json(0)