ShopifyController.py 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181
  1. from datetime import datetime
  2. import pytz
  3. import requests
  4. from django.db.models import Q
  5. from django.views import View
  6. from Crypto.Cipher import AES
  7. from Crypto.Util.Padding import pad
  8. from django.contrib.auth.hashers import check_password, make_password
  9. from Controller.CheckUserData import DataValid
  10. from Model.models import Device_User
  11. from Object.RedisObject import RedisObject
  12. from Object.ResponseObject import ResponseObject
  13. import base64
  14. import hmac
  15. import hashlib
  16. import os
  17. import json
  18. from Ansjer.config import SHOPIFY_CONFIG
  19. from Service.CommonService import CommonService
  20. class ShopifyMultipass:
  21. @staticmethod
  22. def generate_multipass_token(secret, customer_data):
  23. """
  24. 使用指定的密钥对加密并签名JSON数据,返回Base64编码的Multipass令牌
  25. """
  26. # 第一步:将客户数据转换为JSON格式
  27. json_data = json.dumps(customer_data)
  28. # 第二步:生成加密密钥和签名密钥
  29. hash_digest = hashlib.sha256(secret.encode()).digest()
  30. encryption_key = hash_digest[:16] # 128位加密密钥
  31. signature_key = hash_digest[16:32] # 128位签名密钥
  32. # 第三步:加密JSON数据
  33. iv = os.urandom(16) # 随机初始化向量
  34. cipher = AES.new(encryption_key, AES.MODE_CBC, iv)
  35. ciphertext = cipher.encrypt(pad(json_data.encode(), AES.block_size))
  36. # 第四步:签名加密数据
  37. data_to_sign = iv + ciphertext
  38. signature = hmac.new(signature_key, data_to_sign, hashlib.sha256).digest()
  39. # 第五步:Base64编码
  40. multipass_token = base64.urlsafe_b64encode(iv + ciphertext + signature).decode()
  41. return multipass_token
  42. @staticmethod
  43. def search_customer_by_email(store_name, access_token, email):
  44. # 设置请求URL
  45. url = f"https://{store_name}.myshopify.com/admin/api/2024-10/customers/search.json"
  46. params = {
  47. "query": f"email:{email}"
  48. }
  49. # 设置请求头
  50. headers = {
  51. "X-Shopify-Access-Token": access_token,
  52. }
  53. # 发送GET请求
  54. response = requests.get(url, headers=headers, params=params)
  55. # 返回响应的JSON数据
  56. return response.json()
  57. class ShopifyView(View):
  58. def get(self, request, *args, **kwargs):
  59. request.encoding = 'utf-8'
  60. operation = kwargs.get('operation')
  61. request_dict = request.GET
  62. return self.validation(request, request_dict, operation)
  63. def post(self, request, *args, **kwargs):
  64. request.encoding = 'utf-8'
  65. operation = kwargs.get('operation')
  66. request_dict = request.POST
  67. return self.validation(request, request_dict, operation)
  68. def validation(self, request, request_dict, operation):
  69. language = request_dict.get('language', 'cn')
  70. response = ResponseObject(language)
  71. if operation == 'shopifyLogin': # APP查詢定制客户信息
  72. return self.shopify_login(request_dict, response)
  73. elif operation == 'shopifyRegister': # APP注册定制客户信息
  74. return self.shopify_register(request_dict, response)
  75. else:
  76. return response.json(414)
  77. @staticmethod
  78. def shopify_login(request_dict, response):
  79. email = request_dict.get("email", None)
  80. password = request_dict.get("password", None)
  81. if not all([email, password]):
  82. return response.json(444)
  83. user_qs = Device_User.objects.filter(Q(username=email) | Q(userEmail=email))
  84. if not user_qs.exists():
  85. return response.json(104)
  86. users = user_qs.values('role__rid', 'role__roleName', 'userID', 'NickName', 'username', 'userEmail',
  87. 'phone', 'password', 'userIconPath')[0]
  88. check_flag = check_password(password, users['password'])
  89. if not check_flag:
  90. return response.json(111)
  91. # 获取当前时间
  92. now = datetime.now(pytz.timezone('America/New_York')) # 你可以根据需要更改时区
  93. # 格式化时间戳
  94. timestamp = now.strftime('%Y-%m-%dT%H:%M:%S%z')
  95. # 添加冒号到时区部分
  96. timestamp = timestamp[:-2] + ':' + timestamp[-2:]
  97. customer_data = {
  98. "email": email,
  99. "created_at": timestamp,
  100. }
  101. multipass_secret = SHOPIFY_CONFIG["eu_multipass_secret"] # multipass密钥
  102. token = ShopifyMultipass.generate_multipass_token(multipass_secret, customer_data)
  103. # 构造重定向URL
  104. redirect_url = f"https://eu.zositech.com/account/login/multipass/{token}"
  105. return response.json(0, redirect_url)
  106. @staticmethod
  107. def shopify_register(request_dict, response):
  108. email = request_dict.get("email", None)
  109. password = request_dict.get("password", None)
  110. authcode = request_dict.get("authCode", None)
  111. if not all([email, password]):
  112. return response.json(444)
  113. if authcode is None:
  114. # 查询是否在shopify有账号
  115. access_token = SHOPIFY_CONFIG["eu_token"]
  116. customer_data = ShopifyMultipass.search_customer_by_email("0ef557-2", access_token, email)
  117. if not customer_data['customers']:
  118. return response.json(10077)
  119. # 邮箱验证
  120. else:
  121. reds = RedisObject()
  122. identifyingCode = reds.get_data(key=email + '_identifyingCode')
  123. # 判断验证码是否过期
  124. if identifyingCode is False:
  125. return response.json(120)
  126. # 验证码是否正确
  127. if authcode != identifyingCode:
  128. return response.json(121)
  129. # 注册
  130. if Device_User.objects.filter(Q(username=email) | Q(userEmail=email)).exists():
  131. return response.json(103)
  132. # 创建用户
  133. password = make_password(password)
  134. new_userID = CommonService.getUserID(μs=False, setOTAID=True)
  135. user_data = {
  136. "username": email,
  137. "NickName": email,
  138. "userEmail": email,
  139. "password": password,
  140. "userID": new_userID,
  141. "is_active": True,
  142. "user_isValid": True,
  143. }
  144. Device_User.objects.create(**user_data)
  145. return response.json(0)