from datetime import datetime import concurrent.futures import pytz import requests from django.db.models import Q from django.views import View from Crypto.Cipher import AES from Crypto.Util.Padding import pad from django.contrib.auth.hashers import check_password, make_password import concurrent.futures from Controller.CheckUserData import DataValid from Model.models import Device_User, CountryModel from Object.RedisObject import RedisObject from Object.ResponseObject import ResponseObject import base64 import hmac import hashlib import os import json from Ansjer.config import SHOPIFY_CONFIG from Service.CommonService import CommonService class ShopifyMultipass: @staticmethod def generate_multipass_token(secret, customer_data): """ 使用指定的密钥对加密并签名JSON数据,返回Base64编码的Multipass令牌 """ # 第一步:将客户数据转换为JSON格式 json_data = json.dumps(customer_data) # 第二步:生成加密密钥和签名密钥 hash_digest = hashlib.sha256(secret.encode()).digest() encryption_key = hash_digest[:16] # 128位加密密钥 signature_key = hash_digest[16:32] # 128位签名密钥 # 第三步:加密JSON数据 iv = os.urandom(16) # 随机初始化向量 cipher = AES.new(encryption_key, AES.MODE_CBC, iv) ciphertext = cipher.encrypt(pad(json_data.encode(), AES.block_size)) # 第四步:签名加密数据 data_to_sign = iv + ciphertext signature = hmac.new(signature_key, data_to_sign, hashlib.sha256).digest() # 第五步:Base64编码 multipass_token = base64.urlsafe_b64encode(iv + ciphertext + signature).decode() return multipass_token @staticmethod def search_customer_by_email(store_name, access_token, email): # 设置请求URL url = f"https://{store_name}.myshopify.com/admin/api/2024-10/customers/search.json" params = { "query": f"email:{email}" } # 设置请求头 headers = { "X-Shopify-Access-Token": access_token, } # 发送GET请求 response = requests.get(url, headers=headers, params=params) # 返回响应的JSON数据 return response.json() class ShopifyView(View): def get(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') request_dict = request.GET return self.validation(request, request_dict, operation) def post(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') request_dict = request.POST return self.validation(request, request_dict, operation) def validation(self, request, request_dict, operation): language = request_dict.get('language', 'cn') response = ResponseObject(language) if operation == 'shopifyLogin': # APP查詢定制客户信息 return self.shopify_login(request_dict, response) elif operation == 'shopifyRegister': # APP注册定制客户信息 return self.shopify_register(request_dict, response) elif operation == 'searchCustomer': return self.search_customer(request_dict, response) elif operation == 'searchAccount': return self.search_account(request_dict, response) else: return response.json(414) @staticmethod def shopify_login(request_dict, response): email = request_dict.get("email", None) password = request_dict.get("password", None) if not all([email, password]): return response.json(444) user_qs = Device_User.objects.filter(Q(username=email) | Q(userEmail=email)) if not user_qs.exists(): return response.json(104) users = user_qs.values('role__rid', 'role__roleName', 'userID', 'NickName', 'username', 'userEmail', 'phone', 'password', 'userIconPath')[0] check_flag = check_password(password, users['password']) if not check_flag: return response.json(111) # 获取当前时间 now = datetime.now(pytz.timezone('America/New_York')) # 你可以根据需要更改时区 # 格式化时间戳 timestamp = now.strftime('%Y-%m-%dT%H:%M:%S%z') # 添加冒号到时区部分 timestamp = timestamp[:-2] + ':' + timestamp[-2:] customer_data = { "email": email, "created_at": timestamp, } multipass_secret = SHOPIFY_CONFIG["eu_multipass_secret"] # multipass密钥 token = ShopifyMultipass.generate_multipass_token(multipass_secret, customer_data) # 构造重定向URL redirect_url = f"https://eu.zositech.com/account/login/multipass/{token}" return response.json(0, redirect_url) @staticmethod def shopify_register(request_dict, response): email = request_dict.get("email", None) password = request_dict.get("password", None) authcode = request_dict.get("authCode", None) if not all([email, password]): return response.json(444) if authcode is None: # 查询是否在shopify有账号 access_token = SHOPIFY_CONFIG["eu_token"] customer_data = ShopifyMultipass.search_customer_by_email("0ef557-2", access_token, email) if not customer_data['customers']: return response.json(10077) # 邮箱验证 else: reds = RedisObject() identifyingCode = reds.get_data(key=email + '_identifyingCode') # 判断验证码是否过期 if identifyingCode is False: return response.json(120) # 验证码是否正确 if authcode != identifyingCode: return response.json(121) # 注册 if Device_User.objects.filter(Q(username=email) | Q(userEmail=email)).exists(): return response.json(103) # 创建用户 password = make_password(password) new_userID = CommonService.getUserID(μs=False, setOTAID=True) user_data = { "username": email, "NickName": email, "userEmail": email, "password": password, "userID": new_userID, "is_active": True, "user_isValid": True, } Device_User.objects.create(**user_data) return response.json(0) def search_account(self, request_dict, response): email = request_dict.get("email") if not email: return response.json(444) store_configs = [ ("us", SHOPIFY_CONFIG["us_store_name"], SHOPIFY_CONFIG["us_token"]), ("eu", SHOPIFY_CONFIG["eu_store_name"], SHOPIFY_CONFIG["eu_token"]), ("de", SHOPIFY_CONFIG["de_store_name"], SHOPIFY_CONFIG["de_token"]), ("uk", SHOPIFY_CONFIG["uk_store_name"], SHOPIFY_CONFIG["uk_token"]), ("jp", SHOPIFY_CONFIG["jp_store_name"], SHOPIFY_CONFIG["jp_token"]), ] def search_customer(store_name, token): return ShopifyMultipass.search_customer_by_email(store_name, token, email) shopify_results = {} with concurrent.futures.ThreadPoolExecutor() as executor: future_to_country = {executor.submit(search_customer, store_name, token): country for country, store_name, token in store_configs} for future in concurrent.futures.as_completed(future_to_country): country = future_to_country[future] shopify_results[country] = future.result() shopify_country = next((country for country, result in shopify_results.items() if result["customers"]), "") account_country = self.call_search_customer(email) servers_country = "us" if account_country["us"] else "eu" if account_country["eu"] else "" if servers_country and shopify_country: status = 4 elif servers_country: status = 3 elif shopify_country: status = 2 else: status = 1 account_status = {"status": status, "shopifyCountry": shopify_country, "ServersCountry": servers_country} return response.json(0, account_status) @staticmethod def call_search_customer(email): urls = { "us": "https://www.dvema.com/shopify/searchCustomer", "eu": "https://api.zositeche.com/shopify/searchCustomer" } params = {"email": email} # Use the provided email parameter customer_region = {} def fetch_customer(region, url): try: response = requests.get(url=url, params=params) response.raise_for_status() # Raise an error for bad responses customer_country = response.json().get("result", None) return region, customer_country if not all(customer_country) else None except requests.RequestException: return region, None with concurrent.futures.ThreadPoolExecutor() as executor: future_to_region = {executor.submit(fetch_customer, region, url): region for region, url in urls.items()} for future in concurrent.futures.as_completed(future_to_region): region, customer_country = future.result() customer_region[region] = customer_country return customer_region @staticmethod def search_customer(request_dict, response): email = request_dict.get("email") user_qs = Device_User.objects.filter(Q(username=email) | Q(userEmail=email)) if not user_qs.exists(): return response.json(104) user_region_id = user_qs.values_list('region_country', flat=True).first() country_code = CountryModel.objects.filter(id=user_region_id).values_list("country_code", flat=True).first() return response.json(0, country_code)