123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507 |
- import re
- from datetime import datetime
- import concurrent.futures
- import pytz
- import requests
- from django.db.models import Q, F
- from django.views import View
- from Crypto.Cipher import AES
- from Crypto.Util.Padding import pad
- from django.contrib.auth.hashers import check_password, make_password
- import concurrent.futures
- from Controller.CheckUserData import DataValid
- from Model.models import Device_User, CountryModel, LanguageModel, CountryLanguageModel
- from Object.RedisObject import RedisObject
- from Object.ResponseObject import ResponseObject
- import base64
- import hmac
- import hashlib
- import os
- import json
- from Ansjer.config import SHOPIFY_CONFIG, CONFIG_INFO, CONFIG_EUR, CONFIG_US
- from Service.CommonService import CommonService
- class ShopifyMultipass:
- @staticmethod
- def generate_multipass_token(secret, customer_data):
- """
- 使用指定的密钥对加密并签名JSON数据,返回Base64编码的Multipass令牌
- """
- # 第一步:将客户数据转换为JSON格式
- json_data = json.dumps(customer_data)
- # 第二步:生成加密密钥和签名密钥
- hash_digest = hashlib.sha256(secret.encode()).digest()
- encryption_key = hash_digest[:16] # 128位加密密钥
- signature_key = hash_digest[16:32] # 128位签名密钥
- # 第三步:加密JSON数据
- iv = os.urandom(16) # 随机初始化向量
- cipher = AES.new(encryption_key, AES.MODE_CBC, iv)
- ciphertext = cipher.encrypt(pad(json_data.encode(), AES.block_size))
- # 第四步:签名加密数据
- data_to_sign = iv + ciphertext
- signature = hmac.new(signature_key, data_to_sign, hashlib.sha256).digest()
- # 第五步:Base64编码
- multipass_token = base64.urlsafe_b64encode(iv + ciphertext + signature).decode()
- return multipass_token
- @staticmethod
- def search_customer_by_email(store_name, access_token, email):
- # 设置请求URL
- url = f"https://{store_name}.myshopify.com/admin/api/2024-10/customers/search.json"
- params = {
- "query": f"email:{email}"
- }
- # 设置请求头
- headers = {
- "X-Shopify-Access-Token": access_token,
- }
- # 发送GET请求
- response = requests.get(url, headers=headers, params=params)
- # 返回响应的JSON数据
- return response.json()
- @staticmethod
- def get_timezone_by_country(iso2):
- timezone_map = {
- "us": "America/New_York", # 美国时区(例如:纽约)
- "jp": "Asia/Tokyo", # 日本时区
- "de": "Europe/Berlin", # 德国时区
- "uk": "Europe/London", # 英国时区
- "eu": "Europe/Paris", # 欧盟时区(默认设为法国巴黎)
- }
- # 获取当前时间并格式化时间戳
- now = datetime.now(pytz.timezone(timezone_map[iso2]))
- timestamp = now.strftime('%Y-%m-%dT%H:%M:%S%z')
- timestamp = timestamp[:-2] + ':' + timestamp[-2:]
- return timestamp
- class ShopifyView(View):
- def get(self, request, *args, **kwargs):
- request.encoding = 'utf-8'
- operation = kwargs.get('operation')
- request_dict = request.GET
- return self.validation(request, request_dict, operation)
- def post(self, request, *args, **kwargs):
- request.encoding = 'utf-8'
- operation = kwargs.get('operation')
- request_dict = request.POST
- return self.validation(request, request_dict, operation)
- def validation(self, request, request_dict, operation):
- language = request_dict.get('language', 'cn')
- response = ResponseObject(language, "pc")
- if operation == 'shopifyLogin':
- return self.shopify_login(request_dict, response)
- elif operation == 'shopifyRegister':
- return self.shopify_register(request_dict, response)
- # 查询FAPP注册账号情况
- elif operation == 'searchCustomer':
- return self.search_customer(request_dict, response)
- # 官网检测账号接口
- elif operation == 'searchAccount':
- return self.search_account(request_dict, response)
- elif operation == 'getCountryDomainList':
- return self.get_country_domain_list(request_dict, response)
- # 忘记密码
- elif operation == 'shopifyChangePassword':
- return self.shopify_change_password(request_dict, response)
- elif operation == 'verifyAuthcode':
- return self.verify_authcode(request_dict, response)
- else:
- return response.json(414)
- @staticmethod
- def shopify_login(request_dict, response):
- email = request_dict.get("email", None)
- password = request_dict.get("password", None)
- account_iso2 = request_dict.get("accountCountry", None)
- shopify_country = request_dict.get("shopifyCountry", "")
- if not all([email, password, account_iso2]):
- return response.json(444)
- user_qs = Device_User.objects.filter(Q(username=email) | Q(userEmail=email))
- if not user_qs.exists():
- return response.json(104)
- users = user_qs.values(
- 'role__rid', 'role__roleName', 'userID', 'NickName',
- 'username', 'userEmail', 'phone', 'password', 'userIconPath'
- )[0]
- if not check_password(password, users['password']):
- return response.json(111)
- # 根据条件选择配置键
- if shopify_country:
- timestamp = ShopifyMultipass.get_timezone_by_country(shopify_country)
- secret_key = f"{shopify_country}_multipass_secret"
- multipass_secret = SHOPIFY_CONFIG[secret_key]
- if shopify_country == "eu":
- customer_data = {
- "email": email,
- "created_at": timestamp,
- }
- token = ShopifyMultipass.generate_multipass_token(multipass_secret, customer_data)
- redirect_url = f"https://eu.zositech.com/account/login/multipass/{token}"
- elif shopify_country == "uk":
- customer_data = {
- "email": email,
- "created_at": timestamp,
- }
- token = ShopifyMultipass.generate_multipass_token(multipass_secret, customer_data)
- redirect_url = f"https://www.zositech.co.uk/account/login/multipass/{token}"
- elif shopify_country == "us":
- customer_data = {
- "email": email,
- "created_at": timestamp,
- }
- token = ShopifyMultipass.generate_multipass_token(multipass_secret, customer_data)
- redirect_url = f"https://www.zositech.com/account/login/multipass/{token}"
- else:
- customer_data = {
- "email": email,
- "created_at": timestamp,
- }
- token = ShopifyMultipass.generate_multipass_token(multipass_secret, customer_data)
- redirect_url = f"https://www.zositech.{shopify_country}/account/login/multipass/{token}"
- return response.json(0, redirect_url)
- elif account_iso2 in ["de", "jp"]:
- timestamp = ShopifyMultipass.get_timezone_by_country(account_iso2)
- secret_key = f"{account_iso2}_multipass_secret"
- multipass_secret = SHOPIFY_CONFIG[secret_key]
- customer_data = {
- "email": email,
- "created_at": timestamp,
- }
- token = ShopifyMultipass.generate_multipass_token(multipass_secret, customer_data)
- redirect_url = f"https://www.zositech.{account_iso2}/account/login/multipass/{token}"
- return response.json(0, redirect_url)
- elif account_iso2 == "uk":
- timestamp = ShopifyMultipass.get_timezone_by_country(account_iso2)
- secret_key = f"{account_iso2}_multipass_secret"
- multipass_secret = SHOPIFY_CONFIG[secret_key]
- customer_data = {
- "email": email,
- "created_at": timestamp,
- }
- token = ShopifyMultipass.generate_multipass_token(multipass_secret, customer_data)
- redirect_url = f"https://www.zositech.co.uk/account/login/multipass/{token}"
- return response.json(0, redirect_url)
- elif CONFIG_INFO == CONFIG_EUR:
- timestamp = ShopifyMultipass.get_timezone_by_country("eu")
- secret_key = "eu_multipass_secret"
- multipass_secret = SHOPIFY_CONFIG[secret_key]
- customer_data = {
- "email": email,
- "created_at": timestamp,
- }
- token = ShopifyMultipass.generate_multipass_token(multipass_secret, customer_data)
- redirect_url = f"https://eu.zositech.com/account/login/multipass/{token}"
- return response.json(0, redirect_url)
- elif CONFIG_INFO == CONFIG_US:
- timestamp = ShopifyMultipass.get_timezone_by_country("us")
- secret_key = "us_multipass_secret"
- multipass_secret = SHOPIFY_CONFIG[secret_key]
- customer_data = {
- "email": email,
- "created_at": timestamp,
- }
- token = ShopifyMultipass.generate_multipass_token(multipass_secret, customer_data)
- redirect_url = f"https://www.zositech.com/account/login/multipass/{token}"
- return response.json(0, redirect_url)
- else:
- return response.json(444)
- @staticmethod
- def shopify_register(request_dict, response):
- email = request_dict.get("email", None)
- country_code = request_dict.get("countryCode", None)
- password = request_dict.get("password", None)
- authcode = request_dict.get("authCode", None)
- if not all([email, password, authcode, country_code]):
- return response.json(444)
- data_valid = DataValid()
- if data_valid.email_validate(email) is not True:
- return response.json(105)
- re_flag = data_valid.password_validate(password)
- has_upper = bool(re.search(r"[A-Z]", password)) # 大写字母
- has_lower = bool(re.search(r"[a-z]", password)) # 小写字母
- has_digit = bool(re.search(r"[0-9]", password)) # 数字
- has_special = bool(re.search(r"[!@#$%^&*()_+\-=\[\]{}|;:'\",.<>?/]", password)) # 特殊字符
- # 至少包含任意两类字符
- categories = sum([has_upper, has_lower, has_digit, has_special])
- if re_flag is not True and categories > 2:
- return response.json(109)
- reds = RedisObject()
- identifyingCode = reds.get_data(key=email + '_identifyingCode')
- # 判断验证码是否过期
- if identifyingCode is False:
- return response.json(120)
- # 验证码是否正确
- if authcode != identifyingCode:
- return response.json(121)
- # 注册
- if Device_User.objects.filter(Q(username=email) | Q(userEmail=email)).exists():
- return response.json(103)
- # 创建用户
- password = make_password(password)
- new_userID = CommonService.getUserID(μs=False, setOTAID=True)
- country_code = country_code.upper()
- countr_qs = CountryModel.objects.filter(country_code=country_code).values("id")
- if countr_qs.exists():
- region_country = countr_qs[0]['id']
- else:
- return response.json(173)
- user_data = {
- "username": email,
- "NickName": email,
- "userEmail": email,
- "password": password,
- "userID": new_userID,
- "is_active": True,
- "user_isValid": True,
- "region_country": region_country,
- "region_status": 1
- }
- Device_User.objects.create(**user_data)
- reds.del_data(key=email + '_identifyingCode')
- return response.json(0)
- def search_account(self, request_dict, response):
- email = request_dict.get("email")
- if not email:
- return response.json(444)
- store_configs = [
- ("us", SHOPIFY_CONFIG["us_store_name"], SHOPIFY_CONFIG["us_token"]),
- ("eu", SHOPIFY_CONFIG["eu_store_name"], SHOPIFY_CONFIG["eu_token"]),
- ("de", SHOPIFY_CONFIG["de_store_name"], SHOPIFY_CONFIG["de_token"]),
- ("uk", SHOPIFY_CONFIG["uk_store_name"], SHOPIFY_CONFIG["uk_token"]),
- ("jp", SHOPIFY_CONFIG["jp_store_name"], SHOPIFY_CONFIG["jp_token"]),
- ]
- def search_customer(store_name, token):
- return ShopifyMultipass.search_customer_by_email(store_name, token, email)
- try:
- shopify_results = {}
- with concurrent.futures.ThreadPoolExecutor() as executor:
- future_to_country = {executor.submit(search_customer, store_name, token): country for
- country, store_name, token in store_configs}
- for future in concurrent.futures.as_completed(future_to_country):
- country = future_to_country[future]
- shopify_results[country] = future.result()
- shopify_country = next((country for country, result in shopify_results.items() if result["customers"]), "")
- account_country = self.call_search_customer(email)
- servers_continent = "us" if account_country["us"] else "eu" if account_country["eu"] else ""
- if servers_continent:
- account_region_list = []
- if account_country.get("us"):
- account_region_list.append({
- "url": "https://www.dvema.com/shopify/shopifyLogin",
- "accountCountry": account_country["us"].lower(),
- "shopifyCountry": shopify_country
- })
- if account_country.get("eu"):
- account_region_list.append({
- "url": "https://api.zositeche.com/shopify/shopifyLogin",
- "accountCountry": account_country["eu"].lower(),
- "shopifyCountry": shopify_country
- })
- return response.json(0, {"accountStatus": 3, "accountRegionList": account_region_list})
- elif shopify_country:
- if shopify_country == "eu":
- url = "https://eu.zositech.com/account/login"
- elif shopify_country == "jp":
- url = "https://www.zosi.jp/account/login"
- elif shopify_country == "de":
- url = "https://www.zositech.de/account/login"
- elif shopify_country == "uk":
- url = "https://www.zositech.co.uk/account/login"
- else:
- url = "https://www.zositech.com/account/login"
- return response.json(0, {"accountStatus": 2, "url": url})
- else:
- url = "注册链接"
- return response.json(0, {"accountStatus": 1, "url": url})
- except Exception as e:
- print(e)
- return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
- @staticmethod
- def call_search_customer(email):
- urls = {
- "us": "https://www.dvema.com/shopify/searchCustomer",
- "eu": "https://api.zositeche.com/shopify/searchCustomer"
- }
- params = {"email": email} # Use the provided email parameter
- customer_region = {}
- def fetch_customer(region, url):
- try:
- response = requests.get(url=url, params=params)
- response.raise_for_status() # Raise an error for bad responses
- customer_country = response.json()["data"]
- if customer_country == "":
- return region, None
- return region, customer_country
- except requests.RequestException:
- return region, None
- with concurrent.futures.ThreadPoolExecutor() as executor:
- future_to_region = {executor.submit(fetch_customer, region, url): region for region, url in urls.items()}
- for future in concurrent.futures.as_completed(future_to_region):
- region, customer_country = future.result()
- customer_region[region] = customer_country
- return customer_region
- @staticmethod
- def search_customer(request_dict, response):
- email = request_dict.get("email")
- user_qs = Device_User.objects.filter(Q(username=email) | Q(userEmail=email))
- if not user_qs.exists():
- return response.json(104)
- user_region_id = user_qs.values_list('region_country', flat=True).first()
- country_code = CountryModel.objects.filter(id=user_region_id).values_list("country_code", flat=True).first()
- if country_code:
- return response.json(0, country_code)
- elif CONFIG_INFO == CONFIG_EUR:
- return response.json(0, "uk")
- elif CONFIG_INFO == CONFIG_US:
- return response.json(0, "us")
- return response.json(173)
- @staticmethod
- def get_country_domain_list(request_dict, response):
- lang = request_dict.get('lang', 'en')
- time_stamp = request_dict.get('time_stamp', None)
- time_stamp_token = request_dict.get('time_stamp_token', None)
- if not all([time_stamp, time_stamp_token]):
- return response.json(444)
- try:
- # 时间戳token校验
- if not CommonService.check_time_stamp_token(time_stamp_token, time_stamp):
- return response.json(13)
- lang_qs = LanguageModel.objects.filter(lang=lang)
- language = lang_qs[0]
- country_qs = CountryLanguageModel.objects.filter(language_id=language.id)
- country_qs = country_qs.annotate(api=F('country__region__zosi_api'), country_code=F('country__country_code'))
- country_qs = country_qs.values('country_id', 'country_name', 'country_code', 'api').order_by('country_name')
- country_list = []
- for country in country_qs:
- country['api'] = country['api']
- country_list.append(country)
- return response.json(0, country_list)
- except Exception as e:
- print(e)
- return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
- @staticmethod
- def shopify_change_password(request_dict, response):
- email = request_dict.get("email", None)
- password = request_dict.get("password", None)
- authcode = request_dict.get("authCode", None)
- if not all([email, password, authcode]):
- return response.json(444)
- try:
- data_valid = DataValid()
- if data_valid.email_validate(email) is not True:
- return response.json(105)
- re_flag = data_valid.password_validate(password)
- has_upper = bool(re.search(r"[A-Z]", password)) # 大写字母
- has_lower = bool(re.search(r"[a-z]", password)) # 小写字母
- has_digit = bool(re.search(r"[0-9]", password)) # 数字
- has_special = bool(re.search(r"[!@#$%^&*()_+\-=\[\]{}|;:'\",.<>?/]", password)) # 特殊字符
- # 至少包含任意两类字符
- categories = sum([has_upper, has_lower, has_digit, has_special])
- if re_flag is not True and categories > 2:
- return response.json(109)
- reds = RedisObject()
- identifyingCode = reds.get_data(key=email + '_forgetPwdResetCode')
- # 判断验证码是否过期
- if identifyingCode is False:
- return response.json(120)
- # 验证码是否正确
- if authcode != identifyingCode:
- return response.json(121)
- user_qs = Device_User.objects.filter(Q(username=email) | Q(userEmail=email))
- if not user_qs.exists():
- return response.json(173)
- password = make_password(password)
- user_qs.update(password=password)
- reds.del_data(key=email + '_forgetPwdResetCode')
- return response.json(0)
- except Exception as e:
- return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
- @staticmethod
- def verify_authcode(request_dict, response):
- """
- 在修改密码的时候验证验证码
- """
- email = request_dict.get("email", None)
- authcode = request_dict.get("authCode", None)
- code_type = request_dict.get("codeType", None)
- if not all([email, authcode, code_type]):
- return response.json(444)
- try:
- code_type = int(code_type)
- if code_type == 1:
- reds_key = "_identifyingCode"
- user_qs = Device_User.objects.filter(Q(username=email) | Q(userEmail=email))
- if user_qs.exists():
- return response.json(174)
- elif code_type == 2:
- reds_key = "_forgetPwdResetCode"
- user_qs = Device_User.objects.filter(Q(username=email) | Q(userEmail=email))
- if not user_qs.exists():
- return response.json(173)
- else:
- return response.json(444)
- reds = RedisObject()
- identifyingCode = reds.get_data(key=email + reds_key)
- if identifyingCode is False:
- return response.json(120)
- # 验证码是否正确
- if authcode != identifyingCode:
- return response.json(121)
- return response.json(0)
- except Exception as e:
- return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
|