# -*- encoding: utf-8 -*- """ @File : YotpoCoreObject.py @Time : 2024/5/17 9:36 @Author : stephen @Email : zhangdongming@asj6.wecom.work @Software: PyCharm """ # -*- encoding: utf-8 -*- """ @File : EDM.py @Time : 2024/5/16 11:16 @Author : stephen @Email : zhangdongming@asj6.wecom.work @Software: PyCharm """ import datetime import json import requests from Ansjer.config import LOGGER STORE_ID = "LIKwYlx6uonKqN91fXiGFb4FHqVLpvMUdGNU5Zf9" YOTPO_URL = f"https://api.yotpo.com/core/v3/stores/{STORE_ID}/" secret = 'isb8EOQs8bkjrksLHN73o3HIkrhoW539IXhULfdh' class YotpoCoreObject: def get_token(self): try: url = YOTPO_URL + "access_tokens" payload = {'secret': secret} headers = { "accept": "application/json", "Content-Type": "application/json" } response = requests.post(url, json=payload, headers=headers) if response.status_code != 200: LOGGER.info(f'yotpo获取token失败, response.status_code请求不为200') return "" token = json.loads(response.text).get("access_token") return token except Exception as e: LOGGER.info(f'yotpo获取token失败, {e}') return "" def creat_and_update_customers(self, customers): """ 创建和更新用户 """ try: url = YOTPO_URL + "customers" yotpo_token = self.get_token() if yotpo_token == "": LOGGER.info(f'yotpo创建顾客失败, yotpo_token为空 ,customers:{customers}') return False headers = { "accept": "application/json", "X-Yotpo-Token": yotpo_token, "Content-Type": "application/json" } payload = { "customer": customers } response = requests.patch(url, json=payload, headers=headers) if response.status_code != 200: LOGGER.info(f'yotpo创建顾客失败,response.status_code != 200,customers:{customers}') return False LOGGER.info(f'yotpo创建顾客成功,customers:{customers}') return True except Exception as e: LOGGER.info(f'yotpo创建顾客失败, customers:{customers},{e}') return False def get_customers_list(self, email): """ 查用户信息 """ url = YOTPO_URL + f"customers?include_custom_properties=true&email={email}" yotpo_token = self.get_token() headers = { "accept": "application/json", "X-Yotpo-Token": yotpo_token, "Content-Type": "application/json" } response = requests.get(url, headers=headers) assert response.status_code == 200 return response.text def create_subscribers(self, customer_params, list_id): """ 创建订阅 """ url = f"https://api.yotpo.com/messaging/v3/stores/{STORE_ID}/subscribers" if "custom_properties" in customer_params: del customer_params["custom_properties"] payload = { "customer": customer_params, "channels": { "email": { "marketing": { "consent": "subscribed", "list_id": list_id, }, "suppression": { "suppress_email": True, "suppression_reason": "soft_bounce", "timestamp": datetime.datetime.now().strftime("%Y-%m-%dT%H:%M:%SZ") } }, }, } headers = { "accept": "application/json", "content-type": "application/json", "X-Yotpo-Token": self.get_token() } response = requests.post(url, json=payload, headers=headers) if response.status_code != 200: LOGGER.info(f'yotpo创建订阅失败, customer_params:{customer_params}') return False, "" return True, response.text def close_subscribers(self, customer, list_id): """ 关闭订阅 """ url = f"https://api.yotpo.com/messaging/v3/stores/{STORE_ID}/subscribers" payload = { "customer": customer, "channels": { "email": {"marketing": { "consent": "unsubscribed", "list_id": list_id, "source": "post_purchase_popup" }} } } headers = { "accept": "application/json", "content-type": "application/json", "X-Yotpo-Token": self.get_token() } response = requests.post(url, json=payload, headers=headers) if response.status_code != 200: LOGGER.info(f'yotpo关闭订阅失败, customer:{customer}') return False LOGGER.info(f'yotpo关闭订阅成功, customer:{customer}') return True def get_all_list_ids(self): """ 获取所有list_id """ authentication_token = self.get_token() url = f"https://api.yotpo.com/core/v3/stores/{STORE_ID}/lists" headers = { "accept": "application/json", "content-type": "application/json", "X-Yotpo-Token": authentication_token } response = requests.get(url, headers=headers) response_data = response.json() if response.status_code == 200: print(response_data) else: print(f"Error: {response_data['message']}") return [] def check_api(self): """ 注册成功后 进行yotpo api创建用户,并订阅邮件通知 @return: """ email = '1920098158@qq.com' customer = { "email": email, "first_name": "test", "last_name": "22", 'address': { "country_code": 'GB', } } result = self.creat_and_update_customers(customer) print(result) list_id = 8589406 subscribers_result = self.create_subscribers(customer, list_id) print(subscribers_result) # customer_result = self.get_customers_list(email) # customer = json.loads(customer_result) # if customer['customers']: # external_id = customer['customers'][0]['external_id'] # # country_code = 'LU' # customer = { # "external_id": external_id, # "address": { # "country_code": country_code, # } # } # self.creat_and_update_customers(customer) print('success')