| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148 | # @Author    : linhaohong# @File      : AppleInAppPurchaseSubscriptionObject.py# @Time      : 2024/9/4 11:58from appstoreserverlibrary.receipt_utility import ReceiptUtilityfrom Ansjer.config import LOGGER, CONFIG_INFO, CONFIG_TEST, BASE_DIR, IN_APP_CONFIGfrom appstoreserverlibrary.api_client import AppStoreServerAPIClientfrom appstoreserverlibrary.models.Environment import Environmentfrom appstoreserverlibrary.signed_data_verifier import SignedDataVerifierfrom Model.models import Order_Model, DeviceApplePackageENV = Environment.SANDBOX if CONFIG_INFO == CONFIG_TEST else Environment.PRODUCTIONclass InAppConfig:    def __init__(self, bundle_id: str):        """       初始化 AppConfig,加载与指定 bundle_id 对应的配置。       :param bundle_id: 应用的 bundle ID,用于检索相关配置。       """        config = IN_APP_CONFIG.get(bundle_id)        if not config:            raise ValueError(f"没有找到与 bundle_id '{bundle_id}' 对应的配置")        self.key_id = config['key_id']        self.issuer_id = config['issuer_id']        self.key_filename = config['key_filename']        self.cert_names = config['cert_names']        self.app_apple_id = config.get("app_apple_id")        self.environment = ENVclass InAppPurchase:    def __init__(self, bundle_id="com.ansjer.zccloud"):        """        初始化 InAppSubscription,加载私钥并初始化客户端和解码器。        :param app_config: 包含内购相关配置的 AppConfig 实例。        """        app_config = InAppConfig(bundle_id)        self.bundle_id = bundle_id        self.config = app_config        self.private_key = self._load_private_key(self.config.key_filename)        self.client = self._initialize_client()        self.verifier = self._initialize_verifier()        self.receipt_util = self._initialize_receipt_util()    def _load_private_key(self, key_filename: str) -> bytes:        """        加载私钥文件并返回其内容。        :param key_filename: 私钥文件的名称。        :return: 返回私钥文件的内容,类型为 bytes。        """        key_path = f'{BASE_DIR}/Ansjer/file/in_app_purchase/{key_filename}'        try:            with open(key_path, 'rb') as file:                return file.read()        except FileNotFoundError:            raise ValueError(f"Private key file not found: {key_path}")    def _initialize_client(self) -> AppStoreServerAPIClient:        """        初始化 AppStoreServerAPIClient 实例,用于与 App Store 进行通信。        :return: 返回初始化后的 AppStoreServerAPIClient 实例。        """        return AppStoreServerAPIClient(            self.private_key,            self.config.key_id,            self.config.issuer_id,            self.bundle_id,            self.config.environment        )    def _initialize_verifier(self) -> SignedDataVerifier:        """       初始化 SignedDataVerifier 实例,用于内购验证签名数据。       :return: 返回初始化后的 SignedDataVerifier 实例。       """        root_certificates = [            self._load_certificate(cert_name)            for cert_name in self.config.cert_names        ]        enable_online_checks = True  # 根据需要设定在线检查是否启用        app_apple_id = None if CONFIG_INFO == CONFIG_TEST else self.config.app_apple_id        return SignedDataVerifier(            root_certificates,            enable_online_checks,            self.config.environment,            self.bundle_id,            app_apple_id        )    def _load_certificate(self, cert_name: str) -> bytes:        """        加载指定的根证书文件并返回其内容。        :param cert_name: 根证书文件的名称。        :return: 返回根证书文件的内容,类型为 bytes。        """        cert_path = f'{BASE_DIR}/Ansjer/file/in_app_purchase/{cert_name}'        try:            with open(cert_path, 'rb') as file:                return file.read()        except FileNotFoundError:            raise ValueError(f"Certificate file not found: {cert_path}")    def _initialize_receipt_util(self):        receipt_util = ReceiptUtility()        return receipt_util    def check_subscriptions(self, uid: str, subscription_group_id: str) -> bool:        try:            has_order = self._get_last_order(uid)            if not has_order:                return False            subscription_status = self._get_subscription_status(has_order, subscription_group_id)            return subscription_status        except Exception as e:            LOGGER.error(f"Error checking subscriptions: {str(e)}")            return False    def _get_last_order(self, uid: str) -> dict:        return Order_Model.objects.filter(UID=uid, payType=5).exclude(            original_transaction_id=""        ).values(            'transaction_id', 'original_transaction_id', 'orderID', 'addTime'        ).order_by('-addTime').first()    def _get_subscription_status(self, order: dict, subscription_group_id: str) -> bool:        device_apple_package = DeviceApplePackage.objects.filter(            original_transaction_id=order['original_transaction_id']        ).values("subscription_status").first()        if not device_apple_package or device_apple_package['subscription_status'] != 1:            return False        subscription_statuses = self.client.get_all_subscription_statuses(order['transaction_id'])        if not subscription_statuses.data:            return False        for subscription_status in subscription_statuses.data:            if str(subscription_status.subscriptionGroupIdentifier) == subscription_group_id:                if any(str(item.status) == "Status.ACTIVE" for item in subscription_status.lastTransactions):                    return True        return False
 |