| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151 | 
							- # @Author    : linhaohong
 
- # @File      : AppleInAppPurchaseSubscriptionObject.py
 
- # @Time      : 2024/9/4 11:58
 
- from appstoreserverlibrary.receipt_utility import ReceiptUtility
 
- from Ansjer.config import LOGGER, CONFIG_INFO, CONFIG_TEST, BASE_DIR, IN_APP_CONFIG
 
- from appstoreserverlibrary.api_client import AppStoreServerAPIClient
 
- from appstoreserverlibrary.models.Environment import Environment
 
- from appstoreserverlibrary.signed_data_verifier import SignedDataVerifier
 
- from Model.models import Order_Model, DeviceApplePackage
 
- ENV = Environment.SANDBOX if CONFIG_INFO == CONFIG_TEST else Environment.PRODUCTION
 
- class InAppConfig:
 
-     def __init__(self, bundle_id: str, user_id=""):
 
-         """
 
-        初始化 AppConfig,加载与指定 bundle_id 对应的配置。
 
-        :param bundle_id: 应用的 bundle ID,用于检索相关配置。
 
-        """
 
-         config = IN_APP_CONFIG.get(bundle_id)
 
-         if not config:
 
-             raise ValueError(f"没有找到与 bundle_id '{bundle_id}' 对应的配置")
 
-         self.key_id = config['key_id']
 
-         self.issuer_id = config['issuer_id']
 
-         self.key_filename = config['key_filename']
 
-         self.cert_names = config['cert_names']
 
-         self.app_apple_id = config.get("app_apple_id")
 
-         if user_id == "167083887153913800138000":
 
-             self.environment = Environment.SANDBOX
 
-         else:
 
-             self.environment = ENV
 
- class InAppPurchase:
 
-     def __init__(self, bundle_id="com.ansjer.zccloud", user_id=""):
 
-         """
 
-         初始化 InAppSubscription,加载私钥并初始化客户端和解码器。
 
-         :param app_config: 包含内购相关配置的 AppConfig 实例。
 
-         """
 
-         app_config = InAppConfig(bundle_id, user_id)
 
-         self.bundle_id = bundle_id
 
-         self.config = app_config
 
-         self.private_key = self._load_private_key(self.config.key_filename)
 
-         self.client = self._initialize_client()
 
-         self.verifier = self._initialize_verifier()
 
-         self.receipt_util = self._initialize_receipt_util()
 
-     def _load_private_key(self, key_filename: str) -> bytes:
 
-         """
 
-         加载私钥文件并返回其内容。
 
-         :param key_filename: 私钥文件的名称。
 
-         :return: 返回私钥文件的内容,类型为 bytes。
 
-         """
 
-         key_path = f'{BASE_DIR}/Ansjer/file/in_app_purchase/{key_filename}'
 
-         try:
 
-             with open(key_path, 'rb') as file:
 
-                 return file.read()
 
-         except FileNotFoundError:
 
-             raise ValueError(f"Private key file not found: {key_path}")
 
-     def _initialize_client(self) -> AppStoreServerAPIClient:
 
-         """
 
-         初始化 AppStoreServerAPIClient 实例,用于与 App Store 进行通信。
 
-         :return: 返回初始化后的 AppStoreServerAPIClient 实例。
 
-         """
 
-         return AppStoreServerAPIClient(
 
-             self.private_key,
 
-             self.config.key_id,
 
-             self.config.issuer_id,
 
-             self.bundle_id,
 
-             self.config.environment
 
-         )
 
-     def _initialize_verifier(self) -> SignedDataVerifier:
 
-         """
 
-        初始化 SignedDataVerifier 实例,用于内购验证签名数据。
 
-        :return: 返回初始化后的 SignedDataVerifier 实例。
 
-        """
 
-         root_certificates = [
 
-             self._load_certificate(cert_name)
 
-             for cert_name in self.config.cert_names
 
-         ]
 
-         enable_online_checks = True  # 根据需要设定在线检查是否启用
 
-         app_apple_id = None if CONFIG_INFO == CONFIG_TEST else self.config.app_apple_id
 
-         return SignedDataVerifier(
 
-             root_certificates,
 
-             enable_online_checks,
 
-             self.config.environment,
 
-             self.bundle_id,
 
-             app_apple_id
 
-         )
 
-     def _load_certificate(self, cert_name: str) -> bytes:
 
-         """
 
-         加载指定的根证书文件并返回其内容。
 
-         :param cert_name: 根证书文件的名称。
 
-         :return: 返回根证书文件的内容,类型为 bytes。
 
-         """
 
-         cert_path = f'{BASE_DIR}/Ansjer/file/in_app_purchase/{cert_name}'
 
-         try:
 
-             with open(cert_path, 'rb') as file:
 
-                 return file.read()
 
-         except FileNotFoundError:
 
-             raise ValueError(f"Certificate file not found: {cert_path}")
 
-     def _initialize_receipt_util(self):
 
-         receipt_util = ReceiptUtility()
 
-         return receipt_util
 
-     def check_subscriptions(self, uid: str, subscription_group_id: str) -> bool:
 
-         try:
 
-             has_order = self._get_last_order(uid)
 
-             if not has_order:
 
-                 return False
 
-             subscription_status = self._get_subscription_status(has_order, subscription_group_id)
 
-             return subscription_status
 
-         except Exception as e:
 
-             LOGGER.error(f"Error checking subscriptions: {str(e)}")
 
-             return False
 
-     def _get_last_order(self, uid: str) -> dict:
 
-         return Order_Model.objects.filter(UID=uid, payType=5).exclude(
 
-             original_transaction_id=""
 
-         ).values(
 
-             'transaction_id', 'original_transaction_id', 'orderID', 'addTime'
 
-         ).order_by('-addTime').first()
 
-     def _get_subscription_status(self, order: dict, subscription_group_id: str) -> bool:
 
-         device_apple_package = DeviceApplePackage.objects.filter(
 
-             original_transaction_id=order['original_transaction_id']
 
-         ).values("subscription_status").first()
 
-         if not device_apple_package or device_apple_package['subscription_status'] != 1:
 
-             return False
 
-         subscription_statuses = self.client.get_all_subscription_statuses(order['transaction_id'])
 
-         if not subscription_statuses.data:
 
-             return False
 
-         for subscription_status in subscription_statuses.data:
 
-             if str(subscription_status.subscriptionGroupIdentifier) == subscription_group_id:
 
-                 if any(str(item.status) == "Status.ACTIVE" for item in subscription_status.lastTransactions):
 
-                     return True
 
-         return False
 
 
  |