import re from datetime import datetime import concurrent.futures import pytz import requests from django.db.models import Q, F from django.views import View from Crypto.Cipher import AES from Crypto.Util.Padding import pad from django.contrib.auth.hashers import check_password, make_password import concurrent.futures from Controller.CheckUserData import DataValid from Model.models import Device_User, CountryModel, LanguageModel, CountryLanguageModel from Object.RedisObject import RedisObject from Object.ResponseObject import ResponseObject import base64 import hmac import hashlib import os import json from Ansjer.config import SHOPIFY_CONFIG, CONFIG_INFO, CONFIG_EUR, CONFIG_US from Service.CommonService import CommonService class ShopifyMultipass: @staticmethod def generate_multipass_token(secret, customer_data): """ 使用指定的密钥对加密并签名JSON数据,返回Base64编码的Multipass令牌 """ # 第一步:将客户数据转换为JSON格式 json_data = json.dumps(customer_data) # 第二步:生成加密密钥和签名密钥 hash_digest = hashlib.sha256(secret.encode()).digest() encryption_key = hash_digest[:16] # 128位加密密钥 signature_key = hash_digest[16:32] # 128位签名密钥 # 第三步:加密JSON数据 iv = os.urandom(16) # 随机初始化向量 cipher = AES.new(encryption_key, AES.MODE_CBC, iv) ciphertext = cipher.encrypt(pad(json_data.encode(), AES.block_size)) # 第四步:签名加密数据 data_to_sign = iv + ciphertext signature = hmac.new(signature_key, data_to_sign, hashlib.sha256).digest() # 第五步:Base64编码 multipass_token = base64.urlsafe_b64encode(iv + ciphertext + signature).decode() return multipass_token @staticmethod def search_customer_by_email(store_name, access_token, email): # 设置请求URL url = f"https://{store_name}.myshopify.com/admin/api/2024-10/customers/search.json" params = { "query": f"email:{email}" } # 设置请求头 headers = { "X-Shopify-Access-Token": access_token, } # 发送GET请求 response = requests.get(url, headers=headers, params=params) # 返回响应的JSON数据 return response.json() class ShopifyView(View): def get(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') request_dict = request.GET return self.validation(request, request_dict, operation) def post(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') request_dict = request.POST return self.validation(request, request_dict, operation) def validation(self, request, request_dict, operation): language = request_dict.get('language', 'cn') response = ResponseObject(language, "pc") if operation == 'shopifyLogin': return self.shopify_login(request_dict, response) elif operation == 'shopifyRegister': return self.shopify_register(request_dict, response) # 查询FAPP注册账号情况 elif operation == 'searchCustomer': return self.search_customer(request_dict, response) # 官网检测账号接口 elif operation == 'searchAccount': return self.search_account(request_dict, response) elif operation == 'getCountryDomainList': return self.get_country_domain_list(request_dict, response) # 忘记密码 elif operation == 'shopifyChangePassword': return self.shopify_change_password(request_dict, response) elif operation == 'verifyAuthcode': return self.verify_authcode(request_dict, response) else: return response.json(414) @staticmethod def shopify_login(request_dict, response): email = request_dict.get("email", None) password = request_dict.get("password", None) account_iso2 = request_dict.get("accountCountry", None) shopify_country = request_dict.get("shopifyCountry", "") if not all([email, password, account_iso2]): return response.json(444) user_qs = Device_User.objects.filter(Q(username=email) | Q(userEmail=email)) if not user_qs.exists(): return response.json(104) users = user_qs.values( 'role__rid', 'role__roleName', 'userID', 'NickName', 'username', 'userEmail', 'phone', 'password', 'userIconPath' )[0] if not check_password(password, users['password']): return response.json(111) # 获取当前时间并格式化时间戳 now = datetime.now(pytz.timezone('America/New_York')) timestamp = now.strftime('%Y-%m-%dT%H:%M:%S%z') timestamp = timestamp[:-2] + ':' + timestamp[-2:] customer_data = { "email": email, "created_at": timestamp, } # 根据条件选择配置键 if shopify_country: secret_key = f"{shopify_country}_multipass_secret" store_name_key = f"{shopify_country}_store_name" elif account_iso2 == "jp": secret_key = "jp_multipass_secret" store_name_key = "jp_store_name" elif account_iso2 == "de": secret_key = "de_multipass_secret" store_name_key = "de_store_name" elif account_iso2 == "uk": secret_key = "uk_multipass_secret" store_name_key = "uk_store_name" elif CONFIG_INFO == CONFIG_EUR: secret_key = "eu_multipass_secret" store_name_key = "eu_store_name" elif CONFIG_INFO == CONFIG_US: secret_key = "us_multipass_secret" multipass_secret = SHOPIFY_CONFIG[secret_key] token = ShopifyMultipass.generate_multipass_token(multipass_secret, customer_data) redirect_url = f"https://www.zositech.com/account/login/multipass/{token}" return response.json(0, redirect_url) else: return response.json(444) # 获取配置并生成重定向URL multipass_secret = SHOPIFY_CONFIG[secret_key] store_name = SHOPIFY_CONFIG[store_name_key] token = ShopifyMultipass.generate_multipass_token(multipass_secret, customer_data) redirect_url = f"https://{store_name}.zositech.com/account/login/multipass/{token}" return response.json(0, redirect_url) @staticmethod def shopify_register(request_dict, response): email = request_dict.get("email", None) password = request_dict.get("password", None) authcode = request_dict.get("authCode", None) if not all([email, password, authcode]): return response.json(444) data_valid = DataValid() if data_valid.email_validate(email) is not True: return response.json(105) re_flag = data_valid.password_validate(password) has_upper = bool(re.search(r"[A-Z]", password)) # 大写字母 has_lower = bool(re.search(r"[a-z]", password)) # 小写字母 has_digit = bool(re.search(r"[0-9]", password)) # 数字 has_special = bool(re.search(r"[!@#$%^&*()_+\-=\[\]{}|;:'\",.<>?/]", password)) # 特殊字符 # 至少包含任意两类字符 categories = sum([has_upper, has_lower, has_digit, has_special]) if re_flag is not True and categories > 2: return response.json(109) reds = RedisObject() identifyingCode = reds.get_data(key=email + '_identifyingCode') # 判断验证码是否过期 if identifyingCode is False: return response.json(120) # 验证码是否正确 if authcode != identifyingCode: return response.json(121) # 注册 if Device_User.objects.filter(Q(username=email) | Q(userEmail=email)).exists(): return response.json(103) # 创建用户 password = make_password(password) new_userID = CommonService.getUserID(μs=False, setOTAID=True) user_data = { "username": email, "NickName": email, "userEmail": email, "password": password, "userID": new_userID, "is_active": True, "user_isValid": True, } Device_User.objects.create(**user_data) reds.del_data(key=email + '_identifyingCode') return response.json(0) def search_account(self, request_dict, response): email = request_dict.get("email") if not email: return response.json(444) store_configs = [ ("us", SHOPIFY_CONFIG["us_store_name"], SHOPIFY_CONFIG["us_token"]), ("eu", SHOPIFY_CONFIG["eu_store_name"], SHOPIFY_CONFIG["eu_token"]), ("de", SHOPIFY_CONFIG["de_store_name"], SHOPIFY_CONFIG["de_token"]), ("uk", SHOPIFY_CONFIG["uk_store_name"], SHOPIFY_CONFIG["uk_token"]), ("jp", SHOPIFY_CONFIG["jp_store_name"], SHOPIFY_CONFIG["jp_token"]), ] def search_customer(store_name, token): return ShopifyMultipass.search_customer_by_email(store_name, token, email) try: shopify_results = {} with concurrent.futures.ThreadPoolExecutor() as executor: future_to_country = {executor.submit(search_customer, store_name, token): country for country, store_name, token in store_configs} for future in concurrent.futures.as_completed(future_to_country): country = future_to_country[future] shopify_results[country] = future.result() shopify_country = next((country for country, result in shopify_results.items() if result["customers"]), "") account_country = self.call_search_customer(email) servers_continent = "us" if account_country["us"] else "eu" if account_country["eu"] else "" if servers_continent: account_region_list = [] if account_country.get("us"): account_region_list.append({ "url": "https://www.dvema.com/shopify/shopifyLogin", "accountCountry": account_country["us"].lower(), "shopifyCountry": shopify_country }) if account_country.get("eu"): account_region_list.append({ "url": "https://api.zositeche.com/shopify/shopifyLogin", "accountCountry": account_country["eu"].lower(), "shopifyCountry": shopify_country }) return response.json(0, {"accountStatus": 3, "accountRegionList": account_region_list}) elif shopify_country: if shopify_country == "eu": url = "https://eu.zositech.com/account/login" elif shopify_country == "jp": url = "https://www.zosi.jp/account/login" elif shopify_country == "de": url = "https://www.zositech.de/account/login" elif shopify_country == "uk": url = "https://www.zositech.co.uk/account/login" else: url = "https://www.zositech.com/account/login" return response.json(0, {"accountStatus": 2, "url": url}) else: url = "注册链接" return response.json(0, {"accountStatus": 1, "url": url}) except Exception as e: print(e) return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) @staticmethod def call_search_customer(email): urls = { "us": "https://www.dvema.com/shopify/searchCustomer", "eu": "https://api.zositeche.com/shopify/searchCustomer" } params = {"email": email} # Use the provided email parameter customer_region = {} def fetch_customer(region, url): try: response = requests.get(url=url, params=params) response.raise_for_status() # Raise an error for bad responses customer_country = response.json()["data"] if customer_country == "": return region, None return region, customer_country except requests.RequestException: return region, None with concurrent.futures.ThreadPoolExecutor() as executor: future_to_region = {executor.submit(fetch_customer, region, url): region for region, url in urls.items()} for future in concurrent.futures.as_completed(future_to_region): region, customer_country = future.result() customer_region[region] = customer_country return customer_region @staticmethod def search_customer(request_dict, response): email = request_dict.get("email") user_qs = Device_User.objects.filter(Q(username=email) | Q(userEmail=email)) if not user_qs.exists(): return response.json(104) user_region_id = user_qs.values_list('region_country', flat=True).first() country_code = CountryModel.objects.filter(id=user_region_id).values_list("country_code", flat=True).first() return response.json(0, country_code) @staticmethod def get_country_domain_list(request_dict, response): lang = request_dict.get('lang', 'en') time_stamp = request_dict.get('time_stamp', None) time_stamp_token = request_dict.get('time_stamp_token', None) if not all([time_stamp, time_stamp_token]): return response.json(444) try: # 时间戳token校验 if not CommonService.check_time_stamp_token(time_stamp_token, time_stamp): return response.json(13) lang_qs = LanguageModel.objects.filter(lang=lang) language = lang_qs[0] country_qs = CountryLanguageModel.objects.filter(language_id=language.id) country_qs = country_qs.annotate(api=F('country__region__zosi_api')) country_qs = country_qs.values('country_id', 'country_name', 'api').order_by('country_id') country_list = [] for country in country_qs: country['api'] = country['api'] + 'shopify/shopifyRegister' country_list.append(country) return response.json(0, country_list) except Exception as e: print(e) return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) @staticmethod def shopify_change_password(request_dict, response): email = request_dict.get("email", None) password = request_dict.get("password", None) authcode = request_dict.get("authCode", None) if not all([email, password, authcode]): return response.json(444) try: data_valid = DataValid() if data_valid.email_validate(email) is not True: return response.json(105) re_flag = data_valid.password_validate(password) has_upper = bool(re.search(r"[A-Z]", password)) # 大写字母 has_lower = bool(re.search(r"[a-z]", password)) # 小写字母 has_digit = bool(re.search(r"[0-9]", password)) # 数字 has_special = bool(re.search(r"[!@#$%^&*()_+\-=\[\]{}|;:'\",.<>?/]", password)) # 特殊字符 # 至少包含任意两类字符 categories = sum([has_upper, has_lower, has_digit, has_special]) if re_flag is not True and categories > 2: return response.json(109) reds = RedisObject() identifyingCode = reds.get_data(key=email + '_forgetPwdResetCode') # 判断验证码是否过期 if identifyingCode is False: return response.json(120) # 验证码是否正确 if authcode != identifyingCode: return response.json(121) user_qs = Device_User.objects.filter(Q(username=email) | Q(userEmail=email)) if not user_qs.exists(): return response.json(173) password = make_password(password) user_qs.update(password=password) reds.del_data(key=email + '_forgetPwdResetCode') return response.json(0) except Exception as e: return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) @staticmethod def verify_authcode(request_dict, response): """ 验证验证码 """ email = request_dict.get("email", None) authcode = request_dict.get("authCode", None) code_type = request_dict.get("codeType", None) if not all([email, authcode, code_type]): return response.json(444) try: code_type = int(code_type) if code_type == 1: reds_key = "_identifyingCode" user_qs = Device_User.objects.filter(Q(username=email) | Q(userEmail=email)) if user_qs.exists(): return response.json(174) elif code_type == 2: reds_key = "_forgetPwdResetCode" user_qs = Device_User.objects.filter(Q(username=email) | Q(userEmail=email)) if not user_qs.exists(): return response.json(173) else: return response.json(444) reds = RedisObject() identifyingCode = reds.get_data(key=email + reds_key) if identifyingCode is False: return response.json(120) # 验证码是否正确 if authcode != identifyingCode: return response.json(121) return response.json(0) except Exception as e: return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))