Browse Source

更新优化UID烧录管理

zhangdongming 3 weeks ago
parent
commit
47a89b94ad
1 changed files with 340 additions and 100 deletions
  1. 340 100
      AdminController/UIDBurnManageController.py

+ 340 - 100
AdminController/UIDBurnManageController.py

@@ -6,17 +6,27 @@
 @Email   : zhangdongming@asj6.wecom.work
 @Software: PyCharm
 """
+import json
 import os
+import random
+import string
+import threading
+import time
+from datetime import datetime
 from typing import Dict, Any
+from uuid import uuid4
 
 from django.core import serializers
 from django.core.paginator import Paginator, EmptyPage, PageNotAnInteger
+from django.db import transaction
 from django.db.models import Q
 from django.http import QueryDict
 from django.views import View
+from openpyxl import load_workbook
 
 from AgentModel.models import BurnRecord, BurnEncryptedICUID
 from Ansjer.config import LOGGER
+from Object.RedisObject import RedisObject
 from Object.ResponseObject import ResponseObject
 from Object.TokenObject import TokenObject
 
@@ -73,6 +83,10 @@ class UIDBurnManageView(View):
             return self.add_burn_record(request, response)
         elif operation == 'getBurnUidsPage':
             return self.get_burn_uids_page(request_dict, response)
+        elif operation == 'getImportProgress':
+            return self.get_import_progress(request_dict, response)
+        elif operation == 'getImportTaskList':
+            return self.get_import_task_list(request_dict, response)
         else:
             return response.json(414)
 
@@ -135,69 +149,202 @@ class UIDBurnManageView(View):
     @classmethod
     def import_batch_uids(cls, request, response) -> Any:
         """
