ShopifyController.py 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265
  1. from datetime import datetime
  2. import concurrent.futures
  3. import pytz
  4. import requests
  5. from django.db.models import Q
  6. from django.views import View
  7. from Crypto.Cipher import AES
  8. from Crypto.Util.Padding import pad
  9. from django.contrib.auth.hashers import check_password, make_password
  10. import concurrent.futures
  11. from Controller.CheckUserData import DataValid
  12. from Model.models import Device_User, CountryModel
  13. from Object.RedisObject import RedisObject
  14. from Object.ResponseObject import ResponseObject
  15. import base64
  16. import hmac
  17. import hashlib
  18. import os
  19. import json
  20. from Ansjer.config import SHOPIFY_CONFIG
  21. from Service.CommonService import CommonService
  22. class ShopifyMultipass:
  23. @staticmethod
  24. def generate_multipass_token(secret, customer_data):
  25. """
  26. 使用指定的密钥对加密并签名JSON数据,返回Base64编码的Multipass令牌
  27. """
  28. # 第一步:将客户数据转换为JSON格式
  29. json_data = json.dumps(customer_data)
  30. # 第二步:生成加密密钥和签名密钥
  31. hash_digest = hashlib.sha256(secret.encode()).digest()
  32. encryption_key = hash_digest[:16] # 128位加密密钥
  33. signature_key = hash_digest[16:32] # 128位签名密钥
  34. # 第三步:加密JSON数据
  35. iv = os.urandom(16) # 随机初始化向量
  36. cipher = AES.new(encryption_key, AES.MODE_CBC, iv)
  37. ciphertext = cipher.encrypt(pad(json_data.encode(), AES.block_size))
  38. # 第四步:签名加密数据
  39. data_to_sign = iv + ciphertext
  40. signature = hmac.new(signature_key, data_to_sign, hashlib.sha256).digest()
  41. # 第五步:Base64编码
  42. multipass_token = base64.urlsafe_b64encode(iv + ciphertext + signature).decode()
  43. return multipass_token
  44. @staticmethod
  45. def search_customer_by_email(store_name, access_token, email):
  46. # 设置请求URL
  47. url = f"https://{store_name}.myshopify.com/admin/api/2024-10/customers/search.json"
  48. params = {
  49. "query": f"email:{email}"
  50. }
  51. # 设置请求头
  52. headers = {
  53. "X-Shopify-Access-Token": access_token,
  54. }
  55. # 发送GET请求
  56. response = requests.get(url, headers=headers, params=params)
  57. # 返回响应的JSON数据
  58. return response.json()
  59. class ShopifyView(View):
  60. def get(self, request, *args, **kwargs):
  61. request.encoding = 'utf-8'
  62. operation = kwargs.get('operation')
  63. request_dict = request.GET
  64. return self.validation(request, request_dict, operation)
  65. def post(self, request, *args, **kwargs):
  66. request.encoding = 'utf-8'
  67. operation = kwargs.get('operation')
  68. request_dict = request.POST
  69. return self.validation(request, request_dict, operation)
  70. def validation(self, request, request_dict, operation):
  71. language = request_dict.get('language', 'cn')
  72. response = ResponseObject(language)
  73. if operation == 'shopifyLogin': # APP查詢定制客户信息
  74. return self.shopify_login(request_dict, response)
  75. elif operation == 'shopifyRegister': # APP注册定制客户信息
  76. return self.shopify_register(request_dict, response)
  77. elif operation == 'searchCustomer':
  78. return self.search_customer(request_dict, response)
  79. elif operation == 'searchAccount':
  80. return self.search_account(request_dict, response)
  81. else:
  82. return response.json(414)
  83. @staticmethod
  84. def shopify_login(request_dict, response):
  85. email = request_dict.get("email", None)
  86. password = request_dict.get("password", None)
  87. if not all([email, password]):
  88. return response.json(444)
  89. user_qs = Device_User.objects.filter(Q(username=email) | Q(userEmail=email))
  90. if not user_qs.exists():
  91. return response.json(104)
  92. users = user_qs.values('role__rid', 'role__roleName', 'userID', 'NickName', 'username', 'userEmail',
  93. 'phone', 'password', 'userIconPath')[0]
  94. check_flag = check_password(password, users['password'])
  95. if not check_flag:
  96. return response.json(111)
  97. # 获取当前时间
  98. now = datetime.now(pytz.timezone('America/New_York')) # 你可以根据需要更改时区
  99. # 格式化时间戳
  100. timestamp = now.strftime('%Y-%m-%dT%H:%M:%S%z')
  101. # 添加冒号到时区部分
  102. timestamp = timestamp[:-2] + ':' + timestamp[-2:]
  103. customer_data = {
  104. "email": email,
  105. "created_at": timestamp,
  106. }
  107. multipass_secret = SHOPIFY_CONFIG["eu_multipass_secret"] # multipass密钥
  108. token = ShopifyMultipass.generate_multipass_token(multipass_secret, customer_data)
  109. # 构造重定向URL
  110. redirect_url = f"https://eu.zositech.com/account/login/multipass/{token}"
  111. return response.json(0, redirect_url)
  112. @staticmethod
  113. def shopify_register(request_dict, response):
  114. email = request_dict.get("email", None)
  115. password = request_dict.get("password", None)
  116. authcode = request_dict.get("authCode", None)
  117. if not all([email, password]):
  118. return response.json(444)
  119. if authcode is None:
  120. # 查询是否在shopify有账号
  121. access_token = SHOPIFY_CONFIG["eu_token"]
  122. customer_data = ShopifyMultipass.search_customer_by_email("0ef557-2", access_token, email)
  123. if not customer_data['customers']:
  124. return response.json(10077)
  125. # 邮箱验证
  126. else:
  127. reds = RedisObject()
  128. identifyingCode = reds.get_data(key=email + '_identifyingCode')
  129. # 判断验证码是否过期
  130. if identifyingCode is False:
  131. return response.json(120)
  132. # 验证码是否正确
  133. if authcode != identifyingCode:
  134. return response.json(121)
  135. # 注册
  136. if Device_User.objects.filter(Q(username=email) | Q(userEmail=email)).exists():
  137. return response.json(103)
  138. # 创建用户
  139. password = make_password(password)
  140. new_userID = CommonService.getUserID(μs=False, setOTAID=True)
  141. user_data = {
  142. "username": email,
  143. "NickName": email,
  144. "userEmail": email,
  145. "password": password,
  146. "userID": new_userID,
  147. "is_active": True,
  148. "user_isValid": True,
  149. }
  150. Device_User.objects.create(**user_data)
  151. return response.json(0)
  152. def search_account(self, request_dict, response):
  153. email = request_dict.get("email")
  154. if not email:
  155. return None
  156. store_configs = [
  157. ("us", SHOPIFY_CONFIG["us_store_name"], SHOPIFY_CONFIG["us_token"]),
  158. ("eu", SHOPIFY_CONFIG["eu_store_name"], SHOPIFY_CONFIG["eu_token"]),
  159. ("de", SHOPIFY_CONFIG["de_store_name"], SHOPIFY_CONFIG["de_token"]),
  160. ("uk", SHOPIFY_CONFIG["uk_store_name"], SHOPIFY_CONFIG["uk_token"]),
  161. ("jp", SHOPIFY_CONFIG["jp_store_name"], SHOPIFY_CONFIG["jp_token"]),
  162. ]
  163. def search_customer(store_name, token):
  164. return ShopifyMultipass.search_customer_by_email(store_name, token, email)
  165. shopify_results = {}
  166. with concurrent.futures.ThreadPoolExecutor() as executor:
  167. future_to_country = {executor.submit(search_customer, store_name, token): country for
  168. country, store_name, token in store_configs}
  169. for future in concurrent.futures.as_completed(future_to_country):
  170. country = future_to_country[future]
  171. shopify_results[country] = future.result()
  172. shopify_country = next((country for country, result in shopify_results.items() if result["customers"]), "")
  173. account_country = self.call_search_customer(email)
  174. servers_country = "us" if account_country["us"] else "eu" if account_country["eu"] else ""
  175. if servers_country and shopify_country:
  176. status = 4
  177. elif servers_country:
  178. status = 3
  179. elif shopify_country:
  180. status = 2
  181. else:
  182. status = 1
  183. account_status = {"status": status, "shopifyCountry": shopify_country, "ServersCountry": servers_country}
  184. return response.json(0, account_status)
  185. @staticmethod
  186. def call_search_customer(email):
  187. urls = {
  188. "us": "https://www.dvema.com/shopify/searchCustomer",
  189. "eu": "https://api.zositeche.com/shopify/searchCustomer"
  190. }
  191. params = {"email": email} # Use the provided email parameter
  192. customer_region = {}
  193. def fetch_customer(region, url):
  194. try:
  195. response = requests.get(url=url, params=params)
  196. response.raise_for_status() # Raise an error for bad responses
  197. customer_country = response.json().get("result", None)
  198. return region, customer_country if not all(customer_country) else None
  199. except requests.RequestException:
  200. return region, None
  201. with concurrent.futures.ThreadPoolExecutor() as executor:
  202. future_to_region = {executor.submit(fetch_customer, region, url): region for region, url in urls.items()}
  203. for future in concurrent.futures.as_completed(future_to_region):
  204. region, customer_country = future.result()
  205. customer_region[region] = customer_country
  206. return customer_region
  207. @staticmethod
  208. def search_customer(request_dict, response):
  209. email = request_dict.get("email")
  210. user_qs = Device_User.objects.filter(Q(username=email) | Q(userEmail=email))
  211. if not user_qs.exists():
  212. return response.json(104)
  213. user_region_id = user_qs.values_list('region_country', flat=True).first()
  214. country_code = CountryModel.objects.filter(id=user_region_id).values_list("country_code", flat=True).first()
  215. return response.json(0, country_code)