123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278 |
- import json
- import time
- from django.core.paginator import Paginator
- from django.db import transaction
- from django.views import View
- from Ansjer.test.util.email_log import response
- from Model.models import DeviceScheme, ProductTroubleshoot, AbnormalEventCode, ProductsScheme
- from Object.ResponseObject import ResponseObject
- class ProblemEntryView(View):
- def get(self, request, *args, **kwargs):
- request.encoding = 'utf-8'
- operation = kwargs.get('operation')
- request_dict = request.GET
- return self.validation(request, request_dict, operation)
- def post(self, request, *args, **kwargs):
- request.encoding = 'utf-8'
- operation = kwargs.get('operation')
- request_dict = request.POST
- return self.validation(request, request_dict, operation)
- def validation(self, request, request_dict, operation):
- language = request_dict.get('language', 'en')
- response = ResponseObject(language, 'pc')
- if operation == 'addProductProblem':
- return self.add_product_problem(request_dict, response)
- elif operation == 'queryEventSubcategory':
- return self.query_event_subcategory(request_dict, response)
- elif operation == 'queryDeviceScheme':
- return self.query_device_scheme(request_dict, response)
- elif operation == 'ProductProblemList':
- return self.product_problem_list(request_dict, response)
- elif operation == 'editProductProblemStatus':
- return self.edit_product_problem_status(request_dict, response)
- else:
- return response.json(414)
- @staticmethod
- def add_product_problem(request_dict, response):
- try:
- # 解析请求数据
- date_times = json.loads(request_dict.get('dateTimes', '[]'))
- deivce_ids = json.loads(request_dict.get('deviceIds', '[]'))
- device_types = json.loads(request_dict.get('deviceTypes', '[]'))
- storage_codes = json.loads(request_dict.get('storageCodes', '[]'))
- event_codes = json.loads(request_dict.get('eventCodes', '[]'))
- remarks = json.loads(request_dict.get('remarks', '[]'))
- # 基础数据校验
- list_lengths = {
- "dateTimes": len(date_times),
- "serialNumbers": len(deivce_ids),
- "deviceTypes": len(device_types),
- "storageCodes": len(storage_codes),
- "eventCodes": len(event_codes),
- "remarks": len(remarks)
- }
- # 检查各字段数据长度一致性
- if len(set(list_lengths.values())) != 1:
- return response.json(400, f'参数长度不一致: {", ".join([f"{k}({v})" for k, v in list_lengths.items()])}')
- current_time = int(time.time())
- products = []
- # 数据预处理
- for idx in range(len(date_times)):
- # 字段值提取
- date_time = date_times[idx]
- deivce_id = deivce_ids[idx].strip() # 去除前后空格
- device_type = device_types[idx]
- storage_code = storage_codes[idx]
- event_code = event_codes[idx].strip()
- remark = remarks[idx].strip()
- # 构建对象
- products.append(
- ProductTroubleshoot(
- date_time=date_time,
- device_id=deivce_id,
- device_type=device_type,
- event_code=event_code,
- storage_code=storage_code,
- status=0,
- remark=remark,
- created_time=current_time,
- updated_time=current_time
- )
- )
- # 批量写入数据库
- with transaction.atomic():
- ProductTroubleshoot.objects.bulk_create(products)
- return response.json(0)
- except Exception as e:
- print(e)
- return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
- @staticmethod
- def query_event_subcategory(request_dict, response):
- event_type_code = request_dict.get('eventTypeCode', None)
- abnormal_event_code_qs = AbnormalEventCode.objects.all()
- if event_type_code:
- abnormal_event_code_qs = abnormal_event_code_qs.filter(event_code__istartswith=event_type_code)
- data = []
- for abnormal_event_code in abnormal_event_code_qs:
- # 第0个是大类 第一个是细分
- event_list = abnormal_event_code.event.split("-")
- data.append(
- {
- "eventTypeCode": abnormal_event_code.event_code[0:2],
- "eventType": abnormal_event_code.event_type,
- "eventSegCode": abnormal_event_code.event_code[2:4],
- "eventSeg": event_list[1] if len(event_list) > 1 else "",
- "eventDescriptionCode": abnormal_event_code.event_code[4:],
- "eventDescription": event_list[2] if len(event_list) > 2 else "",
- }
- )
- return response.json(0, data)
- @staticmethod
- def query_device_scheme(request_dict, response):
- device_id = request_dict.get('deviceId', None)
- if not device_id:
- return response.json(0, msg="设备ID不能为空")
- try:
- # 获取设备信息
- device_scheme = DeviceScheme.objects.filter(serial_number=device_id).values("device_type",
- "storage_code").first()
- if not device_scheme:
- return response.json(0)
- # 获取产品方案信息
- product_scheme = ProductsScheme.objects.filter(
- storage_code=device_scheme["storage_code"]
- ).values(
- "order_quantity", "created_time", "storage_code", "flash",
- "ddr", "main_controller", "wifi", "four_g", "ad", "sensor", "phy"
- ).first()
- if not product_scheme:
- return response.json(0, {"deviceType": device_scheme["device_type"]})
- # 拼接方案字符串,过滤空值
- scheme_parts = [
- product_scheme["flash"],
- product_scheme["ddr"],
- product_scheme["main_controller"],
- product_scheme["wifi"],
- product_scheme["four_g"],
- product_scheme["ad"],
- product_scheme["sensor"],
- product_scheme["phy"]
- ]
- scheme = " + ".join(part for part in scheme_parts if part)
- return response.json(0, {
- "deviceType": device_scheme["device_type"],
- "quantity": product_scheme["order_quantity"],
- "createdTime": product_scheme["created_time"],
- "storageCode": product_scheme["storage_code"],
- "scheme": scheme,
- })
- except Exception as e:
- print(e)
- return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
- @staticmethod
- def product_problem_list(request_dict, response):
- device_id = request_dict.get("deviceId", None)
- storage_code = request_dict.get("storageCode", None)
- event_code = request_dict.get("eventCode", None)
- status = request_dict.get("status", None)
- page = request_dict.get("page", 1)
- pageSize = request_dict.get("pageSize", 10)
- # 1. 先查询ProductTroubleshoot表
- product_troubleshoot_qs = ProductTroubleshoot.objects.all()
- if device_id:
- product_troubleshoot_qs = product_troubleshoot_qs.filter(device_id=device_id)
- if storage_code:
- product_troubleshoot_qs = product_troubleshoot_qs.filter(storage_code__contains=storage_code)
- if event_code:
- product_troubleshoot_qs = product_troubleshoot_qs.filter(event_code__contains=event_code)
- if status:
- product_troubleshoot_qs = product_troubleshoot_qs.filter(status=status)
- paginator = Paginator(product_troubleshoot_qs.order_by('id'), pageSize)
- product_trouble_page = paginator.page(page)
- # 获取当前页的数据列表
- product_trouble_list = list(product_trouble_page.object_list.values(
- 'id',
- 'date_time',
- 'device_type',
- 'device_id',
- 'storage_code',
- 'event_code',
- 'status',
- 'remark',
- ))
- # 2. 收集所有storage_code用于批量查询ProductsScheme表
- storage_codes = [item['storage_code'] for item in product_trouble_list if item['storage_code']]
- product_schemes = ProductsScheme.objects.filter(storage_code__in=storage_codes)
- # 3. 创建映射字典并拼接字段
- scheme_info_dict = {}
- for scheme in product_schemes:
- # 拼接需要的字段,过滤掉空值
- fields_to_join = [
- scheme.flash,
- scheme.ddr,
- scheme.main_controller,
- scheme.wifi,
- scheme.four_g,
- scheme.ad,
- scheme.sensor,
- scheme.customer_code,
- scheme.phy,
- ]
- # 过滤掉None和空字符串,然后用逗号连接
- joined_info = '+'.join(filter(None, fields_to_join))
- scheme_info_dict[scheme.storage_code] = joined_info
- # 4. 将拼接后的信息添加到原数据中
- for item in product_trouble_list:
- storage_code = item['storage_code']
- item['product_scheme_info'] = scheme_info_dict.get(storage_code, '')
- data = {
- "list": product_trouble_list,
- "total": paginator.count,
- }
- return response.json(0, data)
- @staticmethod
- def edit_product_problem_status(request_dict, response):
- try:
- record_id = request_dict.get("id", None)
- new_status = request_dict.get("status", None)
- if not all([record_id, new_status]):
- return response.json(444)
- try:
- troubleshoot_record = ProductTroubleshoot.objects.get(id=record_id)
- except ProductTroubleshoot.DoesNotExist:
- return response.json(173)
- troubleshoot_record.status = new_status
- if "remark" in request_dict:
- troubleshoot_record.remark = request_dict["remark"]
- troubleshoot_record.save()
- return response.json(0)
- except Exception as e:
- print(e)
- return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
|