#!/usr/bin/env python3 # -*- coding: utf-8 -*- import boto3 import botocore import requests from django.db.models import Q from django.views.generic.base import View from Object.IPWeatherObject import IPQuery from Object.ResponseObject import ResponseObject from Object.TokenObject import TokenObject from Service.CommonService import CommonService from Model.models import Device_Info, RequestRecordModel, iotdeviceInfoModel, Access_Log, DeviceLogModel, LogModel, \ AppLogModel, AppScannedSerial, StsFrequency, DeviceDomainRegionModel, IPAddr, CountryModel, RegionModel, \ AbnormalEvent, DeviceTypeModel, AbnormalEventCode, DeviceScheme, ProductsScheme from Ansjer.config import REGION_NAME, ACCESS_KEY_ID, SECRET_ACCESS_KEY, LOG_BUCKET class LogManagementView(View): def get(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') return self.validation(request.GET, request, operation) def post(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') return self.validation(request.POST, request, operation) def validation(self, request_dict, request, operation): language = request_dict.get('language', 'en') response = ResponseObject(language, 'pc') if operation == '??': return 0 else: tko = TokenObject( request.META.get('HTTP_AUTHORIZATION'), returntpye='pc') # if tko.code != 0: # return response.json(tko.code) response.lang = tko.lang userID = tko.userID if operation == 'getRequestList': return self.getRequestList(request_dict, response) elif operation == 'getDeviceIotInfoList': return self.getDeviceIotInfoList(request_dict, response) elif operation == 'requestPublishMqtt': return self.requestPublishMqtt(request_dict, response) elif operation == 'getAccessLogList': return self.getAccessLogList(request_dict, response) elif operation == 'getDeviceLogList': return self.getDeviceLogList(request_dict, response) # 操作日志 elif operation == 'getOperationLogList': return self.getOperationLogList(request_dict, response) elif operation == 'getAppLogList': # 获取app日志 return self.getAppLogList(request_dict, response) elif operation == 'getScanLog': # 获取扫码日志 return self.getScanLog(request_dict, response) elif operation == 'getAlarmLog': # 获取警报日志 return self.getAlarmLog(request_dict, response) elif operation == 'getDomainLog': # 获取域名日志 return self.getDomainLog(request_dict, response) elif operation == 'getDomainScanLog': # 获取域名扫码日志 return self.getDomainScanLog(request_dict, response) elif operation == 'getDeviceAbnormalEvent': # 获取设备异常事件 return self.getDeviceAbnormalEvent(request_dict, response) elif operation == 'getAbnormalEventValue': # 获取设备异常事件编码/内容 return self.getAbnormalEventValue(response) elif operation == 'getAbnormalPercentage': # 获取异常百分比 return self.getAbnormalPercentage(request_dict, response) elif operation == 'getAbnormalDetailsPercentage': # 获取异常详情百分比 return self.getAbnormalDetailsPercentage(request_dict, response) else: return response.json(414) def getRequestList(self, request_dict, response): pageNo = request_dict.get('pageNo', None) pageSize = request_dict.get('pageSize', None) method = request_dict.get('method', None) url = request_dict.get('url', None) parameter = request_dict.get('parameter', None) status_code = request_dict.get('status_code', None) reason_phrase = request_dict.get('reason_phrase', None) if not all([pageNo, pageSize]): return response.json(444) page = int(pageNo) line = int(pageSize) try: request_qs = RequestRecordModel.objects.all() if method: request_qs = request_qs.filter(method=method) if url: request_qs = request_qs.filter(url__contains=url) if parameter: request_qs = request_qs.filter(parameter__contains=parameter) if status_code: request_qs = request_qs.filter(status_code=status_code) if reason_phrase: request_qs = request_qs.filter(reason_phrase=reason_phrase) count = request_qs.count() request_qs = request_qs[(page - 1) * line:page * line] qs_list = [] for requsetobj in request_qs: qs_list.append({ 'id': requsetobj.id, 'method': requsetobj.method, 'url': requsetobj.url, 'parameter': requsetobj.parameter, # 时间保留小数点后两位 'execute_time': round(requsetobj.execute_time, 2), 'status_code': requsetobj.status_code, 'reason_phrase': requsetobj.reason_phrase, 'add_time': requsetobj.add_time.strftime("%Y-%m-%d %H:%M:%S"), }) return response.json( 0, {'list': qs_list, 'total': count}) except Exception as e: print(e) return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) def getDeviceIotInfoList(self, request_dict, response): serial_number = request_dict.get('serial_number', None) uid = request_dict.get('uid', None) pageNo = request_dict.get('pageNo', None) pageSize = request_dict.get('pageSize', None) if not all([pageNo, pageSize]): return response.json(444) page = int(pageNo) line = int(pageSize) try: if serial_number or uid: if serial_number: iot_device_info_qs = iotdeviceInfoModel.objects.filter( serial_number__contains=serial_number) if uid: iot_device_info_qs = iotdeviceInfoModel.objects.filter( uid__contains=uid) if not iot_device_info_qs.exists(): return response.json(0) total = len(iot_device_info_qs) iot_device_info_qs = iot_device_info_qs.values( 'serial_number', 'uid', 'thing_name', 'thing_groups', 'add_time', 'update_time')[(page - 1) * line:page * line] else: total = iotdeviceInfoModel.objects.filter().count() iot_device_info_qs = iotdeviceInfoModel.objects.filter().values( 'serial_number', 'uid', 'thing_name', 'thing_groups', 'add_time', 'update_time')[(page - 1) * line:page * line] iot_device_info_list = CommonService.qs_to_list(iot_device_info_qs) # 获取序列号的uid for iot_device_info in iot_device_info_list: if not iot_device_info['uid']: device_info_qs = Device_Info.objects.filter( serial_number__contains=iot_device_info['serial_number']).values('UID') if device_info_qs.exists(): iot_device_info['uid'] = device_info_qs[0]['UID'] return response.json( 0, {'list': iot_device_info_list, 'total': total}) except Exception as e: print(e) return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) # 通用发布MQTT通知 @staticmethod def requestPublishMqtt(request_dict, response): msg = request_dict.get('msg', None) thing_name = request_dict.get('thing_name', None) topic_name = request_dict.get('topic_name', None) if not all([msg, thing_name, topic_name]): return response.json(444) try: msg = eval(msg) identification_code = thing_name[thing_name.rindex('_') + 1:] if not CommonService.req_publish_mqtt_msg(identification_code, topic_name, msg): return response.json(10044) return response.json(0) except Exception as e: return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) def getAccessLogList(self, request_dict, response): user = request_dict.get('user', None) operation = request_dict.get('operation', None) status = request_dict.get('status', None) pageNo = request_dict.get('pageNo', None) pageSize = request_dict.get('pageSize', None) if not all([pageNo, pageSize]): return response.json(444) page = int(pageNo) line = int(pageSize) try: if user or operation or status: if user: access_log_qs = Access_Log.objects.filter( user__contains=user) if operation: access_log_qs = Access_Log.objects.filter( operation__contains=operation) if status: access_log_qs = Access_Log.objects.filter(status=status) if not access_log_qs.exists(): return response.json(0) total = access_log_qs.count() access_log_qs = access_log_qs.order_by('-time').values( 'user', 'operation', 'ip', 'status', 'content', 'time')[(page - 1) * line:page * line] else: total = Access_Log.objects.count() access_log_qs = Access_Log.objects.filter().order_by('-time').values('user', 'operation', 'ip', 'status', 'content', 'time')[ (page - 1) * line:page * line] access_log_list = CommonService.qs_to_list(access_log_qs) return response.json(0, {'list': access_log_list, 'total': total}) except Exception as e: print(e) return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) def getDeviceLogList(self, request_dict, response): uid = request_dict.get('uid', None) serial_number = request_dict.get('serial_number', None) pageNo = request_dict.get('pageNo', None) pageSize = request_dict.get('pageSize', None) if not all([pageNo, pageSize]): return response.json(444) page = int(pageNo) line = int(pageSize) try: if uid or serial_number: if uid: device_log_qs = DeviceLogModel.objects.filter( uid=uid) if serial_number: device_log_qs = DeviceLogModel.objects.filter( serial_number=serial_number) if not device_log_qs.exists(): return response.json(0) total = device_log_qs.count() device_log_qs = device_log_qs.order_by( '-add_time').values()[(page - 1) * line:page * line] else: total = DeviceLogModel.objects.count() device_log_qs = DeviceLogModel.objects.filter().order_by( '-add_time').values()[(page - 1) * line:page * line] device_log_list = CommonService.qs_to_list(device_log_qs) # 添加下载链接 aws_s3_client = boto3.client( 's3', region_name=REGION_NAME, aws_access_key_id=ACCESS_KEY_ID, aws_secret_access_key=SECRET_ACCESS_KEY, config=botocore.client.Config(signature_version='s3v4'), ) for device_log in device_log_list: obj = device_log['serial_number'] if device_log['serial_number'] else device_log['uid'] obj = 'device_log/' + obj + \ '/{}'.format(device_log['filename']) download_url = aws_s3_client.generate_presigned_url( ClientMethod='get_object', Params={ 'Bucket': LOG_BUCKET, 'Key': obj }, ExpiresIn=3600 ) device_log['download_url'] = download_url return response.json(0, {'list': device_log_list, 'total': total}) except Exception as e: print(e) return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) def getOperationLogList(self, request_dict, response): operation = request_dict.get('operation', None) url = request_dict.get('url', None) pageNo = request_dict.get('pageNo', None) pageSize = request_dict.get('pageSize', None) if not all([pageNo, pageSize]): return response.json(444) page = int(pageNo) line = int(pageSize) try: log_qs = LogModel.objects.all() if operation: log_qs = log_qs.filter(operation__contains=operation) if url: log_qs = log_qs.filter(url__contains=url) count = log_qs.count() log_qs = log_qs.values( 'operation', 'url', 'content', 'ip', 'time')[ (page - 1) * line:page * line] log_list = list(log_qs) return response.json( 0, {'list': log_list, 'total': count}) except Exception as e: print(e) return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) def getAppLogList(self, request_dict, response): """ 获取App日志信息 @param request_dict:请求参数 @param response:响应对象 @request_dict userID:用户ID @request_dict uid:uid @return: """ userName = request_dict.get('userName', None) uid = request_dict.get('uid', None) pageNo = request_dict.get('pageNo', None) pageSize = request_dict.get('pageSize', None) if not all([pageNo, pageSize]): return response.json(444) page = int(pageNo) line = int(pageSize) try: app_log_qs = AppLogModel.objects.all() if userName: app_log_qs = app_log_qs.filter(user__username=userName) elif uid: app_log_qs = app_log_qs.filter(uid=uid) count = app_log_qs.count() log_qs = app_log_qs.values( 'user__username', 'uid', 'average_delay', 'status', 'filename', 'add_time', 'user_id').order_by( '-add_time')[(page - 1) * line:page * line] app_log_list = CommonService.qs_to_list(log_qs) # 添加下载链接 aws_s3_client = boto3.client( 's3', region_name=REGION_NAME, aws_access_key_id=ACCESS_KEY_ID, aws_secret_access_key=SECRET_ACCESS_KEY, config=botocore.client.Config(signature_version='s3v4'), ) for app_log in app_log_list: filename = app_log['filename'] if not filename.endswith('.txt'): filename += ".txt" obj = 'app_log/' + app_log['user_id'] + '/{}'.format(filename) appLog_url = aws_s3_client.generate_presigned_url( ClientMethod='get_object', Params={ 'Bucket': LOG_BUCKET, 'Key': obj }, ExpiresIn=3600 ) app_log['appLog_url'] = appLog_url return response.json(0, {'list': app_log_list, 'total': count}) except Exception as e: print(e) return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) def getScanLog(self, request_dict, response): serial = request_dict.get('serial', None) ip = request_dict.get('ip', None) pageNo = request_dict.get('pageNo', None) pageSize = request_dict.get('pageSize', None) if not all([pageNo, pageSize]): return response.json(444) page = int(pageNo) line = int(pageSize) try: scan_log_qs = AppScannedSerial.objects.all() if serial: scan_log_qs = scan_log_qs.filter(serial__contains=serial) if ip: scan_log_qs = scan_log_qs.filter(ip__contains=ip) count = scan_log_qs.count() scan_log_qs = scan_log_qs.order_by('-add_time').values( 'serial', 'ip', 'add_time', 'update_time')[ (page - 1) * line:page * line] scan_log_list = list(scan_log_qs) return response.json( 0, {'list': scan_log_list, 'total': count}) except Exception as e: print(e) return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) def getAlarmLog(self, request_dict, response): page_no = request_dict.get('pageNo', None) page_size = request_dict.get('pageSize', None) uid = request_dict.get('uid', None) if not all([page_no, page_size]): return response.json(444) page = int(page_no) line = int(page_size) try: alarm_log_qs = StsFrequency.objects.all() if uid: alarm_log_qs = alarm_log_qs.filter(uid=uid) count = alarm_log_qs.count() alarm_log_qs = alarm_log_qs.order_by('-updateTime').values('id', 'uid', 'frequency', 'updateTime', 'addTime')[(page - 1) * line:page * line] return response.json(0, {'list': list(alarm_log_qs), 'total': count}) except Exception as e: print(e) return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) @staticmethod def getDomainLog(request_dict, response): page_no = request_dict.get('pageNo', None) page_size = request_dict.get('pageSize', None) serial_number = request_dict.get('serialNumber', None) if not all([page_no, page_size]): return response.json(444) page = int(page_no) line = int(page_size) try: device_domain_qs = DeviceDomainRegionModel.objects.filter(~Q(country_code='')) if serial_number: device_domain_qs = device_domain_qs.filter(serial_number__contains=serial_number) count = device_domain_qs.count() device_domain_qs = device_domain_qs.order_by('-update_time').values()[(page - 1) * line:page * line] device_domain_list = [] for device_domain in device_domain_qs: ip = device_domain['ip'] region_id = device_domain['region_id'] country_code = device_domain['country_code'] ip_addr_qs = IPAddr.objects.filter(ip=ip, is_geoip2=False).values('region') if ip_addr_qs.exists(): region = ip_addr_qs[0]['region'] else: ip_qs = IPQuery(ip) region = ip_qs.region # 港澳台返回美洲域名 if country_code == 'CN' and region in ['香港', '澳门', '台湾']: country_code = region # 查询域名数据 region_qs = RegionModel.objects.filter(id=region_id).values('api', 'name') api = 'https://www.dvema.com/' region_name = '美洲' if region_qs.exists(): api = region_qs[0]['api'] region_name = region_qs[0]['name'] api_region = region_name + '域名' device_domain_list.append({ 'id': device_domain['id'], 'serial_number': device_domain['serial_number'], 'ip': ip, 'region_id': region_id, 'country_code': country_code, 'api': api, 'api_region': api_region, 'add_time': device_domain['add_time'].strftime("%Y-%m-%d %H:%M:%S"), 'update_time': device_domain['update_time'].strftime("%Y-%m-%d %H:%M:%S"), }) return response.json(0, {'list': device_domain_list, 'total': count}) except Exception as e: return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) @staticmethod def getDomainScanLog(request_dict, response): serial_number = request_dict.get('serialNumber', None) if not all([serial_number]): return response.json(444) try: scan_log_qs = AppScannedSerial.objects.filter(serial__contains=serial_number).values('ip', 'add_time', 'update_time') device_domain_qs = DeviceDomainRegionModel.objects.filter(serial_number__contains=serial_number).values() res = {} if scan_log_qs.exists() and device_domain_qs.exists(): device_domain = device_domain_qs[0] domain_ip = device_domain['ip'] region_id = device_domain['region_id'] country_code = device_domain['country_code'] ip_addr_qs = IPAddr.objects.filter(ip=domain_ip, is_geoip2=False).values('region') if ip_addr_qs.exists(): region = ip_addr_qs[0]['region'] else: ip_qs = IPQuery(domain_ip) region = ip_qs.region # 港澳台返回美洲域名 if country_code == 'CN' and region in ['香港', '澳门', '台湾']: country_code = region # 查询域名数据 region_qs = RegionModel.objects.filter(id=region_id).values('api', 'name') api = 'https://www.dvema.com/' region_name = '美洲' if region_qs.exists(): api = region_qs[0]['api'] region_name = region_qs[0]['name'] api_region = region_name + '域名' res = { 'serial_number': device_domain['serial_number'], 'domain_ip': domain_ip, 'scan_ip': device_domain_qs[0]['ip'], 'region_id': region_id, 'country_code': country_code, 'api': api, 'api_region': api_region, 'domain_add_time': device_domain['add_time'].strftime("%Y-%m-%d %H:%M:%S"), 'domain_update_time': device_domain['update_time'].strftime("%Y-%m-%d %H:%M:%S"), 'scan_add_time': device_domain_qs[0]['add_time'].strftime("%Y-%m-%d %H:%M:%S"), 'scan_update_time': device_domain_qs[0]['update_time'].strftime("%Y-%m-%d %H:%M:%S"), } return response.json(0, res) except Exception as e: return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) @staticmethod def getDeviceAbnormalEvent(request_dict, response): uid = request_dict.get('uid', None) storage_code = request_dict.get('storageCode', None) device_type = request_dict.get('deviceType', None) version = request_dict.get('version', None) event_type = request_dict.get('eventType', None) report_type = request_dict.get('reportType', None) event_time_range = request_dict.getlist('eventTimeRange[]', None) page_no = request_dict.get('pageNo', None) page_size = request_dict.get('pageSize', None) is_page = False if all([page_no, page_size]): is_page = True page = int(page_no) line = int(page_size) try: query = Q() if uid: query &= Q(uid=uid) if storage_code: # 同一单号都为uid或序列号 uid_qs = DeviceScheme.objects.filter(storage_code=storage_code).\ values_list('serial_number', flat=True) uid_list = list(uid_qs) # 序列号,查询uid if uid_list and len(uid_list[0]) == 9: uid_list = CommonService.get_uids_by_serial_numbers(uid_list) query &= Q(uid__in=uid_list) if device_type: # 处理多个deviceType参数 device_types = [int(t.strip()) for t in device_type.split(',') if t.strip().isdigit()] query &= Q(device_type__in=device_types) if version: query &= Q(version=version) if report_type: query &= Q(report_type=int(report_type)) if event_time_range: start_time, end_time = int( event_time_range[0][:-3]), int(event_time_range[1][:-3]) query &= Q(event_time__gte=start_time, event_time__lte=end_time) if event_type: # 通过event查询对应的event_code event_codes_from_event = AbnormalEventCode.objects.filter( event_type__in=event_type.split(',') ).values_list('event_code', flat=True) if event_codes_from_event.exists(): query &= Q(event_code__in=event_codes_from_event) abnormal_events = AbnormalEvent.objects.filter(query) count = abnormal_events.count() event_list = abnormal_events.order_by('-event_time').values( 'id', 'uid', 'device_type', 'version', 'event_code', 'content', 'report_type', 'event_time', 'created_time' ) # 分页 if is_page: event_list = event_list[(page - 1) * line:page * line] # 查询设备型号,异常信息,制作单信息 for event in event_list: device_type = DeviceTypeModel.objects.filter(type=event['device_type']).first() if device_type: event['device_type'] = device_type.name else: event['device_type'] = '' abnormal_event_code = AbnormalEventCode.objects.filter(event_code=event['event_code']).first() if abnormal_event_code: event['event'] = abnormal_event_code.event event['event_type'] = abnormal_event_code.event_type else: event['event'] = '' # 根据uid查序列号 serial_number = CommonService.get_serial_number_by_uid(event['uid']) device_scheme_qs = DeviceScheme.objects.filter(serial_number=serial_number).values('storage_code') if device_scheme_qs.exists(): storage_code = device_scheme_qs[0]['storage_code'] event['storage_code'] = storage_code products_scheme_qs = ProductsScheme.objects.filter( storage_code=storage_code).values( 'main_controller', 'wifi', 'sensor', 'order_quantity', 'created_time') if products_scheme_qs.exists(): event['scheme'] = '{}+{}+{}'.format( products_scheme_qs[0]['main_controller'], products_scheme_qs[0]['wifi'], products_scheme_qs[0]['sensor']) event['order_quantity'] = products_scheme_qs[0]['order_quantity'] event['shipping_date'] = products_scheme_qs[0]['created_time'] return response.json(0, {'list': list(event_list), 'total': count}) except Exception as e: return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) @staticmethod def getAbnormalEventValue(response): try: event_types = AbnormalEventCode.objects.values_list('event_type', flat=True) # 使用Python的set去重 seen = set() unique_event_types = [x for x in event_types if x not in seen and not seen.add(x)] return response.json(0, {'list': unique_event_types}) except Exception as e: return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) @staticmethod def getAbnormalPercentage(request_dict, response): uid = request_dict.get('uid', None) storage_code = request_dict.get('storageCode', None) device_type = request_dict.get('deviceType', None) version = request_dict.get('version', None) event_type = request_dict.get('eventType', None) report_type = request_dict.get('reportType', None) event_time_range = request_dict.getlist('eventTimeRange[]', None) try: query = Q() if uid: query &= Q(uid=uid) if storage_code: # 同一单号都为uid或序列号 uid_qs = DeviceScheme.objects.filter(storage_code=storage_code).\ values_list('serial_number', flat=True) uid_list = list(uid_qs) # 序列号,查询uid if uid_list and len(uid_list[0]) == 9: uid_list = CommonService.get_uids_by_serial_numbers(uid_list) query &= Q(uid__in=uid_list) if device_type: # 处理多个deviceType参数 device_types = [int(t.strip()) for t in device_type.split(',') if t.strip().isdigit()] query &= Q(device_type__in=device_types) if version: query &= Q(version=version) if report_type: query &= Q(report_type=int(report_type)) if event_time_range: start_time, end_time = int( event_time_range[0][:-3]), int(event_time_range[1][:-3]) query &= Q(event_time__gte=start_time, event_time__lte=end_time) if event_type: # 通过event查询对应的event_code event_codes_from_event = AbnormalEventCode.objects.filter( event_type__in=event_type.split(',') ).values_list('event_code', flat=True) if event_codes_from_event.exists(): query &= Q(event_code__in=event_codes_from_event) # 查询符合条件的异常事件 abnormal_events = AbnormalEvent.objects.filter(query) total_count = abnormal_events.count() # 获取所有异常类型及其数量 event_type_stats = {} if total_count > 0: # 获取所有事件的event_code event_codes = abnormal_events.values_list('event_code', flat=True) # 获取所有event_code对应的event_type event_code_types = AbnormalEventCode.objects.filter( event_code__in=event_codes ).values('event_code', 'event_type') # 创建event_code到event_type的映射 event_code_to_type = {item['event_code']: item['event_type'] for item in event_code_types} # 统计每个event_code的数量 for event in abnormal_events: event_type = event_code_to_type.get(event.event_code, '未知类型') if event_type in event_type_stats: event_type_stats[event_type] += 1 else: event_type_stats[event_type] = 1 # 计算百分比并格式化结果(保留两位小数) statistics = [] for name, value in event_type_stats.items(): percentage = round((value / total_count) * 100, 2) if total_count > 0 else 0.00 statistics.append({ 'name': name, 'value': value, 'percentage': percentage }) # 按数量降序排序 statistics.sort(key=lambda x: x['value'], reverse=True) return response.json(0, { 'total': total_count, 'statistics': statistics }) except Exception as e: return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) @staticmethod def getAbnormalDetailsPercentage(request_dict, response): uid = request_dict.get('uid', None) storage_code = request_dict.get('storageCode', None) device_type = request_dict.get('deviceType', None) version = request_dict.get('version', None) event_type = request_dict.get('eventType', None) report_type = request_dict.get('reportType', None) event_time_range = request_dict.getlist('eventTimeRange[]', None) if not event_type: return response.json(444) try: query = Q() if uid: query &= Q(uid=uid) if storage_code: # 同一单号都为uid或序列号 uid_qs = DeviceScheme.objects.filter(storage_code=storage_code). \ values_list('serial_number', flat=True) uid_list = list(uid_qs) # 序列号,查询uid if uid_list and len(uid_list[0]) == 9: uid_list = CommonService.get_uids_by_serial_numbers(uid_list) query &= Q(uid__in=uid_list) if device_type: # 处理多个deviceType参数 device_types = [int(t.strip()) for t in device_type.split(',') if t.strip().isdigit()] query &= Q(device_type__in=device_types) if version: query &= Q(version=version) if report_type: query &= Q(report_type=int(report_type)) if event_time_range: start_time, end_time = int( event_time_range[0][:-3]), int(event_time_range[1][:-3]) query &= Q(event_time__gte=start_time, event_time__lte=end_time) # 通过event查询对应的event_code event_codes_from_event = AbnormalEventCode.objects.filter(event_type=event_type).\ values_list('event_code', flat=True) if event_codes_from_event.exists(): query &= Q(event_code__in=event_codes_from_event) # 查询符合条件的异常事件 abnormal_events = AbnormalEvent.objects.filter(query) total_count = abnormal_events.count() # 获取所有异常类型及其数量 event_type_stats = {} if total_count > 0: # 获取所有事件的event_code event_codes = abnormal_events.values_list('event_code', flat=True) # 获取所有event_code对应的event_type event_code_types = AbnormalEventCode.objects.filter( event_type=event_type ).values('event_code', 'event') # 创建event_code到event_type的映射 event_code_to_type = {item['event_code']: item['event'] for item in event_code_types} # 统计每个event_code的数量 for abnormal_event in abnormal_events: event = event_code_to_type.get(abnormal_event.event_code, '未知类型') if event in event_type_stats: event_type_stats[event] += 1 else: event_type_stats[event] = 1 # 计算百分比并格式化结果(保留两位小数) statistics = [] for name, value in event_type_stats.items(): percentage = round((value / total_count) * 100, 2) if total_count > 0 else 0.00 statistics.append({ 'name': name, 'value': value, 'percentage': percentage }) # 按数量降序排序 statistics.sort(key=lambda x: x['value'], reverse=True) return response.json(0, { 'category': event_type, # 添加当前查看的异常类型 'total': total_count, 'statistics': statistics }) except Exception as e: print('error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))