import json import time from django.core.paginator import Paginator from django.db import transaction from django.views import View from Model.models import DeviceScheme, ProductTroubleshoot, AbnormalEventCode, ProductsScheme, AbnormalEvent from Object.ResponseObject import ResponseObject from Service.CommonService import CommonService class ProblemEntryView(View): def get(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') request_dict = request.GET return self.validation(request, request_dict, operation) def post(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') request_dict = request.POST return self.validation(request, request_dict, operation) def validation(self, request, request_dict, operation): language = request_dict.get('language', 'en') response = ResponseObject(language, 'pc') if operation == 'addProductProblem': return self.add_product_problem(request_dict, response) elif operation == 'queryEventSubcategory': return self.query_event_subcategory(request_dict, response) elif operation == 'queryDeviceScheme': return self.query_device_scheme(request_dict, response) elif operation == 'ProductProblemList': return self.product_problem_list(request_dict, response) elif operation == 'editProductProblemStatus': return self.edit_product_problem_status(request_dict, response) else: return response.json(414) @staticmethod def add_product_problem(request_dict, response): try: # 解析请求数据 date_times = json.loads(request_dict.get('dateTimes', '[]')) deivce_ids = json.loads(request_dict.get('deviceIds', '[]')) device_types = json.loads(request_dict.get('deviceTypes', '[]')) storage_codes = json.loads(request_dict.get('storageCodes', '[]')) event_codes = json.loads(request_dict.get('eventCodes', '[]')) remarks = json.loads(request_dict.get('remarks', '[]')) # 基础数据校验 list_lengths = { "dateTimes": len(date_times), "serialNumbers": len(deivce_ids), "deviceTypes": len(device_types), "storageCodes": len(storage_codes), "eventCodes": len(event_codes), "remarks": len(remarks) } # 检查各字段数据长度一致性 if len(set(list_lengths.values())) != 1: return response.json(400, f'参数长度不一致: {", ".join([f"{k}({v})" for k, v in list_lengths.items()])}') current_time = int(time.time()) products = [] # 数据预处理 for idx in range(len(date_times)): # 字段值提取 date_time = date_times[idx] deivce_id = deivce_ids[idx].strip() # 去除前后空格 device_type = device_types[idx] storage_code = storage_codes[idx] event_code = event_codes[idx].strip() remark = remarks[idx].strip() # 构建对象 products.append( ProductTroubleshoot( date_time=date_time, device_id=deivce_id, device_type=device_type, event_code=event_code, storage_code=storage_code, status=0, remark=remark, created_time=current_time, updated_time=current_time ) ) # 批量写入数据库 with transaction.atomic(): ProductTroubleshoot.objects.bulk_create(products) return response.json(0) except Exception as e: print(e) return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) @staticmethod def query_event_subcategory(request_dict, response): event_type_code = request_dict.get('eventTypeCode', None) abnormal_event_code_qs = AbnormalEventCode.objects.all() if event_type_code: abnormal_event_code_qs = abnormal_event_code_qs.filter(event_code__istartswith=event_type_code) data = [] for abnormal_event_code in abnormal_event_code_qs: # 第0个是大类 第一个是细分 event_list = abnormal_event_code.event.split("-") data.append( { "eventTypeCode": abnormal_event_code.event_code[0:2], "eventType": abnormal_event_code.event_type, "eventSegCode": abnormal_event_code.event_code[2:4], "eventSeg": event_list[1] if len(event_list) > 1 else "", "eventDescriptionCode": abnormal_event_code.event_code[4:], "eventDescription": event_list[2] if len(event_list) > 2 else "", } ) return response.json(0, data) @staticmethod def query_device_scheme(request_dict, response): device_id = request_dict.get('deviceId', None) if not device_id: return response.json(0, msg="设备ID不能为空") try: # 获取设备信息 device_scheme = DeviceScheme.objects.filter(serial_number=device_id).values("device_type", "storage_code").first() if not device_scheme: return response.json(0) # 获取产品方案信息 product_scheme = ProductsScheme.objects.filter( storage_code=device_scheme["storage_code"] ).values( "order_quantity", "created_time", "storage_code", "flash", "ddr", "main_controller", "wifi", "four_g", "ad", "sensor", "phy" ).first() if not product_scheme: return response.json(0, {"deviceType": device_scheme["device_type"]}) # 拼接方案字符串,过滤空值 scheme_parts = [ product_scheme["flash"], product_scheme["ddr"], product_scheme["main_controller"], product_scheme["wifi"], product_scheme["four_g"], product_scheme["ad"], product_scheme["sensor"], product_scheme["phy"] ] scheme = " + ".join(part for part in scheme_parts if part) return response.json(0, { "deviceType": device_scheme["device_type"], "quantity": product_scheme["order_quantity"], "createdTime": product_scheme["created_time"], "storageCode": product_scheme["storage_code"], "scheme": scheme, }) except Exception as e: print(e) return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) @staticmethod def product_problem_list(request_dict, response): device_id = request_dict.get("deviceId", None) storage_code = request_dict.get("storageCode", None) event_code = request_dict.get("eventCode", None) status = request_dict.get("status", None) page = request_dict.get("page", 1) pageSize = request_dict.get("pageSize", 10) # 1. 先查询ProductTroubleshoot表 product_troubleshoot_qs = ProductTroubleshoot.objects.all() if device_id: product_troubleshoot_qs = product_troubleshoot_qs.filter(device_id=device_id) if storage_code: product_troubleshoot_qs = product_troubleshoot_qs.filter(storage_code__contains=storage_code) if event_code: product_troubleshoot_qs = product_troubleshoot_qs.filter(event_code__contains=event_code) if status: product_troubleshoot_qs = product_troubleshoot_qs.filter(status=status) paginator = Paginator(product_troubleshoot_qs.order_by('status', '-id'), pageSize) product_trouble_page = paginator.page(page) # 获取当前页的数据列表 product_trouble_list = list(product_trouble_page.object_list.values( 'id', 'date_time', 'device_type', 'device_id', 'storage_code', 'event_code', 'status', 'remark', )) # 2. 收集所有storage_code用于批量查询ProductsScheme表 storage_codes = [item['storage_code'] for item in product_trouble_list if item['storage_code']] product_schemes = ProductsScheme.objects.filter(storage_code__in=storage_codes) # 3. 创建映射字典并拼接字段 scheme_info_dict = {} for scheme in product_schemes: # 拼接需要的字段,过滤掉空值 fields_to_join = [ scheme.flash, scheme.ddr, scheme.main_controller, scheme.wifi, scheme.four_g, scheme.ad, scheme.sensor, scheme.customer_code, scheme.phy, ] # 过滤掉None和空字符串,然后用逗号连接 joined_info = '+'.join(filter(None, fields_to_join)) scheme_info_dict[scheme.storage_code] = joined_info # 4. 将拼接后的信息添加到原数据中 for item in product_trouble_list: storage_code = item['storage_code'] item['product_scheme_info'] = scheme_info_dict.get(storage_code, '') data = { "list": product_trouble_list, "total": paginator.count, } return response.json(0, data) @staticmethod def edit_product_problem_status(request_dict, response): try: record_id = request_dict.get("id", None) new_status = request_dict.get("status", None) if not all([record_id, new_status]): return response.json(444) new_status = int(new_status) try: troubleshoot_record = ProductTroubleshoot.objects.get(id=record_id) except ProductTroubleshoot.DoesNotExist: return response.json(173) # 更新状态 troubleshoot_record.status = new_status updated_time = int(time.time()) troubleshoot_record.save() # 如果是审批通过状态(1),创建异常事件 if new_status == 1: device_id = troubleshoot_record.device_id device_type = troubleshoot_record.device_type # 获取UID uid = device_id if device_type == 2 else CommonService.get_uid_by_serial_number(device_id) # 获取设备型号 device_model = 0 device_scheme = DeviceScheme.objects.filter(serial_number=device_id).first() if device_scheme: device_model = device_scheme.device_type # 创建异常事件 AbnormalEvent.objects.create( uid=uid, event_code=troubleshoot_record.event_code, device_type=device_model, content=troubleshoot_record.remark, report_type=1, created_time=int(time.time()), event_time=int(time.time()) ) return response.json(0) # 成功 except Exception as e: print(e) return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))