YotpoCoreObject.py 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208
  1. # -*- encoding: utf-8 -*-
  2. """
  3. @File : YotpoCoreObject.py
  4. @Time : 2024/5/17 9:36
  5. @Author : stephen
  6. @Email : zhangdongming@asj6.wecom.work
  7. @Software: PyCharm
  8. """
  9. # -*- encoding: utf-8 -*-
  10. """
  11. @File : EDM.py
  12. @Time : 2024/5/16 11:16
  13. @Author : stephen
  14. @Email : zhangdongming@asj6.wecom.work
  15. @Software: PyCharm
  16. """
  17. import datetime
  18. import json
  19. import requests
  20. from Ansjer.config import LOGGER
  21. STORE_ID = "LIKwYlx6uonKqN91fXiGFb4FHqVLpvMUdGNU5Zf9"
  22. YOTPO_URL = f"https://api.yotpo.com/core/v3/stores/{STORE_ID}/"
  23. secret = 'isb8EOQs8bkjrksLHN73o3HIkrhoW539IXhULfdh'
  24. class YotpoCoreObject:
  25. def get_token(self):
  26. try:
  27. url = YOTPO_URL + "access_tokens"
  28. payload = {'secret': secret}
  29. headers = {
  30. "accept": "application/json",
  31. "Content-Type": "application/json"
  32. }
  33. response = requests.post(url, json=payload, headers=headers)
  34. if response.status_code != 200:
  35. LOGGER.info(f'yotpo获取token失败, response.status_code请求不为200')
  36. return ""
  37. token = json.loads(response.text).get("access_token")
  38. return token
  39. except Exception as e:
  40. LOGGER.info(f'yotpo获取token失败, {e}')
  41. return ""
  42. def creat_and_update_customers(self, customers):
  43. """
  44. 创建和更新用户
  45. """
  46. try:
  47. url = YOTPO_URL + "customers"
  48. yotpo_token = self.get_token()
  49. if yotpo_token == "":
  50. LOGGER.info(f'yotpo创建顾客失败, yotpo_token为空 ,customers:{customers}')
  51. return False
  52. headers = {
  53. "accept": "application/json",
  54. "X-Yotpo-Token": yotpo_token,
  55. "Content-Type": "application/json"
  56. }
  57. payload = {
  58. "customer": customers
  59. }
  60. response = requests.patch(url, json=payload, headers=headers)
  61. if response.status_code != 200:
  62. LOGGER.info(f'yotpo创建顾客失败,response.status_code != 200,customers:{customers}')
  63. return False
  64. LOGGER.info(f'yotpo创建顾客成功,customers:{customers}')
  65. return True
  66. except Exception as e:
  67. LOGGER.info(f'yotpo创建顾客失败, customers:{customers},{e}')
  68. return False
  69. def get_customers_list(self, email):
  70. """
  71. 查用户信息
  72. """
  73. url = YOTPO_URL + f"customers?include_custom_properties=true&email={email}"
  74. yotpo_token = self.get_token()
  75. headers = {
  76. "accept": "application/json",
  77. "X-Yotpo-Token": yotpo_token,
  78. "Content-Type": "application/json"
  79. }
  80. response = requests.get(url, headers=headers)
  81. assert response.status_code == 200
  82. return response.text
  83. def create_subscribers(self, customer_params, list_id):
  84. """
  85. 创建订阅
  86. """
  87. url = f"https://api.yotpo.com/messaging/v3/stores/{STORE_ID}/subscribers"
  88. if "custom_properties" in customer_params:
  89. del customer_params["custom_properties"]
  90. payload = {
  91. "customer": customer_params,
  92. "channels": {
  93. "email": {
  94. "marketing": {
  95. "consent": "subscribed",
  96. "list_id": list_id,
  97. },
  98. "suppression": {
  99. "suppress_email": True,
  100. "suppression_reason": "soft_bounce",
  101. "timestamp": datetime.datetime.now().strftime("%Y-%m-%dT%H:%M:%SZ")
  102. }
  103. },
  104. },
  105. }
  106. headers = {
  107. "accept": "application/json",
  108. "content-type": "application/json",
  109. "X-Yotpo-Token": self.get_token()
  110. }
  111. response = requests.post(url, json=payload, headers=headers)
  112. if response.status_code != 200:
  113. LOGGER.info(f'yotpo创建订阅失败, customer_params:{customer_params}')
  114. return False, ""
  115. return True, response.text
  116. def close_subscribers(self, customer, list_id):
  117. """
  118. 关闭订阅
  119. """
  120. url = f"https://api.yotpo.com/messaging/v3/stores/{STORE_ID}/subscribers"
  121. payload = {
  122. "customer": customer,
  123. "channels": {
  124. "email": {"marketing": {
  125. "consent": "unsubscribed",
  126. "list_id": list_id,
  127. "source": "post_purchase_popup"
  128. }}
  129. }
  130. }
  131. headers = {
  132. "accept": "application/json",
  133. "content-type": "application/json",
  134. "X-Yotpo-Token": self.get_token()
  135. }
  136. response = requests.post(url, json=payload, headers=headers)
  137. if response.status_code != 200:
  138. LOGGER.info(f'yotpo关闭订阅失败, customer:{customer}')
  139. return False
  140. LOGGER.info(f'yotpo关闭订阅成功, customer:{customer}')
  141. return True
  142. def get_all_list_ids(self):
  143. """
  144. 获取所有list_id
  145. """
  146. authentication_token = self.get_token()
  147. url = f"https://api.yotpo.com/core/v3/stores/{STORE_ID}/lists"
  148. headers = {
  149. "accept": "application/json",
  150. "content-type": "application/json",
  151. "X-Yotpo-Token": authentication_token
  152. }
  153. response = requests.get(url, headers=headers)
  154. response_data = response.json()
  155. if response.status_code == 200:
  156. print(response_data)
  157. else:
  158. print(f"Error: {response_data['message']}")
  159. return []
  160. def check_api(self):
  161. """
  162. 注册成功后 进行yotpo api创建用户,并订阅邮件通知
  163. @return:
  164. """
  165. email = '1920098158@qq.com'
  166. customer = {
  167. "email": email, "first_name": "test", "last_name": "22",
  168. 'address': {
  169. "country_code": 'GB',
  170. }
  171. }
  172. result = self.creat_and_update_customers(customer)
  173. print(result)
  174. list_id = 8589406
  175. subscribers_result = self.create_subscribers(customer, list_id)
  176. print(subscribers_result)
  177. # customer_result = self.get_customers_list(email)
  178. # customer = json.loads(customer_result)
  179. # if customer['customers']:
  180. # external_id = customer['customers'][0]['external_id']
  181. #
  182. # country_code = 'LU'
  183. # customer = {
  184. # "external_id": external_id,
  185. # "address": {
  186. # "country_code": country_code,
  187. # }
  188. # }
  189. # self.creat_and_update_customers(customer)
  190. print('success')