| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236 | # -*- encoding: utf-8 -*-"""@File    : YotpoCoreObject.py@Time    : 2024/5/17 9:36@Author  : stephen@Email   : zhangdongming@asj6.wecom.work@Software: PyCharm"""# -*- encoding: utf-8 -*-"""@File    : EDM.py@Time    : 2024/5/16 11:16@Author  : stephen@Email   : zhangdongming@asj6.wecom.work@Software: PyCharm"""import datetimeimport jsonimport requestsSTORE_ID = "LIKwYlx6uonKqN91fXiGFb4FHqVLpvMUdGNU5Zf9"YOTPO_URL = f"https://api.yotpo.com/core/v3/stores/{STORE_ID}/"secret = 'isb8EOQs8bkjrksLHN73o3HIkrhoW539IXhULfdh'class YotpoCoreObject:    def get_token(self):        url = YOTPO_URL + "access_tokens"        payload = {'secret': secret}        headers = {            "accept": "application/json",            "Content-Type": "application/json"        }        response = requests.post(url, json=payload, headers=headers)        assert response.status_code == 200        token = json.loads(response.text).get("access_token")        print("token:", token)        return token    def creat_and_update_customers(self, customers):        """        创建和更新用户        """        url = YOTPO_URL + "customers"        yotpo_token = self.get_token()        headers = {            "accept": "application/json",            "X-Yotpo-Token": yotpo_token,            "Content-Type": "application/json"        }        payload = {            "customer": customers        }        response = requests.patch(url, json=payload, headers=headers)        assert response.status_code == 200        return response.text    def get_customers_list(self, email):        """        查用户信息        """        url = YOTPO_URL + f"customers?email={email}"        yotpo_token = self.get_token()        headers = {            "accept": "application/json",            "X-Yotpo-Token": yotpo_token,            "Content-Type": "application/json"        }        response = requests.get(url, headers=headers)        assert response.status_code == 200        return response.text    def create_subscribers(self, customer_params, list_id):        """        创建订阅        """        url = f"https://api.yotpo.com/messaging/v3/stores/{STORE_ID}/subscribers"        payload = {            "customer": customer_params,            "channels": {                "email": {                    "marketing": {                        "consent": "subscribed",                        "list_id": list_id,                        "source": "post_purchase_popup"                    },                    "suppression": {                        "suppress_email": True,                        "suppression_reason": "soft_bounce",                        "timestamp": datetime.datetime.now().strftime("%Y-%m-%dT%H:%M:%SZ")                    }                },            },            "suppress_welcome_messages": True        }        headers = {            "accept": "application/json",            "content-type": "application/json",            "X-Yotpo-Token": self.get_token()        }        response = requests.post(url, json=payload, headers=headers)        assert response.status_code == 200        return response.text    def close_subscribers(self, email, list_id):        """        关闭订阅        """        url = f"https://api.yotpo.com/messaging/v3/stores/{STORE_ID}/subscribers"        payload = {            "customer": {                "email": email,            },            # "channels": {            #     "email": {"marketing": {"consent": None}},            #     "suppress_welcome_messages": True            # },            "channels": {                "email": {"marketing": {                    "consent": "unsubscribed",                    "list_id": list_id,                    "source": "post_purchase_popup"                }}            },            "list_id": list_id        }        headers = {            "accept": "application/json",            "content-type": "application/json",            "X-Yotpo-Token": self.get_token()        }        response = requests.post(url, json=payload, headers=headers)        print(response.text)    def get_all_list_ids(self):        """        获取所有list_id        """        authentication_token = self.get_token()        url = f"https://api.yotpo.com/core/v3/stores/{STORE_ID}/lists"        headers = {            "accept": "application/json",            "content-type": "application/json",            "X-Yotpo-Token": authentication_token        }        response = requests.get(url, headers=headers)        response_data = response.json()        if response.status_code == 200:            print(response_data)        else:            print(f"Error: {response_data['message']}")            return []    def check_api(self):        """        注册成功后 进行yotpo api创建用户,并订阅邮件通知        @return:        """        email = '1920098158@qq.com'        customer = {            "email": email, "first_name": "test", "last_name": "22",            'address': {                "country_code": 'GB',            }        }        result = self.creat_and_update_customers(customer)        print(result)        list_id = 8589406        subscribers_result = self.create_subscribers(customer, list_id)        print(subscribers_result)        # customer_result = self.get_customers_list(email)        # customer = json.loads(customer_result)        # if customer['customers']:        #     external_id = customer['customers'][0]['external_id']        #        #     country_code = 'LU'        #     customer = {        #         "external_id": external_id,        #         "address": {        #             "country_code": country_code,        #         }        #     }        #     self.creat_and_update_customers(customer)        print('success')if __name__ == "__main__":    pass    # YotpoCoreObject().check_api()    # 获取 token ok    # token = get_token()    # print(token)    # # # 创建用户 ok    # response = creat_and_update_customers()    # print(response)    #    # 检索用户 ok    # response = get_customers_list()    # print(response.text)    # 检索订阅 ok    # response = get_webhooks_subscriptions()    # print(response.text)    # 检索接口    # response = get_webhooks("webhooks/targets")    # print(response)    # 创建筛选器    # create_webhook_filters()    #    # 创建订阅    # create_subscribers()    # 关闭订阅    # YotpoCoreObject().close_subscribers('1920098158@qq.com', 8589406)    # 获取list    # get_all_list_ids()
 |