ProductsSchemeManageController.py 21 KB


  1. # -*- encoding: utf-8 -*-
  2. """
  3. @File : ProductsSchemeManageController.py
  4. @Time : 2025/5/13 11:52
  5. @Author : stephen
  6. @Email : zhangdongming@asj6.wecom.work
  7. @Software: PyCharm
  8. """
  9. import datetime
  10. import json
  11. import secrets
  12. import string
  13. import time
  14. from io import BytesIO
  15. import qrcode
  16. from django.core.paginator import Paginator, EmptyPage
  17. from django.db import transaction, IntegrityError
  18. from django.db.models import Q
  19. from django.http import QueryDict, HttpResponse
  20. from django.views import View
  21. from Ansjer.config import LOGGER
  22. from Model.models import ProductsScheme, DeviceScheme, DeviceTypeModel, CompanySerialModel
  23. from Object.ResponseObject import ResponseObject
  24. from Object.TokenObject import TokenObject
  25. class ProductsSchemeManageView(View):
  26. def get(self, request, *args, **kwargs):
  27. request.encoding = 'utf-8'
  28. operation = kwargs.get('operation')
  29. return self.validation(request.GET, request, operation)
  30. def post(self, request, *args, **kwargs):
  31. request.encoding = 'utf-8'
  32. operation = kwargs.get('operation')
  33. return self.validation(request.POST, request, operation)
  34. def delete(self, request, *args, **kwargs):
  35. request.encoding = 'utf-8'
  36. operation = kwargs.get('operation')
  37. delete = QueryDict(request.body)
  38. if not delete:
  39. delete = request.GET
  40. return self.validation(delete, request, operation)
  41. def put(self, request, *args, **kwargs):
  42. request.encoding = 'utf-8'
  43. operation = kwargs.get('operation')
  44. put = QueryDict(request.body)
  45. return self.validation(put, request, operation)
  46. def validation(self, request_dict, request, operation):
  47. """请求验证路由"""
  48. # 初始化响应对象
  49. language = request_dict.get('language', 'cn')
  50. response = ResponseObject(language, 'pc')
  51. # Token验证
  52. try:
  53. tko = TokenObject(
  54. request.META.get('HTTP_AUTHORIZATION'),
  55. returntpye='pc')
  56. if tko.code != 0:
  57. return response.json(tko.code)
  58. response.lang = tko.lang
  59. user_id = tko.userID
  60. except Exception as e:
  61. LOGGER.error(f"Token验证失败: {str(e)}")
  62. return response.json(444)
  63. # 操作路由映射
  64. operation_handlers = {
  65. 'queryList': self.query_list, # 查询列表
  66. 'add': self.add_scheme, # 添加方案
  67. 'edit': self.edit_scheme, # 编辑方案
  68. 'delete': self.delete_scheme, # 删除方案
  69. 'generateQR': self.generate_qr_code, # 生成二维码
  70. 'deviceSchemeList': self.device_scheme_list
  71. }
  72. handler = operation_handlers.get(operation)
  73. if not handler:
  74. return response.json(444, 'operation')
  75. try:
  76. return handler(user_id, request_dict, response)
  77. except Exception as e:
  78. LOGGER.error(f"操作{operation}执行异常:{repr(e)}")
  79. return response.json(500, "服务器内部错误")
  80. def query_list(self, user_id, request_dict, response):
  81. """查询方案列表(优化点:分页封装/字段映射)"""
  82. # 参数处理与验证
  83. try:
  84. page = int(request_dict.get('page', 1))
  85. page_size = min(int(request_dict.get('pageSize', 10)), 100) # 限制最大分页大小
  86. except ValueError:
  87. return response.json(444, "分页参数错误")
  88. # 构建查询条件
  89. query = Q(deleted=False)
  90. # 特殊字段处理示例(如果需要)
  91. if order_number := request_dict.get('orderNumber'):
  92. query &= Q(order_number__icontains=order_number)
  93. if storage_code := request_dict.get('storageCode'):
  94. query &= Q(storage_code__icontains=storage_code)
  95. if device_type := request_dict.get('deviceType'):
  96. query &= Q(device_type=int(device_type))
  97. if flash := request_dict.get('flash'):
  98. query &= Q(flash__icontains=flash)
  99. if ddr := request_dict.get('ddr'):
  100. query &= Q(ddr__icontains=ddr)
  101. if main_controller := request_dict.get('mainController'):
  102. query &= Q(main_controller__icontains=main_controller)
  103. if wifi := request_dict.get('wifi'):
  104. query &= Q(wifi__icontains=wifi)
  105. if four_g := request_dict.get('fourG'):
  106. query &= Q(four_g__icontains=four_g)
  107. if ad := request_dict.get('ad'):
  108. query &= Q(ad__icontains=ad)
  109. if sensor := request_dict.get('sensor'):
  110. query &= Q(sensor__icontains=sensor)
  111. # 使用分页器
  112. queryset = ProductsScheme.objects.filter(query).order_by('-created_time')
  113. paginator = Paginator(queryset, page_size)
  114. try:
  115. page_obj = paginator.page(page)
  116. except EmptyPage:
  117. return response.json(444, "页码超出范围")
  118. # 序列化数据
  119. data = [self._scheme_to_dict(scheme) for scheme in page_obj]
  120. return response.json(0, {
  121. 'list': data,
  122. 'total': paginator.count,
  123. 'currentPage': page_obj.number,
  124. 'totalPages': paginator.num_pages
  125. })
  126. @staticmethod
  127. def generate_timestamp_code(device_type: int = 0) -> str:
  128. """
  129. 方案1:基于时间戳的编码
  130. 格式:PC+设备类型+年月日+时分秒+6位随机大写字符
  131. 示例:PC020250519143022-ABCDEF
  132. """
  133. try:
  134. current_time = datetime.datetime.now()
  135. date_part = current_time.strftime("%Y%m%d%H%M%S")
  136. # 使用string.ascii_uppercase生成大写随机字符
  137. random_part = ''.join(secrets.choice(string.ascii_uppercase) for _ in range(3))
  138. return f"{date_part}-{device_type}-{random_part}"
  139. except Exception as e:
  140. LOGGER.error(f"生成编码失败: {repr(e)}")
  141. raise
  142. def add_scheme(self, user_id, request_dict, response):
  143. """新增方案(集成自动编码生成)"""
  144. try:
  145. device_type = int(request_dict.get('deviceType', 0))
  146. # 生成唯一storage_code
  147. storage_code = self.generate_timestamp_code(device_type)
  148. # 构造方案数据
  149. scheme_data = {
  150. 'storage_code': storage_code, # 使用生成的编码
  151. 'order_number': request_dict.get('orderNumber', ''),
  152. 'device_type': device_type,
  153. 'flash': request_dict.get('flash', ''),
  154. 'ddr': request_dict.get('ddr', ''),
  155. 'main_controller': request_dict.get('mainController', ''),
  156. 'wifi': request_dict.get('wifi', ''),
  157. 'four_g': request_dict.get('fourG', ''),
  158. 'ad': request_dict.get('ad', ''),
  159. 'sensor': request_dict.get('sensor', ''),
  160. 'order_quantity': int(request_dict.get('orderQuantity', 0)),
  161. 'customer_code': request_dict.get('customerCode', ''),
  162. 'phy': request_dict.get('phy', ''),
  163. 'remark': request_dict.get('remark', ''),
  164. 'created_time': int(time.time()),
  165. 'updated_time': int(time.time()),
  166. 'created_by': user_id,
  167. 'updated_by': user_id
  168. }
  169. scheme = ProductsScheme.objects.create(**scheme_data)
  170. return response.json(0, {
  171. 'id': scheme.id,
  172. 'storageCode': storage_code # 返回生成的编码
  173. })
  174. except IntegrityError as e:
  175. LOGGER.error(f"唯一性冲突: {str(e)}")
  176. return response.json(173, "数据已存在")
  177. except ValueError as e:
  178. return response.json(444, "参数类型错误")
  179. except Exception as e:
  180. LOGGER.exception(f"添加方案异常:{repr(e)}")
  181. return response.json(500, "生成编码失败")
  182. # 可编辑字段白名单(前端字段名: 模型字段名)
  183. EDITABLE_FIELDS = {
  184. 'orderNumber': 'order_number',
  185. 'deviceType': 'device_type',
  186. 'flash': 'flash',
  187. 'ddr': 'ddr',
  188. 'mainController': 'main_controller',
  189. 'wifi': 'wifi',
  190. 'fourG': 'four_g',
  191. 'ad': 'ad',
  192. 'sensor': 'sensor',
  193. 'orderQuantity': 'order_quantity',
  194. 'customerCode': 'customer_code',
  195. 'phy': 'phy',
  196. 'remark': 'remark'
  197. }
  198. # 需要类型转换的字段配置(字段名: 转换函数)
  199. FIELD_CONVERTERS = {
  200. 'deviceType': int,
  201. 'orderQuantity': int
  202. }
  203. @transaction.atomic
  204. def edit_scheme(self, user_id, request_dict, response):
  205. """
  206. 方案编辑接口(事务原子性保证)
  207. 优化特性:
  208. 1. 前端字段到模型字段的自动映射
  209. 2. 字段级白名单过滤
  210. 3. 智能类型转换
  211. 4. 最小化更新字段
  212. 5. 并发安全控制
  213. """
  214. # ================= 参数校验阶段 =================
  215. if 'id' not in request_dict:
  216. return response.json(444, "缺少方案ID")
  217. try:
  218. scheme_id = int(request_dict['id'])
  219. except (ValueError, TypeError):
  220. LOGGER.warning(f"非法方案ID: {request_dict.get('id')}")
  221. return response.json(444, "方案ID格式错误")
  222. # ================= 数据获取阶段 =================
  223. try:
  224. # 使用select_for_update锁定记录,防止并发修改
  225. scheme = ProductsScheme.objects.select_for_update().get(
  226. id=scheme_id,
  227. deleted=False
  228. )
  229. except ProductsScheme.DoesNotExist:
  230. LOGGER.info(f"方案不存在: {scheme_id}")
  231. return response.json(173, "方案不存在")
  232. except Exception as e:
  233. LOGGER.error(f"数据库查询异常: {str(e)}")
  234. return response.json(500, "系统错误")
  235. # ================= 数据处理阶段 =================
  236. update_fields = []
  237. update_data = {'updated_time': int(time.time()), 'updated_by': user_id}
  238. # 遍历所有请求参数
  239. for frontend_field, value in request_dict.items():
  240. # 1. 白名单校验
  241. if frontend_field not in self.EDITABLE_FIELDS:
  242. continue
  243. # 2. 获取对应的模型字段名
  244. model_field = self.EDITABLE_FIELDS[frontend_field]
  245. # 3. 类型转换处理
  246. if frontend_field in self.FIELD_CONVERTERS:
  247. try:
  248. value = self.FIELD_CONVERTERS[frontend_field](value)
  249. except (ValueError, TypeError):
  250. LOGGER.warning(f"字段类型错误 {frontend_field}={value}")
  251. return response.json(444, f"{frontend_field}类型不合法")
  252. # 4. 值变更检查(避免无意义更新)
  253. if getattr(scheme, model_field) != value:
  254. update_data[model_field] = value
  255. update_fields.append(model_field)
  256. # 无实际修改时快速返回
  257. if not update_fields:
  258. LOGGER.debug("无字段变更")
  259. return response.json(0)
  260. # ================= 数据持久化阶段 =================
  261. try:
  262. # 动态生成更新SQL:UPDATE ... SET field1=%s, field2=%s
  263. ProductsScheme.objects.filter(id=scheme_id).update(**update_data)
  264. LOGGER.info(f"方案更新成功: {scheme_id} 修改字段: {update_fields}")
  265. return response.json(0)
  266. except IntegrityError as e:
  267. LOGGER.error(f"数据唯一性冲突: {str(e)}")
  268. return response.json(177)
  269. except Exception as e:
  270. LOGGER.exception("方案更新异常")
  271. return response.json(500, "系统错误")
  272. def delete_scheme(self, user_id, request_dict, response):
  273. """删除方案(优化点:逻辑删除优化)"""
  274. # 参数校验
  275. if 'id' not in request_dict:
  276. return response.json(444, "缺少方案ID")
  277. try:
  278. # 使用update直接进行逻辑删除,提高效率
  279. rows = ProductsScheme.objects.filter(
  280. id=request_dict['id'],
  281. deleted=False
  282. ).update(
  283. deleted=True,
  284. updated_by=user_id,
  285. updated_time=int(time.time())
  286. )
  287. if rows == 0:
  288. return response.json(173, "方案不存在")
  289. return response.json(0)
  290. except ValueError:
  291. return response.json(500, "方案ID格式错误")
  292. def _scheme_to_dict(self, scheme):
  293. """方案对象序列化(优化点:集中管理序列化逻辑)"""
  294. return {
  295. 'id': scheme.id,
  296. 'orderNumber': scheme.order_number,
  297. 'storageCode': scheme.storage_code,
  298. 'deviceType': scheme.device_type,
  299. 'flash': scheme.flash,
  300. 'ddr': scheme.ddr,
  301. 'mainController': scheme.main_controller,
  302. 'wifi': scheme.wifi,
  303. 'fourG': scheme.four_g,
  304. 'ad': scheme.ad,
  305. 'sensor': scheme.sensor,
  306. 'orderQuantity': scheme.order_quantity,
  307. 'customerCode': scheme.customer_code,
  308. 'phy': scheme.phy,
  309. 'remark': scheme.remark,
  310. 'createdTime': scheme.created_time,
  311. 'createdBy': scheme.created_by
  312. }
  313. def generate_qr_code(self, user_id, request_dict, response):
  314. """生成方案二维码"""
  315. try:
  316. scheme_id = int(request_dict.get('scheme_id'))
  317. scheme = ProductsScheme.objects.get(id=scheme_id, deleted=False)
  318. # 创建二维码(示例生成包含方案ID+名称)
  319. qr = qrcode.QRCode(
  320. version=1,
  321. error_correction=qrcode.constants.ERROR_CORRECT_L,
  322. box_size=10,
  323. border=4,
  324. )
  325. # 组织二维码内容(根据业务需求自定义)
  326. qr_data = json.dumps({
  327. "sc": scheme.storage_code,
  328. "t": 'NVR' if scheme.device_type == 1 else 'IPC',
  329. "f": scheme.flash,
  330. "ddr": scheme.ddr,
  331. "mc": scheme.main_controller,
  332. "wifi": scheme.wifi,
  333. "mode4G": scheme.four_g,
  334. "ad": scheme.ad,
  335. "s": scheme.sensor,
  336. 'uc': scheme.customer_code,
  337. 'phy': scheme.phy,
  338. "num": scheme.order_quantity,
  339. "ct": scheme.created_time
  340. })
  341. qr.add_data(qr_data)
  342. qr.make(fit=True)
  343. # 生成图片
  344. img = qr.make_image(fill_color="black", back_color="white")
  345. # 将图片转为字节流
  346. buffer = BytesIO()
  347. img.save(buffer, format="PNG")
  348. # 返回文件响应
  349. return HttpResponse(
  350. buffer.getvalue(),
  351. content_type="image/png",
  352. headers={
  353. 'Content-Disposition': f'attachment; filename="scheme_{scheme.id}_qr.png"'
  354. }
  355. )
  356. except ProductsScheme.DoesNotExist:
  357. return response.json(173, "方案不存在")
  358. except Exception as e:
  359. LOGGER.error(f"生成二维码失败: {str(e)}")
  360. return response.json(500, "生成失败")
  361. @staticmethod
  362. def device_scheme_list(user_id, request_dict, response):
  363. """
  364. 查询设备方案列表
  365. @param request_dict: 请求参数
  366. @param response: 响应对象
  367. @return: 响应对象包含设备方案列表
  368. """
  369. # 提取请求参数并提供默认值
  370. filters = {
  371. 'serial_number': request_dict.get("serialNumber"),
  372. 'storage_code': request_dict.get("storageCode"),
  373. 'device_type': request_dict.get("deviceType"),
  374. 'phone_model': request_dict.get("phoneModel"),
  375. }
  376. page = int(request_dict.get("page", 1))
  377. page_size = int(request_dict.get("pageSize", 20))
  378. try:
  379. # 构建基础查询集
  380. queryset = DeviceScheme.objects.all().order_by('-created_time')
  381. # 应用过滤条件
  382. if filters['serial_number']:
  383. queryset = queryset.filter(serial_number=filters['serial_number'])
  384. if filters['storage_code']:
  385. queryset = queryset.filter(storage_code__icontains=filters['storage_code'])
  386. if filters['device_type']:
  387. queryset = queryset.filter(device_type=filters['device_type'])
  388. if filters['phone_model']:
  389. queryset = queryset.filter(phone_model__icontains=filters['phone_model'])
  390. # 分页处理
  391. paginator = Paginator(queryset, page_size)
  392. try:
  393. current_page = paginator.page(page)
  394. except EmptyPage:
  395. current_page = paginator.page(paginator.num_pages)
  396. # 批量预取关联数据
  397. device_list = []
  398. if current_page.object_list:
  399. # 准备批量查询所需数据
  400. storage_codes = {device.storage_code for device in current_page}
  401. serial_prefixes = {device.serial_number[:6] for device in current_page}
  402. device_types = {device.device_type for device in current_page}
  403. # 批量获取关联数据
  404. product_schemes_map = {
  405. ps.storage_code: ps
  406. for ps in ProductsScheme.objects.filter(storage_code__in=storage_codes)
  407. }
  408. device_type_names = {
  409. dt['type']: dt['name']
  410. for dt in DeviceTypeModel.objects.filter(type__in=device_types).values('type', 'name')
  411. }
  412. serial_statuses = {
  413. cs.serial_number: cs
  414. for cs in CompanySerialModel.objects.filter(serial_number__in=serial_prefixes)
  415. }
  416. # 构建设备数据
  417. device_list = [
  418. ProductsSchemeManageView.build_device_data(
  419. device,
  420. product_schemes_map.get(device.storage_code),
  421. device_type_names.get(device.device_type, device.device_type),
  422. serial_statuses.get(device.serial_number[:6])
  423. )
  424. for device in current_page
  425. ]
  426. return response.json(0, {
  427. 'list': device_list,
  428. 'total': paginator.count
  429. })
  430. except Exception as e:
  431. LOGGER.error("设备方案列表查询异常")
  432. return response.json(500, f'服务器错误: {str(e)}')
  433. @staticmethod
  434. def build_device_data(device, product_scheme, device_type_name, serial_status):
  435. """构建单个设备数据字典"""
  436. # 设备基础数据
  437. device_data = {
  438. 'id': device.id,
  439. 'serialNumber': device.serial_number,
  440. 'deviceType': device_type_name,
  441. 'phoneModel': device.phone_model,
  442. 'storageCode': device.storage_code,
  443. 'createdTime': device.created_time,
  444. 'updatedTime': device.updated_time,
  445. 'snStatus': 'N/A',
  446. 'aTime': 'N/A',
  447. }
  448. # 序列号激活信息
  449. if device.device_type != 0 and serial_status:
  450. device_data['snStatus'] = '已激活' if serial_status.status > 1 else '未激活'
  451. device_data['aTime'] = datetime.datetime.fromtimestamp(
  452. serial_status.update_time
  453. ).strftime('%Y-%m-%d %H:%M:%S')
  454. # 添加产品方案信息
  455. if product_scheme:
  456. # 构建方案数据字符串
  457. scheme_parts = [
  458. product_scheme.flash,
  459. product_scheme.ddr,
  460. product_scheme.main_controller,
  461. product_scheme.wifi,
  462. product_scheme.four_g,
  463. product_scheme.ad,
  464. product_scheme.phy,
  465. product_scheme.sensor
  466. ]
  467. scheme_data = '+'.join(filter(None, map(str, scheme_parts)))
  468. device_data.update({
  469. 'orderNumber': product_scheme.order_number or '',
  470. 'type': 'NVR' if product_scheme.device_type == 1 else 'IPC',
  471. 'flash': product_scheme.flash or '',
  472. 'ddr': product_scheme.ddr or '',
  473. 'mainController': product_scheme.main_controller or '',
  474. 'wifi': product_scheme.wifi or '',
  475. 'fourG': product_scheme.four_g or '',
  476. 'ad': product_scheme.ad or '',
  477. 'phy': product_scheme.phy or '',
  478. 'sensor': product_scheme.sensor or '',
  479. 'orderQuantity': product_scheme.order_quantity or '',
  480. 'customerCode': product_scheme.customer_code or '',
  481. 'schemeData': scheme_data,
  482. 'remark': product_scheme.remark or ''
  483. })
  484. return device_data