-        导入批次UID
+        导入批次UID - 异步优化版
         :param request: HttpRequest对象(包含上传文件)
         :param response: 响应对象
         :return: JSON响应
         """
-        from openpyxl import load_workbook
-        from datetime import datetime
-        import time
-        
         # 1. 验证文件上传
         if 'file' not in request.FILES:
             return response.json(444, "请上传Excel文件")
-        
+
         excel_file = request.FILES['file']
         if not excel_file.name.endswith(('.xlsx', '.xls')):
             return response.json(444, "只支持Excel文件(.xlsx/.xls)")
-            
-        # 2. 生成批次号
-        batch_number = f"ENG{datetime.now().strftime('%Y%m%d')}"
-        
-        # 3. 读取Excel并处理UID
+
         try:
-            wb = load_workbook(excel_file)
+            # 2. 生成任务ID和批次号
+            task_id = str(uuid4())
+            # 生成带时间戳和随机字符的批次号
+            timestamp = datetime.now().strftime('%Y%m%d%H%M%S')  # 精确到秒
+            random_chars = ''.join(random.choices(string.ascii_uppercase + string.digits, k=3))  # 3个随机字符
+            batch_number = f"ENG{timestamp}{random_chars}"  # 格式: ENG+时间戳+随机字符
+
+            # 3. 初始化Redis状态
+            redis_key = f"import_task:{task_id}"
+            redis_obj = RedisObject()
+
+            # 保存任务基本信息到Redis (过期时间2小时)
+            task_data = {
+                'status': 'pending',
+                'batch_number': batch_number,
+                'progress': 0,
+                'processed': 0,
+                'total': 0,
+                'start_time': int(time.time()),
+                'success_count': 0
+            }
+            redis_obj.set_data(redis_key, json.dumps(task_data), 7200)
+
+            # 4. 保存文件到项目static/uploadedfiles目录
+            base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
+            upload_dir = os.path.join(base_dir, 'static', 'uploaded_files')
+            os.makedirs(upload_dir, exist_ok=True)
+            file_path = os.path.join(upload_dir, f"{task_id}.xlsx")
+
+            with open(file_path, 'wb+') as destination:
+                for chunk in excel_file.chunks():
+                    destination.write(chunk)
+
+            # 5. 启动后台线程处理
+            thread = threading.Thread(
+                target=cls._process_import_batch_async,
+                args=(task_id, file_path, redis_key, batch_number),
+                daemon=True
+            )
+            thread.start()
+
+            return response.json(0, {
+                "task_id": task_id,
+                "batch_number": batch_number,
+                "message": "导入任务已提交,正在后台处理",
+                "redis_key": redis_key
+            })
+
+        except Exception as e:
+            LOGGER.error(f"创建导入任务失败: {str(e)}")
+            return response.json(500, "创建导入任务失败")
+
+    @classmethod
+    def _process_import_batch_async(cls, task_id, file_path, redis_key, batch_number):
+        """后台线程处理批量导入任务"""
+
+        redis_obj = RedisObject()
+
+        try:
+            # 获取并更新任务状态为处理中
+            task_data = json.loads(redis_obj.get_data(redis_key))
+            task_data['status'] = 'processing'
+            redis_obj.set_data(redis_key, json.dumps(task_data))
+
+            # 1. 读取Excel文件获取总行数
+            wb = load_workbook(file_path)
             ws = wb.active
-            uids = []
-            
-            for row in ws.iter_rows(min_row=1, values_only=True):
-                if row[0]:  # 第一列有值
-                    uid = str(row[0]).strip()  # 去空格
-                    if uid:  # 非空字符串
-                        uids.append(uid)
-            
-            if not uids:
-                return response.json(444, "Excel中没有有效的UID数据")
-                
-            # 4. 批量创建记录(使用事务保证原子性)
-            from django.db import transaction
+            total_rows = ws.max_row
+
+            # 更新总行数和开始时间
+            task_data['total'] = total_rows
+            task_data['start_time'] = int(time.time())
+            redis_obj.set_data(redis_key, json.dumps(task_data))
+
+            # 2. 分批处理UID数据(每批500条)
+            batch_size = 500
             current_time = int(time.time())
-            
+            processed = 0
+            success_count = 0
+            uids_batch = []
+
+            for row in ws.iter_rows(min_row=1, values_only=True):
+                if row[0]:
+                    uid = str(row[0]).strip()
+                    if uid:
+                        uids_batch.append(uid)
+                        processed += 1
+
+                        # 每处理1000条更新一次进度
+                        if processed % 1000 == 0:
+                            progress = min(99, int((processed / total_rows) * 100))
+                            task_data['progress'] = progress
+                            task_data['processed'] = processed
+                            task_data['last_update'] = int(time.time())
+                            redis_obj.set_data(redis_key, json.dumps(task_data))
+
+                        # 处理批次
+                        if len(uids_batch) >= batch_size:
+                            success = cls._import_uids_batch(
+                                uids_batch,
+                                batch_number,
+                                current_time,
+                                redis_key
+                            )
+                            success_count += success
+                            uids_batch = []
+
+            # 处理最后一批
+            if uids_batch:
+                success = cls._import_uids_batch(
+                    uids_batch,
+                    batch_number,
+                    current_time,
+                    redis_key
+                )
+                success_count += success
+
+            # 更新最终状态
+            task_data['status'] = 'completed'
+            task_data['progress'] = 100
+            task_data['processed'] = processed
+            task_data['success_count'] = success_count
+            task_data['end_time'] = int(time.time())
+            redis_obj.set_data(redis_key, json.dumps(task_data))
+
+            LOGGER.info(f"处理批量导入完成,任务ID: {task_id}, 处理UID数量: {processed}, 成功导入: {success_count}")
+
+            # 清理临时文件
             try:
-                with transaction.atomic():
-                    records = [
-                        BurnEncryptedICUID(
-                            batch_number=batch_number,
-                            uid=uid,
-                            purpose='批次导入',
-                            created_time=current_time,
-                            updated_time=current_time,
-                            status=0  # 未烧录状态
-                        )
-                        for uid in set(uids)  # 去重
-                    ]
-                    BurnEncryptedICUID.objects.bulk_create(records)
-                    
+                os.remove(file_path)
             except Exception as e:
-                LOGGER.error(f"导入批次UID失败: {str(e)}")
-                return response.json(500, "导入失败,请检查数据格式")
-                
-            return response.json(0, {"batch_number": batch_number, "count": len(records)})
-            
+                LOGGER.warning(f"删除临时文件失败: {str(e)}")
+
         except Exception as e:
-            LOGGER.error(f"处理Excel文件失败: {str(e)}")
-            return response.json(500, "处理Excel文件失败")
+            LOGGER.error(f"处理批量导入失败: {str(e)}")
+            task_data = {
+                'status': 'failed',
+                'error': str(e),
+                'end_time': int(time.time())
+            }
+            redis_obj.set_data(redis_key, json.dumps(task_data))
+
+    @classmethod
+    def _import_uids_batch(cls, uids_batch, batch_number, current_time, redis_key):
+        """批量导入UID记录"""
+
+        redis_obj = RedisObject()
+
+        try:
+            with transaction.atomic():
+                # 去重处理
+                unique_uids = list(set(uids_batch))
+
+                # 创建记录
+                records = [
+                    BurnEncryptedICUID(
+                        batch_number=batch_number,
+                        uid=uid,
+                        purpose='批次导入',
+                        created_time=current_time,
+                        updated_time=current_time,
+                        status=0  # 未烧录状态
+                    )
+                    for uid in unique_uids
+                ]
+                BurnEncryptedICUID.objects.bulk_create(records)
+
+                # 更新已处理数量和成功数量到Redis
+                task_data = json.loads(redis_obj.get_data(redis_key))
+                task_data['processed'] = task_data.get('processed', 0) + len(uids_batch)
+                task_data['success_count'] = task_data.get('success_count', 0) + len(records)
+                redis_obj.set_data(redis_key, json.dumps(task_data))
+
+                return len(records)
+
+        except Exception as e:
+            LOGGER.error(f"批量导入UID失败: {str(e)}")
+            # 更新失败状态但继续处理
+            task_data = json.loads(redis_obj.get_data(redis_key))
+            task_data['processed'] = task_data.get('processed', 0) + len(uids_batch)
+            redis_obj.set_data(redis_key, json.dumps(task_data))
+            return 0
 
     @classmethod
     def add_burn_record(cls, request, response) -> Any:
@@ -207,39 +354,32 @@ class UIDBurnManageView(View):
         :param response: 响应对象
         :return: JSON响应
         """
