# -*- encoding: utf-8 -*- """ @File : AgentOrderController.py @Time : 2024/3/14 10:53 @Author : stephen @Email : zhangdongming@asj6.wecom.work @Software: PyCharm """ import threading import time from datetime import datetime, timedelta from decimal import Decimal, ROUND_DOWN from django.http import QueryDict from django.views import View from AgentModel.models import AgentDevice, AgentCloudServicePackage, AgentCustomerPackage, AgentDeviceOrder, \ AgentDeviceOrderInstallment, AgentAccount from Ansjer.config import LOGGER from Model.models import Order_Model, Store_Meal, UnicomCombo from Object.RedisObject import RedisObject from Object.ResponseObject import ResponseObject from Object.TokenObject import TokenObject from Service.CommonService import CommonService class AgentOrderView(View): def get(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') return self.validation(request.GET, request, operation) def post(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') return self.validation(request.POST, request, operation) def delete(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') delete = QueryDict(request.body) if not delete: delete = request.GET return self.validation(delete, request, operation) def put(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') put = QueryDict(request.body) return self.validation(put, request, operation) def validation(self, request_dict, request, operation): response = ResponseObject() if operation == 'settlementOrder': # 季度结算 asy = threading.Thread(target=AgentOrderView.update_periodic_settlement,args=()) asy.start() return response.json(0) tko = TokenObject( request.META.get('HTTP_AUTHORIZATION'), returntpye='pc') if operation == 'addOrder': # 添加代理商订单 order_id = request_dict.get('order_id', None) uid = request_dict.get('uid', None) order_type = request_dict.get('order_type', None) package_id = request_dict.get('package_id', None) self.check_agent_service_package(order_id, uid, int(package_id)) return response.json(0) else: return response.json(414) @classmethod def check_agent_service_package(cls, order_id, uid, package_id): """ 检查是否代理服务套餐 @param package_id: 套餐id @param order_id: 订单ID @param uid: UID @return: True | False """ try: serial_number = CommonService.get_serial_number_by_uid(uid) a_device_qs = AgentDevice.objects.filter(serial_number=serial_number) \ .values('ac_id', 'type', 'status') LOGGER.info(f'检查当前订单是否绑定代理*****orderID:{order_id},serialNumber:{serial_number}') if not a_device_qs.exists(): return False LOGGER.info(f'当前设备属于代理商orderID:{order_id},serialNumber:{serial_number}') asy = threading.Thread(target=cls.save_agent_package, args=(order_id, serial_number, a_device_qs[0]['ac_id'], package_id)) asy.start() return True except Exception as e: LOGGER.error('*****支付成功保存云服务代理订单异常orderID:{},errLine:{}, errMsg:{}' .format(order_id, e.__traceback__.tb_lineno, repr(e))) return False @classmethod def save_agent_package(cls, order_id, serial_number, ac_id, package_id): """ 保存代理套餐 """ try: order_qs = Order_Model.objects.filter(orderID=order_id, status=1).values('price', 'payTime', 'order_type') if not order_qs.exists(): LOGGER.info( f'******save_agent_package当前代理客户未添加此套餐******ac_id:{ac_id},package_id:{package_id}') return order_type = order_qs[0]['order_type'] package_type = 2 if order_type in [2, 3, 5] else 1 # 判断订单信息是云存还是4G package_id = int(package_id) agent_package_qs = AgentCloudServicePackage.objects.filter(type=package_type, package_id=package_id, status=1) if not agent_package_qs.exists(): LOGGER.info( f'******save_agent_package当前套餐未设置代理******order_id:{order_id},serial_number:{serial_number}') return agent_package = agent_package_qs.first() # 代理云服务套餐 LOGGER.info(f'******save_agent_package代理套餐******service_name:{agent_package_qs.first().service_name}') acp_qs = AgentCustomerPackage.objects.filter(ac_id=ac_id, cs_id=agent_package.id).values('id') if not acp_qs.exists(): LOGGER.info( f'******save_agent_package当前代理客户未添加此套餐******ac_id:{ac_id},package_id:{package_id}') return # 组装数据 now_time = int(time.time()) pay_price = Decimal(order_qs[0]['price']).quantize(Decimal('0.00')) profit = cls.calculate_order_profit(agent_package, pay_price) dict_data = {'ac_id': ac_id, 'serial_number': serial_number, 'csp_id': agent_package.id, 'order_id': order_id, 'status': 1, 'profit_amount': pay_price, 'profit': profit, 'pay_time': order_qs[0]['payTime'], 'created_time': now_time, 'updated_time': now_time} agent_order_obj = AgentDeviceOrder.objects.create(**dict_data) # 保存分期结算记录 cls.save_order_installment(agent_order_obj.id, package_type, package_id, profit, ac_id, order_qs[0]['payTime']) LOGGER.info(f'******save_agent_package代理订单存表结束:{dict_data}') except Exception as e: LOGGER.info('*****AgentOrderView.save_agent_package:errLine:{}, errMsg:{}' .format(e.__traceback__.tb_lineno, repr(e))) @classmethod def calculate_order_profit(cls, agent_package, price): """ 计算利润 @param agent_package: 套餐配置 @param price: 支付价格 @return: 利润 """ profit = 0 price = Decimal(price).quantize(Decimal('0.00')) if agent_package.profit_type == 1: profit = agent_package.profit elif agent_package.profit_type == 2: profit_value = Decimal(agent_package.profit).quantize(Decimal('0.00')) cost = Decimal(agent_package.cost).quantize(Decimal('0.00')) profit = (price - cost) * (profit_value / 100) profit = profit.quantize(Decimal('0.00')) return profit @classmethod def get_quarterly_settlement_dates(cls, start_date, months): """ 获取季度结算日期列表,按照以下规则: 1. 固定在四个季度结算日期(1月1日、4月1日、7月1日、10月1日)进行结算 2. 从购买时间到结算日不满1个月的不在当前季度结算,累积到下一个季度 3. 中间季度每季度计算3个月 4. 最后一个季度计算剩余的时间 :param start_date: 套餐开始日期(datetime) :param months: 套餐总月数 :return: 包含(结算日期timestamp, 该季度使用月数)的元组列表 """ # 固定的季度结算日期 QUARTER_DATES = [(1, 1), (4, 1), (7, 1), (10, 1)] # (月, 日) # 计算套餐结束日期 end_date = start_date + timedelta(days=int(months * 30.5)) # 初始化结果列表 result = [] # 找到开始日期后的第一个季度结算日 current_year = start_date.year current_quarter_idx = 0 # 找到开始日期之后的第一个季度结算日 for i, (month, day) in enumerate(QUARTER_DATES): quarter_date = datetime(current_year, month, day) if quarter_date > start_date: current_quarter_idx = i break else: # 如果当年没有更多季度结算日,则移到下一年的第一个季度结算日 current_year += 1 current_quarter_idx = 0 # 第一个季度的结算日期 month, day = QUARTER_DATES[current_quarter_idx] first_settlement_date = datetime(current_year, month, day) # 计算第一个季度的整月数 days_in_first_quarter = (first_settlement_date - start_date).days whole_months_first_quarter = int(days_in_first_quarter / 30.5) # 计算剩余的月数(总月数减去第一个季度的整月数,如果第一个季度有整月数) remaining_months = months # 如果第一个季度有整月数,则添加第一个季度的结算记录并减去已结算的月数 if whole_months_first_quarter >= 1: result.append((int(first_settlement_date.timestamp()), whole_months_first_quarter)) remaining_months -= whole_months_first_quarter else: # 即使不足1个月,也添加第一个季度的结算记录,但月数为0 # 这样可以确保在7月1日进行第一次结算 result.append((int(first_settlement_date.timestamp()), 0)) # 如果没有剩余月数,直接返回结果 if remaining_months <= 0: return result # 特殊处理年套餐(12个月)的情况 if months == 12 and whole_months_first_quarter == 0: # 确保总共有5次季度结算,最后一次是剩余的月数 # 第一次结算已经添加(7月1日,整月数为0) # 添加第二次结算(10月1日,整月数为3) current_quarter_idx = (current_quarter_idx + 1) % len(QUARTER_DATES) if current_quarter_idx == 0: current_year += 1 month, day = QUARTER_DATES[current_quarter_idx] settlement_date = datetime(current_year, month, day) result.append((int(settlement_date.timestamp()), 3)) # 添加第三次结算(1月1日,整月数为3) current_quarter_idx = (current_quarter_idx + 1) % len(QUARTER_DATES) if current_quarter_idx == 0: current_year += 1 month, day = QUARTER_DATES[current_quarter_idx] settlement_date = datetime(current_year, month, day) result.append((int(settlement_date.timestamp()), 3)) # 添加第四次结算(4月1日,整月数为3) current_quarter_idx = (current_quarter_idx + 1) % len(QUARTER_DATES) if current_quarter_idx == 0: current_year += 1 month, day = QUARTER_DATES[current_quarter_idx] settlement_date = datetime(current_year, month, day) result.append((int(settlement_date.timestamp()), 3)) # 添加第五次结算(7月1日,整月数为剩余的月数) current_quarter_idx = (current_quarter_idx + 1) % len(QUARTER_DATES) if current_quarter_idx == 0: current_year += 1 month, day = QUARTER_DATES[current_quarter_idx] settlement_date = datetime(current_year, month, day) # 剩余的月数为12减去前面已经结算的月数 remaining = 12 - (0 + 3 + 3 + 3) result.append((int(settlement_date.timestamp()), remaining)) return result # 非年套餐的处理逻辑 # 计算完整季度的数量(每季度3个月) full_quarters = int(remaining_months / 3) # 计算最后一个季度的剩余月数 last_quarter_months = remaining_months % 3 # 当前日期设置为第一个结算日期 current_date = first_settlement_date # 添加完整季度的结算记录 for _ in range(full_quarters): # 移到下一个季度结算日 current_quarter_idx = (current_quarter_idx + 1) % len(QUARTER_DATES) if current_quarter_idx == 0: current_year += 1 month, day = QUARTER_DATES[current_quarter_idx] settlement_date = datetime(current_year, month, day) # 添加完整季度的结算记录(3个月) result.append((int(settlement_date.timestamp()), 3)) # 更新当前日期 current_date = settlement_date # 如果有剩余月数,添加最后一个季度的结算记录 if last_quarter_months > 0: # 移到下一个季度结算日 current_quarter_idx = (current_quarter_idx + 1) % len(QUARTER_DATES) if current_quarter_idx == 0: current_year += 1 month, day = QUARTER_DATES[current_quarter_idx] settlement_date = datetime(current_year, month, day) # 添加最后一个季度的结算记录(剩余月数) result.append((int(settlement_date.timestamp()), last_quarter_months)) return result @staticmethod def calculate_months_in_period(start_date, end_date): """ 计算两个日期之间的整月数,不足一个月不计入 :param start_date: 开始日期 :param end_date: 结束日期 :return: 整月数 """ # 计算天数差 days_diff = (end_date - start_date).days # 转换为整月数(按平均每月30.5天计算) whole_months = int(days_diff / 30.5) return whole_months @staticmethod def calculate_quarterly_profit(profit, settlement_dates_with_months): """ 计算季度利润分配,基于整月数 :param profit: 总利润 :param settlement_dates_with_months: 包含(结算日期, 整月数)的元组列表 :return: 每个季度的利润列表 """ profit = Decimal(str(profit)).quantize(Decimal('0.01')) # 计算总整月数 total_months = sum(months for _, months in settlement_dates_with_months) # 如果总月数为0,返回空列表 if total_months == 0: return [] # 计算每月利润 monthly_profit = profit / Decimal(total_months) # 计算每个季度的利润 quarterly_amounts = [] for _, months in settlement_dates_with_months: # 计算当前季度的利润(基于整月数) amount = (monthly_profit * Decimal(months)).quantize(Decimal('0.01'), rounding=ROUND_DOWN) quarterly_amounts.append(amount) # 处理舍入误差,确保总和等于总利润 total_allocated = sum(quarterly_amounts) remainder = profit - total_allocated # 将剩余的分配到第一个季度 if remainder > Decimal('0') and quarterly_amounts: quarterly_amounts[0] += remainder return quarterly_amounts @classmethod def save_order_installment(cls, agent_order_id, package_type, package_id, profit, ac_id=None, pay_time=0): """ 保存代理订单分期信息(季度结算逻辑),不满一个月的时间累积到下一个季度 :param cls: 类方法的约定参数 :param agent_order_id: 代理订单ID :param package_type: 套餐类型(1:云存, 2:4G) :param package_id: 套餐ID :param profit: 利润总额 :param ac_id: 代理客户ID :param pay_time: 订单支付时间 :return: 无返回值 """ try: # 转换支付时间为datetime对象 pay_time_dt = datetime.fromtimestamp(pay_time) # 获取套餐月数 if package_type == 1: # 云存 store = Store_Meal.objects.filter(id=package_id).first() if not store: LOGGER.info(f'云存套餐不存在: {package_id}') return months = store.expire else: # 4G combo = UnicomCombo.objects.filter(id=package_id).first() if not combo: LOGGER.info(f'4G套餐不存在: {package_id}') return months = int(combo.expiration_days / 30) if months <= 0 or profit <= 0: LOGGER.info(f'无效参数: months={months}, profit={profit}') return LOGGER.info( f'开始计算季度结算: 订单ID={agent_order_id}, 开始日期={pay_time_dt}, 套餐月数={months}, 总利润={profit}') # 获取季度结算日期和每个季度的整月数 settlement_dates_with_months = cls.get_quarterly_settlement_dates(pay_time_dt, months) # 记录季度结算日期和月数 for i, (date, months_used) in enumerate(settlement_dates_with_months): date_str = datetime.fromtimestamp(date).strftime('%Y-%m-%d') LOGGER.info(f'季度{i + 1}结算日期: {date_str}, 整月数: {months_used}') # 如果没有有效的结算日期,则退出 if not settlement_dates_with_months: LOGGER.info(f'没有有效的季度结算日期: start_date={pay_time_dt}, months={months}') return # 计算每个季度的利润分配 amounts = cls.calculate_quarterly_profit(profit, settlement_dates_with_months) # 记录每个季度的利润分配 for i, amount in enumerate(amounts): LOGGER.info(f'季度{i + 1}利润分配: {amount}') # 创建分期记录 n_time = int(time.time()) installment_list = [] for i, ((settlement_date, months_used), amount) in enumerate(zip(settlement_dates_with_months, amounts)): # 只有使用满一个月才创建结算记录 if months_used >= 1: installment_list.append(AgentDeviceOrderInstallment( ado_id=agent_order_id, period_number=len(settlement_dates_with_months), ac_id=ac_id, amount=amount, due_date=settlement_date, status=1, created_time=n_time, updated_time=n_time )) # 批量创建 if installment_list: batch_size = 100 for i in range(0, len(installment_list), batch_size): AgentDeviceOrderInstallment.objects.bulk_create(installment_list[i:i + batch_size]) LOGGER.info(f'季度分期结算记录创建完成: {len(installment_list)}条, 订单ID: {agent_order_id}') else: LOGGER.info(f'没有创建季度分期结算记录: 订单ID: {agent_order_id}') except Exception as e: LOGGER.error(f'保存季度分期结算记录异常: 行号:{e.__traceback__.tb_lineno}, 错误:{repr(e)}') @classmethod def update_periodic_settlement(cls): """ 更新周期结算信息 - 优化版 功能: 1. 使用分布式锁确保同一时间只有一个进程在处理结算 2. 添加对账机制,确保数据准确性 3. 增加详细的日志记录 4. 使用Redis缓存避免重复处理 返回值: - 无返回值 """ # 初始化Redis对象 redis_obj = RedisObject() # 生成唯一的请求ID用于锁 request_id = f"settlement_task_{int(time.time())}" lock_key = "lock:periodic_settlement" # 尝试获取分布式锁 if not redis_obj.try_lock(lock_key, request_id, expire=10, time_unit_second=60): LOGGER.info("周期结算任务已在其他进程中运行,本次跳过") return LOGGER.info("开始执行周期结算任务") try: n_time = int(time.time()) # 记录开始处理的时间 start_time = time.time() LOGGER.info(f"开始查询到期的分期结算记录,当前时间戳: {n_time}") # 根据条件查询需要更新结算信息的订单分期记录 adoi_qs = AgentDeviceOrderInstallment.objects.filter(status=1, due_date__lte=n_time) if not adoi_qs.exists(): LOGGER.info("没有找到需要结算的记录") return # 记录找到的记录数 record_count = adoi_qs.count() LOGGER.info(f"找到 {record_count} 条需要结算的记录") # 使用事务处理结算过程 from django.db import transaction # 准备数据 settlement_records = [] # 用于记录处理的结算记录,后续对账使用 ids = [] a_account_list = [] adoi_set = set() total_amount = Decimal('0.00') for item in adoi_qs: # 准备分期记录的id列表和账户记录列表 ids.append(item.id) adoi_set.add(item.ado_id) # 记录结算金额 amount = Decimal(str(item.amount)).quantize(Decimal('0.01')) total_amount += amount # 创建账户记录对象 a_account_list.append(AgentAccount( ac_id=item.ac_id, amount=amount, remark=f'周期结算 - 分期ID:{item.id}', status=1, created_time=n_time, updated_time=n_time )) # 记录处理的结算记录 settlement_records.append({ 'id': item.id, 'ac_id': item.ac_id, 'amount': str(amount), 'due_date': item.due_date }) # 缓存结算记录用于对账 settlement_cache_key = f"settlement:records:{n_time}" redis_obj.set_data(settlement_cache_key, str(settlement_records), expire=86400) # 缓存24小时 LOGGER.info(f"准备处理 {len(ids)} 条分期记录,总金额: {total_amount}") # 使用事务确保数据一致性 with transaction.atomic(): batch_size = 100 # 分批更新分期记录状态 updated_count = 0 for i in range(0, len(ids), batch_size): batch_ids = ids[i:i + batch_size] update_result = AgentDeviceOrderInstallment.objects.filter(id__in=batch_ids) \ .update(status=2, settlement_time=n_time, updated_time=n_time) updated_count += update_result LOGGER.info(f"已更新分期记录状态: {updated_count}/{len(ids)}") # 对账检查 - 确保所有记录都已更新 if updated_count != len(ids): LOGGER.error(f"对账失败: 应更新 {len(ids)} 条记录,实际更新 {updated_count} 条") # 在事务中抛出异常将触发回滚 raise Exception(f"结算记录更新不一致: 应更新 {len(ids)} 条,实际更新 {updated_count} 条") # 分批创建账户记录 created_accounts = 0 for i in range(0, len(a_account_list), batch_size): batch_accounts = a_account_list[i:i + batch_size] created_batch = AgentAccount.objects.bulk_create(batch_accounts) created_accounts += len(created_batch) LOGGER.info(f"已创建账户记录: {created_accounts}/{len(a_account_list)}") # 对账检查 - 确保所有账户记录都已创建 if created_accounts != len(a_account_list): LOGGER.error(f"对账失败: 应创建 {len(a_account_list)} 条账户记录,实际创建 {created_accounts} 条") raise Exception( f"账户记录创建不一致: 应创建 {len(a_account_list)} 条,实际创建 {created_accounts} 条") # 检查是否所有分期都已结算,如果是,则更新订单状态为已结算 updated_orders = 0 for ado_id in adoi_set: # 检查是否还有未结算的分期 if not AgentDeviceOrderInstallment.objects.filter(ado_id=ado_id, status=1).exists(): # 更新订单状态为已结算 update_result = AgentDeviceOrder.objects.filter(id=ado_id, status=1) \ .update(status=2, settlement_time=n_time, updated_time=n_time) if update_result > 0: updated_orders += 1 LOGGER.info(f"订单 {ado_id} 所有分期已结算,已更新订单状态") LOGGER.info(f"共更新了 {updated_orders} 个订单的状态为已结算") # 记录结算摘要到Redis,用于后续查询 summary_key = f"settlement:summary:{n_time}" summary_data = { 'timestamp': n_time, 'record_count': record_count, 'total_amount': str(total_amount), 'updated_records': updated_count, 'created_accounts': created_accounts, 'updated_orders': updated_orders, 'duration': round(time.time() - start_time, 2) } redis_obj.set_hash_data(summary_key, summary_data) redis_obj.set_expire(summary_key, 86400 * 7) # 保存7天 # 记录完成信息 end_time = time.time() duration = round(end_time - start_time, 2) LOGGER.info(f"周期结算任务完成,处理时间: {duration}秒,处理记录: {record_count}条,总金额: {total_amount}") except Exception as e: # 记录详细的异常信息 LOGGER.error( f'周期结算任务异常: 行号:{e.__traceback__.tb_lineno}, 错误类型:{type(e).__name__}, 错误信息:{repr(e)}') finally: # 释放分布式锁 redis_obj.release_lock(lock_key, request_id) LOGGER.info("周期结算任务锁已释放")