# -*- encoding: utf-8 -*- """ @File : SerialNumberCheckController.py @Time : 2024/6/20 13:33 @Author : stephen @Email : zhangdongming@asj6.wecom.work @Software: PyCharm """ import time import requests from django.http import QueryDict from django.views import View from Ansjer.config import LOGGER from Model.models import SerialNumberCheckLog, SerialNumberRenovateLog, Device_Info from Object.ResponseObject import ResponseObject from Service.CommonService import CommonService class SerialNumberView(View): def get(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') return self.validation(request.GET, request, operation) def post(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') return self.validation(request.POST, request, operation) def delete(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') delete = QueryDict(request.body) if not delete: delete = request.GET return self.validation(delete, request, operation) def put(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') put = QueryDict(request.body) return self.validation(put, request, operation) def validation(self, request_dict, request, operation): response = ResponseObject('cn') if operation == 'serialNumberCheck': # 扫码序列号查重 return self.save_serial_number_log(request_dict, response) elif operation == 'addSerialNumberRenovate': # 添加序列号翻新记录 return self.serial_number_renovate_check(request, request_dict, response) elif operation == 'getSerialNumberRenovation': # 查询序列号翻新记录 return self.get_serial_number_renovation(request, request_dict, response) @classmethod def save_serial_number_log(cls, request_dict, response): """ 保存APP扫码工具排查序列号重复日志 @param request_dict: 序列号、手机型号、类型 @param response: 响应类 @return: """ serial_no = request_dict.get("serialNo", None) phone_model = request_dict.get("phoneModel", None) if not serial_no: return response.json(444) p_type = int(request_dict.get('type', 0)) n_time = int(time.time()) try: dict_type = {'1': '三乡总装', '2': '三乡包装', '3': '三乡返工', '4': '三乡管理', '5': '珠海翻新', '6': '珠海管理'} first_serial = serial_no[:6] # 查询当前工位是否扫过序列号 first_serial_qs = SerialNumberCheckLog.objects.filter(serial_number=first_serial) # 工位类型,1:三乡总装,2:三乡包装,3:三乡返工,4:三乡管理,5:珠海翻新,6:珠海管理 if p_type == 4 or p_type == 6: first_serial_qs = first_serial_qs.filter(type__in=[2, 5]) if p_type == 6 \ else first_serial_qs.exclude(type=5) first_serial_qs = first_serial_qs.order_by('type', '-created_time') if not first_serial_qs.exists(): return response.json(173) logs = [] # 管理分类可查询当前序列号扫码记录 for item in first_serial_qs: log_dict = {'fullSerialNumber': item.full_serial_number, 'count': item.count, 'type': item.type, 'createdTime': item.created_time, 'stationName': dict_type[str(item.type)], 'phoneModel': item.phone_model} logs.append(log_dict) result = {'logs': logs} return response.json(0, result) first_serial_qs = first_serial_qs.filter(type=p_type) if first_serial_qs.exists(): # APP扫码工具记录+1 params = {'serial_number': first_serial, 'created_time': n_time, 'full_serial_number': serial_no, 'type': p_type, 'phone_model': phone_model} SerialNumberCheckLog.objects.create(**params) # 返回当前序列号数据结构 log_dict = {'fullSerialNumber': first_serial_qs[0].full_serial_number, 'count': first_serial_qs[0].count, 'type': first_serial_qs[0].type, 'createdTime': first_serial_qs[0].created_time, 'stationName': dict_type[str(first_serial_qs[0].type)], 'phoneModel': first_serial_qs[0].phone_model} result = {'logs': [log_dict]} return response.json(174, result) else: params = {'serial_number': first_serial, 'created_time': n_time, 'full_serial_number': serial_no, 'type': p_type, 'phone_model': phone_model} SerialNumberCheckLog.objects.create(**params) # 珠海翻新查询三乡包装扫码记录超过2次则提示异常 if p_type == 5: s_qs = SerialNumberCheckLog.objects.filter(serial_number=first_serial, type=2) if s_qs.count() >= 2: # 返回当前序列号数据结构 log_dict = {'fullSerialNumber': s_qs[0].full_serial_number, 'count': s_qs[0].count, 'type': s_qs[0].type, 'createdTime': s_qs[0].created_time, 'stationName': dict_type[str(s_qs[0].type)], 'phoneModel': s_qs[0].phone_model} result = {'logs': [log_dict]} return response.json(174, result) return response.json(0) except Exception as e: LOGGER.error('APP扫码工具保存日志异常:errLine:{}, errMsg:{}' .format(e.__traceback__.tb_lineno, repr(e))) return response.json(5) @classmethod def serial_number_renovate_check(cls, request, request_dict, response): """ 序列号翻新检查 @param request: 请求体 @param request_dict: 请求参数 @param response: 响应参数 @return: 返回序列号查询结果 """ # 获取完整序列号 full_serial_number = request_dict.get("serialNo") try: # 获取请求的IP地址和手机型号 ip_address = CommonService.get_ip_address(request) phone_model = request_dict.get("phoneModel") # 检查序列号分配方式URL check_url = 'http://serial.zositechc.cn:7880/serialNumber/checkSerialLog' if not full_serial_number: return response.json(444) LOGGER.info('翻新前端检查: serial: {}, ip: {}'.format(full_serial_number, ip_address)) # 获取序列号的前6位进行检查 serial_number_prefix = full_serial_number[:6] params = {'serial': serial_number_prefix} # 向外部服务发送请求以检查序列号 response_from_service = requests.get(check_url, params=params, timeout=10) result = response_from_service.json() # 根据返回结果确定分发类型 distribution_type = 1 if result['code'] == 0 else 0 # 查询翻新记录 renovation_logs = SerialNumberRenovateLog.objects.filter(serial_number=serial_number_prefix) current_time = int(time.time()) # 准备插入或更新的参数 insert_or_update_params = { 'serial_number': serial_number_prefix, 'full_serial_number': full_serial_number, 'count': 1, 'operation_count': 1, 'phone_model': phone_model, 'created_time': current_time, 'updated_time': current_time, 'type': 1 } # 获取UID并查询设备信息 uid = CommonService.get_uid_by_serial_number(serial_number=serial_number_prefix) device_info = Device_Info.objects.filter(UID=uid).values('userID__username') user_name = device_info[0]['userID__username'] if device_info.exists() else '' user_added = 1 if device_info.exists() else 0 scan_log = 1 if SerialNumberCheckLog.objects.filter(serial_number=serial_number_prefix).exists() else 0 # 创建或更新翻新记录 cls.create_or_update_renovation_log(renovation_logs, insert_or_update_params, current_time) # 准备返回的数据 response_data = { 'distributionType': distribution_type, 'userAdded': user_added, 'userName': user_name, 'scanLog': scan_log, 'color': 'E24A28' if distribution_type == 0 or user_added == 1 or scan_log == 0 else '4EDF4E' } return response.json(0, response_data) except Exception as e: # 记录异常信息 LOGGER.error('翻新前端检查异常 serial: {}: errLine: {}, errMsg: {}'.format( full_serial_number, e.__traceback__.tb_lineno, repr(e))) return response.json(503) @classmethod def create_or_update_renovation_log(cls, renovation_logs, params, current_time): """创建或更新翻新记录""" if not renovation_logs.exists(): SerialNumberRenovateLog.objects.create(**params) else: # 更新现有记录 should_reset = False # 翻新次数 count = 1 # 扫码检查次数 operation_count = 1 # 是否存在前端检查 frontend = False for log in renovation_logs: # 翻新前端==1,翻新入库==2 if log.type == 1: count = log.count frontend = True operation_count = log.operation_count elif log.type == 2 and log.count == 1: should_reset = True if not frontend: # 不存在前端检查新增记录 SerialNumberRenovateLog.objects.create(**params) elif should_reset: # 更新操作计数和重置计数 SerialNumberRenovateLog.objects.filter(serial_number=params['serial_number'], type=1) \ .update(operation_count=1, count=count + 1, updated_time=current_time) SerialNumberRenovateLog.objects.filter(serial_number=params['serial_number'], type=2) \ .update(operation_count=1, count=0, updated_time=current_time) else: # 仅更新操作计数 SerialNumberRenovateLog.objects.filter(serial_number=params['serial_number'], type=1) \ .update(operation_count=operation_count + 1, count=count, updated_time=current_time) @classmethod def get_serial_number_renovation(cls, request, request_dict, response): """ 获取序列号翻新记录 @param request: 请求体 @param request_dict: 请求参数 @param response: 响应结果 @return: 返回查询结果 """ # 获取完整序列号 full_serial_number = request_dict.get("serialNo") # 检查序列号是否存在 if not full_serial_number: return response.json(444) # 返回错误代码444,表示序列号缺失 try: # 获取请求的IP地址和手机型号 ip_address = CommonService.get_ip_address(request) phone_model = request_dict.get("phoneModel") # 提取序列号前缀(前6位) serial_number_prefix = full_serial_number[:6] LOGGER.info('翻新入库检查: serial: {}, ip: {}'.format(full_serial_number, ip_address)) # 查询翻新记录和翻新前记录 renovation_logs = SerialNumberRenovateLog.objects.filter(serial_number=serial_number_prefix, type=2) renovation_before_logs = SerialNumberRenovateLog.objects.filter(serial_number=serial_number_prefix, type=1) # 获取当前时间戳 now_time = int(time.time()) # 准备插入或更新的参数 insert_or_update_params = { 'serial_number': serial_number_prefix, 'full_serial_number': full_serial_number, 'count': 1, 'operation_count': 1, 'phone_model': phone_model, 'created_time': now_time, 'updated_time': now_time, 'type': 2 } # 如果没有入库记录,则创建新记录 if not renovation_logs.exists(): SerialNumberRenovateLog.objects.create(**insert_or_update_params) else: # 如果存在入库记录,则更新操作计数和更新时间 renovation_logs.update(operation_count=renovation_logs[0].operation_count + 1, count=1, updated_time=now_time) # 获取翻新前记录的操作计数 count = renovation_before_logs[0].operation_count if renovation_before_logs.exists() else 0 # 根据操作计数确定颜色 color = 'E24A28' # 默认颜色为红色 if count == 0: color = 'E3C310' # 操作计数为0时为黄色 elif count == 1: color = '4EDF4E' # 操作计数为1时为绿色 # 准备返回的数据 res_data = { 'checkCount': count, 'color': color } return response.json(0, res_data) # 返回成功响应 except Exception as e: # 记录异常信息 LOGGER.error('翻新入库检查异常 serial: {}: errLine: {}, errMsg: {}'.format( full_serial_number, e.__traceback__.tb_lineno, repr(e))) return response.json(503) # 返回错误代码503,表示服务不可用