import json import time from django.db import transaction from django.views import View from Ansjer.test.util.email_log import response from Model.models import Device_Info, DeviceScheme, ProductTroubleshoot, AbnormalEventCode from Object.ResponseObject import ResponseObject class ProblemEntryView(View): def get(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') request_dict = request.GET return self.validation(request, request_dict, operation) def post(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') request_dict = request.POST return self.validation(request, request_dict, operation) def validation(self, request, request_dict, operation): language = request_dict.get('language', 'en') response = ResponseObject(language, 'pc') if operation == 'addProductProblem': return self.add_product_problem(request_dict, response) elif operation == 'queryEventSubcategory': return self.query_event_subcategory(request_dict, response) @staticmethod def add_product_problem(request_dict, response): try: # 解析请求数据 date_times = json.loads(request_dict.get('dateTimes', '[]')) deivce_ids = json.loads(request_dict.get('deviceIds', '[]')) device_types = json.loads(request_dict.get('deviceTypes', '[]')) event_codes = json.loads(request_dict.get('eventCodes', '[]')) remarks = json.loads(request_dict.get('remarks', '[]')) # 基础数据校验 list_lengths = { "dateTimes": len(date_times), "serialNumbers": len(deivce_ids), "deviceTypes": len(device_types), "eventCodes": len(event_codes), "remarks": len(remarks) } # 检查各字段数据长度一致性 if len(set(list_lengths.values())) != 1: return response.json(400, f'参数长度不一致: {", ".join([f"{k}({v})" for k, v in list_lengths.items()])}') current_time = int(time.time()) products = [] # 数据预处理 for idx in range(len(date_times)): # 字段值提取 date_time = date_times[idx] deivce_id = deivce_ids[idx].strip() # 去除前后空格 device_type = device_types[idx] event_code = event_codes[idx].strip() remark = remarks[idx].strip() # 构建对象 products.append( ProductTroubleshoot( date_time=date_time, device_id=deivce_id, device_type=device_type, event_code=event_code, status=0, remark=remark, created_time=current_time, updated_time=current_time ) ) # 批量写入数据库 with transaction.atomic(): ProductTroubleshoot.objects.bulk_create(products) return response.json(0) except Exception as e: print(e) return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) @staticmethod def query_event_subcategory(request_dict, response): event_type_code = request_dict.get('eventTypeCode', None) abnormal_event_code_qs = AbnormalEventCode.objects.all() if event_type_code: abnormal_event_code_qs = abnormal_event_code_qs.filter(event_code__istartswith=event_type_code) data = [] for abnormal_event_code in abnormal_event_code_qs: # 第0个是大类 第一个是细分 event_list = abnormal_event_code.event.split("-") data.append( { "eventTypeCode": abnormal_event_code.event_code[0:2], "eventType": abnormal_event_code.event_type, "eventSegCode": abnormal_event_code.event_code[2:4], "eventSeg": event_list[1] if len(event_list) > 1 else "", "eventDescriptionCode": abnormal_event_code.event_code[4:], "eventDescription": event_list[2] if len(event_list) > 2 else "", } ) return response.json(0, data)