|
@@ -0,0 +1,884 @@
|
|
|
+# -*- encoding: utf-8 -*-
|
|
|
+"""
|
|
|
+@File : UIDBurnManageController.py
|
|
|
+@Time : 2025/7/30 08:57
|
|
|
+@Author : stephen
|
|
|
+@Email : zhangdongming@asj6.wecom.work
|
|
|
+@Software: PyCharm
|
|
|
+"""
|
|
|
+import json
|
|
|
+import os
|
|
|
+import random
|
|
|
+import string
|
|
|
+import threading
|
|
|
+import time
|
|
|
+from datetime import datetime
|
|
|
+from typing import Dict, Any
|
|
|
+from uuid import uuid4
|
|
|
+
|
|
|
+import xlrd
|
|
|
+from django.core.paginator import Paginator, EmptyPage, PageNotAnInteger
|
|
|
+from django.db import transaction
|
|
|
+from django.db.models import Q
|
|
|
+from django.http import QueryDict
|
|
|
+from django.views import View
|
|
|
+from openpyxl import load_workbook
|
|
|
+
|
|
|
+from AgentModel.models import BurnRecord, BurnEncryptedICUID, BurnBatch
|
|
|
+from Ansjer.config import LOGGER
|
|
|
+from Object.RedisObject import RedisObject
|
|
|
+from Object.ResponseObject import ResponseObject
|
|
|
+from Object.TokenObject import TokenObject
|
|
|
+
|
|
|
+
|
|
|
+class UIDBurnManageView(View):
|
|
|
+ def get(self, request, *args, **kwargs):
|
|
|
+ request.encoding = 'utf-8'
|
|
|
+ operation = kwargs.get('operation')
|
|
|
+ return self.validation(request.GET, request, operation)
|
|
|
+
|
|
|
+ def post(self, request, *args, **kwargs):
|
|
|
+ request.encoding = 'utf-8'
|
|
|
+ operation = kwargs.get('operation')
|
|
|
+ return self.validation(request.POST, request, operation)
|
|
|
+
|
|
|
+ def delete(self, request, *args, **kwargs):
|
|
|
+ request.encoding = 'utf-8'
|
|
|
+ operation = kwargs.get('operation')
|
|
|
+ delete = QueryDict(request.body)
|
|
|
+ if not delete:
|
|
|
+ delete = request.GET
|
|
|
+ return self.validation(delete, request, operation)
|
|
|
+
|
|
|
+ def put(self, request, *args, **kwargs):
|
|
|
+ request.encoding = 'utf-8'
|
|
|
+ operation = kwargs.get('operation')
|
|
|
+ put = QueryDict(request.body)
|
|
|
+ return self.validation(put, request, operation)
|
|
|
+
|
|
|
+ def validation(self, request_dict, request, operation):
|
|
|
+ """请求验证路由"""
|
|
|
+ # 初始化响应对象
|
|
|
+ language = request_dict.get('language', 'cn')
|
|
|
+ response = ResponseObject(language, 'pc')
|
|
|
+
|
|
|
+ # Token验证
|
|
|
+ try:
|
|
|
+ tko = TokenObject(
|
|
|
+ request.META.get('HTTP_AUTHORIZATION'),
|
|
|
+ returntpye='pc')
|
|
|
+ if tko.code != 0:
|
|
|
+ return response.json(tko.code)
|
|
|
+ response.lang = tko.lang
|
|
|
+ user_id = tko.userID
|
|
|
+ except Exception as e:
|
|
|
+ LOGGER.error(f"Token验证失败: {str(e)}")
|
|
|
+ return response.json(444)
|
|
|
+
|
|
|
+ if operation == 'getBurnRecordsPage':
|
|
|
+ return self.get_burn_records_page(request_dict, response)
|
|
|
+ elif operation == 'importBatchUids':
|
|
|
+ return self.import_batch_uids(request, response)
|
|
|
+ elif operation == 'addBurnRecord':
|
|
|
+ return self.add_burn_record(request, request_dict, response)
|
|
|
+ elif operation == 'batchPageUids':
|
|
|
+ return self.batch_page_uids(request_dict, response)
|
|
|
+ elif operation == 'getImportProgress':
|
|
|
+ return self.get_import_progress(request_dict, response)
|
|
|
+ elif operation == 'getImportTaskList':
|
|
|
+ return self.get_import_task_list(request_dict, response)
|
|
|
+ elif operation == 'getBatchRecordsPage':
|
|
|
+ return self.get_batch_records_page(request_dict, response)
|
|
|
+ else:
|
|
|
+ return response.json(414)
|
|
|
+
|
|
|
+ @classmethod
|
|
|
+ def get_batch_records_page(cls, request_dict: Dict[str, Any], response) -> Any:
|
|
|
+ """
|
|
|
+ 分页查询批次记录(带统计信息)
|
|
|
+ :param request_dict: 请求参数字典
|
|
|
+ :param response: 响应对象
|
|
|
+ :return: JSON响应
|
|
|
+ """
|
|
|
+ # 1. 分页参数处理
|
|
|
+ try:
|
|
|
+ page = int(request_dict.get('page', 1))
|
|
|
+ page_size = int(request_dict.get('pageSize', 10))
|
|
|
+ page = max(page, 1)
|
|
|
+ page_size = max(1, min(page_size, 100))
|
|
|
+ except (ValueError, TypeError):
|
|
|
+ return response.json(444, "分页参数错误(必须为整数)")
|
|
|
+
|
|
|
+ # 2. 构建查询条件
|
|
|
+ query = Q()
|
|
|
+ batch_number = request_dict.get('batch_number', '').strip()
|
|
|
+ if batch_number:
|
|
|
+ query &= Q(batch_number__icontains=batch_number)
|
|
|
+
|
|
|
+ # 3. 查询并分页
|
|
|
+ batch_qs = BurnBatch.objects.filter(query).order_by('-created_time').values(
|
|
|
+ 'id', 'batch_number', 'purpose', 'manager', 'total_uid', 'created_time'
|
|
|
+ )
|
|
|
+
|
|
|
+ paginator = Paginator(batch_qs, page_size)
|
|
|
+ try:
|
|
|
+ page_obj = paginator.page(page)
|
|
|
+ except PageNotAnInteger:
|
|
|
+ page_obj = paginator.page(1)
|
|
|
+ except EmptyPage:
|
|
|
+ page_obj = paginator.page(paginator.num_pages)
|
|
|
+
|
|
|
+ # 4. 获取统计信息并构建结果
|
|
|
+ redis_obj = RedisObject()
|
|
|
+ batch_list = []
|
|
|
+
|
|
|
+ for batch in page_obj:
|
|
|
+ batch_id = batch['id']
|
|
|
+ cache_key = f"batch_stats:{batch_id}"
|
|
|
+
|
|
|
+ # 尝试从缓存获取统计信息
|
|
|
+ cached_stats = redis_obj.get_data(cache_key)
|
|
|
+ if cached_stats:
|
|
|
+ stats = json.loads(cached_stats)
|
|
|
+ else:
|
|
|
+ # 查询数据库统计
|
|
|
+ burned_count = BurnEncryptedICUID.objects.filter(
|
|
|
+ batch_id=batch_id,
|
|
|
+ status=1
|
|
|
+ ).count()
|
|
|
+ unburned_count = BurnEncryptedICUID.objects.filter(
|
|
|
+ batch_id=batch_id,
|
|
|
+ status=0
|
|
|
+ ).count()
|
|
|
+
|
|
|
+ stats = {
|
|
|
+ 'burned_count': burned_count,
|
|
|
+ 'unburned_count': unburned_count
|
|
|
+ }
|
|
|
+ # 设置缓存,过期时间1小时
|
|
|
+ redis_obj.set_data(cache_key, json.dumps(stats), 3600)
|
|
|
+
|
|
|
+ # 合并批次信息和统计信息
|
|
|
+ batch_info = dict(batch)
|
|
|
+ batch_info.update(stats)
|
|
|
+ batch_list.append(batch_info)
|
|
|
+
|
|
|
+ return response.json(
|
|
|
+ 0,
|
|
|
+ {
|
|
|
+ 'list': batch_list,
|
|
|
+ 'total': paginator.count,
|
|
|
+ 'currentPage': page_obj.number,
|
|
|
+ 'totalPages': paginator.num_pages,
|
|
|
+ 'pageSize': page_size
|
|
|
+ }
|
|
|
+ )
|
|
|
+
|
|
|
+ @classmethod
|
|
|
+ def get_burn_records_page(cls, request_dict: Dict[str, Any], response) -> Any:
|
|
|
+ """
|
|
|
+ 分页查询烧录记录
|
|
|
+ :param cls:
|
|
|
+ :param request_dict: 请求参数字典(包含分页参数和查询条件)
|
|
|
+ :param response: 响应对象(用于返回JSON)
|
|
|
+ :return: 分页查询结果的JSON响应
|
|
|
+ """
|
|
|
+ # 1. 分页参数处理与验证
|
|
|
+ try:
|
|
|
+ page = int(request_dict.get('page', 1))
|
|
|
+ page_size = int(request_dict.get('pageSize', 10))
|
|
|
+ # 限制分页范围
|
|
|
+ page = max(page, 1)
|
|
|
+ page_size = max(1, min(page_size, 100))
|
|
|
+ except (ValueError, TypeError):
|
|
|
+ return response.json(444, "分页参数错误(必须为整数)")
|
|
|
+
|
|
|
+ # 2. 构建查询条件
|
|
|
+ query = Q()
|
|
|
+ order_number = request_dict.get('orderNumber', '').strip()
|
|
|
+ if order_number:
|
|
|
+ query &= Q(order_number__icontains=order_number)
|
|
|
+
|
|
|
+ # 3. 获取查询集并指定需要的字段
|
|
|
+ burn_qs = BurnRecord.objects.filter(query).order_by('-created_time').values(
|
|
|
+ 'id', 'order_number', 'burn_count', 'purpose', 'created_time'
|
|
|
+ )
|
|
|
+
|
|
|
+ # 4. 分页处理
|
|
|
+ paginator = Paginator(burn_qs, page_size)
|
|
|
+ try:
|
|
|
+ page_obj = paginator.page(page)
|
|
|
+ except PageNotAnInteger:
|
|
|
+ page_obj = paginator.page(1)
|
|
|
+ except EmptyPage:
|
|
|
+ page_obj = paginator.page(paginator.num_pages)
|
|
|
+
|
|
|
+ # 转换为列表
|
|
|
+ burn_list = list(page_obj)
|
|
|
+ # 返回结果
|
|
|
+ return response.json(
|
|
|
+ 0,
|
|
|
+ {
|
|
|
+ 'total': paginator.count, # 总记录数
|
|
|
+ 'list': burn_list, # 当前页数据列表
|
|
|
+ 'currentPage': page_obj.number,
|
|
|
+ 'totalPages': paginator.num_pages
|
|
|
+ }
|
|
|
+ )
|
|
|
+
|
|
|
+ @classmethod
|
|
|
+ def import_batch_uids(cls, request, response) -> Any:
|
|
|
+ """
|
|
|
+ 导入批次UID - 异步优化版(适配新表结构)
|
|
|
+ :param request: HttpRequest对象(包含上传文件)
|
|
|
+ :param response: 响应对象
|
|
|
+ :return: JSON响应
|
|
|
+ """
|
|
|
+ # 1. 验证文件上传
|
|
|
+ if 'file' not in request.FILES:
|
|
|
+ return response.json(444, "请上传Excel文件")
|
|
|
+
|
|
|
+ excel_file = request.FILES['file']
|
|
|
+ if not excel_file.name.endswith(('.xlsx', '.xls')):
|
|
|
+ return response.json(444, "只支持Excel文件(.xlsx/.xls)")
|
|
|
+
|
|
|
+ try:
|
|
|
+ # 2. 生成任务ID和批次号
|
|
|
+ task_id = str(uuid4())
|
|
|
+ # 生成带时间戳和随机字符的批次号
|
|
|
+ timestamp = datetime.now().strftime('%Y%m%d%H%M%S') # 精确到秒
|
|
|
+ random_chars = ''.join(random.choices(string.ascii_uppercase + string.digits, k=3)) # 3个随机字符
|
|
|
+ batch_number = f"ENG{timestamp}{random_chars}" # 格式: ENG+时间戳+随机字符
|
|
|
+
|
|
|
+ # 3. 初始化Redis状态
|
|
|
+ redis_key = f"import_task:{task_id}"
|
|
|
+ redis_obj = RedisObject()
|
|
|
+
|
|
|
+ # 保存任务基本信息到Redis (过期时间2小时)
|
|
|
+ task_data = {
|
|
|
+ 'status': 'pending',
|
|
|
+ 'batch_number': batch_number,
|
|
|
+ 'progress': 0,
|
|
|
+ 'processed': 0,
|
|
|
+ 'total': 0,
|
|
|
+ 'start_time': int(time.time()),
|
|
|
+ 'success_count': 0
|
|
|
+ }
|
|
|
+ redis_obj.set_data(redis_key, json.dumps(task_data), 7200)
|
|
|
+
|
|
|
+ # 4. 保存文件到项目static/uploadedfiles目录
|
|
|
+ base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
|
|
+ upload_dir = os.path.join(base_dir, 'static', 'uploaded_files')
|
|
|
+ os.makedirs(upload_dir, exist_ok=True)
|
|
|
+ file_path = os.path.join(upload_dir, f"{task_id}.xls")
|
|
|
+
|
|
|
+ with open(file_path, 'wb+') as destination:
|
|
|
+ for chunk in excel_file.chunks():
|
|
|
+ destination.write(chunk)
|
|
|
+
|
|
|
+ # 5. 启动后台线程处理
|
|
|
+ thread = threading.Thread(
|
|
|
+ target=cls._process_import_batch_async,
|
|
|
+ args=(task_id, file_path, redis_key, batch_number),
|
|
|
+ daemon=True
|
|
|
+ )
|
|
|
+ thread.start()
|
|
|
+
|
|
|
+ return response.json(0, {
|
|
|
+ "task_id": task_id,
|
|
|
+ "batch_number": batch_number,
|
|
|
+ "message": "导入任务已提交,正在后台处理",
|
|
|
+ "redis_key": redis_key
|
|
|
+ })
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ LOGGER.error(f"创建导入任务失败: {str(e)}")
|
|
|
+ return response.json(500, "创建导入任务失败")
|
|
|
+
|
|
|
+ @classmethod
|
|
|
+ def _process_import_batch_async(cls, task_id, file_path, redis_key, batch_number):
|
|
|
+ """后台线程处理批量导入任务(兼容 xls 和 xlsx,读取第3列UID)"""
|
|
|
+
|
|
|
+ redis_obj = RedisObject()
|
|
|
+ try:
|
|
|
+ task_data = json.loads(redis_obj.get_data(redis_key))
|
|
|
+ task_data['status'] = 'processing'
|
|
|
+ redis_obj.set_data(redis_key, json.dumps(task_data))
|
|
|
+
|
|
|
+ # 判断文件类型并读取数据
|
|
|
+ file_ext = os.path.splitext(file_path)[-1].lower()
|
|
|
+ uid_rows = []
|
|
|
+
|
|
|
+ if file_ext == '.xls':
|
|
|
+ # 使用 xlrd 读取 .xls 文件
|
|
|
+ workbook = xlrd.open_workbook(file_path)
|
|
|
+ sheet = workbook.sheet_by_index(0)
|
|
|
+ total_rows = sheet.nrows
|
|
|
+ for row_idx in range(0, total_rows): # 改为从第0行开始
|
|
|
+ row = sheet.row_values(row_idx)
|
|
|
+ if len(row) > 2 and row[2]:
|
|
|
+ uid = str(row[2]).strip()
|
|
|
+ if uid:
|
|
|
+ uid_rows.append(uid)
|
|
|
+
|
|
|
+ elif file_ext == '.xlsx':
|
|
|
+ # 使用 openpyxl 读取 .xlsx 文件
|
|
|
+ workbook = load_workbook(file_path, read_only=True)
|
|
|
+ sheet = workbook.active
|
|
|
+ for row in sheet.iter_rows(min_row=1, values_only=True): # 改为从第1行开始
|
|
|
+ if len(row) > 2 and row[2]:
|
|
|
+ uid = str(row[2]).strip()
|
|
|
+ if uid:
|
|
|
+ uid_rows.append(uid)
|
|
|
+ task_data['total'] = len(uid_rows)
|
|
|
+ task_data['start_time'] = int(time.time())
|
|
|
+ redis_obj.set_data(redis_key, json.dumps(task_data))
|
|
|
+
|
|
|
+ # 创建批次记录
|
|
|
+ current_time = int(time.time())
|
|
|
+ with transaction.atomic():
|
|
|
+ batch = BurnBatch(
|
|
|
+ batch_number=batch_number,
|
|
|
+ purpose='批次导入',
|
|
|
+ created_time=current_time,
|
|
|
+ manager='system',
|
|
|
+ total_uid=0
|
|
|
+ )
|
|
|
+ batch.save()
|
|
|
+ batch_id = batch.id
|
|
|
+
|
|
|
+ # 分批处理UID
|
|
|
+ batch_size = 500
|
|
|
+ processed = 0
|
|
|
+ success_count = 0
|
|
|
+ uids_batch = []
|
|
|
+
|
|
|
+ for uid in uid_rows:
|
|
|
+ uids_batch.append(uid)
|
|
|
+ processed += 1
|
|
|
+
|
|
|
+ # 每处理1000条更新进度
|
|
|
+ if processed % 1000 == 0:
|
|
|
+ progress = min(99, int((processed / len(uid_rows)) * 100))
|
|
|
+ task_data['progress'] = progress
|
|
|
+ task_data['processed'] = processed
|
|
|
+ task_data['last_update'] = int(time.time())
|
|
|
+ redis_obj.set_data(redis_key, json.dumps(task_data))
|
|
|
+
|
|
|
+ if len(uids_batch) >= batch_size:
|
|
|
+ success = cls._import_uids_batch(
|
|
|
+ uids_batch,
|
|
|
+ batch_id,
|
|
|
+ current_time,
|
|
|
+ redis_key
|
|
|
+ )
|
|
|
+ success_count += success
|
|
|
+ uids_batch = []
|
|
|
+
|
|
|
+ # 最后一批处理
|
|
|
+ if uids_batch:
|
|
|
+ success = cls._import_uids_batch(
|
|
|
+ uids_batch,
|
|
|
+ batch_id,
|
|
|
+ current_time,
|
|
|
+ redis_key
|
|
|
+ )
|
|
|
+ success_count += success
|
|
|
+
|
|
|
+ # 更新数据库记录
|
|
|
+ with transaction.atomic():
|
|
|
+ BurnBatch.objects.filter(id=batch_id).update(total_uid=success_count)
|
|
|
+
|
|
|
+ task_data['status'] = 'completed'
|
|
|
+ task_data['progress'] = 100
|
|
|
+ task_data['processed'] = processed
|
|
|
+ task_data['success_count'] = success_count
|
|
|
+ task_data['end_time'] = int(time.time())
|
|
|
+ redis_obj.set_data(redis_key, json.dumps(task_data))
|
|
|
+
|
|
|
+ LOGGER.info(f"批次导入完成,任务ID: {task_id},处理UID: {processed},成功导入: {success_count}")
|
|
|
+
|
|
|
+ try:
|
|
|
+ os.remove(file_path)
|
|
|
+ except Exception as e:
|
|
|
+ LOGGER.warning(f"删除临时文件失败: {str(e)}")
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ LOGGER.error(f"处理导入任务失败: {str(e)}")
|
|
|
+ task_data = {
|
|
|
+ 'status': 'failed',
|
|
|
+ 'error': str(e),
|
|
|
+ 'end_time': int(time.time())
|
|
|
+ }
|
|
|
+ redis_obj.set_data(redis_key, json.dumps(task_data))
|
|
|
+
|
|
|
+ @classmethod
|
|
|
+ def _import_uids_batch(cls, uids_batch, batch_id, current_time, redis_key):
|
|
|
+ """批量导入UID记录(适配新表结构)"""
|
|
|
+
|
|
|
+ redis_obj = RedisObject()
|
|
|
+
|
|
|
+ try:
|
|
|
+ with transaction.atomic():
|
|
|
+ # 去重处理
|
|
|
+ unique_uids = list(set(uids_batch))
|
|
|
+
|
|
|
+ # 创建记录
|
|
|
+ records = [
|
|
|
+ BurnEncryptedICUID(
|
|
|
+ batch_id=batch_id,
|
|
|
+ uid=uid,
|
|
|
+ created_time=current_time,
|
|
|
+ updated_time=current_time,
|
|
|
+ status=0 # 未烧录状态
|
|
|
+ )
|
|
|
+ for uid in unique_uids
|
|
|
+ ]
|
|
|
+ BurnEncryptedICUID.objects.bulk_create(records)
|
|
|
+
|
|
|
+ # 更新已处理数量和成功数量到Redis
|
|
|
+ task_data = json.loads(redis_obj.get_data(redis_key))
|
|
|
+ task_data['processed'] = task_data.get('processed', 0) + len(uids_batch)
|
|
|
+ task_data['success_count'] = task_data.get('success_count', 0) + len(records)
|
|
|
+ redis_obj.set_data(redis_key, json.dumps(task_data))
|
|
|
+
|
|
|
+ # 清除批次统计缓存
|
|
|
+ cache_key = f"batch_stats:{batch_id}"
|
|
|
+ redis_obj.del_data(cache_key)
|
|
|
+
|
|
|
+ return len(records)
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ LOGGER.error(f"批量导入UID失败: {str(e)}")
|
|
|
+ # 更新失败状态但继续处理
|
|
|
+ task_data = json.loads(redis_obj.get_data(redis_key))
|
|
|
+ task_data['processed'] = task_data.get('processed', 0) + len(uids_batch)
|
|
|
+ redis_obj.set_data(redis_key, json.dumps(task_data))
|
|
|
+ return 0
|
|
|
+
|
|
|
+ @classmethod
|
|
|
+ def add_burn_record(cls, request, request_dict, response) -> Any:
|
|
|
+ """
|
|
|
+ 新增烧录记录(带UID文件) - Redis字符串优化版
|
|
|
+ :param request_dict:
|
|
|
+ :param request: HttpRequest对象(包含上传文件和表单数据)
|
|
|
+ :param response: 响应对象
|
|
|
+ :return: JSON响应
|
|
|
+ """
|
|
|
+
|
|
|
+ required_fields = ['order_number', 'burn_count', 'purpose']
|
|
|
+ for field in required_fields:
|
|
|
+ if not request_dict.get(field):
|
|
|
+ return response.json(444, f"缺少必填参数: {field}")
|
|
|
+
|
|
|
+ # 2. 验证文件上传
|
|
|
+ if 'uid_file' not in request.FILES:
|
|
|
+ return response.json(444, "请上传包含已烧录UID的Excel文件")
|
|
|
+
|
|
|
+ excel_file = request.FILES['uid_file']
|
|
|
+ if not excel_file.name.endswith(('.xlsx', '.xls')):
|
|
|
+ return response.json(444, "只支持Excel文件(.xlsx/.xls)")
|
|
|
+
|
|
|
+ try:
|
|
|
+ burn_count = int(request_dict['burn_count'])
|
|
|
+ if burn_count <= 0:
|
|
|
+ return response.json(444, "烧录数量必须大于0")
|
|
|
+
|
|
|
+ # 3. 创建任务ID并初始化Redis状态
|
|
|
+ task_id = str(uuid4())
|
|
|
+ redis_key = f"burn_task:{task_id}"
|
|
|
+ redis_obj = RedisObject()
|
|
|
+
|
|
|
+ # 保存任务基本信息到Redis (过期时间2小时)
|
|
|
+ task_data = {
|
|
|
+ 'status': 'pending',
|
|
|
+ 'order_number': request_dict['order_number'].strip(),
|
|
|
+ 'burn_count': burn_count,
|
|
|
+ 'purpose': request_dict['purpose'].strip(),
|
|
|
+ 'progress': 0,
|
|
|
+ 'processed': 0,
|
|
|
+ 'total': 0,
|
|
|
+ 'start_time': int(time.time())
|
|
|
+ }
|
|
|
+ redis_obj.set_data(redis_key, json.dumps(task_data), 7200)
|
|
|
+
|
|
|
+ # 4. 保存文件到项目static/uploadedfiles目录
|
|
|
+ base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
|
|
+ upload_dir = os.path.join(base_dir, 'static', 'uploaded_files')
|
|
|
+ os.makedirs(upload_dir, exist_ok=True)
|
|
|
+ file_path = os.path.join(upload_dir, f"{task_id}.xlsx")
|
|
|
+
|
|
|
+ with open(file_path, 'wb+') as destination:
|
|
|
+ for chunk in excel_file.chunks():
|
|
|
+ destination.write(chunk)
|
|
|
+
|
|
|
+ # 5. 启动后台线程处理
|
|
|
+ thread = threading.Thread(
|
|
|
+ target=cls._process_burn_record_async,
|
|
|
+ args=(task_id, file_path, redis_key),
|
|
|
+ daemon=True
|
|
|
+ )
|
|
|
+ thread.start()
|
|
|
+
|
|
|
+ return response.json(0, {
|
|
|
+ "task_id": task_id,
|
|
|
+ "message": "任务已提交,正在后台处理",
|
|
|
+ "redis_key": redis_key
|
|
|
+ })
|
|
|
+
|
|
|
+ except ValueError:
|
|
|
+ return response.json(444, "烧录数量必须是整数")
|
|
|
+ except Exception as e:
|
|
|
+ LOGGER.error(f"创建烧录任务失败: {str(e)}")
|
|
|
+ return response.json(500, "创建烧录任务失败")
|
|
|
+
|
|
|
+ @classmethod
|
|
|
+ def _process_burn_record_async(cls, task_id, file_path, redis_key):
|
|
|
+ """后台线程处理烧录记录任务"""
|
|
|
+ redis_obj = RedisObject()
|
|
|
+
|
|
|
+ try:
|
|
|
+ # 获取并更新任务状态为处理中
|
|
|
+ task_data = json.loads(redis_obj.get_data(redis_key))
|
|
|
+ task_data['status'] = 'processing'
|
|
|
+ redis_obj.set_data(redis_key, json.dumps(task_data))
|
|
|
+
|
|
|
+ # 1. 读取Excel文件获取总行数
|
|
|
+ wb = load_workbook(file_path)
|
|
|
+ ws = wb.active
|
|
|
+ total_rows = ws.max_row
|
|
|
+
|
|
|
+ # 更新总行数和开始时间
|
|
|
+ task_data['total'] = total_rows
|
|
|
+ task_data['start_time'] = int(time.time())
|
|
|
+ redis_obj.set_data(redis_key, json.dumps(task_data))
|
|
|
+
|
|
|
+ # 2. 创建烧录记录
|
|
|
+ with transaction.atomic():
|
|
|
+ burn_record = BurnRecord(
|
|
|
+ order_number=task_data['order_number'],
|
|
|
+ burn_count=task_data['burn_count'],
|
|
|
+ purpose=task_data['purpose'],
|
|
|
+ updated_time=int(time.time()),
|
|
|
+ created_time=int(time.time())
|
|
|
+ )
|
|
|
+ burn_record.save()
|
|
|
+ task_data['burn_record_id'] = burn_record.id
|
|
|
+ redis_obj.set_data(redis_key, json.dumps(task_data))
|
|
|
+
|
|
|
+ # 3. 分批处理UID文件(每批300条)
|
|
|
+ batch_size = 300
|
|
|
+ current_time = int(time.time())
|
|
|
+ processed = 0
|
|
|
+ uids_batch = []
|
|
|
+
|
|
|
+ for row in ws.iter_rows(min_row=1, values_only=True):
|
|
|
+ if row[0]:
|
|
|
+ uid = str(row[0]).strip()
|
|
|
+ if uid:
|
|
|
+ uids_batch.append(uid)
|
|
|
+ processed += 1
|
|
|
+
|
|
|
+ # 每处理100条更新一次进度
|
|
|
+ if processed % 100 == 0:
|
|
|
+ progress = min(99, int((processed / total_rows) * 100))
|
|
|
+ task_data['progress'] = progress
|
|
|
+ task_data['processed'] = processed
|
|
|
+ task_data['last_update'] = int(time.time())
|
|
|
+ redis_obj.set_data(redis_key, json.dumps(task_data))
|
|
|
+
|
|
|
+ # 处理批次
|
|
|
+ if len(uids_batch) >= batch_size:
|
|
|
+ cls._update_uids_batch(
|
|
|
+ uids_batch,
|
|
|
+ burn_record.id,
|
|
|
+ current_time,
|
|
|
+ redis_key
|
|
|
+ )
|
|
|
+ uids_batch = []
|
|
|
+
|
|
|
+ # 处理最后一批
|
|
|
+ if uids_batch:
|
|
|
+ cls._update_uids_batch(
|
|
|
+ uids_batch,
|
|
|
+ burn_record.id,
|
|
|
+ current_time,
|
|
|
+ redis_key
|
|
|
+ )
|
|
|
+
|
|
|
+ # 更新最终状态
|
|
|
+ task_data['status'] = 'completed'
|
|
|
+ task_data['progress'] = 100
|
|
|
+ task_data['processed'] = processed
|
|
|
+ task_data['end_time'] = int(time.time())
|
|
|
+ redis_obj.set_data(redis_key, json.dumps(task_data))
|
|
|
+
|
|
|
+ # 查询受影响的批次ID并清除缓存
|
|
|
+ batch_ids = BurnEncryptedICUID.objects.filter(
|
|
|
+ burn_id=burn_record.id
|
|
|
+ ).values_list('batch_id', flat=True).distinct()
|
|
|
+
|
|
|
+ for batch_id in batch_ids:
|
|
|
+ cache_key = f"batch_stats:{batch_id}"
|
|
|
+ redis_obj.del_data(cache_key)
|
|
|
+
|
|
|
+ LOGGER.info(f"处理烧录记录完成,任务ID: {task_id}, 处理UID数量: {processed}")
|
|
|
+
|
|
|
+ # 清理临时文件
|
|
|
+ try:
|
|
|
+ os.remove(file_path)
|
|
|
+ except Exception as e:
|
|
|
+ LOGGER.warning(f"删除临时文件失败: {str(e)}")
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ LOGGER.error(f"处理烧录记录失败: {str(e)}")
|
|
|
+ task_data = {
|
|
|
+ 'status': 'failed',
|
|
|
+ 'error': str(e),
|
|
|
+ 'end_time': int(time.time())
|
|
|
+ }
|
|
|
+ redis_obj.set_data(redis_key, json.dumps(task_data))
|
|
|
+
|
|
|
+ @classmethod
|
|
|
+ def _update_uids_batch(cls, uids_batch, burn_id, current_time, redis_key):
|
|
|
+ """批量更新UID记录"""
|
|
|
+ redis_obj = RedisObject()
|
|
|
+
|
|
|
+ try:
|
|
|
+ with transaction.atomic():
|
|
|
+ # 先查询出受影响的批次ID
|
|
|
+ batch_ids = BurnEncryptedICUID.objects.filter(
|
|
|
+ uid__in=uids_batch
|
|
|
+ ).values_list('batch_id', flat=True).distinct()
|
|
|
+
|
|
|
+ updated = BurnEncryptedICUID.objects.filter(
|
|
|
+ uid__in=uids_batch
|
|
|
+ ).update(
|
|
|
+ burn_id=burn_id,
|
|
|
+ status=1, # 烧录成功
|
|
|
+ updated_time=current_time
|
|
|
+ )
|
|
|
+ # 更新已处理数量到Redis
|
|
|
+ task_data = json.loads(redis_obj.get_data(redis_key))
|
|
|
+ task_data['processed'] = task_data.get('processed', 0) + len(uids_batch)
|
|
|
+ redis_obj.set_data(redis_key, json.dumps(task_data))
|
|
|
+
|
|
|
+ # 清除受影响批次的统计缓存
|
|
|
+ for batch_id in batch_ids:
|
|
|
+ cache_key = f"batch_stats:{batch_id}"
|
|
|
+ redis_obj.del_data(cache_key)
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ LOGGER.error(f"批量更新UID失败: {str(e)}")
|
|
|
+ raise
|
|
|
+
|
|
|
+ @classmethod
|
|
|
+ def get_import_progress(cls, request_dict: Dict[str, Any], response) -> Any:
|
|
|
+ """
|
|
|
+ 查询任务进度(支持导入和烧录任务)
|
|
|
+ :param request_dict: 请求参数字典(必须包含task_id, 可选type)
|
|
|
+ :param response: 响应对象
|
|
|
+ :return: JSON响应
|
|
|
+ type参数说明:
|
|
|
+ - import: 导入任务(默认)
|
|
|
+ - burn: 烧录任务
|
|
|
+ """
|
|
|
+ # 1. 参数验证
|
|
|
+ task_id = request_dict.get('task_id')
|
|
|
+ if not task_id:
|
|
|
+ return response.json(444, "缺少task_id参数")
|
|
|
+
|
|
|
+ task_type = request_dict.get('type', 'import').lower()
|
|
|
+ if task_type not in ['import', 'burn']:
|
|
|
+ return response.json(444, "type参数必须是'import'或'burn'")
|
|
|
+
|
|
|
+ # 2. 构建Redis key
|
|
|
+ redis_key = f"{task_type}_task:{task_id}"
|
|
|
+
|
|
|
+ try:
|
|
|
+ # 3. 从Redis获取任务数据
|
|
|
+ redis_obj = RedisObject()
|
|
|
+ task_data_str = redis_obj.get_data(redis_key)
|
|
|
+
|
|
|
+ if not task_data_str:
|
|
|
+ return response.json(173, "任务不存在或已过期")
|
|
|
+
|
|
|
+ # 4. 解析任务数据
|
|
|
+ if isinstance(task_data_str, bytes):
|
|
|
+ task_data_str = task_data_str.decode('utf-8')
|
|
|
+ task_data = json.loads(task_data_str)
|
|
|
+
|
|
|
+ # 5. 计算耗时(秒)
|
|
|
+ current_time = int(time.time())
|
|
|
+ start_time = task_data.get('start_time', current_time)
|
|
|
+ elapsed = current_time - start_time
|
|
|
+ if task_data.get('end_time'):
|
|
|
+ elapsed = task_data['end_time'] - start_time
|
|
|
+
|
|
|
+ # 6. 构建基础响应数据
|
|
|
+ result = {
|
|
|
+ 'status': task_data.get('status', 'unknown'),
|
|
|
+ 'progress': task_data.get('progress', 0),
|
|
|
+ 'processed': task_data.get('processed', 0),
|
|
|
+ 'total': task_data.get('total', 0),
|
|
|
+ 'elapsed_seconds': elapsed,
|
|
|
+ 'start_time': start_time,
|
|
|
+ 'end_time': task_data.get('end_time'),
|
|
|
+ 'error': task_data.get('error'),
|
|
|
+ 'task_type': task_type
|
|
|
+ }
|
|
|
+
|
|
|
+ # 7. 根据任务类型添加特定字段
|
|
|
+ if task_type == 'import':
|
|
|
+ # 从Redis获取批次号并查询批次信息
|
|
|
+ batch_number = task_data.get('batch_number', '')
|
|
|
+ if batch_number:
|
|
|
+ try:
|
|
|
+ batch = BurnBatch.objects.filter(batch_number=batch_number).first()
|
|
|
+ if batch:
|
|
|
+ result.update({
|
|
|
+ 'batch_number': batch_number,
|
|
|
+ 'success_count': task_data.get('success_count', 0),
|
|
|
+ 'purpose': batch.purpose,
|
|
|
+ 'manager': batch.manager,
|
|
|
+ 'total_uid': batch.total_uid
|
|
|
+ })
|
|
|
+ else:
|
|
|
+ result.update({
|
|
|
+ 'batch_number': batch_number,
|
|
|
+ 'success_count': task_data.get('success_count', 0)
|
|
|
+ })
|
|
|
+ except Exception as e:
|
|
|
+ LOGGER.error(f"查询批次信息失败: {str(e)}")
|
|
|
+ result.update({
|
|
|
+ 'batch_number': batch_number,
|
|
|
+ 'success_count': task_data.get('success_count', 0)
|
|
|
+ })
|
|
|
+ else: # burn task
|
|
|
+ result.update({
|
|
|
+ 'order_number': task_data.get('order_number', ''),
|
|
|
+ 'purpose': task_data.get('purpose', ''),
|
|
|
+ 'burn_count': task_data.get('burn_count', 0),
|
|
|
+ 'burn_record_id': task_data.get('burn_record_id')
|
|
|
+ })
|
|
|
+
|
|
|
+ return response.json(0, result)
|
|
|
+
|
|
|
+ except json.JSONDecodeError:
|
|
|
+ LOGGER.error(f"任务数据解析失败, redis_key: {redis_key}")
|
|
|
+ return response.json(500, "任务数据格式错误")
|
|
|
+ except Exception as e:
|
|
|
+ LOGGER.error(f"查询任务进度失败: {str(e)}")
|
|
|
+ return response.json(500, "查询进度失败")
|
|
|
+
|
|
|
+ @classmethod
|
|
|
+ def get_import_task_list(cls, request_dict: Dict[str, Any], response) -> Any:
|
|
|
+ """
|
|
|
+ 获取所有导入任务列表
|
|
|
+ :param request_dict: 请求参数字典
|
|
|
+ :param response: 响应对象
|
|
|
+ :return: JSON响应
|
|
|
+ """
|
|
|
+
|
|
|
+ try:
|
|
|
+ redis_obj = RedisObject()
|
|
|
+ # 获取所有import_task开头的key
|
|
|
+ keys = redis_obj.get_keys("import_task:*")
|
|
|
+
|
|
|
+ if not keys:
|
|
|
+ return response.json(0, {"tasks": []})
|
|
|
+
|
|
|
+ tasks = []
|
|
|
+
|
|
|
+ # 获取每个任务的基本信息
|
|
|
+ for key in keys:
|
|
|
+ try:
|
|
|
+ task_data_str = redis_obj.get_data(key)
|
|
|
+ if task_data_str:
|
|
|
+ # 确保task_data_str是字符串类型
|
|
|
+ if isinstance(task_data_str, bytes):
|
|
|
+ task_data_str = task_data_str.decode('utf-8')
|
|
|
+ task_data = json.loads(task_data_str)
|
|
|
+ # 处理key为bytes的情况
|
|
|
+ key_str = key.decode('utf-8') if isinstance(key, bytes) else key
|
|
|
+ tasks.append({
|
|
|
+ 'task_id': key_str.split(':')[1], # 从key中提取task_id
|
|
|
+ 'status': task_data.get('status', 'unknown'),
|
|
|
+ 'progress': task_data.get('progress', 0),
|
|
|
+ 'batch_number': task_data.get('batch_number', ''),
|
|
|
+ 'start_time': task_data.get('start_time'),
|
|
|
+ 'end_time': task_data.get('end_time'),
|
|
|
+ 'processed': task_data.get('processed', 0),
|
|
|
+ 'total': task_data.get('total', 0),
|
|
|
+ 'redis_key': key_str
|
|
|
+ })
|
|
|
+ except Exception as e:
|
|
|
+ LOGGER.error(f"解析任务数据失败, key: {key}, error: {str(e)}")
|
|
|
+ continue
|
|
|
+
|
|
|
+ # 按开始时间倒序排列
|
|
|
+ tasks.sort(key=lambda x: x.get('start_time', 0), reverse=True)
|
|
|
+
|
|
|
+ # 限制返回数量(最近100条)
|
|
|
+ tasks = tasks[:100]
|
|
|
+
|
|
|
+ return response.json(0, {"tasks": tasks})
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ LOGGER.error(f"获取导入任务列表失败: {str(e)}")
|
|
|
+ return response.json(500, "获取任务列表失败")
|
|
|
+
|
|
|
+ @classmethod
|
|
|
+ def batch_page_uids(cls, request_dict: Dict[str, Any], response) -> Any:
|
|
|
+ """
|
|
|
+ 根据burn_id分页查询烧录UID记录
|
|
|
+ :param request_dict: 请求参数字典
|
|
|
+ :param response: 响应对象
|
|
|
+ :return: JSON响应
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ page = int(request_dict.get('page', 1))
|
|
|
+ page_size = int(request_dict.get('pageSize', 10))
|
|
|
+ page = max(page, 1)
|
|
|
+ page_size = max(1, min(page_size, 100))
|
|
|
+ except (ValueError, TypeError):
|
|
|
+ return response.json(444, "分页参数错误(必须为整数)")
|
|
|
+
|
|
|
+ # 3. 构建查询条件
|
|
|
+ query = Q()
|
|
|
+
|
|
|
+ # 添加batch_id筛选条件
|
|
|
+ batch_id = request_dict.get('batch_id')
|
|
|
+ if batch_id:
|
|
|
+ try:
|
|
|
+ batch_id = int(batch_id)
|
|
|
+ query &= Q(batch_id=batch_id)
|
|
|
+ except ValueError:
|
|
|
+ return response.json(444, "batch_id必须是整数")
|
|
|
+
|
|
|
+ # 4. 查询并分页
|
|
|
+ uid_qs = BurnEncryptedICUID.objects.filter(query).order_by('-created_time').values(
|
|
|
+ 'id', 'uid', 'batch_id', 'status', 'created_time', 'updated_time'
|
|
|
+ )
|
|
|
+
|
|
|
+ paginator = Paginator(uid_qs, page_size)
|
|
|
+ try:
|
|
|
+ page_obj = paginator.page(page)
|
|
|
+ except PageNotAnInteger:
|
|
|
+ page_obj = paginator.page(1)
|
|
|
+ except EmptyPage:
|
|
|
+ page_obj = paginator.page(paginator.num_pages)
|
|
|
+
|
|
|
+ # 转换为列表
|
|
|
+ uid_list = list(page_obj)
|
|
|
+
|
|
|
+ return response.json(
|
|
|
+ 0,
|
|
|
+ {
|
|
|
+ 'list': uid_list,
|
|
|
+ 'total': paginator.count,
|
|
|
+ 'currentPage': page_obj.number,
|
|
|
+ 'totalPages': paginator.num_pages,
|
|
|
+ 'pageSize': page_size
|
|
|
+ }
|
|
|
+ )
|