# @Author : Rocky # @File : InAppPurchaseController.py # @Time : 2024/6/21 9:10 import logging import time import json from appstoreserverlibrary.api_client import AppStoreServerAPIClient, GetTransactionHistoryVersion from appstoreserverlibrary.models.Environment import Environment from appstoreserverlibrary.receipt_utility import ReceiptUtility from appstoreserverlibrary.models.HistoryResponse import HistoryResponse from appstoreserverlibrary.models.TransactionHistoryRequest import TransactionHistoryRequest, ProductType, Order from appstoreserverlibrary.signed_data_verifier import SignedDataVerifier from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives.serialization import load_pem_private_key from django.db.models import Q from django.views import View from django.http import HttpResponse from Ansjer.config import LOGGER, CONFIG_INFO, CONFIG_TEST, PAY_TYPE_IN_APP_PURCHASE, BASE_DIR from Controller.CheckUserData import DataValid from Model.models import Order_Model, Store_Meal, Device_Info, UID_Bucket, Unused_Uid_Meal, AiService, Device_User, \ SysMsgModel from Object.AWS.S3Email import S3Email from Object.AliSmsObject import AliSmsObject from Object.RedisObject import RedisObject from Service.CommonService import CommonService ENV = Environment.SANDBOX if CONFIG_INFO == CONFIG_TEST else Environment.PRODUCTION class InAppPurchaseView(View): def get(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') return self.validation(request.GET, request, operation) def post(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') return self.validation(request.POST, request, operation) def validation(self, request_dict, request, operation): if operation == 'AppStoreServerNotifications': # App Store服务器通知 return self.app_store_server_notifications(request_dict) token_code, user_id, response = CommonService.verify_token_get_user_id(request_dict, request) if token_code != 0: return response.json(token_code) if operation == 'verifyTransaction': # 认证交易 return self.verify_transaction(user_id, request_dict, response) @classmethod def verify_transaction(cls, user_id, request_dict, response): """ 认证交易 @param user_id: 用户id @param request_dict: 请求参数 @request_dict receipt: 收据 @param response: 响应对象 @return: response """ receipt = request_dict.get('receipt', None) uid = request_dict.get('uid', None) lang = request_dict.get('lang', 'en') channel = request_dict.get('channel', None) if not all([receipt, uid, channel]): return response.json(444) # redis加锁,防止订单重复 redis_obj = RedisObject() redis_key = uid + 'in_app_purchase' is_lock = redis_obj.CONN.setnx(redis_key, 1) redis_obj.CONN.expire(redis_key, 60) # if not is_lock: # return response.json(5) try: device_info_qs = Device_Info.objects.filter(userID_id=user_id, UID=uid, isShare=False, isExist=1).values( 'vodPrimaryUserID', 'vodPrimaryMaster') if not device_info_qs.exists(): return response.json(12) device_info_qs = Device_Info.objects.filter(Q(UID=uid), ~Q(vodPrimaryUserID='')).values('vodPrimaryUserID') if device_info_qs.exists(): if device_info_qs[0]['vodPrimaryUserID'] != user_id: return response.json(10033) # 从交易信息中获取product_id key_path = '{}/Ansjer/file/in_app_purchase/SubscriptionKey_N42WMFCV6A.p8'.format(BASE_DIR) with open(key_path, 'rb') as file: # 读取文件内容 private_key = file.read() key_id = 'N42WMFCV6A' issuer_id = '69a6de8c-789b-47e3-e053-5b8c7c11a4d1' bundle_id = 'com.ansjer.zccloud' environment = ENV client = AppStoreServerAPIClient(private_key, key_id, issuer_id, bundle_id, environment) receipt_util = ReceiptUtility() transaction_id = receipt_util.extract_transaction_id_from_app_receipt(receipt) if transaction_id is None: pay_result_url = CommonService.get_payment_status_url(lang, 'fail') return response.json(0, {'url': pay_result_url}) transaction_info = client.get_transaction_info(transaction_id) signed_transaction_info = transaction_info.signedTransactionInfo root_certificates = [] for cert_name in [ 'AppleIncRootCertificate.cer', 'AppleComputerRootCertificate.cer', 'AppleRootCA-G2.cer', 'AppleRootCA-G3.cer' ]: cert_path = '{}/Ansjer/file/in_app_purchase/{}'.format(BASE_DIR, cert_name) with open(cert_path, 'rb') as file: # 读取文件内容 root_certificates.append(file.read()) enable_online_checks = True app_apple_id = None # 生产环境必需 signed_data_verifier = SignedDataVerifier( root_certificates, enable_online_checks, environment, bundle_id, app_apple_id) payload = signed_data_verifier.verify_and_decode_signed_transaction(signed_transaction_info) product_id = None if payload and payload.productId: product_id = payload.productId if not product_id: pay_result_url = CommonService.get_payment_status_url(lang, 'fail') return response.json(0, {'url': pay_result_url}) pay_type = PAY_TYPE_IN_APP_PURCHASE now_time = int(time.time()) store_qs = Store_Meal.objects.filter( product_id=product_id, lang__lang=lang, is_show=0).\ values( 'id', 'currency', 'price', 'lang__content', 'day', 'commodity_type', 'lang__title', 'expire', 'commodity_code', 'discount_price', 'bucket_id', 'bucket__mold', 'cycle_config_id', 'is_ai') if not store_qs.exists(): return response.json(173) order_id = CommonService.createOrderID() rank_id = store_qs[0]['id'] bucket_id = store_qs[0]['bucket_id'] currency = store_qs[0]['currency'] price = store_qs[0]['price'] is_ai = store_qs[0]['is_ai'] expire = store_qs[0]['expire'] end_time = CommonService.calcMonthLater(expire) content = store_qs[0]['lang__content'] commodity_code = store_qs[0]['commodity_code'] commodity_type = store_qs[0]['commodity_type'] order_type = 1 if is_ai else 0 store_meal_qs = Store_Meal.objects.filter(id=rank_id, lang__lang='cn', is_show=0).\ values('lang__title', 'lang__content') if store_meal_qs.exists(): store_meal_name = store_meal_qs[0]['lang__title'] + '-' + store_meal_qs[0]['lang__content'] else: store_meal_name = '未知套餐' # 查询设备是否已开过云存 use_flag = True uid_bucket_qs = UID_Bucket.objects.filter(uid=uid).\ values('id', 'bucket_id', 'bucket__region', 'endTime', 'use_status') if uid_bucket_qs.exists(): uid_bucket = uid_bucket_qs.first() uid_bucket_id = uid_bucket['id'] # 叠加相同套餐的过期时间 if uid_bucket['use_status'] == 1 and uid_bucket['endTime'] > now_time: Unused_Uid_Meal.objects.create( uid=uid, channel=channel, addTime=now_time, order_id=order_id, expire=expire, is_ai=is_ai, bucket_id=bucket_id) UID_Bucket.objects.filter(id=uid_bucket_id).update(has_unused=1) use_flag = False # 更新套餐的过期时间 else: UID_Bucket.objects.filter(id=uid_bucket_id).update( channel=channel, bucket_id=bucket_id, endTime=end_time, updateTime=now_time, use_status=1, orderId=order_id) else: uid_bucket = UID_Bucket.objects.create( uid=uid, channel=channel, bucket_id=bucket_id, endTime=end_time, use_status=1, orderId=order_id, addTime=now_time, updateTime=now_time) uid_bucket_id = uid_bucket.id # 开通AI服务 if is_ai and use_flag: ai_service = AiService.objects.filter(uid=uid, channel=channel) # 有正在使用的套餐,叠加套餐时间,否则创建 if ai_service.exists(): ai_service.update(updTime=now_time, use_status=1, orders_id=order_id, endTime=end_time) else: AiService.objects.create( uid=uid, channel=channel, detect_status=1, use_status=1, orders_id=order_id, addTime=now_time, updTime=now_time, endTime=end_time) Order_Model.objects.create( orderID=order_id, UID=uid, channel=channel, userID_id=user_id, desc=content, payType=pay_type, payTime=now_time, price=price, currency=currency, addTime=now_time, updTime=now_time, order_type=order_type, commodity_code=commodity_code, commodity_type=commodity_type, rank_id=rank_id, ai_rank_id=1, status=1, create_vod=1, store_meal_name=store_meal_name, uid_bucket_id=uid_bucket_id) # 发送云存开通信息 date_time = time.strftime("%Y-%m-%d", time.localtime()) # 如果存在序列号,消息提示用序列号 device_info_qs = Device_Info.objects.filter(UID=uid).values('serial_number', 'Type') serial_number = device_info_qs[0]['serial_number'] device_type = device_info_qs[0]['Type'] if serial_number: device_name = CommonService.get_full_serial_number(uid, serial_number, device_type) else: device_name = uid sys_msg_text_list = [ '温馨提示:尊敬的客户,您的{}设备在{}已成功购买云存套餐'.format(device_name, date_time), 'Dear customer,you already subscribed the cloud storage package successfully for device {} on '. format(device_name, time.strftime('%b %dth,%Y', time.localtime()))] cls.do_vod_msg_notice(uid, user_id, lang, sys_msg_text_list) redis_obj.del_data(redis_key) pay_result_url = CommonService.get_payment_status_url(lang, 'success') return response.json(0, {'url': pay_result_url}) except Exception as e: redis_obj.del_data(redis_key) LOGGER.info('苹果内购认证交易接口异常:{}'. format('error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))) pay_result_url = CommonService.get_payment_status_url(lang, 'fail') return response.json(0, {'url': pay_result_url}) @classmethod def do_vod_msg_notice(cls, uid, user_id, lang, sys_msg_text_list): """ 发送云存开通信息 @param uid: uid @param user_id: 用户id @param lang: 语言 @param sys_msg_text_list: 消息列表 @return: response """ if lang == 'cn': sys_msg_text = sys_msg_text_list[0] else: sys_msg_text = sys_msg_text_list[1] now_time = int(time.time()) create_data = { 'userID_id': user_id, 'msg': sys_msg_text, 'addTime': now_time, 'updTime': now_time, 'uid': uid, 'eventType': 0 } SysMsgModel.objects.create(**create_data) # 不接收邮件用户 if user_id == '167015836969813800138000': return user_qs = Device_User.objects.filter(userID=user_id) if user_qs.exists(): user = user_qs.first() username = user.username data_valid = DataValid() if data_valid.email_validate(username): S3Email().faEmail(sys_msg_text, username) elif data_valid.mobile_validate(username): # 如果存在序列号,消息提示用序列号 device_info_qs = Device_Info.objects.filter(UID=uid).values('serial_number', 'Type') if device_info_qs.exists(): serial_number = device_info_qs[0]['serial_number'] device_type = device_info_qs[0]['Type'] if serial_number: device_name = CommonService.get_full_serial_number(uid, serial_number, device_type) else: device_name = uid params = '{"devname":"%s","submittime":"%s"}' % ( device_name, time.strftime("%Y-%m-%d", time.localtime())) cls.send_message(username, params, 'SMS_219738485') @staticmethod def send_message(phone, params, temp_msg): """ 发送手机消息 @param phone: 用户名 @param params: 消息参数 @param temp_msg: sms码 """ sign_ms = '周视' ali_sms = AliSmsObject() ali_sms.send_code_sms_cloud(phone=phone, params=params, sign_name=sign_ms, temp_msg=temp_msg) @staticmethod def app_store_server_notifications(request): logger = logging.getLogger('apple_pay') logger.info('App Store服务器通知参数:{}'.format(request.POST)) if request.method == 'POST': try: payload = json.loads(request.body.decode('utf-8')) # 获取 signedPayload signed_payload = payload.get('signedPayload') if not signed_payload: return HttpResponse(status=400) bundle_id = 'com.ansjer.zccloud' environment = ENV root_certificates = [] for cert_name in [ 'AppleIncRootCertificate.cer', 'AppleComputerRootCertificate.cer', 'AppleRootCA-G2.cer', 'AppleRootCA-G3.cer' ]: cert_path = '{}/Ansjer/file/in_app_purchase/{}'.format(BASE_DIR, cert_name) with open(cert_path, 'rb') as file: # 读取文件内容 root_certificates.append(file.read()) enable_online_checks = True app_apple_id = None # 生产环境必需 # 验证签名并解码 payload verifier = SignedDataVerifier( root_certificates, enable_online_checks, environment, bundle_id, app_apple_id) decoded_payload = verifier.verify_and_decode_notification(signed_payload) LOGGER.info(f"打印decoded_payload{decoded_payload}") if decoded_payload.notificationType == "DID_RENEW": # 处理订阅续费 pass return HttpResponse(status=200) except Exception as e: LOGGER.info('App Store服务器通知异常:{}'. format('error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))) return HttpResponse(status=500)