| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338 |
- # -*- encoding: utf-8 -*-
- """
- @File : UserSubscriptionController.py
- @Time : 2024/5/21 16:31
- @Author : stephen
- @Email : zhangdongming@asj6.wecom.work
- @Software: PyCharm
- """
- import threading
- import time
- from datetime import datetime
- import requests
- from django.http import QueryDict
- from django.views import View
- from Model.models import Device_User, CountryModel, UserEmailSubscriptions
- from Object.ResponseObject import ResponseObject
- from Object.TokenObject import TokenObject
- from Object.YotpoCoreObject import YotpoCoreObject
- from Ansjer.config import LOGGER
- from django.conf import settings
- OMNI_API_KEY = settings.OMNI_API_KEY
- API_URL = "https://api.omnisend.com/v5/"
- class UserSubscriptionControllerView(View):
- def get(self, request, *args, **kwargs):
- request.encoding = 'utf-8'
- operation = kwargs.get('operation')
- return self.validation(request.GET, request, operation)
- def post(self, request, *args, **kwargs):
- request.encoding = 'utf-8'
- operation = kwargs.get('operation')
- return self.validation(request.POST, request, operation)
- def delete(self, request, *args, **kwargs):
- request.encoding = 'utf-8'
- operation = kwargs.get('operation')
- delete = QueryDict(request.body)
- if not delete:
- delete = request.GET
- return self.validation(delete, request, operation)
- def put(self, request, *args, **kwargs):
- request.encoding = 'utf-8'
- operation = kwargs.get('operation')
- put = QueryDict(request.body)
- return self.validation(put, request, operation)
- def validation(self, request_dict, request, operation):
- response = ResponseObject('cn')
- tko = TokenObject(request.META.get('HTTP_AUTHORIZATION'))
- if tko.code != 0:
- return response.json(tko.code)
- response.lang = tko.lang
- userID = tko.userID
- if operation == 'checkSubscriptionStatus':
- return self.check_subscription_status(userID, response)
- elif operation == 'switchSubscription':
- return self.switch_subscription(userID, request_dict, response)
- else:
- return response.json(414)
- @staticmethod
- def check_subscription_status(user_id, response):
- """
- 检查订阅状态
- @param user_id: str
- @param response: 响应
- @return: response
- """
- user_qs = Device_User.objects.filter(userID=user_id)
- if not user_qs.exists():
- return response.json(104)
- user_sub = UserEmailSubscriptions.objects.filter(user_id=user_id).values('status', 'push_sub_status').first()
- if not user_sub:
- UserEmailSubscriptions.objects.create(user_id=user_id, status=0, push_sub_status=1,
- updated_time=int(time.time()), created_time=int(time.time()))
- return response.json(0, {"emailSubStatus": 0, 'pushSubStatus': 1})
- email_sub_status = 1 if user_sub["status"] == 1 else 0
- push_sub_status = 0 if user_sub["push_sub_status"] == 0 else 1
- user_sub = {"emailSubStatus": email_sub_status, 'pushSubStatus': push_sub_status}
- return response.json(0, user_sub)
- def switch_subscription(self, user_id, request_dict, response):
- """
- 邮件订阅开关
- @param user_id: str
- @param request_dict: dict
- @param response
- """
- user_qs = Device_User.objects.filter(userID=user_id)
- email_sub_status = request_dict.get('emailSubStatus', None)
- push_sub_status = request_dict.get('pushSubStatus', None)
- if not email_sub_status and not push_sub_status:
- return response.json(444, "emailSubStatus or pushSubStatus")
- if not user_qs.exists():
- return response.json(104)
- # 用户推送订阅
- if push_sub_status:
- push_sub_status = int(push_sub_status)
- user_sub_qs = UserEmailSubscriptions.objects.filter(user_id=user_id)
- if user_sub_qs.exists():
- user_sub_qs.update(push_sub_status=push_sub_status, updated_time=int(time.time()))
- else:
- user = user_qs.values('userEmail', 'phone').first()
- push_sub = {
- "user_id": user_id,
- "push_sub_status": push_sub_status,
- "created_time": int(time.time()),
- "updated_time": int(time.time())
- }
- if user["userEmail"]:
- push_sub["email"] = user["userEmail"]
- if user["phone"]:
- push_sub["phone"] = user["phone"]
- UserEmailSubscriptions.objects.create(**push_sub)
- else:
- user_sub = UserEmailSubscriptions.objects.filter(user_id=user_id).values('status',
- 'push_sub_status').first()
- if not user_sub:
- push_sub_status = 1
- else:
- push_sub_status = user_sub["push_sub_status"]
- if email_sub_status:
- # 修改数据库中订阅状态
- email_sub_status = int(email_sub_status)
- email_scription = email_sub_status
- user_sub_qs = UserEmailSubscriptions.objects.filter(user_id=user_id)
- # 邮件订阅
- if email_sub_status == 1:
- subscribers = user_qs.values('NickName', 'userEmail', 'region_country').first()
- if not subscribers["userEmail"]:
- LOGGER.info(f'subscribers{user_id}邮箱为空,无法订阅')
- return response.json(183)
- if user_sub_qs.exists():
- user_sub_qs.update(email=subscribers["userEmail"], status=1, updated_time=int(time.time()))
- else:
- UserEmailSubscriptions.objects.create(user_id=user_id, status=1, email=subscribers["userEmail"],
- created_time=int(time.time()), updated_time=int(time.time()))
- subscription_thread = threading.Thread(target=self.subscription, args=(subscribers,email_scription))
- subscription_thread.start()
- # 取消订阅
- elif email_sub_status == 0:
- device_user = Device_User.objects.filter(userID=user_id).values('userEmail').first()
- email = device_user["userEmail"]
- omni_unsubscription_thread = threading.Thread(target=UserSubscriptionControllerView.close_omni_subscribers, args=(email,))
- omni_unsubscription_thread.start()
- if device_user:
- customer = {
- "email": device_user["userEmail"],
- "first_name": device_user["userEmail"],
- "last_name": "APP",
- }
- try:
- yotpo = YotpoCoreObject()
- list_id = 8589406
- subscription_thread = threading.Thread(target=yotpo.close_subscribers, args=(customer, list_id))
- subscription_thread.start()
- if user_sub_qs.exists():
- customer["status"] = "unsubscription"
- user_sub_qs.update(email=device_user["userEmail"], status=0, sub_result=customer,
- updated_time=int(time.time()))
- except Exception as e:
- return response.json(500,'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
- else:
- user_sub = UserEmailSubscriptions.objects.filter(user_id=user_id).values('status').first()
- if not user_sub:
- email_sub_status = 0
- else:
- email_sub_status = user_sub["status"]
- return response.json(0, {"emailSubStatus": email_sub_status, "pushSubStatus": push_sub_status})
- @staticmethod
- def subscription(subscribers, email_scription):
- """
- 订阅
- @param subscribers: dict
- @return: boolean
- """
- yotpo = YotpoCoreObject()
- email = subscribers.get("userEmail")
- try:
- # 查询顾客所在地区
- if subscribers["region_country"]:
- country = CountryModel.objects.filter(id=subscribers["region_country"]).values('country_code').first()
- if country:
- country_code = country["country_code"]
- else:
- country_code = ''
- # 构建顾客订阅格式
- customer = {
- "email": subscribers["userEmail"],
- "first_name": subscribers["userEmail"],
- "last_name": "APP",
- 'address': {
- "country_code": country_code,
- },
- "custom_properties": {
- "subscription_office": "ZosiApp",
- }
- }
- else:
- customer = {
- "email": subscribers["userEmail"],
- "first_name": subscribers["userEmail"],
- "last_name": "APP",
- "custom_properties": {
- "subscription_office": "ZosiApp",
- }
- }
- result = UserSubscriptionControllerView.sync_on_register(email, email_scription, country_code)
- if result:
- LOGGER.info(f'{email}{result}')
- try:
- result = yotpo.creat_and_update_customers(customer)
- list_id = 8589406
- sub_status, sub_result = yotpo.create_subscribers(customer, list_id)
- if result and sub_status:
- # 创建结果写入数据库
- user_sub_qs = UserEmailSubscriptions.objects.filter(email=subscribers["userEmail"])
- if user_sub_qs.exists():
- user_sub_qs.update(email=subscribers["userEmail"], status=1, sub_result=sub_result,list_id=list_id,
- updated_time=int(time.time()))
- LOGGER.info(f'在yotpo创建客户并订阅成功,customer:{customer}')
- return True
- else:
- LOGGER.info(f'在yotpo创建客户并订阅失败,customer:{customer}')
- except Exception as yotpo_err:
- LOGGER.warning(f'Yotpo接口异常,已跳过:{yotpo_err}')
- return False
- except Exception as e:
- LOGGER.error(f'{subscribers["userEmail"]}订阅失败:{e}')
- return False
- def sync_on_register(email, email_scription, country_code):
- """
- 注册时调用,写入 Omnisend。当email_scription==1为订阅打app subscribed标签
- """
- email_status = "subscribed" if str(email_scription) == '1' else "unsubscribed"
- payload = {
- "updateEnabled": True,
- "country": country_code,
- "identifiers": [
- {
- "type": "email",
- "id": email,
- "channels": {
- "email": {
- "status": email_status,
- "statusDate": datetime.utcnow().isoformat() + "Z",
- }
- }
- }
- ],
- "customProperties": {
- "source": "User Registration",
- "createdAt": datetime.utcnow().isoformat() + "Z",
- }
- }
- if str(email_scription) == "1":
- payload["tags"] = ["app subscribed"]
- headers = {
- "X-API-KEY": OMNI_API_KEY,
- "accept": "application/json",
- "content-type": "application/json"
- }
- response = requests.post(f"{API_URL}/contacts", headers=headers, json=payload)
- LOGGER.info(f"{email} 注册同步到omni平台: ({response.status_code})")
- return response.status_code, response.json()
- @staticmethod
- def close_omni_subscribers(email):
- """
- 关闭 Omnisend 订阅
- @param email: 用户邮箱
- @return: 成功返回 True,失败返回 False
- """
- try:
- payload = {
- "updateEnabled": True,
- "identifiers": [
- {
- "type": "email",
- "id": email,
- "channels": {
- "email": {
- "status": "unsubscribed",
- "statusDate": datetime.utcnow().isoformat() + "Z",
- }
- }
- }
- ],
- "customProperties": {
- "source": "User Unsubscription",
- "updatedAt": datetime.utcnow().isoformat() + "Z",
- },
- }
- headers = {
- "X-API-KEY": OMNI_API_KEY,
- "accept": "application/json",
- "content-type": "application/json"
- }
- response = requests.patch(
- f"{API_URL}/contacts?email={email}",
- headers=headers,
- json=payload
- )
- LOGGER.info(f"Omnisend 取消订阅 {email}: {response.status_code}")
- if response.status_code in [200, 201]:
- return True
- else:
- LOGGER.error(f"Omnisend API 失败: {response.text}")
- return False
- except Exception as e:
- LOGGER.error(f"关闭订阅异常 {email}: {str(e)}")
- return False
|