import json import time from django.core.paginator import Paginator from django.db import transaction from django.views import View from Ansjer.test.util.email_log import response from Model.models import DeviceScheme, ProductTroubleshoot, AbnormalEventCode, ProductsScheme, AbnormalEvent from Object.ResponseObject import ResponseObject from Service.CommonService import CommonService class ProblemEntryView(View): def get(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') request_dict = request.GET return self.validation(request, request_dict, operation) def post(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') request_dict = request.POST return self.validation(request, request_dict, operation) def validation(self, request, request_dict, operation): language = request_dict.get('language', 'en') response = ResponseObject(language, 'pc') if operation == 'addProductProblem': return self.add_product_problem(request_dict, response) elif operation == 'queryEventSubcategory': return self.query_event_subcategory(request_dict, response) elif operation == 'queryDeviceScheme': return self.query_device_scheme(request_dict, response) elif operation == 'ProductProblemList': return self.product_problem_list(request_dict, response) elif operation == 'editProductProblemStatus': return self.edit_product_problem_status(request_dict, response) else: return response.json(414) @staticmethod def add_product_problem(request_dict, response): try: # 解析请求数据 date_times = json.loads(request_dict.get('dateTimes', '[]')) deivce_ids = json.loads(request_dict.get('deviceIds', '[]')) device_types = json.loads(request_dict.get('deviceTypes', '[]')) storage_codes = json.loads(request_dict.get('storageCodes', '[]')) event_codes = json.loads(request_dict.get('eventCodes', '[]')) remarks = json.loads(request_dict.get('remarks', '[]')) # 基础数据校验 list_lengths = { "dateTimes": len(date_times), "serialNumbers": len(deivce_ids), "deviceTypes": len(device_types), "storageCodes": len(storage_codes), "eventCodes": len(event_codes), "remarks": len(remarks) } # 检查各字段数据长度一致性 if len(set(list_lengths.values())) != 1: return response.json(400, f'参数长度不一致: {", ".join([f"{k}({v})" for k, v in list_lengths.items()])}') current_time = int(time.time()) products = [] # 数据预处理 for idx in range(len(date_times)): # 字段值提取 date_time = date_times[idx] deivce_id = deivce_ids[idx].strip() # 去除前后空格 device_type = device_types[idx] storage_code = storage_codes[idx] event_code = event_codes[idx].strip() remark = remarks[idx].strip() # 构建对象 products.append( ProductTroubleshoot( date_time=date_time, device_id=deivce_id, device_type=device_type, event_code=event_code, storage_code=storage_code, status=0, remark=remark, created_time=current_time, updated_time=current_time ) ) # 批量写入数据库 with transaction.atomic(): ProductTroubleshoot.objects.bulk_create(products) return response.json(0) except Exception as e: print(e) return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) @staticmethod def query_event_subcategory(request_dict, response): event_type_code = request_dict.get('eventTypeCode', None) abnormal_event_code_qs = AbnormalEventCode.objects.all() if event_type_code: abnormal_event_code_qs = abnormal_event_code_qs.filter(event_code__istartswith=event_type_code) data = [] for abnormal_event_code in abnormal_event_code_qs: # 第0个是大类 第一个是细分 event_list = abnormal_event_code.event.split("-") data.append( { "eventTypeCode": abnormal_event_code.event_code[0:2], "eventType": abnormal_event_code.event_type, "eventSegCode": abnormal_event_code.event_code[2:4], "eventSeg": event_list[1] if len(event_list) > 1 else "", "eventDescriptionCode": abnormal_event_code.event_code[4:], "eventDescription": event_list[2] if len(event_list) > 2 else "", } ) return response.json(0, data) @staticmethod def query_device_scheme(request_dict, response): device_id = request_dict.get('deviceId', None) if not device_id: return response.json(0, msg="设备ID不能为空") try: # 获取设备信息 device_scheme = DeviceScheme.objects.filter(serial_number=device_id).values("device_type", "storage_code").first() if not device_scheme: return response.json(0) # 获取产品方案信息 product_scheme = ProductsScheme.objects.filter( storage_code=device_scheme["storage_code"] ).values( "order_quantity", "created_time", "storage_code", "flash", "ddr", "main_controller", "wifi", "four_g", "ad", "sensor", "phy" ).first() if not product_scheme: return response.json(0, {"deviceType": device_scheme["device_type"]}) # 拼接方案字符串,过滤空值 scheme_parts = [ product_scheme["flash"], product_scheme["ddr"], product_scheme["main_controller"], product_scheme["wifi"], product_scheme["four_g"], product_scheme["ad"], product_scheme["sensor"], product_scheme["phy"] ] scheme = " + ".join(part for part in scheme_parts if part) return response.json(0, { "deviceType": device_scheme["device_type"], "quantity": product_scheme["order_quantity"], "createdTime": product_scheme["created_time"], "storageCode": product_scheme["storage_code"], "scheme": scheme, }) except Exception as e: print(e) return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) @staticmethod def product_problem_list(request_dict, response): device_id = request_dict.get("deviceId", None) storage_code = request_dict.get("storageCode", None) event_code = request_dict.get("eventCode", None) status = request_dict.get("status", None) page = request_dict.get("page", 1) pageSize = request_dict.get("pageSize", 10) # 1. 先查询ProductTroubleshoot表 product_troubleshoot_qs = ProductTroubleshoot.objects.all() if device_id: product_troubleshoot_qs = product_troubleshoot_qs.filter(device_id=device_id) if storage_code: product_troubleshoot_qs = product_troubleshoot_qs.filter(storage_code__contains=storage_code) if event_code: product_troubleshoot_qs = product_troubleshoot_qs.filter(event_code__contains=event_code) if status: product_troubleshoot_qs = product_troubleshoot_qs.filter(status=status) paginator = Paginator(product_troubleshoot_qs.order_by('status', '-id'), pageSize) product_trouble_page = paginator.page(page) # 获取当前页的数据列表 product_trouble_list = list(product_trouble_page.object_list.values( 'id', 'date_time', 'device_type', 'device_id', 'storage_code', 'event_code', 'status', 'remark', )) # 2. 收集所有storage_code用于批量查询ProductsScheme表 storage_codes = [item['storage_code'] for item in product_trouble_list if item['storage_code']] product_schemes = ProductsScheme.objects.filter(storage_code__in=storage_codes) # 3. 创建映射字典并拼接字段 scheme_info_dict = {} for scheme in product_schemes: # 拼接需要的字段,过滤掉空值 fields_to_join = [ scheme.flash, scheme.ddr, scheme.main_controller, scheme.wifi, scheme.four_g, scheme.ad, scheme.sensor, scheme.customer_code, scheme.phy, ] # 过滤掉None和空字符串,然后用逗号连接 joined_info = '+'.join(filter(None, fields_to_join)) scheme_info_dict[scheme.storage_code] = joined_info # 4. 将拼接后的信息添加到原数据中 for item in product_trouble_list: storage_code = item['storage_code'] item['product_scheme_info'] = scheme_info_dict.get(storage_code, '') data = { "list": product_trouble_list, "total": paginator.count, } return response.json(0, data) @staticmethod def edit_product_problem_status(request_dict, response): try: record_id = request_dict.get("id", None) new_status = request_dict.get("status", None) if not all([record_id, new_status]): return response.json(444) new_status = int(new_status) try: troubleshoot_record = ProductTroubleshoot.objects.get(id=record_id) except ProductTroubleshoot.DoesNotExist: return response.json(173) # 更新状态 troubleshoot_record.status = new_status updated_time = int(time.time()) troubleshoot_record.save() # 如果是审批通过状态(1),创建异常事件 if new_status == 1: device_id = troubleshoot_record.device_id device_type = troubleshoot_record.device_type # 获取UID uid = device_id if device_type == 2 else CommonService.get_uid_by_serial_number(device_id) # 获取设备型号 device_model = 0 device_scheme = DeviceScheme.objects.filter(serial_number=device_id).first() if device_scheme: device_model = device_scheme.device_type # 创建异常事件 AbnormalEvent.objects.create( uid=uid, event_code=troubleshoot_record.event_code, device_type=device_model, content=troubleshoot_record.remark, report_type=1, created_time=int(time.time()), event_time=int(time.time()) ) return response.json(0) # 成功 except Exception as e: print(e) return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))