123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208 |
- # -*- encoding: utf-8 -*-
- """
- @File : YotpoCoreObject.py
- @Time : 2024/5/17 9:36
- @Author : stephen
- @Email : zhangdongming@asj6.wecom.work
- @Software: PyCharm
- """
- # -*- encoding: utf-8 -*-
- """
- @File : EDM.py
- @Time : 2024/5/16 11:16
- @Author : stephen
- @Email : zhangdongming@asj6.wecom.work
- @Software: PyCharm
- """
- import datetime
- import json
- import requests
- from Ansjer.config import LOGGER
- STORE_ID = "LIKwYlx6uonKqN91fXiGFb4FHqVLpvMUdGNU5Zf9"
- YOTPO_URL = f"https://api.yotpo.com/core/v3/stores/{STORE_ID}/"
- secret = 'isb8EOQs8bkjrksLHN73o3HIkrhoW539IXhULfdh'
- class YotpoCoreObject:
- def get_token(self):
- try:
- url = YOTPO_URL + "access_tokens"
- payload = {'secret': secret}
- headers = {
- "accept": "application/json",
- "Content-Type": "application/json"
- }
- response = requests.post(url, json=payload, headers=headers)
- if response.status_code != 200:
- LOGGER.info(f'yotpo获取token失败, response.status_code请求不为200')
- return ""
- token = json.loads(response.text).get("access_token")
- return token
- except Exception as e:
- LOGGER.info(f'yotpo获取token失败, {e}')
- return ""
- def creat_and_update_customers(self, customers):
- """
- 创建和更新用户
- """
- try:
- url = YOTPO_URL + "customers"
- yotpo_token = self.get_token()
- if yotpo_token == "":
- LOGGER.info(f'yotpo创建顾客失败, yotpo_token为空 ,customers:{customers}')
- return False
- headers = {
- "accept": "application/json",
- "X-Yotpo-Token": yotpo_token,
- "Content-Type": "application/json"
- }
- payload = {
- "customer": customers
- }
- response = requests.patch(url, json=payload, headers=headers)
- if response.status_code != 200:
- LOGGER.info(f'yotpo创建顾客失败,response.status_code != 200,customers:{customers}')
- return False
- LOGGER.info(f'yotpo创建顾客成功,customers:{customers}')
- return True
- except Exception as e:
- LOGGER.info(f'yotpo创建顾客失败, customers:{customers},{e}')
- return False
- def get_customers_list(self, email):
- """
- 查用户信息
- """
- url = YOTPO_URL + f"customers?include_custom_properties=true&email={email}"
- yotpo_token = self.get_token()
- headers = {
- "accept": "application/json",
- "X-Yotpo-Token": yotpo_token,
- "Content-Type": "application/json"
- }
- response = requests.get(url, headers=headers)
- assert response.status_code == 200
- return response.text
- def create_subscribers(self, customer_params, list_id):
- """
- 创建订阅
- """
- url = f"https://api.yotpo.com/messaging/v3/stores/{STORE_ID}/subscribers"
- if "custom_properties" in customer_params:
- del customer_params["custom_properties"]
- payload = {
- "customer": customer_params,
- "channels": {
- "email": {
- "marketing": {
- "consent": "subscribed",
- "list_id": list_id,
- },
- "suppression": {
- "suppress_email": True,
- "suppression_reason": "soft_bounce",
- "timestamp": datetime.datetime.now().strftime("%Y-%m-%dT%H:%M:%SZ")
- }
- },
- },
- }
- headers = {
- "accept": "application/json",
- "content-type": "application/json",
- "X-Yotpo-Token": self.get_token()
- }
- response = requests.post(url, json=payload, headers=headers)
- if response.status_code != 200:
- LOGGER.info(f'yotpo创建订阅失败, customer_params:{customer_params}')
- return False, ""
- return True, response.text
- def close_subscribers(self, customer, list_id):
- """
- 关闭订阅
- """
- url = f"https://api.yotpo.com/messaging/v3/stores/{STORE_ID}/subscribers"
- payload = {
- "customer": customer,
- "channels": {
- "email": {"marketing": {
- "consent": "unsubscribed",
- "list_id": list_id,
- "source": "post_purchase_popup"
- }}
- }
- }
- headers = {
- "accept": "application/json",
- "content-type": "application/json",
- "X-Yotpo-Token": self.get_token()
- }
- response = requests.post(url, json=payload, headers=headers)
- if response.status_code != 200:
- LOGGER.info(f'yotpo关闭订阅失败, customer:{customer}')
- return False
- LOGGER.info(f'yotpo关闭订阅成功, customer:{customer}')
- return True
- def get_all_list_ids(self):
- """
- 获取所有list_id
- """
- authentication_token = self.get_token()
- url = f"https://api.yotpo.com/core/v3/stores/{STORE_ID}/lists"
- headers = {
- "accept": "application/json",
- "content-type": "application/json",
- "X-Yotpo-Token": authentication_token
- }
- response = requests.get(url, headers=headers)
- response_data = response.json()
- if response.status_code == 200:
- print(response_data)
- else:
- print(f"Error: {response_data['message']}")
- return []
- def check_api(self):
- """
- 注册成功后 进行yotpo api创建用户,并订阅邮件通知
- @return:
- """
- email = '1920098158@qq.com'
- customer = {
- "email": email, "first_name": "test", "last_name": "22",
- 'address': {
- "country_code": 'GB',
- }
- }
- result = self.creat_and_update_customers(customer)
- print(result)
- list_id = 8589406
- subscribers_result = self.create_subscribers(customer, list_id)
- print(subscribers_result)
- # customer_result = self.get_customers_list(email)
- # customer = json.loads(customer_result)
- # if customer['customers']:
- # external_id = customer['customers'][0]['external_id']
- #
- # country_code = 'LU'
- # customer = {
- # "external_id": external_id,
- # "address": {
- # "country_code": country_code,
- # }
- # }
- # self.creat_and_update_customers(customer)
- print('success')
|