# -*- encoding: utf-8 -*- """ @File : YotpoCoreObject.py @Time : 2024/5/17 9:36 @Author : stephen @Email : zhangdongming@asj6.wecom.work @Software: PyCharm """ # -*- encoding: utf-8 -*- """ @File : EDM.py @Time : 2024/5/16 11:16 @Author : stephen @Email : zhangdongming@asj6.wecom.work @Software: PyCharm """ import datetime import json import requests STORE_ID = "LIKwYlx6uonKqN91fXiGFb4FHqVLpvMUdGNU5Zf9" YOTPO_URL = f"https://api.yotpo.com/core/v3/stores/{STORE_ID}/" secret = 'isb8EOQs8bkjrksLHN73o3HIkrhoW539IXhULfdh' class YotpoCoreObject: def get_token(self): url = YOTPO_URL + "access_tokens" payload = {'secret': secret} headers = { "accept": "application/json", "Content-Type": "application/json" } response = requests.post(url, json=payload, headers=headers) assert response.status_code == 200 token = json.loads(response.text).get("access_token") print("token:", token) return token def creat_and_update_customers(self, customers): """ 创建和更新用户 """ url = YOTPO_URL + "customers" yotpo_token = self.get_token() headers = { "accept": "application/json", "X-Yotpo-Token": yotpo_token, "Content-Type": "application/json" } payload = { "customer": customers } response = requests.patch(url, json=payload, headers=headers) assert response.status_code == 200 return response.text def get_customers_list(self, email): """ 查用户信息 """ url = YOTPO_URL + f"customers?email={email}" yotpo_token = self.get_token() headers = { "accept": "application/json", "X-Yotpo-Token": yotpo_token, "Content-Type": "application/json" } response = requests.get(url, headers=headers) assert response.status_code == 200 return response.text def create_subscribers(self, customer_params, list_id): """ 创建订阅 """ url = f"https://api.yotpo.com/messaging/v3/stores/{STORE_ID}/subscribers" payload = { "customer": customer_params, "channels": { "email": { "marketing": { "consent": "subscribed", "list_id": list_id, "source": "post_purchase_popup" }, "suppression": { "suppress_email": True, "suppression_reason": "soft_bounce", "timestamp": datetime.datetime.now().strftime("%Y-%m-%dT%H:%M:%SZ") } }, }, "suppress_welcome_messages": True } headers = { "accept": "application/json", "content-type": "application/json", "X-Yotpo-Token": self.get_token() } response = requests.post(url, json=payload, headers=headers) assert response.status_code == 200 return response.text def close_subscribers(self, email, list_id): """ 关闭订阅 """ url = f"https://api.yotpo.com/messaging/v3/stores/{STORE_ID}/subscribers" payload = { "customer": { "email": email, }, # "channels": { # "email": {"marketing": {"consent": None}}, # "suppress_welcome_messages": True # }, "channels": { "email": {"marketing": { "consent": "unsubscribed", "list_id": list_id, "source": "post_purchase_popup" }} }, "list_id": list_id } headers = { "accept": "application/json", "content-type": "application/json", "X-Yotpo-Token": self.get_token() } response = requests.post(url, json=payload, headers=headers) print(response.text) def get_all_list_ids(self): """ 获取所有list_id """ authentication_token = self.get_token() url = f"https://api.yotpo.com/core/v3/stores/{STORE_ID}/lists" headers = { "accept": "application/json", "content-type": "application/json", "X-Yotpo-Token": authentication_token } response = requests.get(url, headers=headers) response_data = response.json() if response.status_code == 200: print(response_data) else: print(f"Error: {response_data['message']}") return [] def check_api(self): """ 注册成功后 进行yotpo api创建用户,并订阅邮件通知 @return: """ email = '1920098158@qq.com' customer = { "email": email, "first_name": "test", "last_name": "22", 'address': { "country_code": 'GB', } } result = self.creat_and_update_customers(customer) print(result) list_id = 8589406 subscribers_result = self.create_subscribers(customer, list_id) print(subscribers_result) # customer_result = self.get_customers_list(email) # customer = json.loads(customer_result) # if customer['customers']: # external_id = customer['customers'][0]['external_id'] # # country_code = 'LU' # customer = { # "external_id": external_id, # "address": { # "country_code": country_code, # } # } # self.creat_and_update_customers(customer) print('success') if __name__ == "__main__": pass # YotpoCoreObject().check_api() # 获取 token ok # token = get_token() # print(token) # # # 创建用户 ok # response = creat_and_update_customers() # print(response) # # 检索用户 ok # response = get_customers_list() # print(response.text) # 检索订阅 ok # response = get_webhooks_subscriptions() # print(response.text) # 检索接口 # response = get_webhooks("webhooks/targets") # print(response) # 创建筛选器 # create_webhook_filters() # # 创建订阅 # create_subscribers() # 关闭订阅 # YotpoCoreObject().close_subscribers('1920098158@qq.com', 8589406) # 获取list # get_all_list_ids()