-        import threading
-        import time
-        import os
-        import json
-        from uuid import uuid4
-        from django.conf import settings
-        from Object.RedisObject import RedisObject
-        
+
         # 1. 参数验证
         request_dict = request.POST
         required_fields = ['order_number', 'burn_count', 'purpose']
         for field in required_fields:
             if not request_dict.get(field):
                 return response.json(444, f"缺少必填参数: {field}")
-                
+
         # 2. 验证文件上传
         if 'uid_file' not in request.FILES:
             return response.json(444, "请上传包含已烧录UID的Excel文件")
-            
+
         excel_file = request.FILES['uid_file']
         if not excel_file.name.endswith(('.xlsx', '.xls')):
             return response.json(444, "只支持Excel文件(.xlsx/.xls)")
-            
+
         try:
             burn_count = int(request_dict['burn_count'])
             if burn_count <= 0:
                 return response.json(444, "烧录数量必须大于0")
-                
+
             # 3. 创建任务ID并初始化Redis状态
             task_id = str(uuid4())
             redis_key = f"burn_task:{task_id}"
             redis_obj = RedisObject()
-            
+
             # 保存任务基本信息到Redis (过期时间2小时)
             task_data = {
                 'status': 'pending',
@@ -252,17 +392,17 @@ class UIDBurnManageView(View):
                 'start_time': int(time.time())
             }
             redis_obj.set_data(redis_key, json.dumps(task_data), 7200)
-            
+
             # 4. 保存文件到项目static/uploadedfiles目录
             base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
             upload_dir = os.path.join(base_dir, 'static', 'uploaded_files')
             os.makedirs(upload_dir, exist_ok=True)
             file_path = os.path.join(upload_dir, f"{task_id}.xlsx")
-            
+
             with open(file_path, 'wb+') as destination:
                 for chunk in excel_file.chunks():
                     destination.write(chunk)
-            
+
             # 5. 启动后台线程处理
             thread = threading.Thread(
                 target=cls._process_burn_record_async,
@@ -270,13 +410,13 @@ class UIDBurnManageView(View):
                 daemon=True
             )
             thread.start()
-            
+
             return response.json(0, {
                 "task_id": task_id,
                 "message": "任务已提交,正在后台处理",
                 "redis_key": redis_key
             })
-            
+
         except ValueError:
             return response.json(444, "烧录数量必须是整数")
         except Exception as e:
@@ -286,31 +426,24 @@ class UIDBurnManageView(View):
     @classmethod
     def _process_burn_record_async(cls, task_id, file_path, redis_key):
         """后台线程处理烧录记录任务"""
-        from openpyxl import load_workbook
-        from django.db import transaction
-        import time
-        import json
-        from AgentModel.models import BurnRecord, BurnEncryptedICUID
-        from Object.RedisObject import RedisObject
-        
         redis_obj = RedisObject()
-        
+
         try:
             # 获取并更新任务状态为处理中
             task_data = json.loads(redis_obj.get_data(redis_key))
             task_data['status'] = 'processing'
             redis_obj.set_data(redis_key, json.dumps(task_data))
-            
+
             # 1. 读取Excel文件获取总行数
             wb = load_workbook(file_path)
             ws = wb.active
             total_rows = ws.max_row
