| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208 | # -*- encoding: utf-8 -*-"""@File    : YotpoCoreObject.py@Time    : 2024/5/17 9:36@Author  : stephen@Email   : zhangdongming@asj6.wecom.work@Software: PyCharm"""# -*- encoding: utf-8 -*-"""@File    : EDM.py@Time    : 2024/5/16 11:16@Author  : stephen@Email   : zhangdongming@asj6.wecom.work@Software: PyCharm"""import datetimeimport jsonimport requestsfrom Ansjer.config import LOGGERSTORE_ID = "LIKwYlx6uonKqN91fXiGFb4FHqVLpvMUdGNU5Zf9"YOTPO_URL = f"https://api.yotpo.com/core/v3/stores/{STORE_ID}/"secret = 'isb8EOQs8bkjrksLHN73o3HIkrhoW539IXhULfdh'class YotpoCoreObject:    def get_token(self):        try:            url = YOTPO_URL + "access_tokens"            payload = {'secret': secret}            headers = {                "accept": "application/json",                "Content-Type": "application/json"            }            response = requests.post(url, json=payload, headers=headers)            if response.status_code != 200:                LOGGER.info(f'yotpo获取token失败, response.status_code请求不为200')                return ""            token = json.loads(response.text).get("access_token")            return token        except Exception as e:            LOGGER.info(f'yotpo获取token失败, {e}')            return ""    def creat_and_update_customers(self, customers):        """        创建和更新用户        """        try:            url = YOTPO_URL + "customers"            yotpo_token = self.get_token()            if yotpo_token == "":                LOGGER.info(f'yotpo创建顾客失败, yotpo_token为空 ,customers:{customers}')                return False            headers = {                "accept": "application/json",                "X-Yotpo-Token": yotpo_token,                "Content-Type": "application/json"            }            payload = {                "customer": customers            }            response = requests.patch(url, json=payload, headers=headers)            if response.status_code != 200:                LOGGER.info(f'yotpo创建顾客失败,response.status_code != 200,customers:{customers}')                return False            LOGGER.info(f'yotpo创建顾客成功,customers:{customers}')            return True        except Exception as e:            LOGGER.info(f'yotpo创建顾客失败, customers:{customers},{e}')            return False    def get_customers_list(self, email):        """        查用户信息        """        url = YOTPO_URL + f"customers?include_custom_properties=true&email={email}"        yotpo_token = self.get_token()        headers = {            "accept": "application/json",            "X-Yotpo-Token": yotpo_token,            "Content-Type": "application/json"        }        response = requests.get(url, headers=headers)        assert response.status_code == 200        return response.text    def create_subscribers(self, customer_params, list_id):        """        创建订阅        """        url = f"https://api.yotpo.com/messaging/v3/stores/{STORE_ID}/subscribers"        if "custom_properties" in customer_params:            del customer_params["custom_properties"]        payload = {            "customer": customer_params,            "channels": {                "email": {                    "marketing": {                        "consent": "subscribed",                        "list_id": list_id,                    },                    "suppression": {                        "suppress_email": True,                        "suppression_reason": "soft_bounce",                        "timestamp": datetime.datetime.now().strftime("%Y-%m-%dT%H:%M:%SZ")                    }                },            },        }        headers = {            "accept": "application/json",            "content-type": "application/json",            "X-Yotpo-Token": self.get_token()        }        response = requests.post(url, json=payload, headers=headers)        if response.status_code != 200:            LOGGER.info(f'yotpo创建订阅失败, customer_params:{customer_params}')            return False, ""        return True, response.text    def close_subscribers(self, customer, list_id):        """        关闭订阅        """        url = f"https://api.yotpo.com/messaging/v3/stores/{STORE_ID}/subscribers"        payload = {            "customer": customer,            "channels": {                "email": {"marketing": {                    "consent": "unsubscribed",                    "list_id": list_id,                    "source": "post_purchase_popup"                }}            }        }        headers = {            "accept": "application/json",            "content-type": "application/json",            "X-Yotpo-Token": self.get_token()        }        response = requests.post(url, json=payload, headers=headers)        if response.status_code != 200:            LOGGER.info(f'yotpo关闭订阅失败, customer:{customer}')            return False        LOGGER.info(f'yotpo关闭订阅成功, customer:{customer}')        return True    def get_all_list_ids(self):        """        获取所有list_id        """        authentication_token = self.get_token()        url = f"https://api.yotpo.com/core/v3/stores/{STORE_ID}/lists"        headers = {            "accept": "application/json",            "content-type": "application/json",            "X-Yotpo-Token": authentication_token        }        response = requests.get(url, headers=headers)        response_data = response.json()        if response.status_code == 200:            print(response_data)        else:            print(f"Error: {response_data['message']}")            return []    def check_api(self):        """        注册成功后 进行yotpo api创建用户,并订阅邮件通知        @return:        """        email = '1920098158@qq.com'        customer = {            "email": email, "first_name": "test", "last_name": "22",            'address': {                "country_code": 'GB',            }        }        result = self.creat_and_update_customers(customer)        print(result)        list_id = 8589406        subscribers_result = self.create_subscribers(customer, list_id)        print(subscribers_result)        # customer_result = self.get_customers_list(email)        # customer = json.loads(customer_result)        # if customer['customers']:        #     external_id = customer['customers'][0]['external_id']        #        #     country_code = 'LU'        #     customer = {        #         "external_id": external_id,        #         "address": {        #             "country_code": country_code,        #         }        #     }        #     self.creat_and_update_customers(customer)        print('success')
 |