|
@@ -0,0 +1,265 @@
|
|
|
+from datetime import datetime
|
|
|
+import concurrent.futures
|
|
|
+import pytz
|
|
|
+import requests
|
|
|
+from django.db.models import Q
|
|
|
+from django.views import View
|
|
|
+from Crypto.Cipher import AES
|
|
|
+from Crypto.Util.Padding import pad
|
|
|
+from django.contrib.auth.hashers import check_password, make_password
|
|
|
+import concurrent.futures
|
|
|
+from Controller.CheckUserData import DataValid
|
|
|
+from Model.models import Device_User, CountryModel
|
|
|
+from Object.RedisObject import RedisObject
|
|
|
+from Object.ResponseObject import ResponseObject
|
|
|
+import base64
|
|
|
+import hmac
|
|
|
+import hashlib
|
|
|
+import os
|
|
|
+import json
|
|
|
+from Ansjer.config import SHOPIFY_CONFIG
|
|
|
+
|
|
|
+from Service.CommonService import CommonService
|
|
|
+
|
|
|
+
|
|
|
+class ShopifyMultipass:
|
|
|
+ @staticmethod
|
|
|
+ def generate_multipass_token(secret, customer_data):
|
|
|
+ """
|
|
|
+ 使用指定的密钥对加密并签名JSON数据,返回Base64编码的Multipass令牌
|
|
|
+ """
|
|
|
+ # 第一步:将客户数据转换为JSON格式
|
|
|
+ json_data = json.dumps(customer_data)
|
|
|
+
|
|
|
+ # 第二步:生成加密密钥和签名密钥
|
|
|
+ hash_digest = hashlib.sha256(secret.encode()).digest()
|
|
|
+ encryption_key = hash_digest[:16] # 128位加密密钥
|
|
|
+ signature_key = hash_digest[16:32] # 128位签名密钥
|
|
|
+
|
|
|
+ # 第三步:加密JSON数据
|
|
|
+ iv = os.urandom(16) # 随机初始化向量
|
|
|
+ cipher = AES.new(encryption_key, AES.MODE_CBC, iv)
|
|
|
+ ciphertext = cipher.encrypt(pad(json_data.encode(), AES.block_size))
|
|
|
+
|
|
|
+ # 第四步:签名加密数据
|
|
|
+ data_to_sign = iv + ciphertext
|
|
|
+ signature = hmac.new(signature_key, data_to_sign, hashlib.sha256).digest()
|
|
|
+
|
|
|
+ # 第五步:Base64编码
|
|
|
+ multipass_token = base64.urlsafe_b64encode(iv + ciphertext + signature).decode()
|
|
|
+
|
|
|
+ return multipass_token
|
|
|
+
|
|
|
+ @staticmethod
|
|
|
+ def search_customer_by_email(store_name, access_token, email):
|
|
|
+ # 设置请求URL
|
|
|
+ url = f"https://{store_name}.myshopify.com/admin/api/2024-10/customers/search.json"
|
|
|
+ params = {
|
|
|
+ "query": f"email:{email}"
|
|
|
+ }
|
|
|
+
|
|
|
+ # 设置请求头
|
|
|
+ headers = {
|
|
|
+ "X-Shopify-Access-Token": access_token,
|
|
|
+ }
|
|
|
+
|
|
|
+ # 发送GET请求
|
|
|
+ response = requests.get(url, headers=headers, params=params)
|
|
|
+
|
|
|
+ # 返回响应的JSON数据
|
|
|
+ return response.json()
|
|
|
+
|
|
|
+
|
|
|
+class ShopifyView(View):
|
|
|
+ def get(self, request, *args, **kwargs):
|
|
|
+ request.encoding = 'utf-8'
|
|
|
+ operation = kwargs.get('operation')
|
|
|
+ request_dict = request.GET
|
|
|
+ return self.validation(request, request_dict, operation)
|
|
|
+
|
|
|
+ def post(self, request, *args, **kwargs):
|
|
|
+ request.encoding = 'utf-8'
|
|
|
+ operation = kwargs.get('operation')
|
|
|
+ request_dict = request.POST
|
|
|
+ return self.validation(request, request_dict, operation)
|
|
|
+
|
|
|
+ def validation(self, request, request_dict, operation):
|
|
|
+ language = request_dict.get('language', 'cn')
|
|
|
+ response = ResponseObject(language)
|
|
|
+ if operation == 'shopifyLogin': # APP查詢定制客户信息
|
|
|
+ return self.shopify_login(request_dict, response)
|
|
|
+ elif operation == 'shopifyRegister': # APP注册定制客户信息
|
|
|
+ return self.shopify_register(request_dict, response)
|
|
|
+ elif operation == 'searchCustomer':
|
|
|
+ return self.search_customer(request_dict, response)
|
|
|
+ elif operation == 'searchAccount':
|
|
|
+ return self.search_account(request_dict, response)
|
|
|
+ else:
|
|
|
+ return response.json(414)
|
|
|
+
|
|
|
+ @staticmethod
|
|
|
+ def shopify_login(request_dict, response):
|
|
|
+ email = request_dict.get("email", None)
|
|
|
+ password = request_dict.get("password", None)
|
|
|
+
|
|
|
+ if not all([email, password]):
|
|
|
+ return response.json(444)
|
|
|
+
|
|
|
+ user_qs = Device_User.objects.filter(Q(username=email) | Q(userEmail=email))
|
|
|
+ if not user_qs.exists():
|
|
|
+ return response.json(104)
|
|
|
+ users = user_qs.values('role__rid', 'role__roleName', 'userID', 'NickName', 'username', 'userEmail',
|
|
|
+ 'phone', 'password', 'userIconPath')[0]
|
|
|
+
|
|
|
+ check_flag = check_password(password, users['password'])
|
|
|
+ if not check_flag:
|
|
|
+ return response.json(111)
|
|
|
+
|
|
|
+ # 获取当前时间
|
|
|
+ now = datetime.now(pytz.timezone('America/New_York')) # 你可以根据需要更改时区
|
|
|
+
|
|
|
+ # 格式化时间戳
|
|
|
+ timestamp = now.strftime('%Y-%m-%dT%H:%M:%S%z')
|
|
|
+ # 添加冒号到时区部分
|
|
|
+ timestamp = timestamp[:-2] + ':' + timestamp[-2:]
|
|
|
+
|
|
|
+ customer_data = {
|
|
|
+ "email": email,
|
|
|
+ "created_at": timestamp,
|
|
|
+ }
|
|
|
+ multipass_secret = SHOPIFY_CONFIG["eu_multipass_secret"] # multipass密钥
|
|
|
+ token = ShopifyMultipass.generate_multipass_token(multipass_secret, customer_data)
|
|
|
+
|
|
|
+ # 构造重定向URL
|
|
|
+ redirect_url = f"https://eu.zositech.com/account/login/multipass/{token}"
|
|
|
+
|
|
|
+ return response.json(0, redirect_url)
|
|
|
+
|
|
|
+ @staticmethod
|
|
|
+ def shopify_register(request_dict, response):
|
|
|
+ email = request_dict.get("email", None)
|
|
|
+ password = request_dict.get("password", None)
|
|
|
+ authcode = request_dict.get("authCode", None)
|
|
|
+
|
|
|
+ if not all([email, password]):
|
|
|
+ return response.json(444)
|
|
|
+
|
|
|
+ if authcode is None:
|
|
|
+ # 查询是否在shopify有账号
|
|
|
+ access_token = SHOPIFY_CONFIG["eu_token"]
|
|
|
+ customer_data = ShopifyMultipass.search_customer_by_email("0ef557-2", access_token, email)
|
|
|
+ if not customer_data['customers']:
|
|
|
+ return response.json(10077)
|
|
|
+
|
|
|
+ # 邮箱验证
|
|
|
+ else:
|
|
|
+ reds = RedisObject()
|
|
|
+ identifyingCode = reds.get_data(key=email + '_identifyingCode')
|
|
|
+ # 判断验证码是否过期
|
|
|
+ if identifyingCode is False:
|
|
|
+ return response.json(120)
|
|
|
+ # 验证码是否正确
|
|
|
+ if authcode != identifyingCode:
|
|
|
+ return response.json(121)
|
|
|
+
|
|
|
+ # 注册
|
|
|
+ if Device_User.objects.filter(Q(username=email) | Q(userEmail=email)).exists():
|
|
|
+ return response.json(103)
|
|
|
+
|
|
|
+ # 创建用户
|
|
|
+ password = make_password(password)
|
|
|
+ new_userID = CommonService.getUserID(μs=False, setOTAID=True)
|
|
|
+ user_data = {
|
|
|
+ "username": email,
|
|
|
+ "NickName": email,
|
|
|
+ "userEmail": email,
|
|
|
+ "password": password,
|
|
|
+ "userID": new_userID,
|
|
|
+ "is_active": True,
|
|
|
+ "user_isValid": True,
|
|
|
+ }
|
|
|
+ Device_User.objects.create(**user_data)
|
|
|
+
|
|
|
+ return response.json(0)
|
|
|
+
|
|
|
+ def search_account(self, request_dict, response):
|
|
|
+ email = request_dict.get("email")
|
|
|
+ if not email:
|
|
|
+ return None
|
|
|
+
|
|
|
+ store_configs = [
|
|
|
+ ("us", SHOPIFY_CONFIG["us_store_name"], SHOPIFY_CONFIG["us_token"]),
|
|
|
+ ("eu", SHOPIFY_CONFIG["eu_store_name"], SHOPIFY_CONFIG["eu_token"]),
|
|
|
+ ("de", SHOPIFY_CONFIG["de_store_name"], SHOPIFY_CONFIG["de_token"]),
|
|
|
+ ("uk", SHOPIFY_CONFIG["uk_store_name"], SHOPIFY_CONFIG["uk_token"]),
|
|
|
+ ("jp", SHOPIFY_CONFIG["jp_store_name"], SHOPIFY_CONFIG["jp_token"]),
|
|
|
+ ]
|
|
|
+
|
|
|
+ def search_customer(store_name, token):
|
|
|
+ return ShopifyMultipass.search_customer_by_email(store_name, token, email)
|
|
|
+
|
|
|
+ shopify_results = {}
|
|
|
+
|
|
|
+ with concurrent.futures.ThreadPoolExecutor() as executor:
|
|
|
+ future_to_country = {executor.submit(search_customer, store_name, token): country for
|
|
|
+ country, store_name, token in store_configs}
|
|
|
+ for future in concurrent.futures.as_completed(future_to_country):
|
|
|
+ country = future_to_country[future]
|
|
|
+ shopify_results[country] = future.result()
|
|
|
+
|
|
|
+ shopify_country = next((country for country, result in shopify_results.items() if result["customers"]), "")
|
|
|
+
|
|
|
+ account_country = self.call_search_customer(email)
|
|
|
+ servers_country = "us" if account_country["us"] else "eu" if account_country["eu"] else ""
|
|
|
+
|
|
|
+ if servers_country and shopify_country:
|
|
|
+ status = 4
|
|
|
+ elif servers_country:
|
|
|
+ status = 3
|
|
|
+ elif shopify_country:
|
|
|
+ status = 2
|
|
|
+ else:
|
|
|
+ status = 1
|
|
|
+
|
|
|
+ account_status = {"status": status, "shopifyCountry": shopify_country, "ServersCountry": servers_country}
|
|
|
+ return response.json(0, account_status)
|
|
|
+
|
|
|
+ @staticmethod
|
|
|
+ def call_search_customer(email):
|
|
|
+ urls = {
|
|
|
+ "us": "https://www.dvema.com/shopify/searchCustomer",
|
|
|
+ "eu": "https://api.zositeche.com/shopify/searchCustomer"
|
|
|
+ }
|
|
|
+ params = {"email": email} # Use the provided email parameter
|
|
|
+
|
|
|
+ customer_region = {}
|
|
|
+
|
|
|
+ def fetch_customer(region, url):
|
|
|
+ try:
|
|
|
+ response = requests.get(url=url, params=params)
|
|
|
+ response.raise_for_status() # Raise an error for bad responses
|
|
|
+ customer_country = response.json().get("result", None)
|
|
|
+ return region, customer_country if not all(customer_country) else None
|
|
|
+ except requests.RequestException:
|
|
|
+ return region, None
|
|
|
+
|
|
|
+ with concurrent.futures.ThreadPoolExecutor() as executor:
|
|
|
+ future_to_region = {executor.submit(fetch_customer, region, url): region for region, url in urls.items()}
|
|
|
+ for future in concurrent.futures.as_completed(future_to_region):
|
|
|
+ region, customer_country = future.result()
|
|
|
+ customer_region[region] = customer_country
|
|
|
+
|
|
|
+ return customer_region
|
|
|
+
|
|
|
+ @staticmethod
|
|
|
+ def search_customer(request_dict, response):
|
|
|
+ email = request_dict.get("email")
|
|
|
+ user_qs = Device_User.objects.filter(Q(username=email) | Q(userEmail=email))
|
|
|
+
|
|
|
+ if not user_qs.exists():
|
|
|
+ return response.json(104)
|
|
|
+
|
|
|
+ user_region_id = user_qs.values_list('region_country', flat=True).first()
|
|
|
+ country_code = CountryModel.objects.filter(id=user_region_id).values_list("country_code", flat=True).first()
|
|
|
+
|
|
|
+ return response.json(0, country_code)
|