import hashlib import logging import shutil import time import traceback import os from urllib import request, parse import requests from boto3 import Session from django.http import HttpResponse from django.views.generic.base import View from Controller.PctestController import TokenObject1 from Model.models import Pc_Info, PctestlogModel, AVSSVersion from Object.ResponseObject import ResponseObject from Object.TokenObject import TokenObject from Service.CommonService import CommonService from Ansjer.config import AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_ARN, BASE_DIR import boto3 import botocore from botocore import client from wsgiref.util import FileWrapper from zlib import crc32 from typing import Union class PcInfo(View): def dispatch(self, requset, *args, **kwargs): return super(PcInfo, self).dispatch(requset, *args, **kwargs) def get(self, request, *args, **kwargs): operation = kwargs.get('operation') request.encoding = 'utf-8' return self.validation(request.GET, request, operation) def post(self, request, *args, **kwargs): operation = kwargs.get('operation') request.encoding = 'utf-8' return self.validation(request.POST, request, operation) def validation(self, request_dict, request, operation): response = ResponseObject() if not operation: return response.json(444, 'operation') else: if operation == 'query': # pc端调用查询 return self.query(request_dict, response) elif operation == 's3addandupload': return self.s3addandupload(request_dict, response, request) elif operation == 's3addanduploadlog': #上传日志文件 return self.s3addanduploadlog(request_dict, response, request) elif operation == 's3download': return self.s3download(request_dict, response) elif operation == 's3downloadlog': #下载日志文件 return self.s3downloadlog(request_dict, response) elif operation == 's3delete': return self.s3delete(request_dict, response) elif operation == 'edit': return self.edit(request_dict, response) elif operation == 'getnewversion': # 获取当前软件的最新版本 return self.getnewversion(request_dict, response) elif operation == 'getnewversionV2': # 获取当前软件的最新版本 return self.getnewversionV2(request_dict, response) elif operation == 'queryall': # 后台查询 return self.queryall(request_dict, response) elif operation == 'addandupload': # 上传到服务器 return self.addandupload(request_dict, response, request) elif operation == 'download': # 服务器下载 return self.download(request_dict, response) elif operation == 'delete': return self.delete(request_dict, response) elif operation == 'AVSS/check-version': return self.check_version(request_dict, response) else: return response.json(414) def getnewversion(self, request_dict, response): pc_name = request_dict.get('pc_name', None) bundle_version = request_dict.get('bundle_version', None) pc_version = request_dict.get('pc_version', None) pc_test = request_dict.get('pc_test', None) param_flag = CommonService.get_param_flag( data=[pc_name, bundle_version, pc_version, pc_test]) if param_flag is not True: return response.json(444) file = Pc_Info.objects.filter(pc_name=pc_name, bundle_version=bundle_version, pc_test=pc_test) if not file: return response.json(173) app_list = Pc_Info.objects.filter(pc_name=pc_name, bundle_version=bundle_version, pc_test=pc_test) all_version = app_list.values('pc_version') v = [] for i in all_version: v.append(i['pc_version']) new_version = max(v) if pc_version == new_version: return response.json(10045) else: path = app_list.filter(pc_version=new_version).values('download_link')[0]['download_link'] print('path', path) aws_s3_guonei = boto3.client( 's3', aws_access_key_id=AWS_ACCESS_KEY_ID[0], aws_secret_access_key=AWS_SECRET_ACCESS_KEY[0], config=botocore.client.Config(signature_version='s3v4'), region_name='cn-northwest-1') response_url = aws_s3_guonei.generate_presigned_url( ClientMethod='get_object', Params={'Bucket': 'pc-package', 'Key': path}, ExpiresIn=3600) res = {'pc_name': pc_name, 'new_version': new_version, 'path': path, 'response_url': response_url} return response.json(0, res) def getnewversionV2(self, request_dict, response): pc_name = request_dict.get('pc_name', None) bundle_version = request_dict.get('bundle_version', None) pc_version = request_dict.get('pc_version', None) pc_test = request_dict.get('pc_test', None) param_flag = CommonService.get_param_flag( data=[pc_name, bundle_version, pc_version, pc_test]) if param_flag is not True: return response.json(444) file = Pc_Info.objects.filter(pc_name=pc_name, bundle_version=bundle_version, pc_test=pc_test) if not file.exists(): return response.json(173) aws_s3_guonei = boto3.client( 's3', aws_access_key_id=AWS_ACCESS_KEY_ID[0], aws_secret_access_key=AWS_SECRET_ACCESS_KEY[0], config=botocore.client.Config(signature_version='s3v4'), region_name='cn-northwest-1') version_list = [] for i in range(2): if pc_test == '1': # 测试版返回所有版本中的最新版本 app_list = Pc_Info.objects.filter(pc_name=pc_name, bundle_version=bundle_version, is_update=i) else: app_list = Pc_Info.objects.filter(pc_name=pc_name, bundle_version=bundle_version, pc_test=pc_test, is_update=i) # 当前软件不存在强制更新(is_update=1)的版本返回空 if not app_list.exists(): res = {'pc_name': '', 'new_version': '', 'is_update': i, 'explain': '', 'path': '', 'response_url': ''} version_list.append(res) continue all_version = app_list.values('pc_version') v = [] for i in all_version: v.append(i['pc_version']) new_version = max(v) # 当前软件版本为最新时返回空 if pc_version >= new_version: res = {'pc_name': '', 'new_version': '', 'is_update': app_list.filter(pc_version=new_version).values('is_update')[0]['is_update'], 'explain': '', 'path': '', 'response_url': ''} version_list.append(res) continue path = app_list.filter(pc_version=new_version).values('download_link')[0]['download_link'] print('path', path) response_url = aws_s3_guonei.generate_presigned_url( ClientMethod='get_object', Params={'Bucket': 'pc-package', 'Key': path}, ExpiresIn=3600) res = {'pc_name': pc_name, 'new_version': new_version, 'is_update': app_list.filter(pc_version=new_version).values('is_update')[0]['is_update'], 'explain': app_list.filter(pc_version=new_version).values('explain')[0]['explain'], 'path': path, 'response_url': response_url} version_list.append(res) return response.json(0, version_list) def query(self, request_dict, response): pc_name = request_dict.get('pc_name', None) bundle_version = request_dict.get('bundle_version', None) pc_version = request_dict.get('pc_version', None) pc_test = request_dict.get('pc_test', None) package = request_dict.get('package', None) file_type = request_dict.get('file_type', None) # 根据传的参数筛选,没传时返回全部 queryset = Pc_Info.objects.all() if package: queryset = Pc_Info.objects.filter(package=package) if file_type: queryset = Pc_Info.objects.filter(file_type=file_type) if pc_name and file_type: queryset = Pc_Info.objects.filter(pc_name=pc_name, file_type=file_type) if file_type and package: queryset = Pc_Info.objects.filter(file_type=file_type, package=package) if pc_name and bundle_version and pc_version and pc_test: queryset = Pc_Info.objects.filter(pc_name=pc_name, bundle_version=bundle_version, pc_version=pc_version, pc_test=pc_test) count = queryset.count() res = queryset send_json = CommonService.qs_to_dict(res) send_json['count'] = count return response.json(0, send_json) def s3addandupload(self, request_dict, response, request): logger = logging.getLogger('info') logger.info('s3方式上传参数:') logger.info(request_dict) token = request_dict.get('token', None) tko = TokenObject(token) response.lang = tko.lang pc_name = request_dict.get('pc_name', None) bundle_version = request_dict.get('bundle_version', None) pc_version = request_dict.get('pc_version', None) pc_test = request_dict.get('pc_test', None) lang = request_dict.get('lang', None) file_name = request_dict.get('file_name', None) file_type = request_dict.get('file_type', None) package = request_dict.get('package', None) explain = request_dict.get('explain', '') content = request_dict.get('content', '') authority = request_dict.get('authority', '') is_update = request_dict.get('is_update', None) is_open = request_dict.get('is_open', None) # logger.info('文件名字:') # logger.info(file_name) param_flag = CommonService.get_param_flag( data=[pc_name, bundle_version, pc_version, pc_test, lang, file_name, file_type, package, is_update, is_open]) if param_flag is not True: return response.json(444) else: file = Pc_Info.objects.filter(pc_name=pc_name, bundle_version=bundle_version, pc_version=pc_version, pc_test=pc_test) if file: return response.json(174) try: logger.info('开始上传') # 把安装包上传到s3 aws_s3_guonei = boto3.client( 's3', aws_access_key_id=AWS_ACCESS_KEY_ID[0], aws_secret_access_key=AWS_SECRET_ACCESS_KEY[0], config=botocore.client.Config(signature_version='s3v4'), region_name='cn-northwest-1' ) download_link = '{pc_name}/{pc_version}_{bundle_version}_{pc_test}_{file_name}'.format( pc_name=pc_name, pc_version=pc_version, bundle_version=bundle_version, pc_test=pc_test, file_name=file_name) response_url = aws_s3_guonei.generate_presigned_url( ClientMethod='put_object', Params={ 'Bucket': 'pc-package', 'Key': download_link }, ExpiresIn=3600 ) # 通过上传签名url对文件进行上传 # requests.put(response_url, data=file_name) # logger.info('上传完成') add_time = time.time() create_dict = { 'pc_name': pc_name, 'bundle_version': bundle_version, 'pc_version': pc_version, 'pc_test': pc_test, 'lang': lang, 'download_link': download_link, 'add_time': add_time, 'update_time': add_time, 'file_type': file_type, 'package': package, 'explain': explain, 'content': content, 'authority': authority, 'is_update': is_update, 'is_open': is_open } pc_Info = Pc_Info(**create_dict) pc_Info.save() except Exception: errorInfo = traceback.format_exc() print(errorInfo) return response.json(500, {'details': errorInfo}) else: if pc_Info.id: res = {'id': pc_Info.id, 'pc_name': pc_Info.pc_name, 'bundle_version': pc_Info.bundle_version, 'pc_version': pc_Info.pc_version, 'pc_test': pc_Info.pc_test, 'download_link': pc_Info.download_link, 'lang': pc_Info.lang, 'add_time': pc_Info.add_time, 'update_time': pc_Info.update_time, 'file_type': pc_Info.file_type, 'package': pc_Info.package, 'explain': pc_Info.explain, 'content': pc_Info.content, 'authority': pc_Info.authority, 'is_update': pc_Info.is_update, 'is_open': pc_Info.is_open, 'upload_url': response_url } return response.json(0, res) else: return response.json(500) def s3addanduploadlog(self, request_dict, response, request): logger = logging.getLogger('info') logger.info('s3方式上传参数:') logger.info(request_dict) token = request_dict.get('token', None) tko = TokenObject1(token) response.lang = tko.lang if tko.code != 0: return response.json(tko.code) # 获取访问者的id和岗位 userID = tko.id datetime = request_dict.get('datetime', None) file_name = request_dict.get('file_name', None) pathtype = request_dict.get('pathtype', 'log') # logger.info('文件名字:') # logger.info(file_name) param_flag = CommonService.get_param_flag( data=[datetime, file_name, pathtype]) if param_flag is not True: return response.json(444) logger.info('开始上传') # 把安装包上传到s3 aws_s3_guonei = boto3.client( 's3', aws_access_key_id=AWS_ACCESS_KEY_ID[0], aws_secret_access_key=AWS_SECRET_ACCESS_KEY[0], config=botocore.client.Config(signature_version='s3v4'), region_name='cn-northwest-1' ) download_link = '{pathtype}/{datetime}/{file_name}'.format( pathtype=pathtype, datetime=datetime, file_name=file_name) response_url = aws_s3_guonei.generate_presigned_url( ClientMethod='put_object', Params={ 'Bucket': 'pc-package', 'Key': download_link }, ExpiresIn=3600 ) add_time = time.time() PctestlogModel.objects.create(user_id=userID, content=download_link, addtime=add_time) return response.json(0, {'upload_url': response_url, 'datetime': datetime}) def s3download(self, request_dict, response): pc_name = request_dict.get('pc_name', None) bundle_version = request_dict.get('bundle_version', None) pc_version = request_dict.get('pc_version', None) pc_test = request_dict.get('pc_test', None) param_flag = CommonService.get_param_flag( data=[pc_name, bundle_version, pc_version, pc_test]) if param_flag is not True: return response.json(444) path = Pc_Info.objects.filter(pc_name=pc_name, bundle_version=bundle_version, pc_version=pc_version, pc_test=pc_test).values('download_link') if not path: return response.json(173) path = path[0]['download_link'] aws_s3_guonei = boto3.client( 's3', aws_access_key_id=AWS_ACCESS_KEY_ID[0], aws_secret_access_key=AWS_SECRET_ACCESS_KEY[0], config=botocore.client.Config(signature_version='s3v4'), region_name='cn-northwest-1' ) response_url = aws_s3_guonei.generate_presigned_url( ClientMethod='get_object', Params={ 'Bucket': 'pc-package', 'Key': path }, ExpiresIn=3600 ) res = {'path': path, 'response_url': response_url } return response.json(0, res) def s3downloadlog(self, request_dict, response): id = request_dict.get('id', None) pclog_qs = PctestlogModel.objects.filter(id=id) if not pclog_qs: return response.json(173) path = pclog_qs[0].content aws_s3_guonei = boto3.client( 's3', aws_access_key_id=AWS_ACCESS_KEY_ID[0], aws_secret_access_key=AWS_SECRET_ACCESS_KEY[0], config=botocore.client.Config(signature_version='s3v4'), region_name='cn-northwest-1' ) response_url = aws_s3_guonei.generate_presigned_url( ClientMethod='get_object', Params={ 'Bucket': 'pc-package', 'Key': path }, ExpiresIn=3600 ) res = {'path': path, 'response_url': response_url } return response.json(0, res) def edit(self, request_dict, response): id = request_dict.get('id', None) explain = request_dict.get('explain', None) content = request_dict.get('content', None) authority = request_dict.get('authority', None) is_open = request_dict.get('is_open', None) param_flag = CommonService.get_param_flag( data=[id, explain, is_open]) if param_flag is not True: return response.json(444) file = Pc_Info.objects.filter(id=id) if not file.exists(): return response.json(173) file.update(explain=explain, content=content, authority=authority, is_open=is_open) return response.json(0) def s3delete(self, request_dict, response): global file token = request_dict.get('token', None) tko = TokenObject(token) response.lang = tko.lang if tko.code != 0: return response.json(tko.code) userID = tko.userID if not userID: return response.json(104) id = request_dict.get('id', None) package = request_dict.get('package', None) if id and package: return response.json(444) elif id and package is None: file = Pc_Info.objects.filter(id=id) elif package and id is None: try: package = int(package) file = Pc_Info.objects.filter(package=package) except Exception as e: return response.json(176) if not file.exists(): return response.json(173) try: # 删除s3和数据库里的相应数据 file_path = file[0].download_link aws_s3_guonei = boto3.client( 's3', aws_access_key_id=AWS_ACCESS_KEY_ID[0], aws_secret_access_key=AWS_SECRET_ACCESS_KEY[0], config=botocore.client.Config(signature_version='s3v4'), region_name='cn-northwest-1' ) try: # 获取存储桶的对象,判断对象是否上传成功 obj = aws_s3_guonei.get_object(Bucket='pc-package', Key=file_path) except Exception as e: print(e) file.delete() else: if obj: aws_s3_guonei.delete_object(Bucket='pc-package', Key=file_path) file.delete() except Exception as e: return response.json(176, repr(e)) else: return response.json(0) def queryall(self, request_dict, response): token = request_dict.get('token', None) tko = TokenObject(token) response.lang = tko.lang if tko.code != 0: return response.json(tko.code) userID = tko.userID if not userID: return response.json(104) page = int(request_dict.get('page', None)) line = int(request_dict.get('line', None)) if page is None or line is None: return response.json(444, 'page,line') queryset = Pc_Info.objects.all() if queryset.exists(): count = queryset.count() res = queryset[(page - 1) * line:page * line] send_json = CommonService.qs_to_dict(res) send_json['count'] = count return response.json(0, send_json) else: return response.json(173) def addandupload(self, request_dict, response, request): token = request_dict.get('token', None) tko = TokenObject(token) response.lang = tko.lang if tko.code != 0: return response.json(tko.code) userID = tko.userID if not userID: return response.json(104) pc_name = request_dict.get('pc_name', None) bundle_version = request_dict.get('bundle_version', None) pc_version = request_dict.get('pc_version', None) pc_test = request_dict.get('pc_test', None) lang = request_dict.get('lang', None) file_name = request.FILES.get('file_name', None) param_flag = CommonService.get_param_flag( data=[pc_name, bundle_version, pc_version, pc_test, lang, file_name]) if param_flag is not True: return response.json(444) else: file = Pc_Info.objects.filter(pc_name=pc_name, bundle_version=bundle_version, pc_version=pc_version, pc_test=pc_test) if file: return response.json(174) try: # 安装包上传到服务器本地 file_path = 'static/pc/' + pc_name if not os.path.exists(file_path): os.makedirs(os.path.join(BASE_DIR, file_path)) a = os.path.splitext(str(file_name))[-1] if not a: return response.json(444, "文件无后缀") name = pc_version + '_' + bundle_version + '_' + pc_test + str(a) file_path = file_path + '/' + str(name) upload_path = os.path.join(BASE_DIR, file_path) print('upload_path:', upload_path) with open(upload_path, 'wb+') as destination: for chunk in file_name.chunks(): destination.write(chunk) add_time = time.time() create_dict = { 'pc_name': pc_name, 'bundle_version': bundle_version, 'pc_version': pc_version, 'pc_test': pc_test, 'lang': lang, 'download_link': file_path, 'add_time': add_time, 'update_time': add_time } pc_Info = Pc_Info(**create_dict) pc_Info.save() except Exception: errorInfo = traceback.format_exc() print(errorInfo) return response.json(700, {'details': errorInfo}) else: if pc_Info.id: res = {'pc_name': pc_Info.pc_name, 'bundle_version': pc_Info.bundle_version, 'pc_version': pc_Info.pc_version, 'pc_test': pc_Info.pc_test, 'download_link': pc_Info.download_link, 'lang': pc_Info.lang, 'add_time': pc_Info.add_time, 'update_time': pc_Info.update_time } return response.json(0, res) else: return response.json(500) def download(self, request_dict, response): pc_name = request_dict.get('pc_name', None) bundle_version = request_dict.get('bundle_version', None) pc_version = request_dict.get('pc_version', None) pc_test = request_dict.get('pc_test', None) param_flag = CommonService.get_param_flag( data=[pc_name, bundle_version, pc_version, pc_test]) if param_flag is not True: return response.json(444) path = Pc_Info.objects.filter(pc_name=pc_name, bundle_version=bundle_version, pc_version=pc_version, pc_test=pc_test).values('download_link') if not path: return response.json(173) filepath = path[0]['download_link'] fullPath = os.path.join(BASE_DIR, filepath) fullPath.replace('\\', '/') res = ResponseObject() print('fullPath:') print(fullPath) print(os.path.basename(fullPath)) if fullPath: if os.path.isfile(fullPath): try: wrapper = FileWrapper(open(fullPath, 'rb')) response = HttpResponse(wrapper, content_type="application/octet-stream") response['Content-Length'] = os.path.getsize(fullPath) response['Content-Disposition'] = 'attachment; filename={}'.format(parse.quote_plus(os.path.basename(fullPath), encoding="utf-8")) response['Content-MD5'] = getMD5orSHA265(fullPath) # 校验文件md5值 response['Content-SHA265'] = getMD5orSHA265(fullPath, 'SHA265') response['Content-CRC32'] = getMD5orSHA265(fullPath, 'CRC32') response['Content-Error'] = res.formal(0) return response except Exception as e: return res.json(906, repr(e)) else: return res.json(907) else: return res.json(444, 'fullPath') def delete(self, request_dict, response): token = request_dict.get('token', None) tko = TokenObject(token) response.lang = tko.lang if tko.code != 0: return response.json(tko.code) userID = tko.userID if not userID: return response.json(104) id = request_dict.get('id', None) param_flag = CommonService.get_param_flag(data=[id]) if param_flag is not True: return response.json(444) file = Pc_Info.objects.filter(id=id) if not file.exists(): return response.json(173) try: # 删除文件,文件夹和数据库里的相应数据 file_path = file[0].download_link file_path = os.path.join(BASE_DIR, file_path).replace('\\', '/') os.remove(file_path) # file_path = file_path.split("/") # file_path = [str(i) for i in file_path][:-1] # file_path = "/".join(file_path) # shutil.rmtree(file_path) file.delete() except Exception as e: return response.json(176, repr(e)) else: return response.json(0) @staticmethod def check_version(request_dict, response): """ AVSS检查版本 @param request_dict: 请求数据 @request_dict version: 当前版本 @param response: 响应 @return: response """ version = request_dict.get('version', None) if version is None: return response.json(444) avss_version_qs = AVSSVersion.objects.filter().values('online_version') if not avss_version_qs.exists(): return response.json(173) # 比较当前版本和线上版本 online_version = avss_version_qs[0]['online_version'] if version < online_version: return response.json(0, {'online_version': online_version}) else: return response.json(0) def compareVersion(s1: str, s2: str) -> Union[str, int]: i, j = 0, 0 while i < len(s1) or j < len(s2): k1 = i while k1 < len(s1) and s1[k1] != '.': k1 += 1 k2 = j while k2 < len(s2) and s2[k2] != '.': k2 += 1 a = int(s1[i:k1]) if i != k1 else 0 b = int(s2[j:k2]) if j != k2 else 0 if a > b: return s1 if a < b: return s2 i = k1 + 1 j = k2 + 1 return 0 def getMD5orSHA265(fileName, encryptionType='MD5'): """ :param filePath: :param encryptionType: :return: """ if not os.path.isfile(fileName): return '' else: if encryptionType == 'MD5': encryption = hashlib.md5() elif encryptionType == 'SHA265': encryption = hashlib.sha256() elif encryptionType == 'CRC32': f = open(fileName, 'rb') chunk = f.read() return crc32(chunk) f = open(fileName, 'rb') block_size = 8192 # why is 8192 | 8192 is fast than 2048 while True: chunk = f.read(block_size) if not chunk: break encryption.update(chunk) f.close() return encryption.hexdigest()