123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172 |
- import json
- import time
- from django.db import transaction
- from django.views import View
- from Ansjer.test.util.email_log import response
- from Model.models import DeviceScheme, ProductTroubleshoot, AbnormalEventCode, ProductsScheme
- from Object.ResponseObject import ResponseObject
- class ProblemEntryView(View):
- def get(self, request, *args, **kwargs):
- request.encoding = 'utf-8'
- operation = kwargs.get('operation')
- request_dict = request.GET
- return self.validation(request, request_dict, operation)
- def post(self, request, *args, **kwargs):
- request.encoding = 'utf-8'
- operation = kwargs.get('operation')
- request_dict = request.POST
- return self.validation(request, request_dict, operation)
- def validation(self, request, request_dict, operation):
- language = request_dict.get('language', 'en')
- response = ResponseObject(language, 'pc')
- if operation == 'addProductProblem':
- return self.add_product_problem(request_dict, response)
- elif operation == 'queryEventSubcategory':
- return self.query_event_subcategory(request_dict, response)
- elif operation == 'queryDeviceScheme':
- return self.query_device_scheme(request_dict, response)
- @staticmethod
- def add_product_problem(request_dict, response):
- try:
- # 解析请求数据
- date_times = json.loads(request_dict.get('dateTimes', '[]'))
- deivce_ids = json.loads(request_dict.get('deviceIds', '[]'))
- device_types = json.loads(request_dict.get('deviceTypes', '[]'))
- storage_codes = json.loads(request_dict.get('storageCodes', '[]'))
- event_codes = json.loads(request_dict.get('eventCodes', '[]'))
- remarks = json.loads(request_dict.get('remarks', '[]'))
- # 基础数据校验
- list_lengths = {
- "dateTimes": len(date_times),
- "serialNumbers": len(deivce_ids),
- "deviceTypes": len(device_types),
- "storageCodes": len(storage_codes),
- "eventCodes": len(event_codes),
- "remarks": len(remarks)
- }
- # 检查各字段数据长度一致性
- if len(set(list_lengths.values())) != 1:
- return response.json(400, f'参数长度不一致: {", ".join([f"{k}({v})" for k, v in list_lengths.items()])}')
- current_time = int(time.time())
- products = []
- # 数据预处理
- for idx in range(len(date_times)):
- # 字段值提取
- date_time = date_times[idx]
- deivce_id = deivce_ids[idx].strip() # 去除前后空格
- device_type = device_types[idx]
- storage_code = storage_codes[idx]
- event_code = event_codes[idx].strip()
- remark = remarks[idx].strip()
- # 构建对象
- products.append(
- ProductTroubleshoot(
- date_time=date_time,
- device_id=deivce_id,
- device_type=device_type,
- event_code=event_code,
- storage_code=storage_code,
- status=0,
- remark=remark,
- created_time=current_time,
- updated_time=current_time
- )
- )
- # 批量写入数据库
- with transaction.atomic():
- ProductTroubleshoot.objects.bulk_create(products)
- return response.json(0)
- except Exception as e:
- print(e)
- return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
- @staticmethod
- def query_event_subcategory(request_dict, response):
- event_type_code = request_dict.get('eventTypeCode', None)
- abnormal_event_code_qs = AbnormalEventCode.objects.all()
- if event_type_code:
- abnormal_event_code_qs = abnormal_event_code_qs.filter(event_code__istartswith=event_type_code)
- data = []
- for abnormal_event_code in abnormal_event_code_qs:
- # 第0个是大类 第一个是细分
- event_list = abnormal_event_code.event.split("-")
- data.append(
- {
- "eventTypeCode": abnormal_event_code.event_code[0:2],
- "eventType": abnormal_event_code.event_type,
- "eventSegCode": abnormal_event_code.event_code[2:4],
- "eventSeg": event_list[1] if len(event_list) > 1 else "",
- "eventDescriptionCode": abnormal_event_code.event_code[4:],
- "eventDescription": event_list[2] if len(event_list) > 2 else "",
- }
- )
- return response.json(0, data)
- @staticmethod
- def query_device_scheme(request_dict, response):
- device_id = request_dict.get('deviceId', None)
- if not device_id:
- return response.json(0, msg="设备ID不能为空")
- try:
- # 获取设备信息
- device_scheme = DeviceScheme.objects.filter(serial_number=device_id).values("device_type",
- "storage_code").first()
- if not device_scheme:
- return response.json(0)
- # 获取产品方案信息
- product_scheme = ProductsScheme.objects.filter(
- storage_code=device_scheme["storage_code"]
- ).values(
- "order_quantity", "created_time", "storage_code", "flash",
- "ddr", "main_controller", "wifi", "four_g", "ad", "sensor", "phy"
- ).first()
- if not product_scheme:
- return response.json(0, {"deviceType": device_scheme["device_type"]})
- # 拼接方案字符串,过滤空值
- scheme_parts = [
- product_scheme["flash"],
- product_scheme["ddr"],
- product_scheme["main_controller"],
- product_scheme["wifi"],
- product_scheme["four_g"],
- product_scheme["ad"],
- product_scheme["sensor"],
- product_scheme["phy"]
- ]
- scheme = " + ".join(part for part in scheme_parts if part)
- return response.json(0, {
- "deviceType": device_scheme["device_type"],
- "quantity": product_scheme["order_quantity"],
- "createdTime": product_scheme["created_time"],
- "storageCode": product_scheme["storage_code"],
- "scheme": scheme,
- })
- except Exception as e:
- print(e)
- return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
|