YotpoCoreObject.py 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236
  1. # -*- encoding: utf-8 -*-
  2. """
  3. @File : YotpoCoreObject.py
  4. @Time : 2024/5/17 9:36
  5. @Author : stephen
  6. @Email : zhangdongming@asj6.wecom.work
  7. @Software: PyCharm
  8. """
  9. # -*- encoding: utf-8 -*-
  10. """
  11. @File : EDM.py
  12. @Time : 2024/5/16 11:16
  13. @Author : stephen
  14. @Email : zhangdongming@asj6.wecom.work
  15. @Software: PyCharm
  16. """
  17. import datetime
  18. import json
  19. import requests
  20. STORE_ID = "LIKwYlx6uonKqN91fXiGFb4FHqVLpvMUdGNU5Zf9"
  21. YOTPO_URL = f"https://api.yotpo.com/core/v3/stores/{STORE_ID}/"
  22. secret = 'isb8EOQs8bkjrksLHN73o3HIkrhoW539IXhULfdh'
  23. class YotpoCoreObject:
  24. def get_token(self):
  25. url = YOTPO_URL + "access_tokens"
  26. payload = {'secret': secret}
  27. headers = {
  28. "accept": "application/json",
  29. "Content-Type": "application/json"
  30. }
  31. response = requests.post(url, json=payload, headers=headers)
  32. assert response.status_code == 200
  33. token = json.loads(response.text).get("access_token")
  34. print("token:", token)
  35. return token
  36. def creat_and_update_customers(self, customers):
  37. """
  38. 创建和更新用户
  39. """
  40. url = YOTPO_URL + "customers"
  41. yotpo_token = self.get_token()
  42. headers = {
  43. "accept": "application/json",
  44. "X-Yotpo-Token": yotpo_token,
  45. "Content-Type": "application/json"
  46. }
  47. payload = {
  48. "customer": customers
  49. }
  50. response = requests.patch(url, json=payload, headers=headers)
  51. assert response.status_code == 200
  52. return response.text
  53. def get_customers_list(self, email):
  54. """
  55. 查用户信息
  56. """
  57. url = YOTPO_URL + f"customers?email={email}"
  58. yotpo_token = self.get_token()
  59. headers = {
  60. "accept": "application/json",
  61. "X-Yotpo-Token": yotpo_token,
  62. "Content-Type": "application/json"
  63. }
  64. response = requests.get(url, headers=headers)
  65. assert response.status_code == 200
  66. return response.text
  67. def create_subscribers(self, customer_params, list_id):
  68. """
  69. 创建订阅
  70. """
  71. url = f"https://api.yotpo.com/messaging/v3/stores/{STORE_ID}/subscribers"
  72. payload = {
  73. "customer": customer_params,
  74. "channels": {
  75. "email": {
  76. "marketing": {
  77. "consent": "subscribed",
  78. "list_id": list_id,
  79. "source": "post_purchase_popup"
  80. },
  81. "suppression": {
  82. "suppress_email": True,
  83. "suppression_reason": "soft_bounce",
  84. "timestamp": datetime.datetime.now().strftime("%Y-%m-%dT%H:%M:%SZ")
  85. }
  86. },
  87. },
  88. "suppress_welcome_messages": True
  89. }
  90. headers = {
  91. "accept": "application/json",
  92. "content-type": "application/json",
  93. "X-Yotpo-Token": self.get_token()
  94. }
  95. response = requests.post(url, json=payload, headers=headers)
  96. assert response.status_code == 200
  97. return response.text
  98. def close_subscribers(self, email, list_id):
  99. """
  100. 关闭订阅
  101. """
  102. url = f"https://api.yotpo.com/messaging/v3/stores/{STORE_ID}/subscribers"
  103. payload = {
  104. "customer": {
  105. "email": email,
  106. },
  107. # "channels": {
  108. # "email": {"marketing": {"consent": None}},
  109. # "suppress_welcome_messages": True
  110. # },
  111. "channels": {
  112. "email": {"marketing": {
  113. "consent": "unsubscribed",
  114. "list_id": list_id,
  115. "source": "post_purchase_popup"
  116. }}
  117. },
  118. "list_id": list_id
  119. }
  120. headers = {
  121. "accept": "application/json",
  122. "content-type": "application/json",
  123. "X-Yotpo-Token": self.get_token()
  124. }
  125. response = requests.post(url, json=payload, headers=headers)
  126. print(response.text)
  127. def get_all_list_ids(self):
  128. """
  129. 获取所有list_id
  130. """
  131. authentication_token = self.get_token()
  132. url = f"https://api.yotpo.com/core/v3/stores/{STORE_ID}/lists"
  133. headers = {
  134. "accept": "application/json",
  135. "content-type": "application/json",
  136. "X-Yotpo-Token": authentication_token
  137. }
  138. response = requests.get(url, headers=headers)
  139. response_data = response.json()
  140. if response.status_code == 200:
  141. print(response_data)
  142. else:
  143. print(f"Error: {response_data['message']}")
  144. return []
  145. def check_api(self):
  146. """
  147. 注册成功后 进行yotpo api创建用户,并订阅邮件通知
  148. @return:
  149. """
  150. email = '1920098158@qq.com'
  151. customer = {
  152. "email": email, "first_name": "test", "last_name": "22",
  153. 'address': {
  154. "country_code": 'GB',
  155. }
  156. }
  157. result = self.creat_and_update_customers(customer)
  158. print(result)
  159. list_id = 8589406
  160. subscribers_result = self.create_subscribers(customer, list_id)
  161. print(subscribers_result)
  162. # customer_result = self.get_customers_list(email)
  163. # customer = json.loads(customer_result)
  164. # if customer['customers']:
  165. # external_id = customer['customers'][0]['external_id']
  166. #
  167. # country_code = 'LU'
  168. # customer = {
  169. # "external_id": external_id,
  170. # "address": {
  171. # "country_code": country_code,
  172. # }
  173. # }
  174. # self.creat_and_update_customers(customer)
  175. print('success')
  176. if __name__ == "__main__":
  177. pass
  178. # YotpoCoreObject().check_api()
  179. # 获取 token ok
  180. # token = get_token()
  181. # print(token)
  182. # # # 创建用户 ok
  183. # response = creat_and_update_customers()
  184. # print(response)
  185. #
  186. # 检索用户 ok
  187. # response = get_customers_list()
  188. # print(response.text)
  189. # 检索订阅 ok
  190. # response = get_webhooks_subscriptions()
  191. # print(response.text)
  192. # 检索接口
  193. # response = get_webhooks("webhooks/targets")
  194. # print(response)
  195. # 创建筛选器
  196. # create_webhook_filters()
  197. #
  198. # 创建订阅
  199. # create_subscribers()
  200. # 关闭订阅
  201. # YotpoCoreObject().close_subscribers('1920098158@qq.com', 8589406)
  202. # 获取list
  203. # get_all_list_ids()