123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116 |
- import json
- import time
- from django.db import transaction
- from django.views import View
- from Ansjer.test.util.email_log import response
- from Model.models import Device_Info, DeviceScheme, ProductTroubleshoot, AbnormalEventCode
- from Object.ResponseObject import ResponseObject
- class ProblemEntryView(View):
- def get(self, request, *args, **kwargs):
- request.encoding = 'utf-8'
- operation = kwargs.get('operation')
- request_dict = request.GET
- return self.validation(request, request_dict, operation)
- def post(self, request, *args, **kwargs):
- request.encoding = 'utf-8'
- operation = kwargs.get('operation')
- request_dict = request.POST
- return self.validation(request, request_dict, operation)
- def validation(self, request, request_dict, operation):
- language = request_dict.get('language', 'en')
- response = ResponseObject(language, 'pc')
- if operation == 'addProductProblem':
- return self.add_product_problem(request_dict, response)
- elif operation == 'queryEventSubcategory':
- return self.query_event_subcategory(request_dict, response)
- @staticmethod
- def add_product_problem(request_dict, response):
- try:
- # 解析请求数据
- date_times = json.loads(request_dict.get('dateTimes', '[]'))
- deivce_ids = json.loads(request_dict.get('deviceIds', '[]'))
- device_types = json.loads(request_dict.get('deviceTypes', '[]'))
- event_codes = json.loads(request_dict.get('eventCodes', '[]'))
- remarks = json.loads(request_dict.get('remarks', '[]'))
- # 基础数据校验
- list_lengths = {
- "dateTimes": len(date_times),
- "serialNumbers": len(deivce_ids),
- "deviceTypes": len(device_types),
- "eventCodes": len(event_codes),
- "remarks": len(remarks)
- }
- # 检查各字段数据长度一致性
- if len(set(list_lengths.values())) != 1:
- return response.json(400, f'参数长度不一致: {", ".join([f"{k}({v})" for k, v in list_lengths.items()])}')
- current_time = int(time.time())
- products = []
- # 数据预处理
- for idx in range(len(date_times)):
- # 字段值提取
- date_time = date_times[idx]
- deivce_id = deivce_ids[idx].strip() # 去除前后空格
- device_type = device_types[idx]
- event_code = event_codes[idx].strip()
- remark = remarks[idx].strip()
- # 构建对象
- products.append(
- ProductTroubleshoot(
- date_time=date_time,
- device_id=deivce_id,
- device_type=device_type,
- event_code=event_code,
- status=0,
- remark=remark,
- created_time=current_time,
- updated_time=current_time
- )
- )
- # 批量写入数据库
- with transaction.atomic():
- ProductTroubleshoot.objects.bulk_create(products)
- return response.json(0)
- except Exception as e:
- print(e)
- return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
- @staticmethod
- def query_event_subcategory(request_dict, response):
- event_type_code = request_dict.get('eventTypeCode', None)
- abnormal_event_code_qs = AbnormalEventCode.objects.all()
- if event_type_code:
- abnormal_event_code_qs = abnormal_event_code_qs.filter(event_code__istartswith=event_type_code)
- data = []
- for abnormal_event_code in abnormal_event_code_qs:
- # 第0个是大类 第一个是细分
- event_list = abnormal_event_code.event.split("-")
- data.append(
- {
- "eventTypeCode": abnormal_event_code.event_code[0:2],
- "eventType": abnormal_event_code.event_type,
- "eventSegCode": abnormal_event_code.event_code[2:4],
- "eventSeg": event_list[1] if len(event_list) > 1 else "",
- "eventDescriptionCode": abnormal_event_code.event_code[4:],
- "eventDescription": event_list[2] if len(event_list) > 2 else "",
- }
- )
- return response.json(0, data)
|