| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208 | 
							- # -*- encoding: utf-8 -*-
 
- """
 
- @File    : YotpoCoreObject.py
 
- @Time    : 2024/5/17 9:36
 
- @Author  : stephen
 
- @Email   : zhangdongming@asj6.wecom.work
 
- @Software: PyCharm
 
- """
 
- # -*- encoding: utf-8 -*-
 
- """
 
- @File    : EDM.py
 
- @Time    : 2024/5/16 11:16
 
- @Author  : stephen
 
- @Email   : zhangdongming@asj6.wecom.work
 
- @Software: PyCharm
 
- """
 
- import datetime
 
- import json
 
- import requests
 
- from Ansjer.config import LOGGER
 
- STORE_ID = "LIKwYlx6uonKqN91fXiGFb4FHqVLpvMUdGNU5Zf9"
 
- YOTPO_URL = f"https://api.yotpo.com/core/v3/stores/{STORE_ID}/"
 
- secret = 'isb8EOQs8bkjrksLHN73o3HIkrhoW539IXhULfdh'
 
- class YotpoCoreObject:
 
-     def get_token(self):
 
-         try:
 
-             url = YOTPO_URL + "access_tokens"
 
-             payload = {'secret': secret}
 
-             headers = {
 
-                 "accept": "application/json",
 
-                 "Content-Type": "application/json"
 
-             }
 
-             response = requests.post(url, json=payload, headers=headers)
 
-             if response.status_code != 200:
 
-                 LOGGER.info(f'yotpo获取token失败, response.status_code请求不为200')
 
-                 return ""
 
-             token = json.loads(response.text).get("access_token")
 
-             return token
 
-         except Exception as e:
 
-             LOGGER.info(f'yotpo获取token失败, {e}')
 
-             return ""
 
-     def creat_and_update_customers(self, customers):
 
-         """
 
-         创建和更新用户
 
-         """
 
-         try:
 
-             url = YOTPO_URL + "customers"
 
-             yotpo_token = self.get_token()
 
-             if yotpo_token == "":
 
-                 LOGGER.info(f'yotpo创建顾客失败, yotpo_token为空 ,customers:{customers}')
 
-                 return False
 
-             headers = {
 
-                 "accept": "application/json",
 
-                 "X-Yotpo-Token": yotpo_token,
 
-                 "Content-Type": "application/json"
 
-             }
 
-             payload = {
 
-                 "customer": customers
 
-             }
 
-             response = requests.patch(url, json=payload, headers=headers)
 
-             if response.status_code != 200:
 
-                 LOGGER.info(f'yotpo创建顾客失败,response.status_code != 200,customers:{customers}')
 
-                 return False
 
-             LOGGER.info(f'yotpo创建顾客成功,customers:{customers}')
 
-             return True
 
-         except Exception as e:
 
-             LOGGER.info(f'yotpo创建顾客失败, customers:{customers},{e}')
 
-             return False
 
-     def get_customers_list(self, email):
 
-         """
 
-         查用户信息
 
-         """
 
-         url = YOTPO_URL + f"customers?include_custom_properties=true&email={email}"
 
-         yotpo_token = self.get_token()
 
-         headers = {
 
-             "accept": "application/json",
 
-             "X-Yotpo-Token": yotpo_token,
 
-             "Content-Type": "application/json"
 
-         }
 
-         response = requests.get(url, headers=headers)
 
-         assert response.status_code == 200
 
-         return response.text
 
-     def create_subscribers(self, customer_params, list_id):
 
-         """
 
-         创建订阅
 
-         """
 
-         url = f"https://api.yotpo.com/messaging/v3/stores/{STORE_ID}/subscribers"
 
-         if "custom_properties" in customer_params:
 
-             del customer_params["custom_properties"]
 
-         payload = {
 
-             "customer": customer_params,
 
-             "channels": {
 
-                 "email": {
 
-                     "marketing": {
 
-                         "consent": "subscribed",
 
-                         "list_id": list_id,
 
-                     },
 
-                     "suppression": {
 
-                         "suppress_email": True,
 
-                         "suppression_reason": "soft_bounce",
 
-                         "timestamp": datetime.datetime.now().strftime("%Y-%m-%dT%H:%M:%SZ")
 
-                     }
 
-                 },
 
-             },
 
-         }
 
-         headers = {
 
-             "accept": "application/json",
 
-             "content-type": "application/json",
 
-             "X-Yotpo-Token": self.get_token()
 
-         }
 
-         response = requests.post(url, json=payload, headers=headers)
 
-         if response.status_code != 200:
 
-             LOGGER.info(f'yotpo创建订阅失败, customer_params:{customer_params}')
 
-             return False, ""
 
-         return True, response.text
 
-     def close_subscribers(self, customer, list_id):
 
-         """
 
-         关闭订阅
 
-         """
 
-         url = f"https://api.yotpo.com/messaging/v3/stores/{STORE_ID}/subscribers"
 
-         payload = {
 
-             "customer": customer,
 
-             "channels": {
 
-                 "email": {"marketing": {
 
-                     "consent": "unsubscribed",
 
-                     "list_id": list_id,
 
-                     "source": "post_purchase_popup"
 
-                 }}
 
-             }
 
-         }
 
-         headers = {
 
-             "accept": "application/json",
 
-             "content-type": "application/json",
 
-             "X-Yotpo-Token": self.get_token()
 
-         }
 
-         response = requests.post(url, json=payload, headers=headers)
 
-         if response.status_code != 200:
 
-             LOGGER.info(f'yotpo关闭订阅失败, customer:{customer}')
 
-             return False
 
-         LOGGER.info(f'yotpo关闭订阅成功, customer:{customer}')
 
-         return True
 
-     def get_all_list_ids(self):
 
-         """
 
-         获取所有list_id
 
-         """
 
-         authentication_token = self.get_token()
 
-         url = f"https://api.yotpo.com/core/v3/stores/{STORE_ID}/lists"
 
-         headers = {
 
-             "accept": "application/json",
 
-             "content-type": "application/json",
 
-             "X-Yotpo-Token": authentication_token
 
-         }
 
-         response = requests.get(url, headers=headers)
 
-         response_data = response.json()
 
-         if response.status_code == 200:
 
-             print(response_data)
 
-         else:
 
-             print(f"Error: {response_data['message']}")
 
-             return []
 
-     def check_api(self):
 
-         """
 
-         注册成功后 进行yotpo api创建用户,并订阅邮件通知
 
-         @return:
 
-         """
 
-         email = '1920098158@qq.com'
 
-         customer = {
 
-             "email": email, "first_name": "test", "last_name": "22",
 
-             'address': {
 
-                 "country_code": 'GB',
 
-             }
 
-         }
 
-         result = self.creat_and_update_customers(customer)
 
-         print(result)
 
-         list_id = 8589406
 
-         subscribers_result = self.create_subscribers(customer, list_id)
 
-         print(subscribers_result)
 
-         # customer_result = self.get_customers_list(email)
 
-         # customer = json.loads(customer_result)
 
-         # if customer['customers']:
 
-         #     external_id = customer['customers'][0]['external_id']
 
-         #
 
-         #     country_code = 'LU'
 
-         #     customer = {
 
-         #         "external_id": external_id,
 
-         #         "address": {
 
-         #             "country_code": country_code,
 
-         #         }
 
-         #     }
 
-         #     self.creat_and_update_customers(customer)
 
-         print('success')
 
 
  |