UIDBurnManageController.py 33 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872
  1. # -*- encoding: utf-8 -*-
  2. """
  3. @File : UIDBurnManageController.py
  4. @Time : 2025/7/30 08:57
  5. @Author : stephen
  6. @Email : zhangdongming@asj6.wecom.work
  7. @Software: PyCharm
  8. """
  9. import json
  10. import os
  11. import random
  12. import string
  13. import threading
  14. import time
  15. from datetime import datetime
  16. from typing import Dict, Any
  17. from uuid import uuid4
  18. from django.core.paginator import Paginator, EmptyPage, PageNotAnInteger
  19. from django.db import transaction
  20. from django.db.models import Q
  21. from django.http import QueryDict
  22. from django.views import View
  23. from openpyxl import load_workbook
  24. from AgentModel.models import BurnRecord, BurnEncryptedICUID, BurnBatch
  25. from Ansjer.config import LOGGER
  26. from Object.RedisObject import RedisObject
  27. from Object.ResponseObject import ResponseObject
  28. from Object.TokenObject import TokenObject
  29. class UIDBurnManageView(View):
  30. def get(self, request, *args, **kwargs):
  31. request.encoding = 'utf-8'
  32. operation = kwargs.get('operation')
  33. return self.validation(request.GET, request, operation)
  34. def post(self, request, *args, **kwargs):
  35. request.encoding = 'utf-8'
  36. operation = kwargs.get('operation')
  37. return self.validation(request.POST, request, operation)
  38. def delete(self, request, *args, **kwargs):
  39. request.encoding = 'utf-8'
  40. operation = kwargs.get('operation')
  41. delete = QueryDict(request.body)
  42. if not delete:
  43. delete = request.GET
  44. return self.validation(delete, request, operation)
  45. def put(self, request, *args, **kwargs):
  46. request.encoding = 'utf-8'
  47. operation = kwargs.get('operation')
  48. put = QueryDict(request.body)
  49. return self.validation(put, request, operation)
  50. def validation(self, request_dict, request, operation):
  51. """请求验证路由"""
  52. # 初始化响应对象
  53. language = request_dict.get('language', 'cn')
  54. response = ResponseObject(language, 'pc')
  55. # Token验证
  56. try:
  57. tko = TokenObject(
  58. request.META.get('HTTP_AUTHORIZATION'),
  59. returntpye='pc')
  60. if tko.code != 0:
  61. return response.json(tko.code)
  62. response.lang = tko.lang
  63. user_id = tko.userID
  64. except Exception as e:
  65. LOGGER.error(f"Token验证失败: {str(e)}")
  66. return response.json(444)
  67. if operation == 'getBurnRecordsPage':
  68. return self.get_burn_records_page(request_dict, response)
  69. elif operation == 'importBatchUids':
  70. return self.import_batch_uids(request, response)
  71. elif operation == 'addBurnRecord':
  72. return self.add_burn_record(request, request_dict, response)
  73. elif operation == 'batchPageUids':
  74. return self.batch_page_uids(request_dict, response)
  75. elif operation == 'getImportProgress':
  76. return self.get_import_progress(request_dict, response)
  77. elif operation == 'getImportTaskList':
  78. return self.get_import_task_list(request_dict, response)
  79. elif operation == 'getBatchRecordsPage':
  80. return self.get_batch_records_page(request_dict, response)
  81. else:
  82. return response.json(414)
  83. @classmethod
  84. def get_batch_records_page(cls, request_dict: Dict[str, Any], response) -> Any:
  85. """
  86. 分页查询批次记录(带统计信息)
  87. :param request_dict: 请求参数字典
  88. :param response: 响应对象
  89. :return: JSON响应
  90. """
  91. # 1. 分页参数处理
  92. try:
  93. page = int(request_dict.get('page', 1))
  94. page_size = int(request_dict.get('pageSize', 10))
  95. page = max(page, 1)
  96. page_size = max(1, min(page_size, 100))
  97. except (ValueError, TypeError):
  98. return response.json(444, "分页参数错误(必须为整数)")
  99. # 2. 构建查询条件
  100. query = Q()
  101. batch_number = request_dict.get('batch_number', '').strip()
  102. if batch_number:
  103. query &= Q(batch_number__icontains=batch_number)
  104. # 3. 查询并分页
  105. batch_qs = BurnBatch.objects.filter(query).order_by('-created_time').values(
  106. 'id', 'batch_number', 'purpose', 'manager', 'total_uid', 'created_time'
  107. )
  108. paginator = Paginator(batch_qs, page_size)
  109. try:
  110. page_obj = paginator.page(page)
  111. except PageNotAnInteger:
  112. page_obj = paginator.page(1)
  113. except EmptyPage:
  114. page_obj = paginator.page(paginator.num_pages)
  115. # 4. 获取统计信息并构建结果
  116. redis_obj = RedisObject()
  117. batch_list = []
  118. for batch in page_obj:
  119. batch_id = batch['id']
  120. cache_key = f"batch_stats:{batch_id}"
  121. # 尝试从缓存获取统计信息
  122. cached_stats = redis_obj.get_data(cache_key)
  123. if cached_stats:
  124. stats = json.loads(cached_stats)
  125. else:
  126. # 查询数据库统计
  127. burned_count = BurnEncryptedICUID.objects.filter(
  128. batch_id=batch_id,
  129. status=1
  130. ).count()
  131. unburned_count = BurnEncryptedICUID.objects.filter(
  132. batch_id=batch_id,
  133. status=0
  134. ).count()
  135. stats = {
  136. 'burned_count': burned_count,
  137. 'unburned_count': unburned_count
  138. }
  139. # 设置缓存,过期时间1小时
  140. redis_obj.set_data(cache_key, json.dumps(stats), 3600)
  141. # 合并批次信息和统计信息
  142. batch_info = dict(batch)
  143. batch_info.update(stats)
  144. batch_list.append(batch_info)
  145. return response.json(
  146. 0,
  147. {
  148. 'list': batch_list,
  149. 'total': paginator.count,
  150. 'currentPage': page_obj.number,
  151. 'totalPages': paginator.num_pages,
  152. 'pageSize': page_size
  153. }
  154. )
  155. @classmethod
  156. def get_burn_records_page(cls, request_dict: Dict[str, Any], response) -> Any:
  157. """
  158. 分页查询烧录记录
  159. :param cls:
  160. :param request_dict: 请求参数字典(包含分页参数和查询条件)
  161. :param response: 响应对象(用于返回JSON)
  162. :return: 分页查询结果的JSON响应
  163. """
  164. # 1. 分页参数处理与验证
  165. try:
  166. page = int(request_dict.get('page', 1))
  167. page_size = int(request_dict.get('pageSize', 10))
  168. # 限制分页范围
  169. page = max(page, 1)
  170. page_size = max(1, min(page_size, 100))
  171. except (ValueError, TypeError):
  172. return response.json(444, "分页参数错误(必须为整数)")
  173. # 2. 构建查询条件
  174. query = Q()
  175. order_number = request_dict.get('orderNumber', '').strip()
  176. if order_number:
  177. query &= Q(order_number__icontains=order_number)
  178. # 3. 获取查询集并指定需要的字段
  179. burn_qs = BurnRecord.objects.filter(query).order_by('-created_time').values(
  180. 'id', 'order_number', 'burn_count', 'purpose', 'created_time'
  181. )
  182. # 4. 分页处理
  183. paginator = Paginator(burn_qs, page_size)
  184. try:
  185. page_obj = paginator.page(page)
  186. except PageNotAnInteger:
  187. page_obj = paginator.page(1)
  188. except EmptyPage:
  189. page_obj = paginator.page(paginator.num_pages)
  190. # 转换为列表
  191. burn_list = list(page_obj)
  192. # 返回结果
  193. return response.json(
  194. 0,
  195. {
  196. 'total': paginator.count, # 总记录数
  197. 'list': burn_list, # 当前页数据列表
  198. 'currentPage': page_obj.number,
  199. 'totalPages': paginator.num_pages
  200. }
  201. )
  202. @classmethod
  203. def import_batch_uids(cls, request, response) -> Any:
  204. """
  205. 导入批次UID - 异步优化版(适配新表结构)
  206. :param request: HttpRequest对象(包含上传文件)
  207. :param response: 响应对象
  208. :return: JSON响应
  209. """
  210. # 1. 验证文件上传
  211. if 'file' not in request.FILES:
  212. return response.json(444, "请上传Excel文件")
  213. excel_file = request.FILES['file']
  214. if not excel_file.name.endswith(('.xlsx', '.xls')):
  215. return response.json(444, "只支持Excel文件(.xlsx/.xls)")
  216. try:
  217. # 2. 生成任务ID和批次号
  218. task_id = str(uuid4())
  219. # 生成带时间戳和随机字符的批次号
  220. timestamp = datetime.now().strftime('%Y%m%d%H%M%S') # 精确到秒
  221. random_chars = ''.join(random.choices(string.ascii_uppercase + string.digits, k=3)) # 3个随机字符
  222. batch_number = f"ENG{timestamp}{random_chars}" # 格式: ENG+时间戳+随机字符
  223. # 3. 初始化Redis状态
  224. redis_key = f"import_task:{task_id}"
  225. redis_obj = RedisObject()
  226. # 保存任务基本信息到Redis (过期时间2小时)
  227. task_data = {
  228. 'status': 'pending',
  229. 'batch_number': batch_number,
  230. 'progress': 0,
  231. 'processed': 0,
  232. 'total': 0,
  233. 'start_time': int(time.time()),
  234. 'success_count': 0
  235. }
  236. redis_obj.set_data(redis_key, json.dumps(task_data), 7200)
  237. # 4. 保存文件到项目static/uploadedfiles目录
  238. base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
  239. upload_dir = os.path.join(base_dir, 'static', 'uploaded_files')
  240. os.makedirs(upload_dir, exist_ok=True)
  241. file_path = os.path.join(upload_dir, f"{task_id}.xlsx")
  242. with open(file_path, 'wb+') as destination:
  243. for chunk in excel_file.chunks():
  244. destination.write(chunk)
  245. # 5. 启动后台线程处理
  246. thread = threading.Thread(
  247. target=cls._process_import_batch_async,
  248. args=(task_id, file_path, redis_key, batch_number),
  249. daemon=True
  250. )
  251. thread.start()
  252. return response.json(0, {
  253. "task_id": task_id,
  254. "batch_number": batch_number,
  255. "message": "导入任务已提交,正在后台处理",
  256. "redis_key": redis_key
  257. })
  258. except Exception as e:
  259. LOGGER.error(f"创建导入任务失败: {str(e)}")
  260. return response.json(500, "创建导入任务失败")
  261. @classmethod
  262. def _process_import_batch_async(cls, task_id, file_path, redis_key, batch_number):
  263. """后台线程处理批量导入任务(适配新表结构)"""
  264. redis_obj = RedisObject()
  265. try:
  266. # 获取并更新任务状态为处理中
  267. task_data = json.loads(redis_obj.get_data(redis_key))
  268. task_data['status'] = 'processing'
  269. redis_obj.set_data(redis_key, json.dumps(task_data))
  270. # 1. 读取Excel文件获取总行数
  271. wb = load_workbook(file_path)
  272. ws = wb.active
  273. total_rows = ws.max_row
  274. # 更新总行数和开始时间
  275. task_data['total'] = total_rows
  276. task_data['start_time'] = int(time.time())
  277. redis_obj.set_data(redis_key, json.dumps(task_data))
  278. # 2. 创建批次记录
  279. current_time = int(time.time())
  280. with transaction.atomic():
  281. batch = BurnBatch(
  282. batch_number=batch_number,
  283. purpose='批次导入',
  284. created_time=current_time,
  285. manager='system', # 默认系统导入
  286. total_uid=0 # 初始为0,处理完成后更新
  287. )
  288. batch.save()
  289. batch_id = batch.id
  290. # 3. 分批处理UID数据(每批500条)
  291. batch_size = 500
  292. processed = 0
  293. success_count = 0
  294. uids_batch = []
  295. for row in ws.iter_rows(min_row=1, values_only=True):
  296. if row[0]:
  297. uid = str(row[0]).strip()
  298. if uid:
  299. uids_batch.append(uid)
  300. processed += 1
  301. # 每处理1000条更新一次进度
  302. if processed % 1000 == 0:
  303. progress = min(99, int((processed / total_rows) * 100))
  304. task_data['progress'] = progress
  305. task_data['processed'] = processed
  306. task_data['last_update'] = int(time.time())
  307. redis_obj.set_data(redis_key, json.dumps(task_data))
  308. # 处理批次
  309. if len(uids_batch) >= batch_size:
  310. success = cls._import_uids_batch(
  311. uids_batch,
  312. batch_id,
  313. current_time,
  314. redis_key
  315. )
  316. success_count += success
  317. uids_batch = []
  318. # 处理最后一批
  319. if uids_batch:
  320. success = cls._import_uids_batch(
  321. uids_batch,
  322. batch_id,
  323. current_time,
  324. redis_key
  325. )
  326. success_count += success
  327. # 更新批次总UID数
  328. with transaction.atomic():
  329. BurnBatch.objects.filter(id=batch_id).update(total_uid=success_count)
  330. # 更新最终状态
  331. task_data['status'] = 'completed'
  332. task_data['progress'] = 100
  333. task_data['processed'] = processed
  334. task_data['success_count'] = success_count
  335. task_data['end_time'] = int(time.time())
  336. redis_obj.set_data(redis_key, json.dumps(task_data))
  337. LOGGER.info(f"处理批量导入完成,任务ID: {task_id}, 处理UID数量: {processed}, 成功导入: {success_count}")
  338. # 清理临时文件
  339. try:
  340. os.remove(file_path)
  341. except Exception as e:
  342. LOGGER.warning(f"删除临时文件失败: {str(e)}")
  343. except Exception as e:
  344. LOGGER.error(f"处理批量导入失败: {str(e)}")
  345. task_data = {
  346. 'status': 'failed',
  347. 'error': str(e),
  348. 'end_time': int(time.time())
  349. }
  350. redis_obj.set_data(redis_key, json.dumps(task_data))
  351. @classmethod
  352. def _import_uids_batch(cls, uids_batch, batch_id, current_time, redis_key):
  353. """批量导入UID记录(适配新表结构)"""
  354. redis_obj = RedisObject()
  355. try:
  356. with transaction.atomic():
  357. # 去重处理
  358. unique_uids = list(set(uids_batch))
  359. # 创建记录
  360. records = [
  361. BurnEncryptedICUID(
  362. batch_id=batch_id,
  363. uid=uid,
  364. created_time=current_time,
  365. updated_time=current_time,
  366. status=0 # 未烧录状态
  367. )
  368. for uid in unique_uids
  369. ]
  370. BurnEncryptedICUID.objects.bulk_create(records)
  371. # 更新已处理数量和成功数量到Redis
  372. task_data = json.loads(redis_obj.get_data(redis_key))
  373. task_data['processed'] = task_data.get('processed', 0) + len(uids_batch)
  374. task_data['success_count'] = task_data.get('success_count', 0) + len(records)
  375. redis_obj.set_data(redis_key, json.dumps(task_data))
  376. # 清除批次统计缓存
  377. cache_key = f"batch_stats:{batch_id}"
  378. redis_obj.del_data(cache_key)
  379. return len(records)
  380. except Exception as e:
  381. LOGGER.error(f"批量导入UID失败: {str(e)}")
  382. # 更新失败状态但继续处理
  383. task_data = json.loads(redis_obj.get_data(redis_key))
  384. task_data['processed'] = task_data.get('processed', 0) + len(uids_batch)
  385. redis_obj.set_data(redis_key, json.dumps(task_data))
  386. return 0
  387. @classmethod
  388. def add_burn_record(cls, request, request_dict, response) -> Any:
  389. """
  390. 新增烧录记录(带UID文件) - Redis字符串优化版
  391. :param request_dict:
  392. :param request: HttpRequest对象(包含上传文件和表单数据)
  393. :param response: 响应对象
  394. :return: JSON响应
  395. """
  396. required_fields = ['order_number', 'burn_count', 'purpose']
  397. for field in required_fields:
  398. if not request_dict.get(field):
  399. return response.json(444, f"缺少必填参数: {field}")
  400. # 2. 验证文件上传
  401. if 'uid_file' not in request.FILES:
  402. return response.json(444, "请上传包含已烧录UID的Excel文件")
  403. excel_file = request.FILES['uid_file']
  404. if not excel_file.name.endswith(('.xlsx', '.xls')):
  405. return response.json(444, "只支持Excel文件(.xlsx/.xls)")
  406. try:
  407. burn_count = int(request_dict['burn_count'])
  408. if burn_count <= 0:
  409. return response.json(444, "烧录数量必须大于0")
  410. # 3. 创建任务ID并初始化Redis状态
  411. task_id = str(uuid4())
  412. redis_key = f"burn_task:{task_id}"
  413. redis_obj = RedisObject()
  414. # 保存任务基本信息到Redis (过期时间2小时)
  415. task_data = {
  416. 'status': 'pending',
  417. 'order_number': request_dict['order_number'].strip(),
  418. 'burn_count': burn_count,
  419. 'purpose': request_dict['purpose'].strip(),
  420. 'progress': 0,
  421. 'processed': 0,
  422. 'total': 0,
  423. 'start_time': int(time.time())
  424. }
  425. redis_obj.set_data(redis_key, json.dumps(task_data), 7200)
  426. # 4. 保存文件到项目static/uploadedfiles目录
  427. base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
  428. upload_dir = os.path.join(base_dir, 'static', 'uploaded_files')
  429. os.makedirs(upload_dir, exist_ok=True)
  430. file_path = os.path.join(upload_dir, f"{task_id}.xlsx")
  431. with open(file_path, 'wb+') as destination:
  432. for chunk in excel_file.chunks():
  433. destination.write(chunk)
  434. # 5. 启动后台线程处理
  435. thread = threading.Thread(
  436. target=cls._process_burn_record_async,
  437. args=(task_id, file_path, redis_key),
  438. daemon=True
  439. )
  440. thread.start()
  441. return response.json(0, {
  442. "task_id": task_id,
  443. "message": "任务已提交,正在后台处理",
  444. "redis_key": redis_key
  445. })
  446. except ValueError:
  447. return response.json(444, "烧录数量必须是整数")
  448. except Exception as e:
  449. LOGGER.error(f"创建烧录任务失败: {str(e)}")
  450. return response.json(500, "创建烧录任务失败")
  451. @classmethod
  452. def _process_burn_record_async(cls, task_id, file_path, redis_key):
  453. """后台线程处理烧录记录任务"""
  454. redis_obj = RedisObject()
  455. try:
  456. # 获取并更新任务状态为处理中
  457. task_data = json.loads(redis_obj.get_data(redis_key))
  458. task_data['status'] = 'processing'
  459. redis_obj.set_data(redis_key, json.dumps(task_data))
  460. # 1. 读取Excel文件获取总行数
  461. wb = load_workbook(file_path)
  462. ws = wb.active
  463. total_rows = ws.max_row
  464. # 更新总行数和开始时间
  465. task_data['total'] = total_rows
  466. task_data['start_time'] = int(time.time())
  467. redis_obj.set_data(redis_key, json.dumps(task_data))
  468. # 2. 创建烧录记录
  469. with transaction.atomic():
  470. burn_record = BurnRecord(
  471. order_number=task_data['order_number'],
  472. burn_count=task_data['burn_count'],
  473. purpose=task_data['purpose'],
  474. updated_time=int(time.time()),
  475. created_time=int(time.time())
  476. )
  477. burn_record.save()
  478. task_data['burn_record_id'] = burn_record.id
  479. redis_obj.set_data(redis_key, json.dumps(task_data))
  480. # 3. 分批处理UID文件(每批300条)
  481. batch_size = 300
  482. current_time = int(time.time())
  483. processed = 0
  484. uids_batch = []
  485. for row in ws.iter_rows(min_row=1, values_only=True):
  486. if row[0]:
  487. uid = str(row[0]).strip()
  488. if uid:
  489. uids_batch.append(uid)
  490. processed += 1
  491. # 每处理100条更新一次进度
  492. if processed % 100 == 0:
  493. progress = min(99, int((processed / total_rows) * 100))
  494. task_data['progress'] = progress
  495. task_data['processed'] = processed
  496. task_data['last_update'] = int(time.time())
  497. redis_obj.set_data(redis_key, json.dumps(task_data))
  498. # 处理批次
  499. if len(uids_batch) >= batch_size:
  500. cls._update_uids_batch(
  501. uids_batch,
  502. burn_record.id,
  503. current_time,
  504. redis_key
  505. )
  506. uids_batch = []
  507. # 处理最后一批
  508. if uids_batch:
  509. cls._update_uids_batch(
  510. uids_batch,
  511. burn_record.id,
  512. current_time,
  513. redis_key
  514. )
  515. # 更新最终状态
  516. task_data['status'] = 'completed'
  517. task_data['progress'] = 100
  518. task_data['processed'] = processed
  519. task_data['end_time'] = int(time.time())
  520. redis_obj.set_data(redis_key, json.dumps(task_data))
  521. # 查询受影响的批次ID并清除缓存
  522. batch_ids = BurnEncryptedICUID.objects.filter(
  523. burn_id=burn_record.id
  524. ).values_list('batch_id', flat=True).distinct()
  525. for batch_id in batch_ids:
  526. cache_key = f"batch_stats:{batch_id}"
  527. redis_obj.del_data(cache_key)
  528. LOGGER.info(f"处理烧录记录完成,任务ID: {task_id}, 处理UID数量: {processed}")
  529. # 清理临时文件
  530. try:
  531. os.remove(file_path)
  532. except Exception as e:
  533. LOGGER.warning(f"删除临时文件失败: {str(e)}")
  534. except Exception as e:
  535. LOGGER.error(f"处理烧录记录失败: {str(e)}")
  536. task_data = {
  537. 'status': 'failed',
  538. 'error': str(e),
  539. 'end_time': int(time.time())
  540. }
  541. redis_obj.set_data(redis_key, json.dumps(task_data))
  542. @classmethod
  543. def _update_uids_batch(cls, uids_batch, burn_id, current_time, redis_key):
  544. """批量更新UID记录"""
  545. redis_obj = RedisObject()
  546. try:
  547. with transaction.atomic():
  548. # 先查询出受影响的批次ID
  549. batch_ids = BurnEncryptedICUID.objects.filter(
  550. uid__in=uids_batch
  551. ).values_list('batch_id', flat=True).distinct()
  552. updated = BurnEncryptedICUID.objects.filter(
  553. uid__in=uids_batch
  554. ).update(
  555. burn_id=burn_id,
  556. status=1, # 烧录成功
  557. updated_time=current_time
  558. )
  559. # 更新已处理数量到Redis
  560. task_data = json.loads(redis_obj.get_data(redis_key))
  561. task_data['processed'] = task_data.get('processed', 0) + len(uids_batch)
  562. redis_obj.set_data(redis_key, json.dumps(task_data))
  563. # 清除受影响批次的统计缓存
  564. for batch_id in batch_ids:
  565. cache_key = f"batch_stats:{batch_id}"
  566. redis_obj.del_data(cache_key)
  567. except Exception as e:
  568. LOGGER.error(f"批量更新UID失败: {str(e)}")
  569. raise
  570. @classmethod
  571. def get_import_progress(cls, request_dict: Dict[str, Any], response) -> Any:
  572. """
  573. 查询任务进度(支持导入和烧录任务)
  574. :param request_dict: 请求参数字典(必须包含task_id, 可选type)
  575. :param response: 响应对象
  576. :return: JSON响应
  577. type参数说明:
  578. - import: 导入任务(默认)
  579. - burn: 烧录任务
  580. """
  581. # 1. 参数验证
  582. task_id = request_dict.get('task_id')
  583. if not task_id:
  584. return response.json(444, "缺少task_id参数")
  585. task_type = request_dict.get('type', 'import').lower()
  586. if task_type not in ['import', 'burn']:
  587. return response.json(444, "type参数必须是'import'或'burn'")
  588. # 2. 构建Redis key
  589. redis_key = f"{task_type}_task:{task_id}"
  590. try:
  591. # 3. 从Redis获取任务数据
  592. redis_obj = RedisObject()
  593. task_data_str = redis_obj.get_data(redis_key)
  594. if not task_data_str:
  595. return response.json(173, "任务不存在或已过期")
  596. # 4. 解析任务数据
  597. if isinstance(task_data_str, bytes):
  598. task_data_str = task_data_str.decode('utf-8')
  599. task_data = json.loads(task_data_str)
  600. # 5. 计算耗时(秒)
  601. current_time = int(time.time())
  602. start_time = task_data.get('start_time', current_time)
  603. elapsed = current_time - start_time
  604. if task_data.get('end_time'):
  605. elapsed = task_data['end_time'] - start_time
  606. # 6. 构建基础响应数据
  607. result = {
  608. 'status': task_data.get('status', 'unknown'),
  609. 'progress': task_data.get('progress', 0),
  610. 'processed': task_data.get('processed', 0),
  611. 'total': task_data.get('total', 0),
  612. 'elapsed_seconds': elapsed,
  613. 'start_time': start_time,
  614. 'end_time': task_data.get('end_time'),
  615. 'error': task_data.get('error'),
  616. 'task_type': task_type
  617. }
  618. # 7. 根据任务类型添加特定字段
  619. if task_type == 'import':
  620. # 从Redis获取批次号并查询批次信息
  621. batch_number = task_data.get('batch_number', '')
  622. if batch_number:
  623. try:
  624. batch = BurnBatch.objects.filter(batch_number=batch_number).first()
  625. if batch:
  626. result.update({
  627. 'batch_number': batch_number,
  628. 'success_count': task_data.get('success_count', 0),
  629. 'purpose': batch.purpose,
  630. 'manager': batch.manager,
  631. 'total_uid': batch.total_uid
  632. })
  633. else:
  634. result.update({
  635. 'batch_number': batch_number,
  636. 'success_count': task_data.get('success_count', 0)
  637. })
  638. except Exception as e:
  639. LOGGER.error(f"查询批次信息失败: {str(e)}")
  640. result.update({
  641. 'batch_number': batch_number,
  642. 'success_count': task_data.get('success_count', 0)
  643. })
  644. else: # burn task
  645. result.update({
  646. 'order_number': task_data.get('order_number', ''),
  647. 'purpose': task_data.get('purpose', ''),
  648. 'burn_count': task_data.get('burn_count', 0),
  649. 'burn_record_id': task_data.get('burn_record_id')
  650. })
  651. return response.json(0, result)
  652. except json.JSONDecodeError:
  653. LOGGER.error(f"任务数据解析失败, redis_key: {redis_key}")
  654. return response.json(500, "任务数据格式错误")
  655. except Exception as e:
  656. LOGGER.error(f"查询任务进度失败: {str(e)}")
  657. return response.json(500, "查询进度失败")
  658. @classmethod
  659. def get_import_task_list(cls, request_dict: Dict[str, Any], response) -> Any:
  660. """
  661. 获取所有导入任务列表
  662. :param request_dict: 请求参数字典
  663. :param response: 响应对象
  664. :return: JSON响应
  665. """
  666. try:
  667. redis_obj = RedisObject()
  668. # 获取所有import_task开头的key
  669. keys = redis_obj.get_keys("import_task:*")
  670. if not keys:
  671. return response.json(0, {"tasks": []})
  672. tasks = []
  673. # 获取每个任务的基本信息
  674. for key in keys:
  675. try:
  676. task_data_str = redis_obj.get_data(key)
  677. if task_data_str:
  678. # 确保task_data_str是字符串类型
  679. if isinstance(task_data_str, bytes):
  680. task_data_str = task_data_str.decode('utf-8')
  681. task_data = json.loads(task_data_str)
  682. # 处理key为bytes的情况
  683. key_str = key.decode('utf-8') if isinstance(key, bytes) else key
  684. tasks.append({
  685. 'task_id': key_str.split(':')[1], # 从key中提取task_id
  686. 'status': task_data.get('status', 'unknown'),
  687. 'progress': task_data.get('progress', 0),
  688. 'batch_number': task_data.get('batch_number', ''),
  689. 'start_time': task_data.get('start_time'),
  690. 'end_time': task_data.get('end_time'),
  691. 'processed': task_data.get('processed', 0),
  692. 'total': task_data.get('total', 0),
  693. 'redis_key': key_str
  694. })
  695. except Exception as e:
  696. LOGGER.error(f"解析任务数据失败, key: {key}, error: {str(e)}")
  697. continue
  698. # 按开始时间倒序排列
  699. tasks.sort(key=lambda x: x.get('start_time', 0), reverse=True)
  700. # 限制返回数量(最近100条)
  701. tasks = tasks[:100]
  702. return response.json(0, {"tasks": tasks})
  703. except Exception as e:
  704. LOGGER.error(f"获取导入任务列表失败: {str(e)}")
  705. return response.json(500, "获取任务列表失败")
  706. @classmethod
  707. def batch_page_uids(cls, request_dict: Dict[str, Any], response) -> Any:
  708. """
  709. 根据burn_id分页查询烧录UID记录
  710. :param request_dict: 请求参数字典
  711. :param response: 响应对象
  712. :return: JSON响应
  713. """
  714. try:
  715. page = int(request_dict.get('page', 1))
  716. page_size = int(request_dict.get('pageSize', 10))
  717. page = max(page, 1)
  718. page_size = max(1, min(page_size, 100))
  719. except (ValueError, TypeError):
  720. return response.json(444, "分页参数错误(必须为整数)")
  721. # 3. 构建查询条件
  722. query = Q()
  723. # 添加batch_id筛选条件
  724. batch_id = request_dict.get('batch_id')
  725. if batch_id:
  726. try:
  727. batch_id = int(batch_id)
  728. query &= Q(batch_id=batch_id)
  729. except ValueError:
  730. return response.json(444, "batch_id必须是整数")
  731. # 4. 查询并分页
  732. uid_qs = BurnEncryptedICUID.objects.filter(query).order_by('-created_time').values(
  733. 'id', 'uid', 'batch_id', 'status', 'created_time', 'updated_time'
  734. )
  735. paginator = Paginator(uid_qs, page_size)
  736. try:
  737. page_obj = paginator.page(page)
  738. except PageNotAnInteger:
  739. page_obj = paginator.page(1)
  740. except EmptyPage:
  741. page_obj = paginator.page(paginator.num_pages)
  742. # 转换为列表
  743. uid_list = list(page_obj)
  744. return response.json(
  745. 0,
  746. {
  747. 'list': uid_list,
  748. 'total': paginator.count,
  749. 'currentPage': page_obj.number,
  750. 'totalPages': paginator.num_pages,
  751. 'pageSize': page_size
  752. }
  753. )