# @Author : linhaohong # @File : AppleInAppPurchaseSubscriptionObject.py # @Time : 2024/9/4 11:58 from appstoreserverlibrary.receipt_utility import ReceiptUtility from Ansjer.config import LOGGER, CONFIG_INFO, CONFIG_TEST, BASE_DIR, IN_APP_CONFIG from appstoreserverlibrary.api_client import AppStoreServerAPIClient from appstoreserverlibrary.models.Environment import Environment from appstoreserverlibrary.signed_data_verifier import SignedDataVerifier from Model.models import Order_Model, DeviceApplePackage ENV = Environment.SANDBOX if CONFIG_INFO == CONFIG_TEST else Environment.PRODUCTION class InAppConfig: def __init__(self, bundle_id: str, user_id=""): """ 初始化 AppConfig,加载与指定 bundle_id 对应的配置。 :param bundle_id: 应用的 bundle ID,用于检索相关配置。 """ config = IN_APP_CONFIG.get(bundle_id) if not config: raise ValueError(f"没有找到与 bundle_id '{bundle_id}' 对应的配置") self.key_id = config['key_id'] self.issuer_id = config['issuer_id'] self.key_filename = config['key_filename'] self.cert_names = config['cert_names'] self.app_apple_id = config.get("app_apple_id") if user_id == "167083887153913800138000": self.environment = Environment.SANDBOX else: self.environment = ENV class InAppPurchase: def __init__(self, bundle_id="com.ansjer.zccloud", user_id=""): """ 初始化 InAppSubscription,加载私钥并初始化客户端和解码器。 :param app_config: 包含内购相关配置的 AppConfig 实例。 """ app_config = InAppConfig(bundle_id, user_id) self.bundle_id = bundle_id self.config = app_config self.private_key = self._load_private_key(self.config.key_filename) self.client = self._initialize_client() self.verifier = self._initialize_verifier() self.receipt_util = self._initialize_receipt_util() def _load_private_key(self, key_filename: str) -> bytes: """ 加载私钥文件并返回其内容。 :param key_filename: 私钥文件的名称。 :return: 返回私钥文件的内容,类型为 bytes。 """ key_path = f'{BASE_DIR}/Ansjer/file/in_app_purchase/{key_filename}' try: with open(key_path, 'rb') as file: return file.read() except FileNotFoundError: raise ValueError(f"Private key file not found: {key_path}") def _initialize_client(self) -> AppStoreServerAPIClient: """ 初始化 AppStoreServerAPIClient 实例,用于与 App Store 进行通信。 :return: 返回初始化后的 AppStoreServerAPIClient 实例。 """ return AppStoreServerAPIClient( self.private_key, self.config.key_id, self.config.issuer_id, self.bundle_id, self.config.environment ) def _initialize_verifier(self) -> SignedDataVerifier: """ 初始化 SignedDataVerifier 实例,用于内购验证签名数据。 :return: 返回初始化后的 SignedDataVerifier 实例。 """ root_certificates = [ self._load_certificate(cert_name) for cert_name in self.config.cert_names ] enable_online_checks = True # 根据需要设定在线检查是否启用 app_apple_id = None if CONFIG_INFO == CONFIG_TEST else self.config.app_apple_id return SignedDataVerifier( root_certificates, enable_online_checks, self.config.environment, self.bundle_id, app_apple_id ) def _load_certificate(self, cert_name: str) -> bytes: """ 加载指定的根证书文件并返回其内容。 :param cert_name: 根证书文件的名称。 :return: 返回根证书文件的内容,类型为 bytes。 """ cert_path = f'{BASE_DIR}/Ansjer/file/in_app_purchase/{cert_name}' try: with open(cert_path, 'rb') as file: return file.read() except FileNotFoundError: raise ValueError(f"Certificate file not found: {cert_path}") def _initialize_receipt_util(self): receipt_util = ReceiptUtility() return receipt_util def check_subscriptions(self, uid: str, subscription_group_id: str) -> bool: try: has_order = self._get_last_order(uid) if not has_order: return False subscription_status = self._get_subscription_status(has_order, subscription_group_id) return subscription_status except Exception as e: LOGGER.error(f"Error checking subscriptions: {str(e)}") return False def _get_last_order(self, uid: str) -> dict: return Order_Model.objects.filter(UID=uid, payType=5).exclude( original_transaction_id="" ).values( 'transaction_id', 'original_transaction_id', 'orderID', 'addTime' ).order_by('-addTime').first() def _get_subscription_status(self, order: dict, subscription_group_id: str) -> bool: device_apple_package = DeviceApplePackage.objects.filter( original_transaction_id=order['original_transaction_id'] ).values("subscription_status").first() if not device_apple_package or device_apple_package['subscription_status'] != 1: return False subscription_statuses = self.client.get_all_subscription_statuses(order['transaction_id']) if not subscription_statuses.data: return False for subscription_status in subscription_statuses.data: if str(subscription_status.subscriptionGroupIdentifier) == subscription_group_id: if any(str(item.status) == "Status.ACTIVE" for item in subscription_status.lastTransactions): return True return False