123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872 |
- # -*- encoding: utf-8 -*-
- """
- @File : UIDBurnManageController.py
- @Time : 2025/7/30 08:57
- @Author : stephen
- @Email : zhangdongming@asj6.wecom.work
- @Software: PyCharm
- """
- import json
- import os
- import random
- import string
- import threading
- import time
- from datetime import datetime
- from typing import Dict, Any
- from uuid import uuid4
- from django.core.paginator import Paginator, EmptyPage, PageNotAnInteger
- from django.db import transaction
- from django.db.models import Q
- from django.http import QueryDict
- from django.views import View
- from openpyxl import load_workbook
- from AgentModel.models import BurnRecord, BurnEncryptedICUID, BurnBatch
- from Ansjer.config import LOGGER
- from Object.RedisObject import RedisObject
- from Object.ResponseObject import ResponseObject
- from Object.TokenObject import TokenObject
- class UIDBurnManageView(View):
- def get(self, request, *args, **kwargs):
- request.encoding = 'utf-8'
- operation = kwargs.get('operation')
- return self.validation(request.GET, request, operation)
- def post(self, request, *args, **kwargs):
- request.encoding = 'utf-8'
- operation = kwargs.get('operation')
- return self.validation(request.POST, request, operation)
- def delete(self, request, *args, **kwargs):
- request.encoding = 'utf-8'
- operation = kwargs.get('operation')
- delete = QueryDict(request.body)
- if not delete:
- delete = request.GET
- return self.validation(delete, request, operation)
- def put(self, request, *args, **kwargs):
- request.encoding = 'utf-8'
- operation = kwargs.get('operation')
- put = QueryDict(request.body)
- return self.validation(put, request, operation)
- def validation(self, request_dict, request, operation):
- """请求验证路由"""
- # 初始化响应对象
- language = request_dict.get('language', 'cn')
- response = ResponseObject(language, 'pc')
- # Token验证
- try:
- tko = TokenObject(
- request.META.get('HTTP_AUTHORIZATION'),
- returntpye='pc')
- if tko.code != 0:
- return response.json(tko.code)
- response.lang = tko.lang
- user_id = tko.userID
- except Exception as e:
- LOGGER.error(f"Token验证失败: {str(e)}")
- return response.json(444)
- if operation == 'getBurnRecordsPage':
- return self.get_burn_records_page(request_dict, response)
- elif operation == 'importBatchUids':
- return self.import_batch_uids(request, response)
- elif operation == 'addBurnRecord':
- return self.add_burn_record(request, request_dict, response)
- elif operation == 'batchPageUids':
- return self.batch_page_uids(request_dict, response)
- elif operation == 'getImportProgress':
- return self.get_import_progress(request_dict, response)
- elif operation == 'getImportTaskList':
- return self.get_import_task_list(request_dict, response)
- elif operation == 'getBatchRecordsPage':
- return self.get_batch_records_page(request_dict, response)
- else:
- return response.json(414)
- @classmethod
- def get_batch_records_page(cls, request_dict: Dict[str, Any], response) -> Any:
- """
- 分页查询批次记录(带统计信息)
- :param request_dict: 请求参数字典
- :param response: 响应对象
- :return: JSON响应
- """
- # 1. 分页参数处理
- try:
- page = int(request_dict.get('page', 1))
- page_size = int(request_dict.get('pageSize', 10))
- page = max(page, 1)
- page_size = max(1, min(page_size, 100))
- except (ValueError, TypeError):
- return response.json(444, "分页参数错误(必须为整数)")
- # 2. 构建查询条件
- query = Q()
- batch_number = request_dict.get('batch_number', '').strip()
- if batch_number:
- query &= Q(batch_number__icontains=batch_number)
- # 3. 查询并分页
- batch_qs = BurnBatch.objects.filter(query).order_by('-created_time').values(
- 'id', 'batch_number', 'purpose', 'manager', 'total_uid', 'created_time'
- )
- paginator = Paginator(batch_qs, page_size)
- try:
- page_obj = paginator.page(page)
- except PageNotAnInteger:
- page_obj = paginator.page(1)
- except EmptyPage:
- page_obj = paginator.page(paginator.num_pages)
- # 4. 获取统计信息并构建结果
- redis_obj = RedisObject()
- batch_list = []
-
- for batch in page_obj:
- batch_id = batch['id']
- cache_key = f"batch_stats:{batch_id}"
-
- # 尝试从缓存获取统计信息
- cached_stats = redis_obj.get_data(cache_key)
- if cached_stats:
- stats = json.loads(cached_stats)
- else:
- # 查询数据库统计
- burned_count = BurnEncryptedICUID.objects.filter(
- batch_id=batch_id,
- status=1
- ).count()
- unburned_count = BurnEncryptedICUID.objects.filter(
- batch_id=batch_id,
- status=0
- ).count()
-
- stats = {
- 'burned_count': burned_count,
- 'unburned_count': unburned_count
- }
- # 设置缓存,过期时间1小时
- redis_obj.set_data(cache_key, json.dumps(stats), 3600)
-
- # 合并批次信息和统计信息
- batch_info = dict(batch)
- batch_info.update(stats)
- batch_list.append(batch_info)
- return response.json(
- 0,
- {
- 'list': batch_list,
- 'total': paginator.count,
- 'currentPage': page_obj.number,
- 'totalPages': paginator.num_pages,
- 'pageSize': page_size
- }
- )
- @classmethod
- def get_burn_records_page(cls, request_dict: Dict[str, Any], response) -> Any:
- """
- 分页查询烧录记录
- :param cls:
- :param request_dict: 请求参数字典(包含分页参数和查询条件)
- :param response: 响应对象(用于返回JSON)
- :return: 分页查询结果的JSON响应
- """
- # 1. 分页参数处理与验证
- try:
- page = int(request_dict.get('page', 1))
- page_size = int(request_dict.get('pageSize', 10))
- # 限制分页范围
- page = max(page, 1)
- page_size = max(1, min(page_size, 100))
- except (ValueError, TypeError):
- return response.json(444, "分页参数错误(必须为整数)")
- # 2. 构建查询条件
- query = Q()
- order_number = request_dict.get('orderNumber', '').strip()
- if order_number:
- query &= Q(order_number__icontains=order_number)
- # 3. 获取查询集并指定需要的字段
- burn_qs = BurnRecord.objects.filter(query).order_by('-created_time').values(
- 'id', 'order_number', 'burn_count', 'purpose', 'created_time'
- )
- # 4. 分页处理
- paginator = Paginator(burn_qs, page_size)
- try:
- page_obj = paginator.page(page)
- except PageNotAnInteger:
- page_obj = paginator.page(1)
- except EmptyPage:
- page_obj = paginator.page(paginator.num_pages)
- # 转换为列表
- burn_list = list(page_obj)
- # 返回结果
- return response.json(
- 0,
- {
- 'total': paginator.count, # 总记录数
- 'list': burn_list, # 当前页数据列表
- 'currentPage': page_obj.number,
- 'totalPages': paginator.num_pages
- }
- )
- @classmethod
- def import_batch_uids(cls, request, response) -> Any:
- """
- 导入批次UID - 异步优化版(适配新表结构)
- :param request: HttpRequest对象(包含上传文件)
- :param response: 响应对象
- :return: JSON响应
- """
- # 1. 验证文件上传
- if 'file' not in request.FILES:
- return response.json(444, "请上传Excel文件")
- excel_file = request.FILES['file']
- if not excel_file.name.endswith(('.xlsx', '.xls')):
- return response.json(444, "只支持Excel文件(.xlsx/.xls)")
- try:
- # 2. 生成任务ID和批次号
- task_id = str(uuid4())
- # 生成带时间戳和随机字符的批次号
- timestamp = datetime.now().strftime('%Y%m%d%H%M%S') # 精确到秒
- random_chars = ''.join(random.choices(string.ascii_uppercase + string.digits, k=3)) # 3个随机字符
- batch_number = f"ENG{timestamp}{random_chars}" # 格式: ENG+时间戳+随机字符
- # 3. 初始化Redis状态
- redis_key = f"import_task:{task_id}"
- redis_obj = RedisObject()
- # 保存任务基本信息到Redis (过期时间2小时)
- task_data = {
- 'status': 'pending',
- 'batch_number': batch_number,
- 'progress': 0,
- 'processed': 0,
- 'total': 0,
- 'start_time': int(time.time()),
- 'success_count': 0
- }
- redis_obj.set_data(redis_key, json.dumps(task_data), 7200)
- # 4. 保存文件到项目static/uploadedfiles目录
- base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
- upload_dir = os.path.join(base_dir, 'static', 'uploaded_files')
- os.makedirs(upload_dir, exist_ok=True)
- file_path = os.path.join(upload_dir, f"{task_id}.xlsx")
- with open(file_path, 'wb+') as destination:
- for chunk in excel_file.chunks():
- destination.write(chunk)
- # 5. 启动后台线程处理
- thread = threading.Thread(
- target=cls._process_import_batch_async,
- args=(task_id, file_path, redis_key, batch_number),
- daemon=True
- )
- thread.start()
- return response.json(0, {
- "task_id": task_id,
- "batch_number": batch_number,
- "message": "导入任务已提交,正在后台处理",
- "redis_key": redis_key
- })
- except Exception as e:
- LOGGER.error(f"创建导入任务失败: {str(e)}")
- return response.json(500, "创建导入任务失败")
- @classmethod
- def _process_import_batch_async(cls, task_id, file_path, redis_key, batch_number):
- """后台线程处理批量导入任务(适配新表结构)"""
- redis_obj = RedisObject()
- try:
- # 获取并更新任务状态为处理中
- task_data = json.loads(redis_obj.get_data(redis_key))
- task_data['status'] = 'processing'
- redis_obj.set_data(redis_key, json.dumps(task_data))
- # 1. 读取Excel文件获取总行数
- wb = load_workbook(file_path)
- ws = wb.active
- total_rows = ws.max_row
- # 更新总行数和开始时间
- task_data['total'] = total_rows
- task_data['start_time'] = int(time.time())
- redis_obj.set_data(redis_key, json.dumps(task_data))
- # 2. 创建批次记录
- current_time = int(time.time())
- with transaction.atomic():
- batch = BurnBatch(
- batch_number=batch_number,
- purpose='批次导入',
- created_time=current_time,
- manager='system', # 默认系统导入
- total_uid=0 # 初始为0,处理完成后更新
- )
- batch.save()
- batch_id = batch.id
- # 3. 分批处理UID数据(每批500条)
- batch_size = 500
- processed = 0
- success_count = 0
- uids_batch = []
- for row in ws.iter_rows(min_row=1, values_only=True):
- if row[0]:
- uid = str(row[0]).strip()
- if uid:
- uids_batch.append(uid)
- processed += 1
- # 每处理1000条更新一次进度
- if processed % 1000 == 0:
- progress = min(99, int((processed / total_rows) * 100))
- task_data['progress'] = progress
- task_data['processed'] = processed
- task_data['last_update'] = int(time.time())
- redis_obj.set_data(redis_key, json.dumps(task_data))
- # 处理批次
- if len(uids_batch) >= batch_size:
- success = cls._import_uids_batch(
- uids_batch,
- batch_id,
- current_time,
- redis_key
- )
- success_count += success
- uids_batch = []
- # 处理最后一批
- if uids_batch:
- success = cls._import_uids_batch(
- uids_batch,
- batch_id,
- current_time,
- redis_key
- )
- success_count += success
- # 更新批次总UID数
- with transaction.atomic():
- BurnBatch.objects.filter(id=batch_id).update(total_uid=success_count)
- # 更新最终状态
- task_data['status'] = 'completed'
- task_data['progress'] = 100
- task_data['processed'] = processed
- task_data['success_count'] = success_count
- task_data['end_time'] = int(time.time())
- redis_obj.set_data(redis_key, json.dumps(task_data))
- LOGGER.info(f"处理批量导入完成,任务ID: {task_id}, 处理UID数量: {processed}, 成功导入: {success_count}")
- # 清理临时文件
- try:
- os.remove(file_path)
- except Exception as e:
- LOGGER.warning(f"删除临时文件失败: {str(e)}")
- except Exception as e:
- LOGGER.error(f"处理批量导入失败: {str(e)}")
- task_data = {
- 'status': 'failed',
- 'error': str(e),
- 'end_time': int(time.time())
- }
- redis_obj.set_data(redis_key, json.dumps(task_data))
- @classmethod
- def _import_uids_batch(cls, uids_batch, batch_id, current_time, redis_key):
- """批量导入UID记录(适配新表结构)"""
- redis_obj = RedisObject()
- try:
- with transaction.atomic():
- # 去重处理
- unique_uids = list(set(uids_batch))
- # 创建记录
- records = [
- BurnEncryptedICUID(
- batch_id=batch_id,
- uid=uid,
- created_time=current_time,
- updated_time=current_time,
- status=0 # 未烧录状态
- )
- for uid in unique_uids
- ]
- BurnEncryptedICUID.objects.bulk_create(records)
- # 更新已处理数量和成功数量到Redis
- task_data = json.loads(redis_obj.get_data(redis_key))
- task_data['processed'] = task_data.get('processed', 0) + len(uids_batch)
- task_data['success_count'] = task_data.get('success_count', 0) + len(records)
- redis_obj.set_data(redis_key, json.dumps(task_data))
- # 清除批次统计缓存
- cache_key = f"batch_stats:{batch_id}"
- redis_obj.del_data(cache_key)
- return len(records)
- except Exception as e:
- LOGGER.error(f"批量导入UID失败: {str(e)}")
- # 更新失败状态但继续处理
- task_data = json.loads(redis_obj.get_data(redis_key))
- task_data['processed'] = task_data.get('processed', 0) + len(uids_batch)
- redis_obj.set_data(redis_key, json.dumps(task_data))
- return 0
- @classmethod
- def add_burn_record(cls, request, request_dict, response) -> Any:
- """
- 新增烧录记录(带UID文件) - Redis字符串优化版
- :param request_dict:
- :param request: HttpRequest对象(包含上传文件和表单数据)
- :param response: 响应对象
- :return: JSON响应
- """
- required_fields = ['order_number', 'burn_count', 'purpose']
- for field in required_fields:
- if not request_dict.get(field):
- return response.json(444, f"缺少必填参数: {field}")
- # 2. 验证文件上传
- if 'uid_file' not in request.FILES:
- return response.json(444, "请上传包含已烧录UID的Excel文件")
- excel_file = request.FILES['uid_file']
- if not excel_file.name.endswith(('.xlsx', '.xls')):
- return response.json(444, "只支持Excel文件(.xlsx/.xls)")
- try:
- burn_count = int(request_dict['burn_count'])
- if burn_count <= 0:
- return response.json(444, "烧录数量必须大于0")
- # 3. 创建任务ID并初始化Redis状态
- task_id = str(uuid4())
- redis_key = f"burn_task:{task_id}"
- redis_obj = RedisObject()
- # 保存任务基本信息到Redis (过期时间2小时)
- task_data = {
- 'status': 'pending',
- 'order_number': request_dict['order_number'].strip(),
- 'burn_count': burn_count,
- 'purpose': request_dict['purpose'].strip(),
- 'progress': 0,
- 'processed': 0,
- 'total': 0,
- 'start_time': int(time.time())
- }
- redis_obj.set_data(redis_key, json.dumps(task_data), 7200)
- # 4. 保存文件到项目static/uploadedfiles目录
- base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
- upload_dir = os.path.join(base_dir, 'static', 'uploaded_files')
- os.makedirs(upload_dir, exist_ok=True)
- file_path = os.path.join(upload_dir, f"{task_id}.xlsx")
- with open(file_path, 'wb+') as destination:
- for chunk in excel_file.chunks():
- destination.write(chunk)
- # 5. 启动后台线程处理
- thread = threading.Thread(
- target=cls._process_burn_record_async,
- args=(task_id, file_path, redis_key),
- daemon=True
- )
- thread.start()
- return response.json(0, {
- "task_id": task_id,
- "message": "任务已提交,正在后台处理",
- "redis_key": redis_key
- })
- except ValueError:
- return response.json(444, "烧录数量必须是整数")
- except Exception as e:
- LOGGER.error(f"创建烧录任务失败: {str(e)}")
- return response.json(500, "创建烧录任务失败")
- @classmethod
- def _process_burn_record_async(cls, task_id, file_path, redis_key):
- """后台线程处理烧录记录任务"""
- redis_obj = RedisObject()
- try:
- # 获取并更新任务状态为处理中
- task_data = json.loads(redis_obj.get_data(redis_key))
- task_data['status'] = 'processing'
- redis_obj.set_data(redis_key, json.dumps(task_data))
- # 1. 读取Excel文件获取总行数
- wb = load_workbook(file_path)
- ws = wb.active
- total_rows = ws.max_row
- # 更新总行数和开始时间
- task_data['total'] = total_rows
- task_data['start_time'] = int(time.time())
- redis_obj.set_data(redis_key, json.dumps(task_data))
- # 2. 创建烧录记录
- with transaction.atomic():
- burn_record = BurnRecord(
- order_number=task_data['order_number'],
- burn_count=task_data['burn_count'],
- purpose=task_data['purpose'],
- updated_time=int(time.time()),
- created_time=int(time.time())
- )
- burn_record.save()
- task_data['burn_record_id'] = burn_record.id
- redis_obj.set_data(redis_key, json.dumps(task_data))
- # 3. 分批处理UID文件(每批300条)
- batch_size = 300
- current_time = int(time.time())
- processed = 0
- uids_batch = []
- for row in ws.iter_rows(min_row=1, values_only=True):
- if row[0]:
- uid = str(row[0]).strip()
- if uid:
- uids_batch.append(uid)
- processed += 1
- # 每处理100条更新一次进度
- if processed % 100 == 0:
- progress = min(99, int((processed / total_rows) * 100))
- task_data['progress'] = progress
- task_data['processed'] = processed
- task_data['last_update'] = int(time.time())
- redis_obj.set_data(redis_key, json.dumps(task_data))
- # 处理批次
- if len(uids_batch) >= batch_size:
- cls._update_uids_batch(
- uids_batch,
- burn_record.id,
- current_time,
- redis_key
- )
- uids_batch = []
- # 处理最后一批
- if uids_batch:
- cls._update_uids_batch(
- uids_batch,
- burn_record.id,
- current_time,
- redis_key
- )
- # 更新最终状态
- task_data['status'] = 'completed'
- task_data['progress'] = 100
- task_data['processed'] = processed
- task_data['end_time'] = int(time.time())
- redis_obj.set_data(redis_key, json.dumps(task_data))
- # 查询受影响的批次ID并清除缓存
- batch_ids = BurnEncryptedICUID.objects.filter(
- burn_id=burn_record.id
- ).values_list('batch_id', flat=True).distinct()
-
- for batch_id in batch_ids:
- cache_key = f"batch_stats:{batch_id}"
- redis_obj.del_data(cache_key)
- LOGGER.info(f"处理烧录记录完成,任务ID: {task_id}, 处理UID数量: {processed}")
- # 清理临时文件
- try:
- os.remove(file_path)
- except Exception as e:
- LOGGER.warning(f"删除临时文件失败: {str(e)}")
- except Exception as e:
- LOGGER.error(f"处理烧录记录失败: {str(e)}")
- task_data = {
- 'status': 'failed',
- 'error': str(e),
- 'end_time': int(time.time())
- }
- redis_obj.set_data(redis_key, json.dumps(task_data))
- @classmethod
- def _update_uids_batch(cls, uids_batch, burn_id, current_time, redis_key):
- """批量更新UID记录"""
- redis_obj = RedisObject()
- try:
- with transaction.atomic():
- # 先查询出受影响的批次ID
- batch_ids = BurnEncryptedICUID.objects.filter(
- uid__in=uids_batch
- ).values_list('batch_id', flat=True).distinct()
- updated = BurnEncryptedICUID.objects.filter(
- uid__in=uids_batch
- ).update(
- burn_id=burn_id,
- status=1, # 烧录成功
- updated_time=current_time
- )
- # 更新已处理数量到Redis
- task_data = json.loads(redis_obj.get_data(redis_key))
- task_data['processed'] = task_data.get('processed', 0) + len(uids_batch)
- redis_obj.set_data(redis_key, json.dumps(task_data))
- # 清除受影响批次的统计缓存
- for batch_id in batch_ids:
- cache_key = f"batch_stats:{batch_id}"
- redis_obj.del_data(cache_key)
- except Exception as e:
- LOGGER.error(f"批量更新UID失败: {str(e)}")
- raise
- @classmethod
- def get_import_progress(cls, request_dict: Dict[str, Any], response) -> Any:
- """
- 查询任务进度(支持导入和烧录任务)
- :param request_dict: 请求参数字典(必须包含task_id, 可选type)
- :param response: 响应对象
- :return: JSON响应
- type参数说明:
- - import: 导入任务(默认)
- - burn: 烧录任务
- """
- # 1. 参数验证
- task_id = request_dict.get('task_id')
- if not task_id:
- return response.json(444, "缺少task_id参数")
- task_type = request_dict.get('type', 'import').lower()
- if task_type not in ['import', 'burn']:
- return response.json(444, "type参数必须是'import'或'burn'")
- # 2. 构建Redis key
- redis_key = f"{task_type}_task:{task_id}"
- try:
- # 3. 从Redis获取任务数据
- redis_obj = RedisObject()
- task_data_str = redis_obj.get_data(redis_key)
- if not task_data_str:
- return response.json(173, "任务不存在或已过期")
- # 4. 解析任务数据
- if isinstance(task_data_str, bytes):
- task_data_str = task_data_str.decode('utf-8')
- task_data = json.loads(task_data_str)
- # 5. 计算耗时(秒)
- current_time = int(time.time())
- start_time = task_data.get('start_time', current_time)
- elapsed = current_time - start_time
- if task_data.get('end_time'):
- elapsed = task_data['end_time'] - start_time
- # 6. 构建基础响应数据
- result = {
- 'status': task_data.get('status', 'unknown'),
- 'progress': task_data.get('progress', 0),
- 'processed': task_data.get('processed', 0),
- 'total': task_data.get('total', 0),
- 'elapsed_seconds': elapsed,
- 'start_time': start_time,
- 'end_time': task_data.get('end_time'),
- 'error': task_data.get('error'),
- 'task_type': task_type
- }
- # 7. 根据任务类型添加特定字段
- if task_type == 'import':
- # 从Redis获取批次号并查询批次信息
- batch_number = task_data.get('batch_number', '')
- if batch_number:
- try:
- batch = BurnBatch.objects.filter(batch_number=batch_number).first()
- if batch:
- result.update({
- 'batch_number': batch_number,
- 'success_count': task_data.get('success_count', 0),
- 'purpose': batch.purpose,
- 'manager': batch.manager,
- 'total_uid': batch.total_uid
- })
- else:
- result.update({
- 'batch_number': batch_number,
- 'success_count': task_data.get('success_count', 0)
- })
- except Exception as e:
- LOGGER.error(f"查询批次信息失败: {str(e)}")
- result.update({
- 'batch_number': batch_number,
- 'success_count': task_data.get('success_count', 0)
- })
- else: # burn task
- result.update({
- 'order_number': task_data.get('order_number', ''),
- 'purpose': task_data.get('purpose', ''),
- 'burn_count': task_data.get('burn_count', 0),
- 'burn_record_id': task_data.get('burn_record_id')
- })
- return response.json(0, result)
- except json.JSONDecodeError:
- LOGGER.error(f"任务数据解析失败, redis_key: {redis_key}")
- return response.json(500, "任务数据格式错误")
- except Exception as e:
- LOGGER.error(f"查询任务进度失败: {str(e)}")
- return response.json(500, "查询进度失败")
- @classmethod
- def get_import_task_list(cls, request_dict: Dict[str, Any], response) -> Any:
- """
- 获取所有导入任务列表
- :param request_dict: 请求参数字典
- :param response: 响应对象
- :return: JSON响应
- """
- try:
- redis_obj = RedisObject()
- # 获取所有import_task开头的key
- keys = redis_obj.get_keys("import_task:*")
- if not keys:
- return response.json(0, {"tasks": []})
- tasks = []
- # 获取每个任务的基本信息
- for key in keys:
- try:
- task_data_str = redis_obj.get_data(key)
- if task_data_str:
- # 确保task_data_str是字符串类型
- if isinstance(task_data_str, bytes):
- task_data_str = task_data_str.decode('utf-8')
- task_data = json.loads(task_data_str)
- # 处理key为bytes的情况
- key_str = key.decode('utf-8') if isinstance(key, bytes) else key
- tasks.append({
- 'task_id': key_str.split(':')[1], # 从key中提取task_id
- 'status': task_data.get('status', 'unknown'),
- 'progress': task_data.get('progress', 0),
- 'batch_number': task_data.get('batch_number', ''),
- 'start_time': task_data.get('start_time'),
- 'end_time': task_data.get('end_time'),
- 'processed': task_data.get('processed', 0),
- 'total': task_data.get('total', 0),
- 'redis_key': key_str
- })
- except Exception as e:
- LOGGER.error(f"解析任务数据失败, key: {key}, error: {str(e)}")
- continue
- # 按开始时间倒序排列
- tasks.sort(key=lambda x: x.get('start_time', 0), reverse=True)
- # 限制返回数量(最近100条)
- tasks = tasks[:100]
- return response.json(0, {"tasks": tasks})
- except Exception as e:
- LOGGER.error(f"获取导入任务列表失败: {str(e)}")
- return response.json(500, "获取任务列表失败")
- @classmethod
- def batch_page_uids(cls, request_dict: Dict[str, Any], response) -> Any:
- """
- 根据burn_id分页查询烧录UID记录
- :param request_dict: 请求参数字典
- :param response: 响应对象
- :return: JSON响应
- """
- try:
- page = int(request_dict.get('page', 1))
- page_size = int(request_dict.get('pageSize', 10))
- page = max(page, 1)
- page_size = max(1, min(page_size, 100))
- except (ValueError, TypeError):
- return response.json(444, "分页参数错误(必须为整数)")
- # 3. 构建查询条件
- query = Q()
-
- # 添加batch_id筛选条件
- batch_id = request_dict.get('batch_id')
- if batch_id:
- try:
- batch_id = int(batch_id)
- query &= Q(batch_id=batch_id)
- except ValueError:
- return response.json(444, "batch_id必须是整数")
- # 4. 查询并分页
- uid_qs = BurnEncryptedICUID.objects.filter(query).order_by('-created_time').values(
- 'id', 'uid', 'batch_id', 'status', 'created_time', 'updated_time'
- )
- paginator = Paginator(uid_qs, page_size)
- try:
- page_obj = paginator.page(page)
- except PageNotAnInteger:
- page_obj = paginator.page(1)
- except EmptyPage:
- page_obj = paginator.page(paginator.num_pages)
- # 转换为列表
- uid_list = list(page_obj)
- return response.json(
- 0,
- {
- 'list': uid_list,
- 'total': paginator.count,
- 'currentPage': page_obj.number,
- 'totalPages': paginator.num_pages,
- 'pageSize': page_size
- }
- )
|