| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303 | import jsonimport timefrom django.core.paginator import Paginatorfrom django.db import transactionfrom django.views import Viewfrom Ansjer.test.util.email_log import responsefrom Model.models import DeviceScheme, ProductTroubleshoot, AbnormalEventCode, ProductsScheme, AbnormalEventfrom Object.ResponseObject import ResponseObjectfrom Service.CommonService import CommonServiceclass ProblemEntryView(View):    def get(self, request, *args, **kwargs):        request.encoding = 'utf-8'        operation = kwargs.get('operation')        request_dict = request.GET        return self.validation(request, request_dict, operation)    def post(self, request, *args, **kwargs):        request.encoding = 'utf-8'        operation = kwargs.get('operation')        request_dict = request.POST        return self.validation(request, request_dict, operation)    def validation(self, request, request_dict, operation):        language = request_dict.get('language', 'en')        response = ResponseObject(language, 'pc')        if operation == 'addProductProblem':            return self.add_product_problem(request_dict, response)        elif operation == 'queryEventSubcategory':            return self.query_event_subcategory(request_dict, response)        elif operation == 'queryDeviceScheme':            return self.query_device_scheme(request_dict, response)        elif operation == 'ProductProblemList':            return self.product_problem_list(request_dict, response)        elif operation == 'editProductProblemStatus':            return self.edit_product_problem_status(request_dict, response)        else:            return response.json(414)    @staticmethod    def add_product_problem(request_dict, response):        try:            # 解析请求数据            date_times = json.loads(request_dict.get('dateTimes', '[]'))            deivce_ids = json.loads(request_dict.get('deviceIds', '[]'))            device_types = json.loads(request_dict.get('deviceTypes', '[]'))            storage_codes = json.loads(request_dict.get('storageCodes', '[]'))            event_codes = json.loads(request_dict.get('eventCodes', '[]'))            remarks = json.loads(request_dict.get('remarks', '[]'))            # 基础数据校验            list_lengths = {                "dateTimes": len(date_times),                "serialNumbers": len(deivce_ids),                "deviceTypes": len(device_types),                "storageCodes": len(storage_codes),                "eventCodes": len(event_codes),                "remarks": len(remarks)            }            # 检查各字段数据长度一致性            if len(set(list_lengths.values())) != 1:                return response.json(400, f'参数长度不一致: {", ".join([f"{k}({v})" for k, v in list_lengths.items()])}')            current_time = int(time.time())            products = []            # 数据预处理            for idx in range(len(date_times)):                # 字段值提取                date_time = date_times[idx]                deivce_id = deivce_ids[idx].strip()  # 去除前后空格                device_type = device_types[idx]                storage_code = storage_codes[idx]                event_code = event_codes[idx].strip()                remark = remarks[idx].strip()                # 构建对象                products.append(                    ProductTroubleshoot(                        date_time=date_time,                        device_id=deivce_id,                        device_type=device_type,                        event_code=event_code,                        storage_code=storage_code,                        status=0,                        remark=remark,                        created_time=current_time,                        updated_time=current_time                    )                )            # 批量写入数据库            with transaction.atomic():                ProductTroubleshoot.objects.bulk_create(products)            return response.json(0)        except Exception as e:            print(e)            return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))    @staticmethod    def query_event_subcategory(request_dict, response):        event_type_code = request_dict.get('eventTypeCode', None)        abnormal_event_code_qs = AbnormalEventCode.objects.all()        if event_type_code:            abnormal_event_code_qs = abnormal_event_code_qs.filter(event_code__istartswith=event_type_code)        data = []        for abnormal_event_code in abnormal_event_code_qs:            # 第0个是大类 第一个是细分            event_list = abnormal_event_code.event.split("-")            data.append(                {                    "eventTypeCode": abnormal_event_code.event_code[0:2],                    "eventType": abnormal_event_code.event_type,                    "eventSegCode": abnormal_event_code.event_code[2:4],                    "eventSeg": event_list[1] if len(event_list) > 1 else "",                    "eventDescriptionCode": abnormal_event_code.event_code[4:],                    "eventDescription": event_list[2] if len(event_list) > 2 else "",                }            )        return response.json(0, data)    @staticmethod    def query_device_scheme(request_dict, response):        device_id = request_dict.get('deviceId', None)        if not device_id:            return response.json(0, msg="设备ID不能为空")        try:            # 获取设备信息            device_scheme = DeviceScheme.objects.filter(serial_number=device_id).values("device_type",                                                                                        "storage_code").first()            if not device_scheme:                return response.json(0)            # 获取产品方案信息            product_scheme = ProductsScheme.objects.filter(                storage_code=device_scheme["storage_code"]            ).values(                "order_quantity", "created_time", "storage_code", "flash",                "ddr", "main_controller", "wifi", "four_g", "ad", "sensor", "phy"            ).first()            if not product_scheme:                return response.json(0, {"deviceType": device_scheme["device_type"]})            # 拼接方案字符串,过滤空值            scheme_parts = [                product_scheme["flash"],                product_scheme["ddr"],                product_scheme["main_controller"],                product_scheme["wifi"],                product_scheme["four_g"],                product_scheme["ad"],                product_scheme["sensor"],                product_scheme["phy"]            ]            scheme = " + ".join(part for part in scheme_parts if part)            return response.json(0, {                "deviceType": device_scheme["device_type"],                "quantity": product_scheme["order_quantity"],                "createdTime": product_scheme["created_time"],                "storageCode": product_scheme["storage_code"],                "scheme": scheme,            })        except Exception as e:            print(e)            return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))    @staticmethod    def product_problem_list(request_dict, response):        device_id = request_dict.get("deviceId", None)        storage_code = request_dict.get("storageCode", None)        event_code = request_dict.get("eventCode", None)        status = request_dict.get("status", None)        page = request_dict.get("page", 1)        pageSize = request_dict.get("pageSize", 10)        # 1. 先查询ProductTroubleshoot表        product_troubleshoot_qs = ProductTroubleshoot.objects.all()        if device_id:            product_troubleshoot_qs = product_troubleshoot_qs.filter(device_id=device_id)        if storage_code:            product_troubleshoot_qs = product_troubleshoot_qs.filter(storage_code__contains=storage_code)        if event_code:            product_troubleshoot_qs = product_troubleshoot_qs.filter(event_code__contains=event_code)        if status:            product_troubleshoot_qs = product_troubleshoot_qs.filter(status=status)        paginator = Paginator(product_troubleshoot_qs.order_by('status', '-id'), pageSize)        product_trouble_page = paginator.page(page)        # 获取当前页的数据列表        product_trouble_list = list(product_trouble_page.object_list.values(            'id',            'date_time',            'device_type',            'device_id',            'storage_code',            'event_code',            'status',            'remark',        ))        # 2. 收集所有storage_code用于批量查询ProductsScheme表        storage_codes = [item['storage_code'] for item in product_trouble_list if item['storage_code']]        product_schemes = ProductsScheme.objects.filter(storage_code__in=storage_codes)        # 3. 创建映射字典并拼接字段        scheme_info_dict = {}        for scheme in product_schemes:            # 拼接需要的字段,过滤掉空值            fields_to_join = [                scheme.flash,                scheme.ddr,                scheme.main_controller,                scheme.wifi,                scheme.four_g,                scheme.ad,                scheme.sensor,                scheme.customer_code,                scheme.phy,            ]            # 过滤掉None和空字符串,然后用逗号连接            joined_info = '+'.join(filter(None, fields_to_join))            scheme_info_dict[scheme.storage_code] = joined_info        # 4. 将拼接后的信息添加到原数据中        for item in product_trouble_list:            storage_code = item['storage_code']            item['product_scheme_info'] = scheme_info_dict.get(storage_code, '')        data = {            "list": product_trouble_list,            "total": paginator.count,        }        return response.json(0, data)    @staticmethod    def edit_product_problem_status(request_dict, response):        try:            record_id = request_dict.get("id", None)            new_status = request_dict.get("status", None)            if not all([record_id, new_status]):                return response.json(444)            new_status = int(new_status)            try:                troubleshoot_record = ProductTroubleshoot.objects.get(id=record_id)            except ProductTroubleshoot.DoesNotExist:                return response.json(173)            # 更新状态            troubleshoot_record.status = new_status            updated_time = int(time.time())            troubleshoot_record.save()            # 如果是审批通过状态(1),创建异常事件            if new_status == 1:                device_id = troubleshoot_record.device_id                device_type = troubleshoot_record.device_type                # 获取UID                uid = device_id if device_type == 2 else CommonService.get_uid_by_serial_number(device_id)                # 获取设备型号                device_model = 0                device_scheme = DeviceScheme.objects.filter(serial_number=device_id).first()                if device_scheme:                    device_model = device_scheme.device_type                # 创建异常事件                AbnormalEvent.objects.create(                    uid=uid,                    event_code=troubleshoot_record.event_code,                    device_type=device_model,                    content=troubleshoot_record.remark,                    report_type=1,                    created_time=int(time.time()),                    event_time=int(time.time())                )            return response.json(0)  # 成功        except Exception as e:            print(e)            return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
 |