#!/usr/bin/env python3 # -*- coding: utf-8 -*- import hashlib import json import os import threading import time import boto3 import botocore import requests from django.core.paginator import Paginator from django.db import transaction from django.views.generic.base import View from packaging import version as pacVer from Ansjer.config import BASE_DIR, AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY from Ansjer.config import LOGGER, CONFIG_TEST, SERVER_DOMAIN, CONFIG_CN, CONFIG_INFO from Model.models import Equipment_Version, App_Info, AppSetModel, App_Colophon, Pc_Info, CountryModel, \ Device_Info, UidSetModel, Device_User, IPAddr, DeviceVersionInfo, iotdeviceInfoModel from Object.RedisObject import RedisObject from Object.ResponseObject import ResponseObject from Object.TokenObject import TokenObject from Object.UrlTokenObject import UrlTokenObject from Service.CommonService import CommonService class VersionManagement(View): def get(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') return self.validation(request.GET, request, operation) def post(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') return self.validation(request.POST, request, operation) def validation(self, request_dict, request, operation): language = request_dict.get('language', 'en') response = ResponseObject(language, 'pc') if operation == 'upLoadFile': return self.upLoadFile(request, request_dict, response) else: tko = TokenObject( request.META.get('HTTP_AUTHORIZATION'), returntpye='pc') if tko.code != 0: return response.json(tko.code) response.lang = tko.lang userID = tko.userID if operation == 'getEquipmentVersionList': return self.getEquipmentVersionList(request_dict, response) elif operation == 'editVersionInformation': return self.editVersionInformation(request_dict, response) elif operation == 'deleteEquipmentVersion': return self.deleteEquipmentVersion(request_dict, response) elif operation == 'getAppVersionList': return self.getAppVersionList(request_dict, response) elif operation == 'addOrEditAppInfo': return self.addOrEditAppInfo(request_dict, response) elif operation == 'deleteAppVersion': return self.deleteAppVersion(request_dict, response) elif operation == 'getAppSet': return self.getAppSet(request_dict, response) elif operation == 'editAppSet': return self.editAppSet(request_dict, response) elif operation == 'getAppRecordList': return self.getAppRecordList(request_dict, response) elif operation == 'getAppBundleIdList': return self.getAppBundleIdList(request_dict, response) elif operation == 'addOrEditAppRecord': return self.addOrEditAppRecord(request_dict, response) elif operation == 'deleteAppRecord': return self.deleteAppRecord(request_dict, response) elif operation == 'getPcInfoList': return self.getPcInfoList(request_dict, response) elif operation == 'editPcVersion': return self.editPcVersion(request_dict, response) elif operation == 'deletePcInfo': return self.deletePcInfo(request_dict, response) elif operation == 'getCountryList': return self.getCountryList(request_dict, response) elif operation == 'deviceAutoUpdate': return self.device_auto_update(userID, request_dict, response) else: return response.json(404) def getEquipmentVersionList(self, request_dict, response): mci = request_dict.get('mci', None) lang = request_dict.get('lang', None) version = request_dict.get('version', None) pageNo = request_dict.get('pageNo', None) pageSize = request_dict.get('pageSize', None) if not all([pageNo, pageSize]): return response.json(444) page = int(pageNo) line = int(pageSize) try: equipment_version_qs = Equipment_Version.objects.filter().order_by('-update_time') if mci: equipment_version_qs = equipment_version_qs.filter(mci=mci) if lang: equipment_version_qs = equipment_version_qs.filter(lang=lang) if version: equipment_version_qs = equipment_version_qs.filter(version__contains=version) total = equipment_version_qs.count() equipment_version_qs = equipment_version_qs.values()[(page - 1) * line:page * line] equipment_version_list = CommonService.qs_to_list(equipment_version_qs) for equipment_version in equipment_version_list: new_equipment_version = equipment_version['version'][1:] d_code = new_equipment_version.rsplit('.', 1)[1] software_ver = new_equipment_version.rsplit('.', 1)[0].replace('V', '') device_ver_info_qs = DeviceVersionInfo.objects.filter(d_code=d_code, software_ver=software_ver) if device_ver_info_qs.exists(): equipment_version['is_hav_dev_ver_info'] = 1 else: equipment_version['is_hav_dev_ver_info'] = 0 return response.json(0, {'list': equipment_version_list, 'total': total}) except Exception as e: print(e) return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) def upLoadFile(self, request, request_dict, response): file = request.FILES.get('file', None) mci = request_dict.get('mci', '') lang = request_dict.get('lang', '') ESN = request_dict.get('ESN', '') max_ver = request_dict.get('max_ver', '') channel = request_dict.get('channel', '') resolutionRatio = request_dict.get('resolutionRatio', '') Description = request_dict.get('Description', '') status = request_dict.get('status', 0) isPopup = request_dict.get('isPopup', 0) auto_update = request_dict.get('autoUpdate', 0) data_json = request_dict.get('dataJson', None) if not all([file, mci, lang, ESN, max_ver, channel, resolutionRatio]): return response.json(444) try: with transaction.atomic(): nowTime = CommonService.timestamp_to_str(timestamp=int(time.time())) channel = int(channel) resolutionRatio = int(resolutionRatio) status = int(status) isPopup = int(isPopup) if data_json: data_json = eval(data_json) # 文件名为设备版本,最后一个'.'的前面为软件版本,后面为设备规格名称 # V2.2.4.16E201252CA,软件版本:2.2.4,设备规格名称:16E201252CA # V1.7.2.36C11680X30411F000600000150001Z,软件版本:1.7.2,设备规格名称:36C11680X30411F000600000150001Z file_name = str(file) # 文件名 # .img和.tar.gz文件 file_type_index = file_name.find('.img') if file_type_index == -1: file_type_index = file_name.find('.tar') if file_type_index == -1: return response.json(903) version = file_name[:file_type_index] # 设备版本 version_index = version.rindex('.') softwareVersion = version[1:version_index] # 软件版本 code = version[version_index + 1:] # 设备规格名称 chipModelList2Code = code[:4] # 主芯片 type = code[8:10] # 设备机型 companyCode = code[-1:] # 公司代码 fileSize = file.size filePath = '/'.join(('static/otapack', mci, lang, file_name)) file_data = file.read() fileMd5 = hashlib.md5(file_data).hexdigest() data_dict = {'mci': mci, 'lang': lang, 'ESN': ESN, 'max_ver': max_ver, 'channel': channel, 'resolutionRatio': resolutionRatio, 'Description': Description, 'status': status, 'is_popup': isPopup, 'version': version, 'softwareVersion': softwareVersion, 'code': code, 'chipModelList2Code': chipModelList2Code, 'type': type, 'companyCode': companyCode, 'fileSize': fileSize, 'filePath': filePath, 'fileMd5': fileMd5, 'update_time': nowTime, 'data_json': data_json, 'auto_update': auto_update} # Equipment_Version表创建或更新数据 equipment_version_qs = Equipment_Version.objects.filter(code=code, lang=lang) if not equipment_version_qs.exists(): Equipment_Version.objects.create(eid=CommonService.getUserID(getUser=False, setOTAID=True), **data_dict) else: equipment_version_qs.update(**data_dict) # 上传文件到服务器 upload_path = '/'.join((BASE_DIR, 'static/otapack', mci, lang)).replace('\\', '/') + '/' if not os.path.exists(upload_path): # 上传目录不存在则创建 os.makedirs(upload_path) # 文件上传 full_name = upload_path + file_name if os.path.exists(full_name): # 删除同名文件 os.remove(full_name) with open(full_name, 'wb+') as write_file: for chunk in file.chunks(): write_file.write(chunk) LOGGER.info('versionManagement/upLoadFile成功上传{}'.format(file_name)) if not DeviceVersionInfo.objects.filter(d_code=code, software_ver=softwareVersion).exists(): return response.json(0, "该版本尚未添加设备版本信息") return response.json(0) except Exception as e: LOGGER.info( 'versionManagement/upLoadFile接口异常,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e))) return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) def editVersionInformation(self, request_dict, response): eid = request_dict.get('eid', None) ESN = request_dict.get('ESN', '') max_ver = request_dict.get('max_ver', '') status = request_dict.get('status', '') channel = request_dict.get('channel', '') resolutionRatio = request_dict.get('resolutionRatio', '') Description = request_dict.get('Description', '') is_popup = request_dict.get('is_popup', '') auto_update = request_dict.get('autoUpdate', 0) data_json = request_dict.get('dataJson', None) if not eid: return response.json(444) status = 1 if status == 'true' else 0 if data_json: data_json = eval(data_json) try: equipment_version_qs = Equipment_Version.objects.filter(eid=eid) if not equipment_version_qs.exists(): return response.json(173) data_dict = {'ESN': ESN, 'max_ver': max_ver, 'status': status, 'channel': channel, 'auto_update': auto_update, 'data_json': data_json, 'resolutionRatio': resolutionRatio, 'Description': Description, 'is_popup': is_popup} equipment_version_qs.update(**data_dict) return response.json(0) except Exception as e: print(e) return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) def deleteEquipmentVersion(self, request_dict, response): eid = request_dict.get('eid', None) if not eid: return response.json(444) try: equipment_version_qs = Equipment_Version.objects.filter(eid=eid) filePath = equipment_version_qs.values('filePath')[0]['filePath'] equipment_version_qs.delete() # 删除文件 full_name = '/'.join((BASE_DIR, filePath)).replace('\\', '/') if os.path.exists(full_name): os.remove(full_name) return response.json(0) except Exception as e: print(e) return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) def getAppVersionList(self, request_dict, response): app_type = request_dict.get('app_type', None) appName = request_dict.get('appName', None) version = request_dict.get('version', None) pageNo = request_dict.get('pageNo', None) pageSize = request_dict.get('pageSize', None) if not all([pageNo, pageSize]): return response.json(444) page = int(pageNo) line = int(pageSize) try: app_info_qs = App_Info.objects.filter() if app_type: if app_type == 'IOS': app_type = 1 elif app_type == '安卓': app_type = 2 else: app_type = 3 app_info_qs = app_info_qs.filter(app_type=app_type) if appName: app_info_qs = app_info_qs.filter(appName__contains=appName) if version: app_info_qs = app_info_qs.filter(version__contains=version) total = app_info_qs.count() app_info_qs = app_info_qs.values()[(page - 1) * line:page * line] app_info_list = CommonService.qs_to_list(app_info_qs) return response.json(0, {'list': app_info_list, 'total': total}) except Exception as e: print(e) return response.json(500, repr(e)) def addOrEditAppInfo(self, request_dict, response): id = request_dict.get('id', None) appName = request_dict.get('appName', '') appBundleId = request_dict.get('appBundleId', '') bundleVersion = request_dict.get('bundleVersion', '') newAppversion = request_dict.get('newAppversion', '') minAppversion = request_dict.get('minAppversion', '') content = request_dict.get('content', '') app_type = request_dict.get('app_type', '') downloadLink = request_dict.get('downloadLink', '') try: app_type = int(app_type) data_dict = {'appName': appName, 'appBundleId': appBundleId, 'bundleVersion': bundleVersion, 'newAppversion': newAppversion, 'minAppversion': minAppversion, 'content': content, 'app_type': app_type, 'downloadLink': downloadLink} if not id: # 添加 App_Info.objects.create(**data_dict) else: # 编辑 app_info_qs = App_Info.objects.filter(id=id) if not app_info_qs.exists(): return response.json(173) app_info_qs.update(**data_dict) return response.json(0) except Exception as e: print(e) return response.json(500, repr(e)) def deleteAppVersion(self, request_dict, response): appBundleId = request_dict.get('appBundleId', None) if not appBundleId: return response.json(444) try: App_Info.objects.filter(appBundleId=appBundleId).delete() AppSetModel.objects.filter(appBundleId=appBundleId).delete() return response.json(0) except Exception as e: print(e) return response.json(500, repr(e)) def getAppSet(self, request_dict, response): appBundleId = request_dict.get('appBundleId', None) if not appBundleId: return response.json(444) try: app_set_qs = AppSetModel.objects.filter(appBundleId=appBundleId).values('content') if app_set_qs.exists(): content = app_set_qs[0]['content'] return response.json(0, {'content': content}) else: nowTime = int(time.time()) AppSetModel.objects.create( appBundleId=appBundleId, addTime=nowTime, updTime=nowTime ) return response.json(0) except Exception as e: print(e) return response.json(500, repr(e)) def editAppSet(self, request_dict, response): appBundleId = request_dict.get('appBundleId', None) content = request_dict.get('content', None) if not all([appBundleId, content]): return response.json(444) try: AppSetModel.objects.filter(appBundleId=appBundleId).update(content=content) return response.json(0) except Exception as e: print(e) return response.json(500, repr(e)) def getAppRecordList_1(self, request_dict, response): app_type = request_dict.get('app_type', 'IOS') pageNo = request_dict.get('pageNo', None) pageSize = request_dict.get('pageSize', None) if not all([pageNo, pageSize]): return response.json(444) page = int(pageNo) line = int(pageSize) try: if app_type: if app_type == 'IOS': app_type = 1 elif app_type == '安卓': app_type = 2 else: app_type = 3 app_colophon_qs = App_Colophon.objects.filter(app_id__app_type=app_type).order_by('app_id').values_list('app_id__appBundleId', flat=True).distinct() if not app_colophon_qs.exists(): return response.json(173) total = app_colophon_qs.count() app_colophon_list = list(app_colophon_qs[(page - 1) * line:page * line]) app_info_qs = App_Colophon.objects.filter(app_id__appBundleId__in=app_colophon_list).\ values("id", "lang", "newApp_version", "content","version_time", "app_id__appBundleId", "app_id__appName", "app_id__app_type") app_info_list = list(app_info_qs) data_dict = {} # 组装数据 for app_info in app_info_list: for app_colophon in app_colophon_list: if app_colophon not in data_dict.keys(): data_dict[app_colophon] = [] if app_colophon == app_info['app_id__appBundleId']: data_dict[app_colophon].append(app_info) for k, v in enumerate(data_dict): new = sorted(data_dict[v], key=lambda x: x['id'], reverse=True) data_dict[v] = new res = { 'data_dict': data_dict, 'total': total } return response.json(0, res) except Exception as e: print(e) return response.json(500, repr(e)) def getAppRecordList(self, request_dict, response): app_type = request_dict.get('appType', 'IOS') queryVersion = request_dict.get('queryVersion', None) queryAppBundleId = request_dict.get('queryAppBundleId', None) pageNo = request_dict.get('pageNo', None) pageSize = request_dict.get('pageSize', None) if not all([pageNo, pageSize]): return response.json(444) page = int(pageNo) line = int(pageSize) try: if app_type == 'IOS': app_type = 1 elif app_type == '安卓': app_type = 2 else: app_type = 3 app_colophon_qs = App_Colophon.objects.filter(app_id__app_type=app_type).order_by('app_id').values_list('app_id__appBundleId', flat=True).distinct() if not app_colophon_qs.exists(): return response.json(173) total = app_colophon_qs.count() app_colophon_list = list(app_colophon_qs[(page - 1) * line:page * line]) app_info_qs = App_Colophon.objects.filter(app_id__appBundleId__in=app_colophon_list).\ values("id", "lang", "newApp_version", "content", "version_time", "app_id__appBundleId", "app_id__appName", "app_id__app_type") app_info_list = list(app_info_qs) app_record_list = [] # 响应的app record数据 appBundleId_list = [] # 记录已添加过的appBundleId # 组装数据 for app_info in app_info_list: version = app_info['lang'] + app_info['newApp_version'] if app_info['app_id__appBundleId'] not in appBundleId_list: appBundleId_list.append(app_info['app_id__appBundleId']) newApp_version_list = [[app_info['lang'], app_info['newApp_version']]] app_record_dict = { 'app_id__appBundleId': app_info['app_id__appBundleId'], 'app_id__appName': app_info['app_id__appName'], 'app_id__app_type': app_info['app_id__app_type'], 'version': version, 'newApp_version_list': newApp_version_list, 'id': app_info['id'], 'content': app_info['content'], 'version_time': time.strftime("%Y-%m-%d", time.localtime(app_info['version_time'])), } if queryVersion and queryVersion == version and queryAppBundleId == app_info['app_id__appBundleId']: app_record_dict['id'] = app_info['id'] app_record_dict['content'] = app_info['content'] app_record_dict['version_time'] = time.strftime("%Y-%m-%d", time.localtime(app_info['version_time'])) app_record_list.append(app_record_dict) else: index = appBundleId_list.index(app_info['app_id__appBundleId']) newApp_version_list = [app_info['lang'], app_info['newApp_version']] if queryVersion and queryVersion == version and queryAppBundleId == app_info['app_id__appBundleId']: # app_record_list里对应字典插入值 app_record_list[index]['id'] = app_info['id'] app_record_list[index]['content'] = app_info['content'] app_record_list[index]['version_time'] = time.strftime("%Y-%m-%d", time.localtime(app_info['version_time'])) app_record_list[index]['version'] = version app_record_list[index]['newApp_version_list'].insert(0, newApp_version_list) else: app_record_list[index]['newApp_version_list'].append(newApp_version_list) res = { 'app_record_list': app_record_list, 'total': total } return response.json(0, res) except Exception as e: print(e) return response.json(500, repr(e)) def getAppBundleIdList(self, request_dict, response): print('request_dict:', request_dict) app_type = request_dict.get('appType', 'IOS') try: if app_type == 'IOS': app_type = 1 elif app_type == '安卓': app_type = 2 else: app_type = 3 app_info_qs = App_Info.objects.filter(app_type=app_type).values('id', 'appBundleId') appBundleIdList = list(app_info_qs) return response.json(0, {'appBundleIdList': appBundleIdList}) except Exception as e: print(e) return response.json(500, repr(e)) def addOrEditAppRecord(self, request_dict, response): print('request_dict:', request_dict) appBundleId = request_dict.get('app_id__appBundleId', None) newApp_version = request_dict.get('version', None) version_time = request_dict.get('version_time', None) cn_content = request_dict.get('cnContent', None) en_content = request_dict.get('enContent', None) content = request_dict.get('content', None) app_colophon_id = request_dict.get('id', None) if not all([appBundleId, newApp_version, version_time]): return response.json(444) try: version_time = int(time.mktime(time.strptime(version_time, '%Y-%m-%d'))) # 字符串转时间戳 if app_colophon_id: # 编辑 # 编辑获取的版本信息前两位为语言 lang = newApp_version[:2] newApp_version = newApp_version[2:] App_Colophon.objects.filter(id=app_colophon_id).update(lang=lang, newApp_version=newApp_version, content=content, version_time=version_time) else: # 添加 app_info_qs = App_Info.objects.filter(appBundleId=appBundleId).values('id') if not app_info_qs.exists(): return response.json(173) data_dict = { 'app_id_id': app_info_qs[0]['id'], 'newApp_version': newApp_version, 'version_time': version_time, 'lang': 'cn', 'content': cn_content, } with transaction.atomic(): # 创建中文内容数据 App_Colophon.objects.create(**data_dict) # 创建英文内容数据 data_dict['lang'] = 'en' data_dict['content'] = en_content App_Colophon.objects.create(**data_dict) return response.json(0) except Exception as e: print(e) return response.json(500, repr(e)) def deleteAppRecord(self, request_dict, response): print('request_dict:', request_dict) app_colophon_id = request_dict.get('app_colophon_id', None) try: if not app_colophon_id: return response.json(444) app_colophon_qs = App_Colophon.objects.filter(id=app_colophon_id) if not app_colophon_qs.exists(): return response.json(173) app_colophon_qs.delete() return response.json(0) except Exception as e: print(e) return response.json(500, repr(e)) def getPcInfoList(self, request_dict, response): print('request_dict:', request_dict) pc_name = request_dict.get('pcName', None) pageNo = request_dict.get('pageNo', None) pageSize = request_dict.get('pageSize', None) if not all([pageNo, pageSize]): return response.json(444) page = int(pageNo) line = int(pageSize) try: pc_info_qs = Pc_Info.objects.filter() if pc_name: pc_info_qs = Pc_Info.objects.filter(pc_name__contains=pc_name) if not pc_info_qs.exists(): return response.json(173) total = pc_info_qs.count() pc_info_qs = pc_info_qs.values()[(page - 1) * line:page * line] pc_info_list = CommonService.qs_to_list(pc_info_qs) return response.json(0, {'list': pc_info_list, 'total': total}) except Exception as e: print(e) return response.json(500, repr(e)) def editPcVersion(self, request_dict, response): pc_info_id = request_dict.get('id', None) pc_name = request_dict.get('pc_name', '') bundle_version = request_dict.get('bundle_version', '') pc_version = request_dict.get('pc_version', '') pc_test = request_dict.get('pc_test', '') lang = request_dict.get('lang', '') file_type = request_dict.get('file_type', '') package = request_dict.get('package', '') explain = request_dict.get('explain', '') is_update = request_dict.get('is_update', '') is_open = request_dict.get('is_open', '') content = request_dict.get('content', '') authority = request_dict.get('authority', '') download_link = request_dict.get('download_link', '') if not pc_info_id: return response.json(444) try: pc_info_qs = Pc_Info.objects.filter(id=pc_info_id) if not pc_info_qs.exists(): return response.json(173) data_dict = { 'pc_name': pc_name, 'bundle_version': bundle_version, 'pc_version': pc_version, 'pc_test': pc_test, 'lang': lang, 'file_type': file_type, 'package': package, 'explain': explain, 'is_update': is_update, 'is_open': is_open, 'content': content, 'authority': authority, 'download_link': download_link, } pc_info_qs.update(**data_dict) return response.json(0) except Exception as e: print(e) return response.json(500, repr(e)) def deletePcInfo(self, request_dict, response): print('request_dict:', request_dict) pc_info_id = request_dict.get('id', None) try: if not pc_info_id: return response.json(444) pc_info_qs = Pc_Info.objects.filter(id=pc_info_id) if not pc_info_qs.exists(): return response.json(173) # 删除存储桶的文件 Key = pc_info_qs.values('download_link')[0]['download_link'] pc_info_qs.delete() aws_s3_client = boto3.client( 's3', region_name='cn-northwest-1', aws_access_key_id=AWS_ACCESS_KEY_ID[0], aws_secret_access_key=AWS_SECRET_ACCESS_KEY[0], config=botocore.client.Config(signature_version='s3v4'), ) try: aws_s3_client.delete_object(Bucket='pc-package', Key=Key) finally: return response.json(0) except Exception as e: print(e) return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) def getCountryList(self, request_dict, response): try: country_qs = CountryModel.objects.all().values_list('country_name', flat=True) return response.json(0, list(country_qs)) except Exception as e: print(e) return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) @classmethod def device_auto_update(cls, user_id, request_dict, response): verId = request_dict.get('verId', None) if not verId: return response.json(444) LOGGER.info(f'版本ID:{verId},操作用户{user_id}') version_qs = Equipment_Version.objects.filter(eid=verId, auto_update=True, status=1) if not version_qs.exists(): return response.json(173) agent_thread = threading.Thread( target=VersionManagement().device_async_update, args=(version_qs.first(),) # 取单个对象 ) agent_thread.start() return response.json(0) @staticmethod def device_async_update(version_qs): code = version_qs.code version = version_qs.softwareVersion ver_data = version_qs.data_json uid_set_qs = UidSetModel.objects.filter(ucode=code).exclude(version=version) \ .values('uid', 'ip', 'version') LOGGER.info(f'通知设备自动升级查询符合数量{uid_set_qs.count()}') if not uid_set_qs.exists(): return try: # 创建分页器,每页100条数据 paginator = Paginator(uid_set_qs, 100) # 遍历每一页数据 for page_num in range(1, paginator.num_pages + 1): # 获取当前页的数据 page_data = paginator.page(page_num) # 遍历当前页的每一条数据 for data in page_data: uid = data['uid'] result = VersionManagement.check_version_auto_update(uid, '', ver_data) if not result: LOGGER.info(f'{uid}判断是否符合自动升级:{result}') continue serial_number = CommonService.get_serial_number_by_uid(uid) now_ver = data['version'] # 将版本号带V字符换成空 now_ver_clean = now_ver.replace("V", "") # 采用packaging模块中的Version类进行版本号比较 now_version = pacVer.Version(now_ver_clean) new_version = pacVer.Version(version) if new_version > now_version: # 新版本大于当前设备版本执行MQTT发送升级 VersionManagement.send_auto_update_url(version_qs, uid, serial_number, code, now_ver) LOGGER.info(f'uid={uid},ucode={code},当前版本{now_ver},最新版本{version}') except Exception as e: LOGGER.error(f'异步自动升级异常:ucode:{code},error:{repr(e)}') @classmethod def check_version_auto_update(cls, uid, user_id, ver_data): """ :param uid: 设备UID :param user_id: 用户id :param ver_data: 版本指定升级数据 :return: True | False """ try: if not ver_data: # 过滤值默认当前版本所有设备自动升级 return True if uid and ver_data['uid_list']: # 当前版本指定UID return uid in ver_data['uid_list'] if user_id and ver_data['country_list'] and CONFIG_INFO not in [CONFIG_CN, CONFIG_TEST]: # 当前版本指定用户国家 user_qs = Device_User.objects.filter(userID=user_id).values('region_country') if not user_qs.exists(): # 用户不存在不升级 return False if user_qs[0]['region_country'] == 0: # 用户未选择国家 不进行升级提示 return False return user_qs[0]['region_country'] in ver_data['country_list'] if CONFIG_INFO in [CONFIG_CN, CONFIG_TEST] and ver_data['addr']: # 中国区可指定城市 city_list = ver_data['addr']['city_list'] if not city_list: return True uid_set_qs = UidSetModel.objects.filter(uid=uid).values('ip') if not uid_set_qs: return False ip_qs = IPAddr.objects.filter(ip=uid_set_qs[0]['ip']).values('city').order_by('-id') if not ip_qs: return False return ip_qs[0]['city'] in city_list return False except Exception as e: LOGGER.error(f'检测是否符合自动升级{repr(e)}') return False @staticmethod def send_auto_update_url(version_qs, uid, serial_number, code, now_ver): """ 发送自动升级URL @param version_qs: 最新版本querySet @param uid: 设备UID @param serial_number: 设备序列号 @param code: 设备规格码 @param now_ver: 设备当前版本好 @return: """ try: file_path = version_qs.filePath version = version_qs.version mci = version_qs.mci ver = version_qs.softwareVersion max_ver = version_qs.max_ver if ver <= max_ver: user_qs = Device_Info.objects.filter(UID=uid, isShare=False).values('userID_id') if not user_qs.exists(): LOGGER.info(f'{uid}未添加该设备返回不发MQTT') return False user_id = user_qs[0]['userID_id'] # 创建url的token param_url = "ansjer/" + CommonService.RandomStr(6) + "/" + file_path data = {'Url': param_url, 'user_id': user_id, 'uid': uid, 'serial_number': serial_number, 'old_version': "V" + now_ver + "." + code, 'new_version': version, 'mci': mci} device_info_value = json.dumps(data) expire = 600 str_uuid = str(user_id) if serial_number and serial_number != 'null': str_uuid += serial_number elif uid and uid != 'null': str_uuid += uid str_uuid += now_ver device_info_key = 'ASJ:SERVER:VERSION:{}'.format(str_uuid) LOGGER.info('缓存key={}'.format(device_info_key)) redisObject = RedisObject() redisObject.set_data(device_info_key, device_info_value, expire) url_tko = UrlTokenObject() file_path = url_tko.generate(data={'uid': str_uuid}) url = SERVER_DOMAIN + 'dlotapack/' + file_path # 主题名称 topic_name = f'ansjer/generic/{serial_number}' # 发布消息内容 msg = f'{{"commandCode":1,"data":{{"url":"{url}"}}}}' result = VersionManagement.publish_to_aws_iot_mqtt(serial_number, topic_name, msg) LOGGER.info(f'uid={uid}发送数据msg={msg},发送MQTT结果={result}') return True return False except Exception as e: LOGGER.error(f'自动升级异常{repr(e)}') return False @staticmethod def publish_to_aws_iot_mqtt(identification_code, topic_name, msg, qos=1): """ 发布消息到AWS IoT MQTT(仅尝试一次,无重试) @param identification_code: 标识码 @param topic_name: 主题名 @param msg: 消息内容(JSON字符串) @param qos: QoS等级(0或1) @return: 成功返回True,失败返回False """ # 入参校验 if not isinstance(identification_code, str) or not identification_code.strip(): LOGGER.error("标识码为空或无效") return False if not isinstance(topic_name, str) or not topic_name.strip(): LOGGER.error("主题名为空或无效") return False if qos not in (0, 1): LOGGER.warning("QoS不合法,默认使用1") qos = 1 # 生成ThingName thing_name = f'LC_{identification_code}' if identification_code.endswith( '11L') else f'Ansjer_Device_{identification_code}' try: # 查询设备信息 iot_device = iotdeviceInfoModel.objects.filter(thing_name=thing_name).values('endpoint', 'token_iot_number').first() if not iot_device: LOGGER.error(f"未查询到设备信息:{thing_name}") return False endpoint = iot_device.get('endpoint', '').strip() token = iot_device.get('token_iot_number', '').strip() if not endpoint or not token: LOGGER.error("设备信息不完整(endpoint或token缺失)") return False # 构造请求 encoded_topic = requests.utils.quote(topic_name, safe='') request_url = f"https://{endpoint}/topics/{encoded_topic}?qos={qos}" signature = CommonService.rsa_sign(token) if not signature: LOGGER.error("Token签名失败") return False headers = { 'x-amz-customauthorizer-name': 'Ansjer_Iot_Auth', 'Token': token, 'x-amz-customauthorizer-signature': signature } # 发送请求(仅一次) response = requests.post( url=request_url, headers=headers, data=msg, timeout=10 # 10秒超时 ) # 结果判断 if response.status_code == 200: res_json = response.json() if res_json.get('message') == 'OK': LOGGER.info(f"发布成功:{topic_name}") return True LOGGER.error(f"响应异常:{res_json}") else: LOGGER.error(f"请求失败,状态码:{response.status_code}") return False except Exception as e: LOGGER.error(f"发布失败:{str(e)}") return False