-            
+
             # 更新总行数和开始时间
             task_data['total'] = total_rows
             task_data['start_time'] = int(time.time())
             redis_obj.set_data(redis_key, json.dumps(task_data))
-            
+
             # 2. 创建烧录记录
             with transaction.atomic():
                 burn_record = BurnRecord(
@@ -323,20 +456,20 @@ class UIDBurnManageView(View):
                 burn_record.save()
                 task_data['burn_record_id'] = burn_record.id
                 redis_obj.set_data(redis_key, json.dumps(task_data))
-            
+
             # 3. 分批处理UID文件(每批300条)
             batch_size = 300
             current_time = int(time.time())
             processed = 0
             uids_batch = []
-            
+
             for row in ws.iter_rows(min_row=1, values_only=True):
                 if row[0]:
                     uid = str(row[0]).strip()
                     if uid:
                         uids_batch.append(uid)
                         processed += 1
-                        
+
                         # 每处理100条更新一次进度
                         if processed % 100 == 0:
                             progress = min(99, int((processed / total_rows) * 100))
@@ -344,17 +477,17 @@ class UIDBurnManageView(View):
                             task_data['processed'] = processed
                             task_data['last_update'] = int(time.time())
                             redis_obj.set_data(redis_key, json.dumps(task_data))
-                        
+
                         # 处理批次
                         if len(uids_batch) >= batch_size:
                             cls._update_uids_batch(
-                                uids_batch, 
+                                uids_batch,
                                 burn_record.id,
                                 current_time,
                                 redis_key
                             )
                             uids_batch = []
-            
+
             # 处理最后一批
             if uids_batch:
                 cls._update_uids_batch(
@@ -363,22 +496,22 @@ class UIDBurnManageView(View):
                     current_time,
                     redis_key
                 )
-            
+
             # 更新最终状态
             task_data['status'] = 'completed'
             task_data['progress'] = 100
             task_data['processed'] = processed
             task_data['end_time'] = int(time.time())
             redis_obj.set_data(redis_key, json.dumps(task_data))
-            
+
             LOGGER.info(f"处理烧录记录完成,任务ID: {task_id}, 处理UID数量: {processed}")
-            
+
             # 清理临时文件
             try:
                 os.remove(file_path)
             except Exception as e:
                 LOGGER.warning(f"删除临时文件失败: {str(e)}")
-            
+
         except Exception as e:
             LOGGER.error(f"处理烧录记录失败: {str(e)}")
             task_data = {
@@ -391,12 +524,8 @@ class UIDBurnManageView(View):
     @classmethod
     def _update_uids_batch(cls, uids_batch, burn_id, current_time, redis_key):
         """批量更新UID记录"""
-        from django.db import transaction
-        import json
-        from Object.RedisObject import RedisObject
-        
         redis_obj = RedisObject()
-        
+
         try:
             with transaction.atomic():
                 updated = BurnEncryptedICUID.objects.filter(
@@ -410,11 +539,124 @@ class UIDBurnManageView(View):
                 task_data = json.loads(redis_obj.get_data(redis_key))
                 task_data['processed'] = task_data.get('processed', 0) + len(uids_batch)
                 redis_obj.set_data(redis_key, json.dumps(task_data))
-                
+
         except Exception as e:
             LOGGER.error(f"批量更新UID失败: {str(e)}")
             raise
 
+    @classmethod
+    def get_import_progress(cls, request_dict: Dict[str, Any], response) -> Any:
+        """
+        查询导入任务进度
+        :param request_dict: 请求参数字典(必须包含task_id)
+        :param response: 响应对象
+        :return: JSON响应
+        """
+
+        # 1. 参数验证
+        task_id = request_dict.get('task_id')
+        if not task_id:
+            return response.json(444, "缺少task_id参数")
+
+        # 2. 构建Redis key
+        redis_key = f"import_task:{task_id}"
+
+        try:
+            # 3. 从Redis获取任务数据
+            redis_obj = RedisObject()
+            task_data_str = redis_obj.get_data(redis_key)
+
+            if not task_data_str:
+                return response.json(404, "任务不存在或已过期")
+
+            # 4. 解析任务数据
+            task_data = json.loads(task_data_str)
+
+            # 5. 计算耗时(秒)
+            current_time = int(time.time())
+            start_time = task_data.get('start_time', current_time)
+            elapsed = current_time - start_time
+            if task_data.get('end_time'):
+                elapsed = task_data['end_time'] - start_time
+
+            # 6. 返回标准化进度信息
+            return response.json(0, {
+                'status': task_data.get('status', 'unknown'),
+                'progress': task_data.get('progress', 0),
+                'processed': task_data.get('processed', 0),
+                'total': task_data.get('total', 0),
+                'batch_number': task_data.get('batch_number', ''),
+                'success_count': task_data.get('success_count', 0),
+                'elapsed_seconds': elapsed,
+                'start_time': start_time,
+                'end_time': task_data.get('end_time'),
+                'error': task_data.get('error')
+            })
+
+        except json.JSONDecodeError:
+            LOGGER.error(f"任务数据解析失败, redis_key: {redis_key}")
+            return response.json(500, "任务数据格式错误")
+        except Exception as e:
+            LOGGER.error(f"查询导入进度失败: {str(e)}")
+            return response.json(500)
+
+    @classmethod
+    def get_import_task_list(cls, request_dict: Dict[str, Any], response) -> Any:
+        """
+        获取所有导入任务列表
+        :param request_dict: 请求参数字典
+        :param response: 响应对象
+        :return: JSON响应
+        """
+
+        try:
+            redis_obj = RedisObject()
+            # 获取所有import_task开头的key
+            keys = redis_obj.get_keys("import_task:*")
+
+            if not keys:
+                return response.json(0, {"tasks": []})
+
+            tasks = []
+
+            # 获取每个任务的基本信息
+            for key in keys:
+                try:
+                    task_data_str = redis_obj.get_data(key)
+                    if task_data_str:
+                        # 确保task_data_str是字符串类型
+                        if isinstance(task_data_str, bytes):
+                            task_data_str = task_data_str.decode('utf-8')
+                        task_data = json.loads(task_data_str)
+                        # 处理key为bytes的情况
+                        key_str = key.decode('utf-8') if isinstance(key, bytes) else key
+                        tasks.append({
+                            'task_id': key_str.split(':')[1],  # 从key中提取task_id
+                            'status': task_data.get('status', 'unknown'),
+                            'progress': task_data.get('progress', 0),
+                            'batch_number': task_data.get('batch_number', ''),
+                            'start_time': task_data.get('start_time'),
+                            'end_time': task_data.get('end_time'),
+                            'processed': task_data.get('processed', 0),
+                            'total': task_data.get('total', 0),
+                            'redis_key': key_str
+                        })
+                except Exception as e:
+                    LOGGER.error(f"解析任务数据失败, key: {key}, error: {str(e)}")
+                    continue
+
+            # 按开始时间倒序排列
+            tasks.sort(key=lambda x: x.get('start_time', 0), reverse=True)
+
+            # 限制返回数量(最近100条)
+            tasks = tasks[:100]
+
+            return response.json(0, {"tasks": tasks})
+
+        except Exception as e:
+            LOGGER.error(f"获取导入任务列表失败: {str(e)}")
+            return response.json(500, "获取任务列表失败")
+
     @classmethod
     def get_burn_uids_page(cls, request_dict: Dict[str, Any], response) -> Any:
         """
@@ -427,12 +669,12 @@ class UIDBurnManageView(View):
         burn_id = request_dict.get('burn_id')
         if not burn_id:
             return response.json(444, "缺少burn_id参数")
-            
+
         try:
             burn_id = int(burn_id)
         except ValueError:
             return response.json(444, "burn_id必须是整数")
-            
+
         # 2. 分页参数处理
         try:
             page = int(request_dict.get('page', 1))
@@ -441,11 +683,11 @@ class UIDBurnManageView(View):
             page_size = max(1, min(page_size, 100))
         except (ValueError, TypeError):
             return response.json(444, "分页参数错误(必须为整数)")
-            
+
         # 3. 查询并分页
         query = Q(burn_id=burn_id)
         uid_qs = BurnEncryptedICUID.objects.filter(query).order_by('-created_time')
-        
+
         paginator = Paginator(uid_qs, page_size)
         try:
             page_obj = paginator.page(page)
@@ -453,13 +695,13 @@ class UIDBurnManageView(View):
             page_obj = paginator.page(1)
         except EmptyPage:
             page_obj = paginator.page(paginator.num_pages)
-            
+
         uid_list = serializers.serialize(
             'python',
             page_obj,
             fields=['id', 'uid', 'batch_number', 'status', 'created_time', 'updated_time']
         )
-        
+
         return response.json(
             0,
             {
@@ -470,5 +712,3 @@ class UIDBurnManageView(View):
                 'pageSize': page_size
             }
         )
-
-