# -*- encoding: utf-8 -*- """ @File : UserSubscriptionController.py @Time : 2024/5/21 16:31 @Author : stephen @Email : zhangdongming@asj6.wecom.work @Software: PyCharm """ import threading import time from datetime import datetime import requests from django.http import QueryDict from django.views import View from Model.models import Device_User, CountryModel, UserEmailSubscriptions from Object.ResponseObject import ResponseObject from Object.TokenObject import TokenObject from Object.YotpoCoreObject import YotpoCoreObject from Ansjer.config import LOGGER from django.conf import settings OMNI_API_KEY = settings.OMNI_API_KEY API_URL = "https://api.omnisend.com/v5/" class UserSubscriptionControllerView(View): def get(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') return self.validation(request.GET, request, operation) def post(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') return self.validation(request.POST, request, operation) def delete(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') delete = QueryDict(request.body) if not delete: delete = request.GET return self.validation(delete, request, operation) def put(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') put = QueryDict(request.body) return self.validation(put, request, operation) def validation(self, request_dict, request, operation): response = ResponseObject('cn') tko = TokenObject(request.META.get('HTTP_AUTHORIZATION')) if tko.code != 0: return response.json(tko.code) response.lang = tko.lang userID = tko.userID if operation == 'checkSubscriptionStatus': return self.check_subscription_status(userID, response) elif operation == 'switchSubscription': return self.switch_subscription(userID, request_dict, response) else: return response.json(414) @staticmethod def check_subscription_status(user_id, response): """ 检查订阅状态 @param user_id: str @param response: 响应 @return: response """ user_qs = Device_User.objects.filter(userID=user_id) if not user_qs.exists(): return response.json(104) user_sub = UserEmailSubscriptions.objects.filter(user_id=user_id).values('status', 'push_sub_status').first() if not user_sub: UserEmailSubscriptions.objects.create(user_id=user_id, status=0, push_sub_status=1, updated_time=int(time.time()), created_time=int(time.time())) return response.json(0, {"emailSubStatus": 0, 'pushSubStatus': 1}) email_sub_status = 1 if user_sub["status"] == 1 else 0 push_sub_status = 0 if user_sub["push_sub_status"] == 0 else 1 user_sub = {"emailSubStatus": email_sub_status, 'pushSubStatus': push_sub_status} return response.json(0, user_sub) def switch_subscription(self, user_id, request_dict, response): """ 邮件订阅开关 @param user_id: str @param request_dict: dict @param response """ user_qs = Device_User.objects.filter(userID=user_id) email_sub_status = request_dict.get('emailSubStatus', None) push_sub_status = request_dict.get('pushSubStatus', None) if not email_sub_status and not push_sub_status: return response.json(444, "emailSubStatus or pushSubStatus") if not user_qs.exists(): return response.json(104) # 用户推送订阅 if push_sub_status: push_sub_status = int(push_sub_status) user_sub_qs = UserEmailSubscriptions.objects.filter(user_id=user_id) if user_sub_qs.exists(): user_sub_qs.update(push_sub_status=push_sub_status, updated_time=int(time.time())) else: user = user_qs.values('userEmail', 'phone').first() push_sub = { "user_id": user_id, "push_sub_status": push_sub_status, "created_time": int(time.time()), "updated_time": int(time.time()) } if user["userEmail"]: push_sub["email"] = user["userEmail"] if user["phone"]: push_sub["phone"] = user["phone"] UserEmailSubscriptions.objects.create(**push_sub) else: user_sub = UserEmailSubscriptions.objects.filter(user_id=user_id).values('status', 'push_sub_status').first() if not user_sub: push_sub_status = 1 else: push_sub_status = user_sub["push_sub_status"] if email_sub_status: # 修改数据库中订阅状态 email_sub_status = int(email_sub_status) email_scription = email_sub_status user_sub_qs = UserEmailSubscriptions.objects.filter(user_id=user_id) # 邮件订阅 if email_sub_status == 1: subscribers = user_qs.values('NickName', 'userEmail', 'region_country').first() if not subscribers["userEmail"]: LOGGER.info(f'subscribers{user_id}邮箱为空,无法订阅') return response.json(183) if user_sub_qs.exists(): user_sub_qs.update(email=subscribers["userEmail"], status=1, updated_time=int(time.time())) else: UserEmailSubscriptions.objects.create(user_id=user_id, status=1, email=subscribers["userEmail"], created_time=int(time.time()), updated_time=int(time.time())) subscription_thread = threading.Thread(target=self.subscription, args=(subscribers,email_scription)) subscription_thread.start() # 取消订阅 elif email_sub_status == 0: device_user = Device_User.objects.filter(userID=user_id).values('userEmail').first() email = device_user["userEmail"] omni_unsubscription_thread = threading.Thread(target=UserSubscriptionControllerView.close_omni_subscribers, args=(email,)) omni_unsubscription_thread.start() if device_user: customer = { "email": device_user["userEmail"], "first_name": device_user["userEmail"], "last_name": "APP", } try: yotpo = YotpoCoreObject() list_id = 8589406 subscription_thread = threading.Thread(target=yotpo.close_subscribers, args=(customer, list_id)) subscription_thread.start() if user_sub_qs.exists(): customer["status"] = "unsubscription" user_sub_qs.update(email=device_user["userEmail"], status=0, sub_result=customer, updated_time=int(time.time())) except Exception as e: return response.json(500,'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) else: user_sub = UserEmailSubscriptions.objects.filter(user_id=user_id).values('status').first() if not user_sub: email_sub_status = 0 else: email_sub_status = user_sub["status"] return response.json(0, {"emailSubStatus": email_sub_status, "pushSubStatus": push_sub_status}) @staticmethod def subscription(subscribers, email_scription): """ 订阅 @param subscribers: dict @return: boolean """ yotpo = YotpoCoreObject() email = subscribers.get("userEmail") try: # 查询顾客所在地区 if subscribers["region_country"]: country = CountryModel.objects.filter(id=subscribers["region_country"]).values('country_code').first() if country: country_code = country["country_code"] else: country_code = '' # 构建顾客订阅格式 customer = { "email": subscribers["userEmail"], "first_name": subscribers["userEmail"], "last_name": "APP", 'address': { "country_code": country_code, }, "custom_properties": { "subscription_office": "ZosiApp", } } else: customer = { "email": subscribers["userEmail"], "first_name": subscribers["userEmail"], "last_name": "APP", "custom_properties": { "subscription_office": "ZosiApp", } } result = UserSubscriptionControllerView.sync_on_register(email, email_scription, country_code) if result: LOGGER.info(f'{email}{result}') try: result = yotpo.creat_and_update_customers(customer) list_id = 8589406 sub_status, sub_result = yotpo.create_subscribers(customer, list_id) if result and sub_status: # 创建结果写入数据库 user_sub_qs = UserEmailSubscriptions.objects.filter(email=subscribers["userEmail"]) if user_sub_qs.exists(): user_sub_qs.update(email=subscribers["userEmail"], status=1, sub_result=sub_result,list_id=list_id, updated_time=int(time.time())) LOGGER.info(f'在yotpo创建客户并订阅成功,customer:{customer}') return True else: LOGGER.info(f'在yotpo创建客户并订阅失败,customer:{customer}') except Exception as yotpo_err: LOGGER.warning(f'Yotpo接口异常,已跳过:{yotpo_err}') return False except Exception as e: LOGGER.error(f'{subscribers["userEmail"]}订阅失败:{e}') return False def sync_on_register(email, email_scription, country_code): """ 注册时调用,写入 Omnisend。当email_scription==1为订阅打app subscribed标签 """ email_status = "subscribed" if str(email_scription) == '1' else "unsubscribed" payload = { "updateEnabled": True, "country": country_code, "identifiers": [ { "type": "email", "id": email, "channels": { "email": { "status": email_status, "statusDate": datetime.utcnow().isoformat() + "Z", } } } ], "customProperties": { "source": "User Registration", "createdAt": datetime.utcnow().isoformat() + "Z", } } if str(email_scription) == "1": payload["tags"] = ["app subscribed"] headers = { "X-API-KEY": OMNI_API_KEY, "accept": "application/json", "content-type": "application/json" } response = requests.post(f"{API_URL}/contacts", headers=headers, json=payload) LOGGER.info(f"{email} 注册同步到omni平台: ({response.status_code})") return response.status_code, response.json() @staticmethod def close_omni_subscribers(email): """ 关闭 Omnisend 订阅 @param email: 用户邮箱 @return: 成功返回 True,失败返回 False """ try: payload = { "updateEnabled": True, "identifiers": [ { "type": "email", "id": email, "channels": { "email": { "status": "unsubscribed", "statusDate": datetime.utcnow().isoformat() + "Z", } } } ], "customProperties": { "source": "User Unsubscription", "updatedAt": datetime.utcnow().isoformat() + "Z", }, } headers = { "X-API-KEY": OMNI_API_KEY, "accept": "application/json", "content-type": "application/json" } response = requests.patch( f"{API_URL}/contacts?email={email}", headers=headers, json=payload ) LOGGER.info(f"Omnisend 取消订阅 {email}: {response.status_code}") if response.status_code in [200, 201]: return True else: LOGGER.error(f"Omnisend API 失败: {response.text}") return False except Exception as e: LOGGER.error(f"关闭订阅异常 {email}: {str(e)}") return False