# -*- encoding: utf-8 -*- """ @File : UIDBurnManageController.py @Time : 2025/7/30 08:57 @Author : stephen @Email : zhangdongming@asj6.wecom.work @Software: PyCharm """ import json import os import random import string import threading import time from datetime import datetime from typing import Dict, Any from uuid import uuid4 import xlrd from django.core.paginator import Paginator, EmptyPage, PageNotAnInteger from django.db import transaction from django.db.models import Q from django.http import QueryDict from django.views import View from openpyxl import load_workbook from AgentModel.models import BurnRecord, BurnEncryptedICUID, BurnBatch from Ansjer.config import LOGGER from Object.RedisObject import RedisObject from Object.ResponseObject import ResponseObject from Object.TokenObject import TokenObject class UIDBurnManageView(View): def get(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') return self.validation(request.GET, request, operation) def post(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') return self.validation(request.POST, request, operation) def delete(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') delete = QueryDict(request.body) if not delete: delete = request.GET return self.validation(delete, request, operation) def put(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') put = QueryDict(request.body) return self.validation(put, request, operation) def validation(self, request_dict, request, operation): """请求验证路由""" # 初始化响应对象 language = request_dict.get('language', 'cn') response = ResponseObject(language, 'pc') # Token验证 try: tko = TokenObject( request.META.get('HTTP_AUTHORIZATION'), returntpye='pc') if tko.code != 0: return response.json(tko.code) response.lang = tko.lang user_id = tko.userID except Exception as e: LOGGER.error(f"Token验证失败: {str(e)}") return response.json(444) if operation == 'getBurnRecordsPage': return self.get_burn_records_page(request_dict, response) elif operation == 'importBatchUids': return self.import_batch_uids(request, response) elif operation == 'addBurnRecord': return self.add_burn_record(request, request_dict, response) elif operation == 'batchPageUids': return self.batch_page_uids(request_dict, response) elif operation == 'getImportProgress': return self.get_import_progress(request_dict, response) elif operation == 'getImportTaskList': return self.get_import_task_list(request_dict, response) elif operation == 'getBatchRecordsPage': return self.get_batch_records_page(request_dict, response) else: return response.json(414) @classmethod def get_batch_records_page(cls, request_dict: Dict[str, Any], response) -> Any: """ 分页查询批次记录(带统计信息) :param request_dict: 请求参数字典 :param response: 响应对象 :return: JSON响应 """ # 1. 分页参数处理 try: page = int(request_dict.get('page', 1)) page_size = int(request_dict.get('pageSize', 10)) page = max(page, 1) page_size = max(1, min(page_size, 100)) except (ValueError, TypeError): return response.json(444, "分页参数错误(必须为整数)") # 2. 构建查询条件 query = Q() batch_number = request_dict.get('batch_number', '').strip() if batch_number: query &= Q(batch_number__icontains=batch_number) # 3. 查询并分页 batch_qs = BurnBatch.objects.filter(query).order_by('-created_time').values( 'id', 'batch_number', 'purpose', 'manager', 'total_uid', 'created_time' ) paginator = Paginator(batch_qs, page_size) try: page_obj = paginator.page(page) except PageNotAnInteger: page_obj = paginator.page(1) except EmptyPage: page_obj = paginator.page(paginator.num_pages) # 4. 获取统计信息并构建结果 redis_obj = RedisObject() batch_list = [] for batch in page_obj: batch_id = batch['id'] cache_key = f"batch_stats:{batch_id}" # 尝试从缓存获取统计信息 cached_stats = redis_obj.get_data(cache_key) if cached_stats: stats = json.loads(cached_stats) else: # 查询数据库统计 burned_count = BurnEncryptedICUID.objects.filter( batch_id=batch_id, status=1 ).count() unburned_count = BurnEncryptedICUID.objects.filter( batch_id=batch_id, status=0 ).count() stats = { 'burned_count': burned_count, 'unburned_count': unburned_count } # 设置缓存,过期时间1小时 redis_obj.set_data(cache_key, json.dumps(stats), 3600) # 合并批次信息和统计信息 batch_info = dict(batch) batch_info.update(stats) batch_list.append(batch_info) return response.json( 0, { 'list': batch_list, 'total': paginator.count, 'currentPage': page_obj.number, 'totalPages': paginator.num_pages, 'pageSize': page_size } ) @classmethod def get_burn_records_page(cls, request_dict: Dict[str, Any], response) -> Any: """ 分页查询烧录记录 :param cls: :param request_dict: 请求参数字典(包含分页参数和查询条件) :param response: 响应对象(用于返回JSON) :return: 分页查询结果的JSON响应 """ # 1. 分页参数处理与验证 try: page = int(request_dict.get('page', 1)) page_size = int(request_dict.get('pageSize', 10)) # 限制分页范围 page = max(page, 1) page_size = max(1, min(page_size, 100)) except (ValueError, TypeError): return response.json(444, "分页参数错误(必须为整数)") # 2. 构建查询条件 query = Q() order_number = request_dict.get('orderNumber', '').strip() if order_number: query &= Q(order_number__icontains=order_number) # 3. 获取查询集并指定需要的字段 burn_qs = BurnRecord.objects.filter(query).order_by('-created_time').values( 'id', 'order_number', 'burn_count', 'purpose', 'created_time' ) # 4. 分页处理 paginator = Paginator(burn_qs, page_size) try: page_obj = paginator.page(page) except PageNotAnInteger: page_obj = paginator.page(1) except EmptyPage: page_obj = paginator.page(paginator.num_pages) # 转换为列表 burn_list = list(page_obj) # 返回结果 return response.json( 0, { 'total': paginator.count, # 总记录数 'list': burn_list, # 当前页数据列表 'currentPage': page_obj.number, 'totalPages': paginator.num_pages } ) @classmethod def import_batch_uids(cls, request, response) -> Any: """ 导入批次UID - 异步优化版(适配新表结构) :param request: HttpRequest对象(包含上传文件) :param response: 响应对象 :return: JSON响应 """ # 1. 验证文件上传 if 'file' not in request.FILES: return response.json(444, "请上传Excel文件") excel_file = request.FILES['file'] if not excel_file.name.endswith(('.xlsx', '.xls')): return response.json(444, "只支持Excel文件(.xlsx/.xls)") try: # 2. 生成任务ID和批次号 task_id = str(uuid4()) # 生成带时间戳和随机字符的批次号 timestamp = datetime.now().strftime('%Y%m%d%H%M%S') # 精确到秒 random_chars = ''.join(random.choices(string.ascii_uppercase + string.digits, k=3)) # 3个随机字符 batch_number = f"ENG{timestamp}{random_chars}" # 格式: ENG+时间戳+随机字符 # 3. 初始化Redis状态 redis_key = f"import_task:{task_id}" redis_obj = RedisObject() # 保存任务基本信息到Redis (过期时间2小时) task_data = { 'status': 'pending', 'batch_number': batch_number, 'progress': 0, 'processed': 0, 'total': 0, 'start_time': int(time.time()), 'success_count': 0 } redis_obj.set_data(redis_key, json.dumps(task_data), 7200) # 4. 保存文件到项目static/uploadedfiles目录 base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) upload_dir = os.path.join(base_dir, 'static', 'uploaded_files') os.makedirs(upload_dir, exist_ok=True) file_path = os.path.join(upload_dir, f"{task_id}.xls") with open(file_path, 'wb+') as destination: for chunk in excel_file.chunks(): destination.write(chunk) # 5. 启动后台线程处理 thread = threading.Thread( target=cls._process_import_batch_async, args=(task_id, file_path, redis_key, batch_number), daemon=True ) thread.start() return response.json(0, { "task_id": task_id, "batch_number": batch_number, "message": "导入任务已提交,正在后台处理", "redis_key": redis_key }) except Exception as e: LOGGER.error(f"创建导入任务失败: {str(e)}") return response.json(500, "创建导入任务失败") @classmethod def _process_import_batch_async(cls, task_id, file_path, redis_key, batch_number): """后台线程处理批量导入任务(兼容 xls 和 xlsx,读取第3列UID)""" redis_obj = RedisObject() try: task_data = json.loads(redis_obj.get_data(redis_key)) task_data['status'] = 'processing' redis_obj.set_data(redis_key, json.dumps(task_data)) # 判断文件类型并读取数据 file_ext = os.path.splitext(file_path)[-1].lower() uid_rows = [] if file_ext == '.xls': # 使用 xlrd 读取 .xls 文件 workbook = xlrd.open_workbook(file_path) sheet = workbook.sheet_by_index(0) total_rows = sheet.nrows for row_idx in range(0, total_rows): # 改为从第0行开始 row = sheet.row_values(row_idx) if len(row) > 2 and row[2]: uid = str(row[2]).strip() if uid: uid_rows.append(uid) elif file_ext == '.xlsx': # 使用 openpyxl 读取 .xlsx 文件 workbook = load_workbook(file_path, read_only=True) sheet = workbook.active for row in sheet.iter_rows(min_row=1, values_only=True): # 改为从第1行开始 if len(row) > 2 and row[2]: uid = str(row[2]).strip() if uid: uid_rows.append(uid) task_data['total'] = len(uid_rows) task_data['start_time'] = int(time.time()) redis_obj.set_data(redis_key, json.dumps(task_data)) # 创建批次记录 current_time = int(time.time()) with transaction.atomic(): batch = BurnBatch( batch_number=batch_number, purpose='批次导入', created_time=current_time, manager='system', total_uid=0 ) batch.save() batch_id = batch.id # 分批处理UID batch_size = 500 processed = 0 success_count = 0 uids_batch = [] for uid in uid_rows: uids_batch.append(uid) processed += 1 # 每处理1000条更新进度 if processed % 1000 == 0: progress = min(99, int((processed / len(uid_rows)) * 100)) task_data['progress'] = progress task_data['processed'] = processed task_data['last_update'] = int(time.time()) redis_obj.set_data(redis_key, json.dumps(task_data)) if len(uids_batch) >= batch_size: success = cls._import_uids_batch( uids_batch, batch_id, current_time, redis_key ) success_count += success uids_batch = [] # 最后一批处理 if uids_batch: success = cls._import_uids_batch( uids_batch, batch_id, current_time, redis_key ) success_count += success # 更新数据库记录 with transaction.atomic(): BurnBatch.objects.filter(id=batch_id).update(total_uid=success_count) task_data['status'] = 'completed' task_data['progress'] = 100 task_data['processed'] = processed task_data['success_count'] = success_count task_data['end_time'] = int(time.time()) redis_obj.set_data(redis_key, json.dumps(task_data)) LOGGER.info(f"批次导入完成,任务ID: {task_id},处理UID: {processed},成功导入: {success_count}") try: os.remove(file_path) except Exception as e: LOGGER.warning(f"删除临时文件失败: {str(e)}") except Exception as e: LOGGER.error(f"处理导入任务失败: {str(e)}") task_data = { 'status': 'failed', 'error': str(e), 'end_time': int(time.time()) } redis_obj.set_data(redis_key, json.dumps(task_data)) @classmethod def _import_uids_batch(cls, uids_batch, batch_id, current_time, redis_key): """批量导入UID记录(适配新表结构)""" redis_obj = RedisObject() try: with transaction.atomic(): # 去重处理 unique_uids = list(set(uids_batch)) # 创建记录 records = [ BurnEncryptedICUID( batch_id=batch_id, uid=uid, created_time=current_time, updated_time=current_time, status=0 # 未烧录状态 ) for uid in unique_uids ] BurnEncryptedICUID.objects.bulk_create(records) # 更新已处理数量和成功数量到Redis task_data = json.loads(redis_obj.get_data(redis_key)) task_data['processed'] = task_data.get('processed', 0) + len(uids_batch) task_data['success_count'] = task_data.get('success_count', 0) + len(records) redis_obj.set_data(redis_key, json.dumps(task_data)) # 清除批次统计缓存 cache_key = f"batch_stats:{batch_id}" redis_obj.del_data(cache_key) return len(records) except Exception as e: LOGGER.error(f"批量导入UID失败: {str(e)}") # 更新失败状态但继续处理 task_data = json.loads(redis_obj.get_data(redis_key)) task_data['processed'] = task_data.get('processed', 0) + len(uids_batch) redis_obj.set_data(redis_key, json.dumps(task_data)) return 0 @classmethod def add_burn_record(cls, request, request_dict, response) -> Any: """ 新增烧录记录(带UID文件) - Redis字符串优化版 :param request_dict: :param request: HttpRequest对象(包含上传文件和表单数据) :param response: 响应对象 :return: JSON响应 """ required_fields = ['order_number', 'burn_count', 'purpose'] for field in required_fields: if not request_dict.get(field): return response.json(444, f"缺少必填参数: {field}") # 2. 验证文件上传 if 'uid_file' not in request.FILES: return response.json(444, "请上传包含已烧录UID的Excel文件") excel_file = request.FILES['uid_file'] if not excel_file.name.endswith(('.xlsx', '.xls')): return response.json(444, "只支持Excel文件(.xlsx/.xls)") try: burn_count = int(request_dict['burn_count']) if burn_count <= 0: return response.json(444, "烧录数量必须大于0") # 3. 创建任务ID并初始化Redis状态 task_id = str(uuid4()) redis_key = f"burn_task:{task_id}" redis_obj = RedisObject() # 保存任务基本信息到Redis (过期时间2小时) task_data = { 'status': 'pending', 'order_number': request_dict['order_number'].strip(), 'burn_count': burn_count, 'purpose': request_dict['purpose'].strip(), 'progress': 0, 'processed': 0, 'total': 0, 'start_time': int(time.time()) } redis_obj.set_data(redis_key, json.dumps(task_data), 7200) # 4. 保存文件到项目static/uploadedfiles目录 base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) upload_dir = os.path.join(base_dir, 'static', 'uploaded_files') os.makedirs(upload_dir, exist_ok=True) file_path = os.path.join(upload_dir, f"{task_id}.xls") with open(file_path, 'wb+') as destination: for chunk in excel_file.chunks(): destination.write(chunk) # 5. 启动后台线程处理 thread = threading.Thread( target=cls._process_burn_record_async, args=(task_id, file_path, redis_key), daemon=True ) thread.start() return response.json(0, { "task_id": task_id, "message": "任务已提交,正在后台处理", "redis_key": redis_key }) except ValueError: return response.json(444, "烧录数量必须是整数") except Exception as e: LOGGER.error(f"创建烧录任务失败: {str(e)}") return response.json(500, "创建烧录任务失败") @classmethod def _process_burn_record_async(cls, task_id, file_path, redis_key): """后台线程处理烧录记录任务""" redis_obj = RedisObject() try: # 获取并更新任务状态为处理中 task_data = json.loads(redis_obj.get_data(redis_key)) task_data['status'] = 'processing' redis_obj.set_data(redis_key, json.dumps(task_data)) # 1. 读取Excel文件获取总行数 workbook = xlrd.open_workbook(file_path) sheet = workbook.sheet_by_index(0) total_rows = sheet.nrows # 更新总行数和开始时间 task_data['total'] = total_rows task_data['start_time'] = int(time.time()) redis_obj.set_data(redis_key, json.dumps(task_data)) # 2. 创建烧录记录 with transaction.atomic(): burn_record = BurnRecord( order_number=task_data['order_number'], burn_count=task_data['burn_count'], purpose=task_data['purpose'], updated_time=int(time.time()), created_time=int(time.time()) ) burn_record.save() task_data['burn_record_id'] = burn_record.id redis_obj.set_data(redis_key, json.dumps(task_data)) # 3. 分批处理UID文件(每批300条) batch_size = 300 current_time = int(time.time()) processed = 0 uids_batch = [] for row_idx in range(0, total_rows): row = sheet.row_values(row_idx) if len(row) > 2 and row[2]: uid = str(row[2]).strip() if uid: uids_batch.append(uid) processed += 1 # 每处理100条更新一次进度 if processed % 100 == 0: progress = min(99, int((processed / total_rows) * 100)) task_data['progress'] = progress task_data['processed'] = processed task_data['last_update'] = int(time.time()) redis_obj.set_data(redis_key, json.dumps(task_data)) # 处理批次 if len(uids_batch) >= batch_size: cls._update_uids_batch( uids_batch, burn_record.id, current_time, redis_key ) uids_batch = [] # 处理最后一批 if uids_batch: cls._update_uids_batch( uids_batch, burn_record.id, current_time, redis_key ) # 更新最终状态 task_data['status'] = 'completed' task_data['progress'] = 100 task_data['processed'] = processed task_data['end_time'] = int(time.time()) redis_obj.set_data(redis_key, json.dumps(task_data)) # 查询受影响的批次ID并清除缓存 batch_ids = BurnEncryptedICUID.objects.filter( burn_id=burn_record.id ).values_list('batch_id', flat=True).distinct() for batch_id in batch_ids: cache_key = f"batch_stats:{batch_id}" redis_obj.del_data(cache_key) LOGGER.info(f"处理烧录记录完成,任务ID: {task_id}, 处理UID数量: {processed}") # 清理临时文件 try: os.remove(file_path) except Exception as e: LOGGER.warning(f"删除临时文件失败: {str(e)}") except Exception as e: LOGGER.error(f"处理烧录记录失败: {str(e)}") task_data = { 'status': 'failed', 'error': str(e), 'end_time': int(time.time()) } redis_obj.set_data(redis_key, json.dumps(task_data)) @classmethod def _update_uids_batch(cls, uids_batch, burn_id, current_time, redis_key): """批量更新UID记录""" redis_obj = RedisObject() try: with transaction.atomic(): # 先查询出受影响的批次ID batch_ids = BurnEncryptedICUID.objects.filter( uid__in=uids_batch ).values_list('batch_id', flat=True).distinct() updated = BurnEncryptedICUID.objects.filter( uid__in=uids_batch ).update( burn_id=burn_id, status=1, # 烧录成功 updated_time=current_time ) # 更新已处理数量到Redis task_data = json.loads(redis_obj.get_data(redis_key)) task_data['processed'] = task_data.get('processed', 0) + len(uids_batch) redis_obj.set_data(redis_key, json.dumps(task_data)) # 清除受影响批次的统计缓存 for batch_id in batch_ids: cache_key = f"batch_stats:{batch_id}" redis_obj.del_data(cache_key) except Exception as e: LOGGER.error(f"批量更新UID失败: {str(e)}") raise @classmethod def get_import_progress(cls, request_dict: Dict[str, Any], response) -> Any: """ 查询任务进度(支持导入和烧录任务) :param request_dict: 请求参数字典(必须包含task_id, 可选type) :param response: 响应对象 :return: JSON响应 type参数说明: - import: 导入任务(默认) - burn: 烧录任务 """ # 1. 参数验证 task_id = request_dict.get('task_id') if not task_id: return response.json(444, "缺少task_id参数") task_type = request_dict.get('type', 'import').lower() if task_type not in ['import', 'burn']: return response.json(444, "type参数必须是'import'或'burn'") # 2. 构建Redis key redis_key = f"{task_type}_task:{task_id}" try: # 3. 从Redis获取任务数据 redis_obj = RedisObject() task_data_str = redis_obj.get_data(redis_key) if not task_data_str: return response.json(173, "任务不存在或已过期") # 4. 解析任务数据 if isinstance(task_data_str, bytes): task_data_str = task_data_str.decode('utf-8') task_data = json.loads(task_data_str) # 5. 计算耗时(秒) current_time = int(time.time()) start_time = task_data.get('start_time', current_time) elapsed = current_time - start_time if task_data.get('end_time'): elapsed = task_data['end_time'] - start_time # 6. 构建基础响应数据 result = { 'status': task_data.get('status', 'unknown'), 'progress': task_data.get('progress', 0), 'processed': task_data.get('processed', 0), 'total': task_data.get('total', 0), 'elapsed_seconds': elapsed, 'start_time': start_time, 'end_time': task_data.get('end_time'), 'error': task_data.get('error'), 'task_type': task_type } # 7. 根据任务类型添加特定字段 if task_type == 'import': # 从Redis获取批次号并查询批次信息 batch_number = task_data.get('batch_number', '') if batch_number: try: batch = BurnBatch.objects.filter(batch_number=batch_number).first() if batch: result.update({ 'batch_number': batch_number, 'success_count': task_data.get('success_count', 0), 'purpose': batch.purpose, 'manager': batch.manager, 'total_uid': batch.total_uid }) else: result.update({ 'batch_number': batch_number, 'success_count': task_data.get('success_count', 0) }) except Exception as e: LOGGER.error(f"查询批次信息失败: {str(e)}") result.update({ 'batch_number': batch_number, 'success_count': task_data.get('success_count', 0) }) else: # burn task result.update({ 'order_number': task_data.get('order_number', ''), 'purpose': task_data.get('purpose', ''), 'burn_count': task_data.get('burn_count', 0), 'burn_record_id': task_data.get('burn_record_id') }) return response.json(0, result) except json.JSONDecodeError: LOGGER.error(f"任务数据解析失败, redis_key: {redis_key}") return response.json(500, "任务数据格式错误") except Exception as e: LOGGER.error(f"查询任务进度失败: {str(e)}") return response.json(500, "查询进度失败") @classmethod def get_import_task_list(cls, request_dict: Dict[str, Any], response) -> Any: """ 获取所有导入任务列表 :param request_dict: 请求参数字典 :param response: 响应对象 :return: JSON响应 """ try: redis_obj = RedisObject() # 获取所有import_task开头的key keys = redis_obj.get_keys("import_task:*") if not keys: return response.json(0, {"tasks": []}) tasks = [] # 获取每个任务的基本信息 for key in keys: try: task_data_str = redis_obj.get_data(key) if task_data_str: # 确保task_data_str是字符串类型 if isinstance(task_data_str, bytes): task_data_str = task_data_str.decode('utf-8') task_data = json.loads(task_data_str) # 处理key为bytes的情况 key_str = key.decode('utf-8') if isinstance(key, bytes) else key tasks.append({ 'task_id': key_str.split(':')[1], # 从key中提取task_id 'status': task_data.get('status', 'unknown'), 'progress': task_data.get('progress', 0), 'batch_number': task_data.get('batch_number', ''), 'start_time': task_data.get('start_time'), 'end_time': task_data.get('end_time'), 'processed': task_data.get('processed', 0), 'total': task_data.get('total', 0), 'redis_key': key_str }) except Exception as e: LOGGER.error(f"解析任务数据失败, key: {key}, error: {str(e)}") continue # 按开始时间倒序排列 tasks.sort(key=lambda x: x.get('start_time', 0), reverse=True) # 限制返回数量(最近100条) tasks = tasks[:100] return response.json(0, {"tasks": tasks}) except Exception as e: LOGGER.error(f"获取导入任务列表失败: {str(e)}") return response.json(500, "获取任务列表失败") @classmethod def batch_page_uids(cls, request_dict: Dict[str, Any], response) -> Any: """ 根据burn_id分页查询烧录UID记录 :param request_dict: 请求参数字典 :param response: 响应对象 :return: JSON响应 """ try: page = int(request_dict.get('page', 1)) page_size = int(request_dict.get('pageSize', 10)) page = max(page, 1) page_size = max(1, min(page_size, 100)) except (ValueError, TypeError): return response.json(444, "分页参数错误(必须为整数)") # 3. 构建查询条件 query = Q() # 添加batch_id筛选条件 batch_id = request_dict.get('batch_id') if batch_id: try: batch_id = int(batch_id) query &= Q(batch_id=batch_id) except ValueError: return response.json(444, "batch_id必须是整数") # 4. 查询并分页 uid_qs = BurnEncryptedICUID.objects.filter(query).order_by('-created_time').values( 'id', 'uid', 'batch_id', 'status', 'created_time', 'updated_time' ) paginator = Paginator(uid_qs, page_size) try: page_obj = paginator.page(page) except PageNotAnInteger: page_obj = paginator.page(1) except EmptyPage: page_obj = paginator.page(paginator.num_pages) # 转换为列表 uid_list = list(page_obj) return response.json( 0, { 'list': uid_list, 'total': paginator.count, 'currentPage': page_obj.number, 'totalPages': paginator.num_pages, 'pageSize': page_size } )