UIDBurnManageController.py 33 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884
  1. # -*- encoding: utf-8 -*-
  2. """
  3. @File : UIDBurnManageController.py
  4. @Time : 2025/7/30 08:57
  5. @Author : stephen
  6. @Email : zhangdongming@asj6.wecom.work
  7. @Software: PyCharm
  8. """
  9. import json
  10. import os
  11. import random
  12. import string
  13. import threading
  14. import time
  15. from datetime import datetime
  16. from typing import Dict, Any
  17. from uuid import uuid4
  18. import xlrd
  19. from django.core.paginator import Paginator, EmptyPage, PageNotAnInteger
  20. from django.db import transaction
  21. from django.db.models import Q
  22. from django.http import QueryDict
  23. from django.views import View
  24. from openpyxl import load_workbook
  25. from AgentModel.models import BurnRecord, BurnEncryptedICUID, BurnBatch
  26. from Ansjer.config import LOGGER
  27. from Object.RedisObject import RedisObject
  28. from Object.ResponseObject import ResponseObject
  29. from Object.TokenObject import TokenObject
  30. class UIDBurnManageView(View):
  31. def get(self, request, *args, **kwargs):
  32. request.encoding = 'utf-8'
  33. operation = kwargs.get('operation')
  34. return self.validation(request.GET, request, operation)
  35. def post(self, request, *args, **kwargs):
  36. request.encoding = 'utf-8'
  37. operation = kwargs.get('operation')
  38. return self.validation(request.POST, request, operation)
  39. def delete(self, request, *args, **kwargs):
  40. request.encoding = 'utf-8'
  41. operation = kwargs.get('operation')
  42. delete = QueryDict(request.body)
  43. if not delete:
  44. delete = request.GET
  45. return self.validation(delete, request, operation)
  46. def put(self, request, *args, **kwargs):
  47. request.encoding = 'utf-8'
  48. operation = kwargs.get('operation')
  49. put = QueryDict(request.body)
  50. return self.validation(put, request, operation)
  51. def validation(self, request_dict, request, operation):
  52. """请求验证路由"""
  53. # 初始化响应对象
  54. language = request_dict.get('language', 'cn')
  55. response = ResponseObject(language, 'pc')
  56. # Token验证
  57. try:
  58. tko = TokenObject(
  59. request.META.get('HTTP_AUTHORIZATION'),
  60. returntpye='pc')
  61. if tko.code != 0:
  62. return response.json(tko.code)
  63. response.lang = tko.lang
  64. user_id = tko.userID
  65. except Exception as e:
  66. LOGGER.error(f"Token验证失败: {str(e)}")
  67. return response.json(444)
  68. if operation == 'getBurnRecordsPage':
  69. return self.get_burn_records_page(request_dict, response)
  70. elif operation == 'importBatchUids':
  71. return self.import_batch_uids(request, response)
  72. elif operation == 'addBurnRecord':
  73. return self.add_burn_record(request, request_dict, response)
  74. elif operation == 'batchPageUids':
  75. return self.batch_page_uids(request_dict, response)
  76. elif operation == 'getImportProgress':
  77. return self.get_import_progress(request_dict, response)
  78. elif operation == 'getImportTaskList':
  79. return self.get_import_task_list(request_dict, response)
  80. elif operation == 'getBatchRecordsPage':
  81. return self.get_batch_records_page(request_dict, response)
  82. else:
  83. return response.json(414)
  84. @classmethod
  85. def get_batch_records_page(cls, request_dict: Dict[str, Any], response) -> Any:
  86. """
  87. 分页查询批次记录(带统计信息)
  88. :param request_dict: 请求参数字典
  89. :param response: 响应对象
  90. :return: JSON响应
  91. """
  92. # 1. 分页参数处理
  93. try:
  94. page = int(request_dict.get('page', 1))
  95. page_size = int(request_dict.get('pageSize', 10))
  96. page = max(page, 1)
  97. page_size = max(1, min(page_size, 100))
  98. except (ValueError, TypeError):
  99. return response.json(444, "分页参数错误(必须为整数)")
  100. # 2. 构建查询条件
  101. query = Q()
  102. batch_number = request_dict.get('batch_number', '').strip()
  103. if batch_number:
  104. query &= Q(batch_number__icontains=batch_number)
  105. # 3. 查询并分页
  106. batch_qs = BurnBatch.objects.filter(query).order_by('-created_time').values(
  107. 'id', 'batch_number', 'purpose', 'manager', 'total_uid', 'created_time'
  108. )
  109. paginator = Paginator(batch_qs, page_size)
  110. try:
  111. page_obj = paginator.page(page)
  112. except PageNotAnInteger:
  113. page_obj = paginator.page(1)
  114. except EmptyPage:
  115. page_obj = paginator.page(paginator.num_pages)
  116. # 4. 获取统计信息并构建结果
  117. redis_obj = RedisObject()
  118. batch_list = []
  119. for batch in page_obj:
  120. batch_id = batch['id']
  121. cache_key = f"batch_stats:{batch_id}"
  122. # 尝试从缓存获取统计信息
  123. cached_stats = redis_obj.get_data(cache_key)
  124. if cached_stats:
  125. stats = json.loads(cached_stats)
  126. else:
  127. # 查询数据库统计
  128. burned_count = BurnEncryptedICUID.objects.filter(
  129. batch_id=batch_id,
  130. status=1
  131. ).count()
  132. unburned_count = BurnEncryptedICUID.objects.filter(
  133. batch_id=batch_id,
  134. status=0
  135. ).count()
  136. stats = {
  137. 'burned_count': burned_count,
  138. 'unburned_count': unburned_count
  139. }
  140. # 设置缓存,过期时间1小时
  141. redis_obj.set_data(cache_key, json.dumps(stats), 3600)
  142. # 合并批次信息和统计信息
  143. batch_info = dict(batch)
  144. batch_info.update(stats)
  145. batch_list.append(batch_info)
  146. return response.json(
  147. 0,
  148. {
  149. 'list': batch_list,
  150. 'total': paginator.count,
  151. 'currentPage': page_obj.number,
  152. 'totalPages': paginator.num_pages,
  153. 'pageSize': page_size
  154. }
  155. )
  156. @classmethod
  157. def get_burn_records_page(cls, request_dict: Dict[str, Any], response) -> Any:
  158. """
  159. 分页查询烧录记录
  160. :param cls:
  161. :param request_dict: 请求参数字典(包含分页参数和查询条件)
  162. :param response: 响应对象(用于返回JSON)
  163. :return: 分页查询结果的JSON响应
  164. """
  165. # 1. 分页参数处理与验证
  166. try:
  167. page = int(request_dict.get('page', 1))
  168. page_size = int(request_dict.get('pageSize', 10))
  169. # 限制分页范围
  170. page = max(page, 1)
  171. page_size = max(1, min(page_size, 100))
  172. except (ValueError, TypeError):
  173. return response.json(444, "分页参数错误(必须为整数)")
  174. # 2. 构建查询条件
  175. query = Q()
  176. order_number = request_dict.get('orderNumber', '').strip()
  177. if order_number:
  178. query &= Q(order_number__icontains=order_number)
  179. # 3. 获取查询集并指定需要的字段
  180. burn_qs = BurnRecord.objects.filter(query).order_by('-created_time').values(
  181. 'id', 'order_number', 'burn_count', 'purpose', 'created_time'
  182. )
  183. # 4. 分页处理
  184. paginator = Paginator(burn_qs, page_size)
  185. try:
  186. page_obj = paginator.page(page)
  187. except PageNotAnInteger:
  188. page_obj = paginator.page(1)
  189. except EmptyPage:
  190. page_obj = paginator.page(paginator.num_pages)
  191. # 转换为列表
  192. burn_list = list(page_obj)
  193. # 返回结果
  194. return response.json(
  195. 0,
  196. {
  197. 'total': paginator.count, # 总记录数
  198. 'list': burn_list, # 当前页数据列表
  199. 'currentPage': page_obj.number,
  200. 'totalPages': paginator.num_pages
  201. }
  202. )
  203. @classmethod
  204. def import_batch_uids(cls, request, response) -> Any:
  205. """
  206. 导入批次UID - 异步优化版(适配新表结构)
  207. :param request: HttpRequest对象(包含上传文件)
  208. :param response: 响应对象
  209. :return: JSON响应
  210. """
  211. # 1. 验证文件上传
  212. if 'file' not in request.FILES:
  213. return response.json(444, "请上传Excel文件")
  214. excel_file = request.FILES['file']
  215. if not excel_file.name.endswith(('.xlsx', '.xls')):
  216. return response.json(444, "只支持Excel文件(.xlsx/.xls)")
  217. try:
  218. # 2. 生成任务ID和批次号
  219. task_id = str(uuid4())
  220. # 生成带时间戳和随机字符的批次号
  221. timestamp = datetime.now().strftime('%Y%m%d%H%M%S') # 精确到秒
  222. random_chars = ''.join(random.choices(string.ascii_uppercase + string.digits, k=3)) # 3个随机字符
  223. batch_number = f"ENG{timestamp}{random_chars}" # 格式: ENG+时间戳+随机字符
  224. # 3. 初始化Redis状态
  225. redis_key = f"import_task:{task_id}"
  226. redis_obj = RedisObject()
  227. # 保存任务基本信息到Redis (过期时间2小时)
  228. task_data = {
  229. 'status': 'pending',
  230. 'batch_number': batch_number,
  231. 'progress': 0,
  232. 'processed': 0,
  233. 'total': 0,
  234. 'start_time': int(time.time()),
  235. 'success_count': 0
  236. }
  237. redis_obj.set_data(redis_key, json.dumps(task_data), 7200)
  238. # 4. 保存文件到项目static/uploadedfiles目录
  239. base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
  240. upload_dir = os.path.join(base_dir, 'static', 'uploaded_files')
  241. os.makedirs(upload_dir, exist_ok=True)
  242. file_path = os.path.join(upload_dir, f"{task_id}.xls")
  243. with open(file_path, 'wb+') as destination:
  244. for chunk in excel_file.chunks():
  245. destination.write(chunk)
  246. # 5. 启动后台线程处理
  247. thread = threading.Thread(
  248. target=cls._process_import_batch_async,
  249. args=(task_id, file_path, redis_key, batch_number),
  250. daemon=True
  251. )
  252. thread.start()
  253. return response.json(0, {
  254. "task_id": task_id,
  255. "batch_number": batch_number,
  256. "message": "导入任务已提交,正在后台处理",
  257. "redis_key": redis_key
  258. })
  259. except Exception as e:
  260. LOGGER.error(f"创建导入任务失败: {str(e)}")
  261. return response.json(500, "创建导入任务失败")
  262. @classmethod
  263. def _process_import_batch_async(cls, task_id, file_path, redis_key, batch_number):
  264. """后台线程处理批量导入任务(兼容 xls 和 xlsx,读取第3列UID)"""
  265. redis_obj = RedisObject()
  266. try:
  267. task_data = json.loads(redis_obj.get_data(redis_key))
  268. task_data['status'] = 'processing'
  269. redis_obj.set_data(redis_key, json.dumps(task_data))
  270. # 判断文件类型并读取数据
  271. file_ext = os.path.splitext(file_path)[-1].lower()
  272. uid_rows = []
  273. if file_ext == '.xls':
  274. # 使用 xlrd 读取 .xls 文件
  275. workbook = xlrd.open_workbook(file_path)
  276. sheet = workbook.sheet_by_index(0)
  277. total_rows = sheet.nrows
  278. for row_idx in range(0, total_rows): # 改为从第0行开始
  279. row = sheet.row_values(row_idx)
  280. if len(row) > 2 and row[2]:
  281. uid = str(row[2]).strip()
  282. if uid:
  283. uid_rows.append(uid)
  284. elif file_ext == '.xlsx':
  285. # 使用 openpyxl 读取 .xlsx 文件
  286. workbook = load_workbook(file_path, read_only=True)
  287. sheet = workbook.active
  288. for row in sheet.iter_rows(min_row=1, values_only=True): # 改为从第1行开始
  289. if len(row) > 2 and row[2]:
  290. uid = str(row[2]).strip()
  291. if uid:
  292. uid_rows.append(uid)
  293. task_data['total'] = len(uid_rows)
  294. task_data['start_time'] = int(time.time())
  295. redis_obj.set_data(redis_key, json.dumps(task_data))
  296. # 创建批次记录
  297. current_time = int(time.time())
  298. with transaction.atomic():
  299. batch = BurnBatch(
  300. batch_number=batch_number,
  301. purpose='批次导入',
  302. created_time=current_time,
  303. manager='system',
  304. total_uid=0
  305. )
  306. batch.save()
  307. batch_id = batch.id
  308. # 分批处理UID
  309. batch_size = 500
  310. processed = 0
  311. success_count = 0
  312. uids_batch = []
  313. for uid in uid_rows:
  314. uids_batch.append(uid)
  315. processed += 1
  316. # 每处理1000条更新进度
  317. if processed % 1000 == 0:
  318. progress = min(99, int((processed / len(uid_rows)) * 100))
  319. task_data['progress'] = progress
  320. task_data['processed'] = processed
  321. task_data['last_update'] = int(time.time())
  322. redis_obj.set_data(redis_key, json.dumps(task_data))
  323. if len(uids_batch) >= batch_size:
  324. success = cls._import_uids_batch(
  325. uids_batch,
  326. batch_id,
  327. current_time,
  328. redis_key
  329. )
  330. success_count += success
  331. uids_batch = []
  332. # 最后一批处理
  333. if uids_batch:
  334. success = cls._import_uids_batch(
  335. uids_batch,
  336. batch_id,
  337. current_time,
  338. redis_key
  339. )
  340. success_count += success
  341. # 更新数据库记录
  342. with transaction.atomic():
  343. BurnBatch.objects.filter(id=batch_id).update(total_uid=success_count)
  344. task_data['status'] = 'completed'
  345. task_data['progress'] = 100
  346. task_data['processed'] = processed
  347. task_data['success_count'] = success_count
  348. task_data['end_time'] = int(time.time())
  349. redis_obj.set_data(redis_key, json.dumps(task_data))
  350. LOGGER.info(f"批次导入完成,任务ID: {task_id},处理UID: {processed},成功导入: {success_count}")
  351. try:
  352. os.remove(file_path)
  353. except Exception as e:
  354. LOGGER.warning(f"删除临时文件失败: {str(e)}")
  355. except Exception as e:
  356. LOGGER.error(f"处理导入任务失败: {str(e)}")
  357. task_data = {
  358. 'status': 'failed',
  359. 'error': str(e),
  360. 'end_time': int(time.time())
  361. }
  362. redis_obj.set_data(redis_key, json.dumps(task_data))
  363. @classmethod
  364. def _import_uids_batch(cls, uids_batch, batch_id, current_time, redis_key):
  365. """批量导入UID记录(适配新表结构)"""
  366. redis_obj = RedisObject()
  367. try:
  368. with transaction.atomic():
  369. # 去重处理
  370. unique_uids = list(set(uids_batch))
  371. # 创建记录
  372. records = [
  373. BurnEncryptedICUID(
  374. batch_id=batch_id,
  375. uid=uid,
  376. created_time=current_time,
  377. updated_time=current_time,
  378. status=0 # 未烧录状态
  379. )
  380. for uid in unique_uids
  381. ]
  382. BurnEncryptedICUID.objects.bulk_create(records)
  383. # 更新已处理数量和成功数量到Redis
  384. task_data = json.loads(redis_obj.get_data(redis_key))
  385. task_data['processed'] = task_data.get('processed', 0) + len(uids_batch)
  386. task_data['success_count'] = task_data.get('success_count', 0) + len(records)
  387. redis_obj.set_data(redis_key, json.dumps(task_data))
  388. # 清除批次统计缓存
  389. cache_key = f"batch_stats:{batch_id}"
  390. redis_obj.del_data(cache_key)
  391. return len(records)
  392. except Exception as e:
  393. LOGGER.error(f"批量导入UID失败: {str(e)}")
  394. # 更新失败状态但继续处理
  395. task_data = json.loads(redis_obj.get_data(redis_key))
  396. task_data['processed'] = task_data.get('processed', 0) + len(uids_batch)
  397. redis_obj.set_data(redis_key, json.dumps(task_data))
  398. return 0
  399. @classmethod
  400. def add_burn_record(cls, request, request_dict, response) -> Any:
  401. """
  402. 新增烧录记录(带UID文件) - Redis字符串优化版
  403. :param request_dict:
  404. :param request: HttpRequest对象(包含上传文件和表单数据)
  405. :param response: 响应对象
  406. :return: JSON响应
  407. """
  408. required_fields = ['order_number', 'burn_count', 'purpose']
  409. for field in required_fields:
  410. if not request_dict.get(field):
  411. return response.json(444, f"缺少必填参数: {field}")
  412. # 2. 验证文件上传
  413. if 'uid_file' not in request.FILES:
  414. return response.json(444, "请上传包含已烧录UID的Excel文件")
  415. excel_file = request.FILES['uid_file']
  416. if not excel_file.name.endswith(('.xlsx', '.xls')):
  417. return response.json(444, "只支持Excel文件(.xlsx/.xls)")
  418. try:
  419. burn_count = int(request_dict['burn_count'])
  420. if burn_count <= 0:
  421. return response.json(444, "烧录数量必须大于0")
  422. # 3. 创建任务ID并初始化Redis状态
  423. task_id = str(uuid4())
  424. redis_key = f"burn_task:{task_id}"
  425. redis_obj = RedisObject()
  426. # 保存任务基本信息到Redis (过期时间2小时)
  427. task_data = {
  428. 'status': 'pending',
  429. 'order_number': request_dict['order_number'].strip(),
  430. 'burn_count': burn_count,
  431. 'purpose': request_dict['purpose'].strip(),
  432. 'progress': 0,
  433. 'processed': 0,
  434. 'total': 0,
  435. 'start_time': int(time.time())
  436. }
  437. redis_obj.set_data(redis_key, json.dumps(task_data), 7200)
  438. # 4. 保存文件到项目static/uploadedfiles目录
  439. base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
  440. upload_dir = os.path.join(base_dir, 'static', 'uploaded_files')
  441. os.makedirs(upload_dir, exist_ok=True)
  442. file_path = os.path.join(upload_dir, f"{task_id}.xlsx")
  443. with open(file_path, 'wb+') as destination:
  444. for chunk in excel_file.chunks():
  445. destination.write(chunk)
  446. # 5. 启动后台线程处理
  447. thread = threading.Thread(
  448. target=cls._process_burn_record_async,
  449. args=(task_id, file_path, redis_key),
  450. daemon=True
  451. )
  452. thread.start()
  453. return response.json(0, {
  454. "task_id": task_id,
  455. "message": "任务已提交,正在后台处理",
  456. "redis_key": redis_key
  457. })
  458. except ValueError:
  459. return response.json(444, "烧录数量必须是整数")
  460. except Exception as e:
  461. LOGGER.error(f"创建烧录任务失败: {str(e)}")
  462. return response.json(500, "创建烧录任务失败")
  463. @classmethod
  464. def _process_burn_record_async(cls, task_id, file_path, redis_key):
  465. """后台线程处理烧录记录任务"""
  466. redis_obj = RedisObject()
  467. try:
  468. # 获取并更新任务状态为处理中
  469. task_data = json.loads(redis_obj.get_data(redis_key))
  470. task_data['status'] = 'processing'
  471. redis_obj.set_data(redis_key, json.dumps(task_data))
  472. # 1. 读取Excel文件获取总行数
  473. wb = load_workbook(file_path)
  474. ws = wb.active
  475. total_rows = ws.max_row
  476. # 更新总行数和开始时间
  477. task_data['total'] = total_rows
  478. task_data['start_time'] = int(time.time())
  479. redis_obj.set_data(redis_key, json.dumps(task_data))
  480. # 2. 创建烧录记录
  481. with transaction.atomic():
  482. burn_record = BurnRecord(
  483. order_number=task_data['order_number'],
  484. burn_count=task_data['burn_count'],
  485. purpose=task_data['purpose'],
  486. updated_time=int(time.time()),
  487. created_time=int(time.time())
  488. )
  489. burn_record.save()
  490. task_data['burn_record_id'] = burn_record.id
  491. redis_obj.set_data(redis_key, json.dumps(task_data))
  492. # 3. 分批处理UID文件(每批300条)
  493. batch_size = 300
  494. current_time = int(time.time())
  495. processed = 0
  496. uids_batch = []
  497. for row in ws.iter_rows(min_row=1, values_only=True):
  498. if row[0]:
  499. uid = str(row[0]).strip()
  500. if uid:
  501. uids_batch.append(uid)
  502. processed += 1
  503. # 每处理100条更新一次进度
  504. if processed % 100 == 0:
  505. progress = min(99, int((processed / total_rows) * 100))
  506. task_data['progress'] = progress
  507. task_data['processed'] = processed
  508. task_data['last_update'] = int(time.time())
  509. redis_obj.set_data(redis_key, json.dumps(task_data))
  510. # 处理批次
  511. if len(uids_batch) >= batch_size:
  512. cls._update_uids_batch(
  513. uids_batch,
  514. burn_record.id,
  515. current_time,
  516. redis_key
  517. )
  518. uids_batch = []
  519. # 处理最后一批
  520. if uids_batch:
  521. cls._update_uids_batch(
  522. uids_batch,
  523. burn_record.id,
  524. current_time,
  525. redis_key
  526. )
  527. # 更新最终状态
  528. task_data['status'] = 'completed'
  529. task_data['progress'] = 100
  530. task_data['processed'] = processed
  531. task_data['end_time'] = int(time.time())
  532. redis_obj.set_data(redis_key, json.dumps(task_data))
  533. # 查询受影响的批次ID并清除缓存
  534. batch_ids = BurnEncryptedICUID.objects.filter(
  535. burn_id=burn_record.id
  536. ).values_list('batch_id', flat=True).distinct()
  537. for batch_id in batch_ids:
  538. cache_key = f"batch_stats:{batch_id}"
  539. redis_obj.del_data(cache_key)
  540. LOGGER.info(f"处理烧录记录完成,任务ID: {task_id}, 处理UID数量: {processed}")
  541. # 清理临时文件
  542. try:
  543. os.remove(file_path)
  544. except Exception as e:
  545. LOGGER.warning(f"删除临时文件失败: {str(e)}")
  546. except Exception as e:
  547. LOGGER.error(f"处理烧录记录失败: {str(e)}")
  548. task_data = {
  549. 'status': 'failed',
  550. 'error': str(e),
  551. 'end_time': int(time.time())
  552. }
  553. redis_obj.set_data(redis_key, json.dumps(task_data))
  554. @classmethod
  555. def _update_uids_batch(cls, uids_batch, burn_id, current_time, redis_key):
  556. """批量更新UID记录"""
  557. redis_obj = RedisObject()
  558. try:
  559. with transaction.atomic():
  560. # 先查询出受影响的批次ID
  561. batch_ids = BurnEncryptedICUID.objects.filter(
  562. uid__in=uids_batch
  563. ).values_list('batch_id', flat=True).distinct()
  564. updated = BurnEncryptedICUID.objects.filter(
  565. uid__in=uids_batch
  566. ).update(
  567. burn_id=burn_id,
  568. status=1, # 烧录成功
  569. updated_time=current_time
  570. )
  571. # 更新已处理数量到Redis
  572. task_data = json.loads(redis_obj.get_data(redis_key))
  573. task_data['processed'] = task_data.get('processed', 0) + len(uids_batch)
  574. redis_obj.set_data(redis_key, json.dumps(task_data))
  575. # 清除受影响批次的统计缓存
  576. for batch_id in batch_ids:
  577. cache_key = f"batch_stats:{batch_id}"
  578. redis_obj.del_data(cache_key)
  579. except Exception as e:
  580. LOGGER.error(f"批量更新UID失败: {str(e)}")
  581. raise
  582. @classmethod
  583. def get_import_progress(cls, request_dict: Dict[str, Any], response) -> Any:
  584. """
  585. 查询任务进度(支持导入和烧录任务)
  586. :param request_dict: 请求参数字典(必须包含task_id, 可选type)
  587. :param response: 响应对象
  588. :return: JSON响应
  589. type参数说明:
  590. - import: 导入任务(默认)
  591. - burn: 烧录任务
  592. """
  593. # 1. 参数验证
  594. task_id = request_dict.get('task_id')
  595. if not task_id:
  596. return response.json(444, "缺少task_id参数")
  597. task_type = request_dict.get('type', 'import').lower()
  598. if task_type not in ['import', 'burn']:
  599. return response.json(444, "type参数必须是'import'或'burn'")
  600. # 2. 构建Redis key
  601. redis_key = f"{task_type}_task:{task_id}"
  602. try:
  603. # 3. 从Redis获取任务数据
  604. redis_obj = RedisObject()
  605. task_data_str = redis_obj.get_data(redis_key)
  606. if not task_data_str:
  607. return response.json(173, "任务不存在或已过期")
  608. # 4. 解析任务数据
  609. if isinstance(task_data_str, bytes):
  610. task_data_str = task_data_str.decode('utf-8')
  611. task_data = json.loads(task_data_str)
  612. # 5. 计算耗时(秒)
  613. current_time = int(time.time())
  614. start_time = task_data.get('start_time', current_time)
  615. elapsed = current_time - start_time
  616. if task_data.get('end_time'):
  617. elapsed = task_data['end_time'] - start_time
  618. # 6. 构建基础响应数据
  619. result = {
  620. 'status': task_data.get('status', 'unknown'),
  621. 'progress': task_data.get('progress', 0),
  622. 'processed': task_data.get('processed', 0),
  623. 'total': task_data.get('total', 0),
  624. 'elapsed_seconds': elapsed,
  625. 'start_time': start_time,
  626. 'end_time': task_data.get('end_time'),
  627. 'error': task_data.get('error'),
  628. 'task_type': task_type
  629. }
  630. # 7. 根据任务类型添加特定字段
  631. if task_type == 'import':
  632. # 从Redis获取批次号并查询批次信息
  633. batch_number = task_data.get('batch_number', '')
  634. if batch_number:
  635. try:
  636. batch = BurnBatch.objects.filter(batch_number=batch_number).first()
  637. if batch:
  638. result.update({
  639. 'batch_number': batch_number,
  640. 'success_count': task_data.get('success_count', 0),
  641. 'purpose': batch.purpose,
  642. 'manager': batch.manager,
  643. 'total_uid': batch.total_uid
  644. })
  645. else:
  646. result.update({
  647. 'batch_number': batch_number,
  648. 'success_count': task_data.get('success_count', 0)
  649. })
  650. except Exception as e:
  651. LOGGER.error(f"查询批次信息失败: {str(e)}")
  652. result.update({
  653. 'batch_number': batch_number,
  654. 'success_count': task_data.get('success_count', 0)
  655. })
  656. else: # burn task
  657. result.update({
  658. 'order_number': task_data.get('order_number', ''),
  659. 'purpose': task_data.get('purpose', ''),
  660. 'burn_count': task_data.get('burn_count', 0),
  661. 'burn_record_id': task_data.get('burn_record_id')
  662. })
  663. return response.json(0, result)
  664. except json.JSONDecodeError:
  665. LOGGER.error(f"任务数据解析失败, redis_key: {redis_key}")
  666. return response.json(500, "任务数据格式错误")
  667. except Exception as e:
  668. LOGGER.error(f"查询任务进度失败: {str(e)}")
  669. return response.json(500, "查询进度失败")
  670. @classmethod
  671. def get_import_task_list(cls, request_dict: Dict[str, Any], response) -> Any:
  672. """
  673. 获取所有导入任务列表
  674. :param request_dict: 请求参数字典
  675. :param response: 响应对象
  676. :return: JSON响应
  677. """
  678. try:
  679. redis_obj = RedisObject()
  680. # 获取所有import_task开头的key
  681. keys = redis_obj.get_keys("import_task:*")
  682. if not keys:
  683. return response.json(0, {"tasks": []})
  684. tasks = []
  685. # 获取每个任务的基本信息
  686. for key in keys:
  687. try:
  688. task_data_str = redis_obj.get_data(key)
  689. if task_data_str:
  690. # 确保task_data_str是字符串类型
  691. if isinstance(task_data_str, bytes):
  692. task_data_str = task_data_str.decode('utf-8')
  693. task_data = json.loads(task_data_str)
  694. # 处理key为bytes的情况
  695. key_str = key.decode('utf-8') if isinstance(key, bytes) else key
  696. tasks.append({
  697. 'task_id': key_str.split(':')[1], # 从key中提取task_id
  698. 'status': task_data.get('status', 'unknown'),
  699. 'progress': task_data.get('progress', 0),
  700. 'batch_number': task_data.get('batch_number', ''),
  701. 'start_time': task_data.get('start_time'),
  702. 'end_time': task_data.get('end_time'),
  703. 'processed': task_data.get('processed', 0),
  704. 'total': task_data.get('total', 0),
  705. 'redis_key': key_str
  706. })
  707. except Exception as e:
  708. LOGGER.error(f"解析任务数据失败, key: {key}, error: {str(e)}")
  709. continue
  710. # 按开始时间倒序排列
  711. tasks.sort(key=lambda x: x.get('start_time', 0), reverse=True)
  712. # 限制返回数量(最近100条)
  713. tasks = tasks[:100]
  714. return response.json(0, {"tasks": tasks})
  715. except Exception as e:
  716. LOGGER.error(f"获取导入任务列表失败: {str(e)}")
  717. return response.json(500, "获取任务列表失败")
  718. @classmethod
  719. def batch_page_uids(cls, request_dict: Dict[str, Any], response) -> Any:
  720. """
  721. 根据burn_id分页查询烧录UID记录
  722. :param request_dict: 请求参数字典
  723. :param response: 响应对象
  724. :return: JSON响应
  725. """
  726. try:
  727. page = int(request_dict.get('page', 1))
  728. page_size = int(request_dict.get('pageSize', 10))
  729. page = max(page, 1)
  730. page_size = max(1, min(page_size, 100))
  731. except (ValueError, TypeError):
  732. return response.json(444, "分页参数错误(必须为整数)")
  733. # 3. 构建查询条件
  734. query = Q()
  735. # 添加batch_id筛选条件
  736. batch_id = request_dict.get('batch_id')
  737. if batch_id:
  738. try:
  739. batch_id = int(batch_id)
  740. query &= Q(batch_id=batch_id)
  741. except ValueError:
  742. return response.json(444, "batch_id必须是整数")
  743. # 4. 查询并分页
  744. uid_qs = BurnEncryptedICUID.objects.filter(query).order_by('-created_time').values(
  745. 'id', 'uid', 'batch_id', 'status', 'created_time', 'updated_time'
  746. )
  747. paginator = Paginator(uid_qs, page_size)
  748. try:
  749. page_obj = paginator.page(page)
  750. except PageNotAnInteger:
  751. page_obj = paginator.page(1)
  752. except EmptyPage:
  753. page_obj = paginator.page(paginator.num_pages)
  754. # 转换为列表
  755. uid_list = list(page_obj)
  756. return response.json(
  757. 0,
  758. {
  759. 'list': uid_list,
  760. 'total': paginator.count,
  761. 'currentPage': page_obj.number,
  762. 'totalPages': paginator.num_pages,
  763. 'pageSize': page_size
  764. }
  765. )