# -*- encoding: utf-8 -*- """ @File : UIDBurnManageController.py @Time : 2025/7/30 08:57 @Author : stephen @Email : zhangdongming@asj6.wecom.work @Software: PyCharm """ import os from typing import Dict, Any from django.core import serializers from django.core.paginator import Paginator, EmptyPage, PageNotAnInteger from django.db.models import Q from django.http import QueryDict from django.views import View from AgentModel.models import BurnRecord, BurnEncryptedICUID from Ansjer.config import LOGGER from Object.ResponseObject import ResponseObject from Object.TokenObject import TokenObject class UIDBurnManageView(View): def get(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') return self.validation(request.GET, request, operation) def post(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') return self.validation(request.POST, request, operation) def delete(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') delete = QueryDict(request.body) if not delete: delete = request.GET return self.validation(delete, request, operation) def put(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') put = QueryDict(request.body) return self.validation(put, request, operation) def validation(self, request_dict, request, operation): """请求验证路由""" # 初始化响应对象 language = request_dict.get('language', 'cn') response = ResponseObject(language, 'pc') # Token验证 try: tko = TokenObject( request.META.get('HTTP_AUTHORIZATION'), returntpye='pc') if tko.code != 0: return response.json(tko.code) response.lang = tko.lang user_id = tko.userID except Exception as e: LOGGER.error(f"Token验证失败: {str(e)}") return response.json(444) if operation == 'getBurnRecordsPage': return self.get_burn_records_page(request_dict, response) elif operation == 'importBatchUids': return self.import_batch_uids(request, response) elif operation == 'addBurnRecord': return self.add_burn_record(request, response) elif operation == 'getBurnUidsPage': return self.get_burn_uids_page(request_dict, response) else: return response.json(414) @classmethod def get_burn_records_page(cls, request_dict: Dict[str, Any], response) -> Any: """ 分页查询烧录记录 :param cls: :param request_dict: 请求参数字典(包含分页参数和查询条件) :param response: 响应对象(用于返回JSON) :return: 分页查询结果的JSON响应 """ # 1. 分页参数处理与验证(严格类型转换+边界控制) try: page = int(request_dict.get('page', 1)) page_size = int(request_dict.get('pageSize', 10)) # 限制分页范围(避免页码为0或负数,页大小控制在1-100) page = max(page, 1) page_size = max(1, min(page_size, 100)) except (ValueError, TypeError): return response.json(444, "分页参数错误(必须为整数)") # 2. 构建查询条件(解决原代码中query可能未定义的问题) query = Q() # 初始化空条件(无条件查询) order_number = request_dict.get('orderNumber', '').strip() # 去除首尾空格,避免空字符串查询 if order_number: query &= Q(order_number__icontains=order_number) # 3. 获取查询集(延迟执行,不立即查询数据库) burn_qs = BurnRecord.objects.filter(query).order_by('-created_time') # 4. 分页处理(完善异常捕获) paginator = Paginator(burn_qs, page_size) try: page_obj = paginator.page(page) except PageNotAnInteger: # 若页码不是整数,返回第一页 page_obj = paginator.page(1) except EmptyPage: # 若页码超出范围,返回最后一页(或空列表,根据业务需求调整) page_obj = paginator.page(paginator.num_pages) burn_list = serializers.serialize( 'python', # 输出Python字典格式 page_obj, fields=['id', 'order_number', 'burn_count', 'purpose', 'created_time'] # 指定需要的字段 ) return response.json( 0, { 'list': burn_list, 'total': paginator.count, # 总记录数 'currentPage': page_obj.number, # 当前页码 'totalPages': paginator.num_pages, # 总页数 'pageSize': page_size # 返回实际使用的页大小(便于前端同步) } ) @classmethod def import_batch_uids(cls, request, response) -> Any: """ 导入批次UID :param request: HttpRequest对象(包含上传文件) :param response: 响应对象 :return: JSON响应 """ from openpyxl import load_workbook from datetime import datetime import time # 1. 验证文件上传 if 'file' not in request.FILES: return response.json(444, "请上传Excel文件") excel_file = request.FILES['file'] if not excel_file.name.endswith(('.xlsx', '.xls')): return response.json(444, "只支持Excel文件(.xlsx/.xls)") # 2. 生成批次号 batch_number = f"ENG{datetime.now().strftime('%Y%m%d')}" # 3. 读取Excel并处理UID try: wb = load_workbook(excel_file) ws = wb.active uids = [] for row in ws.iter_rows(min_row=1, values_only=True): if row[0]: # 第一列有值 uid = str(row[0]).strip() # 去空格 if uid: # 非空字符串 uids.append(uid) if not uids: return response.json(444, "Excel中没有有效的UID数据") # 4. 批量创建记录(使用事务保证原子性) from django.db import transaction current_time = int(time.time()) try: with transaction.atomic(): records = [ BurnEncryptedICUID( batch_number=batch_number, uid=uid, purpose='批次导入', created_time=current_time, updated_time=current_time, status=0 # 未烧录状态 ) for uid in set(uids) # 去重 ] BurnEncryptedICUID.objects.bulk_create(records) except Exception as e: LOGGER.error(f"导入批次UID失败: {str(e)}") return response.json(500, "导入失败,请检查数据格式") return response.json(0, {"batch_number": batch_number, "count": len(records)}) except Exception as e: LOGGER.error(f"处理Excel文件失败: {str(e)}") return response.json(500, "处理Excel文件失败") @classmethod def add_burn_record(cls, request, response) -> Any: """ 新增烧录记录(带UID文件) - Redis字符串优化版 :param request: HttpRequest对象(包含上传文件和表单数据) :param response: 响应对象 :return: JSON响应 """ import threading import time import os import json from uuid import uuid4 from django.conf import settings from Object.RedisObject import RedisObject # 1. 参数验证 request_dict = request.POST required_fields = ['order_number', 'burn_count', 'purpose'] for field in required_fields: if not request_dict.get(field): return response.json(444, f"缺少必填参数: {field}") # 2. 验证文件上传 if 'uid_file' not in request.FILES: return response.json(444, "请上传包含已烧录UID的Excel文件") excel_file = request.FILES['uid_file'] if not excel_file.name.endswith(('.xlsx', '.xls')): return response.json(444, "只支持Excel文件(.xlsx/.xls)") try: burn_count = int(request_dict['burn_count']) if burn_count <= 0: return response.json(444, "烧录数量必须大于0") # 3. 创建任务ID并初始化Redis状态 task_id = str(uuid4()) redis_key = f"burn_task:{task_id}" redis_obj = RedisObject() # 保存任务基本信息到Redis (过期时间2小时) task_data = { 'status': 'pending', 'order_number': request_dict['order_number'].strip(), 'burn_count': burn_count, 'purpose': request_dict['purpose'].strip(), 'progress': 0, 'processed': 0, 'total': 0, 'start_time': int(time.time()) } redis_obj.set_data(redis_key, json.dumps(task_data), 7200) # 4. 保存文件到项目static/uploadedfiles目录 base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) upload_dir = os.path.join(base_dir, 'static', 'uploaded_files') os.makedirs(upload_dir, exist_ok=True) file_path = os.path.join(upload_dir, f"{task_id}.xlsx") with open(file_path, 'wb+') as destination: for chunk in excel_file.chunks(): destination.write(chunk) # 5. 启动后台线程处理 thread = threading.Thread( target=cls._process_burn_record_async, args=(task_id, file_path, redis_key), daemon=True ) thread.start() return response.json(0, { "task_id": task_id, "message": "任务已提交,正在后台处理", "redis_key": redis_key }) except ValueError: return response.json(444, "烧录数量必须是整数") except Exception as e: LOGGER.error(f"创建烧录任务失败: {str(e)}") return response.json(500, "创建烧录任务失败") @classmethod def _process_burn_record_async(cls, task_id, file_path, redis_key): """后台线程处理烧录记录任务""" from openpyxl import load_workbook from django.db import transaction import time import json from AgentModel.models import BurnRecord, BurnEncryptedICUID from Object.RedisObject import RedisObject redis_obj = RedisObject() try: # 获取并更新任务状态为处理中 task_data = json.loads(redis_obj.get_data(redis_key)) task_data['status'] = 'processing' redis_obj.set_data(redis_key, json.dumps(task_data)) # 1. 读取Excel文件获取总行数 wb = load_workbook(file_path) ws = wb.active total_rows = ws.max_row # 更新总行数和开始时间 task_data['total'] = total_rows task_data['start_time'] = int(time.time()) redis_obj.set_data(redis_key, json.dumps(task_data)) # 2. 创建烧录记录 with transaction.atomic(): burn_record = BurnRecord( order_number=task_data['order_number'], burn_count=task_data['burn_count'], purpose=task_data['purpose'], updated_time=int(time.time()), created_time=int(time.time()) ) burn_record.save() task_data['burn_record_id'] = burn_record.id redis_obj.set_data(redis_key, json.dumps(task_data)) # 3. 分批处理UID文件(每批300条) batch_size = 300 current_time = int(time.time()) processed = 0 uids_batch = [] for row in ws.iter_rows(min_row=1, values_only=True): if row[0]: uid = str(row[0]).strip() if uid: uids_batch.append(uid) processed += 1 # 每处理100条更新一次进度 if processed % 100 == 0: progress = min(99, int((processed / total_rows) * 100)) task_data['progress'] = progress task_data['processed'] = processed task_data['last_update'] = int(time.time()) redis_obj.set_data(redis_key, json.dumps(task_data)) # 处理批次 if len(uids_batch) >= batch_size: cls._update_uids_batch( uids_batch, burn_record.id, current_time, redis_key ) uids_batch = [] # 处理最后一批 if uids_batch: cls._update_uids_batch( uids_batch, burn_record.id, current_time, redis_key ) # 更新最终状态 task_data['status'] = 'completed' task_data['progress'] = 100 task_data['processed'] = processed task_data['end_time'] = int(time.time()) redis_obj.set_data(redis_key, json.dumps(task_data)) LOGGER.info(f"处理烧录记录完成,任务ID: {task_id}, 处理UID数量: {processed}") # 清理临时文件 try: os.remove(file_path) except Exception as e: LOGGER.warning(f"删除临时文件失败: {str(e)}") except Exception as e: LOGGER.error(f"处理烧录记录失败: {str(e)}") task_data = { 'status': 'failed', 'error': str(e), 'end_time': int(time.time()) } redis_obj.set_data(redis_key, json.dumps(task_data)) @classmethod def _update_uids_batch(cls, uids_batch, burn_id, current_time, redis_key): """批量更新UID记录""" from django.db import transaction import json from Object.RedisObject import RedisObject redis_obj = RedisObject() try: with transaction.atomic(): updated = BurnEncryptedICUID.objects.filter( uid__in=uids_batch ).update( burn_id=burn_id, status=1, # 烧录成功 updated_time=current_time ) # 更新已处理数量到Redis task_data = json.loads(redis_obj.get_data(redis_key)) task_data['processed'] = task_data.get('processed', 0) + len(uids_batch) redis_obj.set_data(redis_key, json.dumps(task_data)) except Exception as e: LOGGER.error(f"批量更新UID失败: {str(e)}") raise @classmethod def get_burn_uids_page(cls, request_dict: Dict[str, Any], response) -> Any: """ 根据burn_id分页查询烧录UID记录 :param request_dict: 请求参数字典 :param response: 响应对象 :return: JSON响应 """ # 1. 参数验证 burn_id = request_dict.get('burn_id') if not burn_id: return response.json(444, "缺少burn_id参数") try: burn_id = int(burn_id) except ValueError: return response.json(444, "burn_id必须是整数") # 2. 分页参数处理 try: page = int(request_dict.get('page', 1)) page_size = int(request_dict.get('pageSize', 10)) page = max(page, 1) page_size = max(1, min(page_size, 100)) except (ValueError, TypeError): return response.json(444, "分页参数错误(必须为整数)") # 3. 查询并分页 query = Q(burn_id=burn_id) uid_qs = BurnEncryptedICUID.objects.filter(query).order_by('-created_time') paginator = Paginator(uid_qs, page_size) try: page_obj = paginator.page(page) except PageNotAnInteger: page_obj = paginator.page(1) except EmptyPage: page_obj = paginator.page(paginator.num_pages) uid_list = serializers.serialize( 'python', page_obj, fields=['id', 'uid', 'batch_number', 'status', 'created_time', 'updated_time'] ) return response.json( 0, { 'list': uid_list, 'total': paginator.count, 'currentPage': page_obj.number, 'totalPages': paginator.num_pages, 'pageSize': page_size } )