ProblemEntryManagementController.py 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172
  1. import json
  2. import time
  3. from django.db import transaction
  4. from django.views import View
  5. from Ansjer.test.util.email_log import response
  6. from Model.models import DeviceScheme, ProductTroubleshoot, AbnormalEventCode, ProductsScheme
  7. from Object.ResponseObject import ResponseObject
  8. class ProblemEntryView(View):
  9. def get(self, request, *args, **kwargs):
  10. request.encoding = 'utf-8'
  11. operation = kwargs.get('operation')
  12. request_dict = request.GET
  13. return self.validation(request, request_dict, operation)
  14. def post(self, request, *args, **kwargs):
  15. request.encoding = 'utf-8'
  16. operation = kwargs.get('operation')
  17. request_dict = request.POST
  18. return self.validation(request, request_dict, operation)
  19. def validation(self, request, request_dict, operation):
  20. language = request_dict.get('language', 'en')
  21. response = ResponseObject(language, 'pc')
  22. if operation == 'addProductProblem':
  23. return self.add_product_problem(request_dict, response)
  24. elif operation == 'queryEventSubcategory':
  25. return self.query_event_subcategory(request_dict, response)
  26. elif operation == 'queryDeviceScheme':
  27. return self.query_device_scheme(request_dict, response)
  28. @staticmethod
  29. def add_product_problem(request_dict, response):
  30. try:
  31. # 解析请求数据
  32. date_times = json.loads(request_dict.get('dateTimes', '[]'))
  33. deivce_ids = json.loads(request_dict.get('deviceIds', '[]'))
  34. device_types = json.loads(request_dict.get('deviceTypes', '[]'))
  35. storage_codes = json.loads(request_dict.get('storageCodes', '[]'))
  36. event_codes = json.loads(request_dict.get('eventCodes', '[]'))
  37. remarks = json.loads(request_dict.get('remarks', '[]'))
  38. # 基础数据校验
  39. list_lengths = {
  40. "dateTimes": len(date_times),
  41. "serialNumbers": len(deivce_ids),
  42. "deviceTypes": len(device_types),
  43. "storageCodes": len(storage_codes),
  44. "eventCodes": len(event_codes),
  45. "remarks": len(remarks)
  46. }
  47. # 检查各字段数据长度一致性
  48. if len(set(list_lengths.values())) != 1:
  49. return response.json(400, f'参数长度不一致: {", ".join([f"{k}({v})" for k, v in list_lengths.items()])}')
  50. current_time = int(time.time())
  51. products = []
  52. # 数据预处理
  53. for idx in range(len(date_times)):
  54. # 字段值提取
  55. date_time = date_times[idx]
  56. deivce_id = deivce_ids[idx].strip() # 去除前后空格
  57. device_type = device_types[idx]
  58. storage_code = storage_codes[idx]
  59. event_code = event_codes[idx].strip()
  60. remark = remarks[idx].strip()
  61. # 构建对象
  62. products.append(
  63. ProductTroubleshoot(
  64. date_time=date_time,
  65. device_id=deivce_id,
  66. device_type=device_type,
  67. event_code=event_code,
  68. storage_code=storage_code,
  69. status=0,
  70. remark=remark,
  71. created_time=current_time,
  72. updated_time=current_time
  73. )
  74. )
  75. # 批量写入数据库
  76. with transaction.atomic():
  77. ProductTroubleshoot.objects.bulk_create(products)
  78. return response.json(0)
  79. except Exception as e:
  80. print(e)
  81. return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
  82. @staticmethod
  83. def query_event_subcategory(request_dict, response):
  84. event_type_code = request_dict.get('eventTypeCode', None)
  85. abnormal_event_code_qs = AbnormalEventCode.objects.all()
  86. if event_type_code:
  87. abnormal_event_code_qs = abnormal_event_code_qs.filter(event_code__istartswith=event_type_code)
  88. data = []
  89. for abnormal_event_code in abnormal_event_code_qs:
  90. # 第0个是大类 第一个是细分
  91. event_list = abnormal_event_code.event.split("-")
  92. data.append(
  93. {
  94. "eventTypeCode": abnormal_event_code.event_code[0:2],
  95. "eventType": abnormal_event_code.event_type,
  96. "eventSegCode": abnormal_event_code.event_code[2:4],
  97. "eventSeg": event_list[1] if len(event_list) > 1 else "",
  98. "eventDescriptionCode": abnormal_event_code.event_code[4:],
  99. "eventDescription": event_list[2] if len(event_list) > 2 else "",
  100. }
  101. )
  102. return response.json(0, data)
  103. @staticmethod
  104. def query_device_scheme(request_dict, response):
  105. device_id = request_dict.get('deviceId', None)
  106. if not device_id:
  107. return response.json(0, msg="设备ID不能为空")
  108. try:
  109. # 获取设备信息
  110. device_scheme = DeviceScheme.objects.filter(serial_number=device_id).values("device_type",
  111. "storage_code").first()
  112. if not device_scheme:
  113. return response.json(0)
  114. # 获取产品方案信息
  115. product_scheme = ProductsScheme.objects.filter(
  116. storage_code=device_scheme["storage_code"]
  117. ).values(
  118. "order_quantity", "created_time", "storage_code", "flash",
  119. "ddr", "main_controller", "wifi", "four_g", "ad", "sensor", "phy"
  120. ).first()
  121. if not product_scheme:
  122. return response.json(0, {"deviceType": device_scheme["device_type"]})
  123. # 拼接方案字符串,过滤空值
  124. scheme_parts = [
  125. product_scheme["flash"],
  126. product_scheme["ddr"],
  127. product_scheme["main_controller"],
  128. product_scheme["wifi"],
  129. product_scheme["four_g"],
  130. product_scheme["ad"],
  131. product_scheme["sensor"],
  132. product_scheme["phy"]
  133. ]
  134. scheme = " + ".join(part for part in scheme_parts if part)
  135. return response.json(0, {
  136. "deviceType": device_scheme["device_type"],
  137. "quantity": product_scheme["order_quantity"],
  138. "createdTime": product_scheme["created_time"],
  139. "storageCode": product_scheme["storage_code"],
  140. "scheme": scheme,
  141. })
  142. except Exception as e:
  143. print(e)
  144. return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))