123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727 |
- import hashlib
- import logging
- import shutil
- import time
- import traceback
- import os
- from urllib import request, parse
- import requests
- from boto3 import Session
- from django.http import HttpResponse
- from django.views.generic.base import View
- from Controller.PctestController import TokenObject1
- from Model.models import Pc_Info, PctestlogModel, AVSSVersion
- from Object.ResponseObject import ResponseObject
- from Object.TokenObject import TokenObject
- from Service.CommonService import CommonService
- from Ansjer.config import AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_ARN, BASE_DIR
- import boto3
- import botocore
- from botocore import client
- from wsgiref.util import FileWrapper
- from zlib import crc32
- from typing import Union
- class PcInfo(View):
- def dispatch(self, requset, *args, **kwargs):
- return super(PcInfo, self).dispatch(requset, *args, **kwargs)
- def get(self, request, *args, **kwargs):
- operation = kwargs.get('operation')
- request.encoding = 'utf-8'
- return self.validation(request.GET, request, operation)
- def post(self, request, *args, **kwargs):
- operation = kwargs.get('operation')
- request.encoding = 'utf-8'
- return self.validation(request.POST, request, operation)
- def validation(self, request_dict, request, operation):
- response = ResponseObject()
- if not operation:
- return response.json(444, 'operation')
- else:
- if operation == 'query': # pc端调用查询
- return self.query(request_dict, response)
- elif operation == 's3addandupload':
- return self.s3addandupload(request_dict, response, request)
- elif operation == 's3addanduploadlog': #上传日志文件
- return self.s3addanduploadlog(request_dict, response, request)
- elif operation == 's3download':
- return self.s3download(request_dict, response)
- elif operation == 's3downloadlog': #下载日志文件
- return self.s3downloadlog(request_dict, response)
- elif operation == 's3delete':
- return self.s3delete(request_dict, response)
- elif operation == 'edit':
- return self.edit(request_dict, response)
- elif operation == 'getnewversion': # 获取当前软件的最新版本
- return self.getnewversion(request_dict, response)
- elif operation == 'getnewversionV2': # 获取当前软件的最新版本
- return self.getnewversionV2(request_dict, response)
- elif operation == 'queryall': # 后台查询
- return self.queryall(request_dict, response)
- elif operation == 'addandupload': # 上传到服务器
- return self.addandupload(request_dict, response, request)
- elif operation == 'download': # 服务器下载
- return self.download(request_dict, response)
- elif operation == 'delete':
- return self.delete(request_dict, response)
- elif operation == 'AVSS/check-version':
- return self.check_version(request_dict, response)
- else:
- return response.json(414)
- def getnewversion(self, request_dict, response):
- pc_name = request_dict.get('pc_name', None)
- bundle_version = request_dict.get('bundle_version', None)
- pc_version = request_dict.get('pc_version', None)
- pc_test = request_dict.get('pc_test', None)
- param_flag = CommonService.get_param_flag(
- data=[pc_name, bundle_version, pc_version, pc_test])
- if param_flag is not True:
- return response.json(444)
- file = Pc_Info.objects.filter(pc_name=pc_name, bundle_version=bundle_version, pc_test=pc_test)
- if not file:
- return response.json(173)
- app_list = Pc_Info.objects.filter(pc_name=pc_name, bundle_version=bundle_version, pc_test=pc_test)
- all_version = app_list.values('pc_version')
- v = []
- for i in all_version:
- v.append(i['pc_version'])
- new_version = max(v)
- if pc_version == new_version:
- return response.json(10045)
- else:
- path = app_list.filter(pc_version=new_version).values('download_link')[0]['download_link']
- print('path', path)
- aws_s3_guonei = boto3.client(
- 's3',
- aws_access_key_id=AWS_ACCESS_KEY_ID[0],
- aws_secret_access_key=AWS_SECRET_ACCESS_KEY[0],
- config=botocore.client.Config(signature_version='s3v4'),
- region_name='cn-northwest-1')
- response_url = aws_s3_guonei.generate_presigned_url(
- ClientMethod='get_object',
- Params={'Bucket': 'pc-package', 'Key': path}, ExpiresIn=3600)
- res = {'pc_name': pc_name,
- 'new_version': new_version,
- 'path': path,
- 'response_url': response_url}
- return response.json(0, res)
- def getnewversionV2(self, request_dict, response):
- pc_name = request_dict.get('pc_name', None)
- bundle_version = request_dict.get('bundle_version', None)
- pc_version = request_dict.get('pc_version', None)
- pc_test = request_dict.get('pc_test', None)
- param_flag = CommonService.get_param_flag(
- data=[pc_name, bundle_version, pc_version, pc_test])
- if param_flag is not True:
- return response.json(444)
- file = Pc_Info.objects.filter(pc_name=pc_name, bundle_version=bundle_version, pc_test=pc_test)
- if not file.exists():
- return response.json(173)
- aws_s3_guonei = boto3.client(
- 's3',
- aws_access_key_id=AWS_ACCESS_KEY_ID[0],
- aws_secret_access_key=AWS_SECRET_ACCESS_KEY[0],
- config=botocore.client.Config(signature_version='s3v4'),
- region_name='cn-northwest-1')
- version_list = []
- for i in range(2):
- if pc_test == '1': # 测试版返回所有版本中的最新版本
- app_list = Pc_Info.objects.filter(pc_name=pc_name, bundle_version=bundle_version, is_update=i)
- else:
- app_list = Pc_Info.objects.filter(pc_name=pc_name, bundle_version=bundle_version, pc_test=pc_test, is_update=i)
- # 当前软件不存在强制更新(is_update=1)的版本返回空
- if not app_list.exists():
- res = {'pc_name': '',
- 'new_version': '',
- 'is_update': i,
- 'explain': '',
- 'path': '',
- 'response_url': ''}
- version_list.append(res)
- continue
- all_version = app_list.values('pc_version')
- v = []
- for i in all_version:
- v.append(i['pc_version'])
- new_version = max(v)
- # 当前软件版本为最新时返回空
- if pc_version >= new_version:
- res = {'pc_name': '',
- 'new_version': '',
- 'is_update': app_list.filter(pc_version=new_version).values('is_update')[0]['is_update'],
- 'explain': '',
- 'path': '',
- 'response_url': ''}
- version_list.append(res)
- continue
- path = app_list.filter(pc_version=new_version).values('download_link')[0]['download_link']
- print('path', path)
- response_url = aws_s3_guonei.generate_presigned_url(
- ClientMethod='get_object',
- Params={'Bucket': 'pc-package', 'Key': path}, ExpiresIn=3600)
- res = {'pc_name': pc_name,
- 'new_version': new_version,
- 'is_update': app_list.filter(pc_version=new_version).values('is_update')[0]['is_update'],
- 'explain': app_list.filter(pc_version=new_version).values('explain')[0]['explain'],
- 'path': path,
- 'response_url': response_url}
- version_list.append(res)
- return response.json(0, version_list)
- def query(self, request_dict, response):
- pc_name = request_dict.get('pc_name', None)
- bundle_version = request_dict.get('bundle_version', None)
- pc_version = request_dict.get('pc_version', None)
- pc_test = request_dict.get('pc_test', None)
- package = request_dict.get('package', None)
- file_type = request_dict.get('file_type', None)
- # 根据传的参数筛选,没传时返回全部
- queryset = Pc_Info.objects.all()
- if package:
- queryset = Pc_Info.objects.filter(package=package)
- if file_type:
- queryset = Pc_Info.objects.filter(file_type=file_type)
- if pc_name and file_type:
- queryset = Pc_Info.objects.filter(pc_name=pc_name, file_type=file_type)
- if file_type and package:
- queryset = Pc_Info.objects.filter(file_type=file_type, package=package)
- if pc_name and bundle_version and pc_version and pc_test:
- queryset = Pc_Info.objects.filter(pc_name=pc_name, bundle_version=bundle_version, pc_version=pc_version, pc_test=pc_test)
- count = queryset.count()
- res = queryset
- send_json = CommonService.qs_to_dict(res)
- send_json['count'] = count
- return response.json(0, send_json)
- def s3addandupload(self, request_dict, response, request):
- logger = logging.getLogger('info')
- logger.info('s3方式上传参数:')
- logger.info(request_dict)
- token = request_dict.get('token', None)
- tko = TokenObject(token)
- response.lang = tko.lang
- pc_name = request_dict.get('pc_name', None)
- bundle_version = request_dict.get('bundle_version', None)
- pc_version = request_dict.get('pc_version', None)
- pc_test = request_dict.get('pc_test', None)
- lang = request_dict.get('lang', None)
- file_name = request_dict.get('file_name', None)
- file_type = request_dict.get('file_type', None)
- package = request_dict.get('package', None)
- explain = request_dict.get('explain', '')
- content = request_dict.get('content', '')
- authority = request_dict.get('authority', '')
- is_update = request_dict.get('is_update', None)
- is_open = request_dict.get('is_open', None)
- # logger.info('文件名字:')
- # logger.info(file_name)
- param_flag = CommonService.get_param_flag(
- data=[pc_name, bundle_version, pc_version, pc_test, lang, file_name, file_type, package, is_update, is_open])
- if param_flag is not True:
- return response.json(444)
- else:
- file = Pc_Info.objects.filter(pc_name=pc_name, bundle_version=bundle_version,
- pc_version=pc_version, pc_test=pc_test)
- if file:
- return response.json(174)
- try:
- logger.info('开始上传')
- # 把安装包上传到s3
- aws_s3_guonei = boto3.client(
- 's3',
- aws_access_key_id=AWS_ACCESS_KEY_ID[0],
- aws_secret_access_key=AWS_SECRET_ACCESS_KEY[0],
- config=botocore.client.Config(signature_version='s3v4'),
- region_name='cn-northwest-1'
- )
- download_link = '{pc_name}/{pc_version}_{bundle_version}_{pc_test}_{file_name}'.format(
- pc_name=pc_name, pc_version=pc_version, bundle_version=bundle_version,
- pc_test=pc_test, file_name=file_name)
- response_url = aws_s3_guonei.generate_presigned_url(
- ClientMethod='put_object',
- Params={
- 'Bucket': 'pc-package',
- 'Key': download_link
- },
- ExpiresIn=3600
- )
- # 通过上传签名url对文件进行上传
- # requests.put(response_url, data=file_name)
- # logger.info('上传完成')
- add_time = time.time()
- create_dict = {
- 'pc_name': pc_name,
- 'bundle_version': bundle_version,
- 'pc_version': pc_version,
- 'pc_test': pc_test,
- 'lang': lang,
- 'download_link': download_link,
- 'add_time': add_time,
- 'update_time': add_time,
- 'file_type': file_type,
- 'package': package,
- 'explain': explain,
- 'content': content,
- 'authority': authority,
- 'is_update': is_update,
- 'is_open': is_open
- }
- pc_Info = Pc_Info(**create_dict)
- pc_Info.save()
- except Exception:
- errorInfo = traceback.format_exc()
- print(errorInfo)
- return response.json(500, {'details': errorInfo})
- else:
- if pc_Info.id:
- res = {'id': pc_Info.id,
- 'pc_name': pc_Info.pc_name,
- 'bundle_version': pc_Info.bundle_version,
- 'pc_version': pc_Info.pc_version,
- 'pc_test': pc_Info.pc_test,
- 'download_link': pc_Info.download_link,
- 'lang': pc_Info.lang,
- 'add_time': pc_Info.add_time,
- 'update_time': pc_Info.update_time,
- 'file_type': pc_Info.file_type,
- 'package': pc_Info.package,
- 'explain': pc_Info.explain,
- 'content': pc_Info.content,
- 'authority': pc_Info.authority,
- 'is_update': pc_Info.is_update,
- 'is_open': pc_Info.is_open,
- 'upload_url': response_url
- }
- return response.json(0, res)
- else:
- return response.json(500)
- def s3addanduploadlog(self, request_dict, response, request):
- logger = logging.getLogger('info')
- logger.info('s3方式上传参数:')
- logger.info(request_dict)
- token = request_dict.get('token', None)
- tko = TokenObject1(token)
- response.lang = tko.lang
- if tko.code != 0:
- return response.json(tko.code)
- # 获取访问者的id和岗位
- userID = tko.id
- datetime = request_dict.get('datetime', None)
- file_name = request_dict.get('file_name', None)
- pathtype = request_dict.get('pathtype', 'log')
- # logger.info('文件名字:')
- # logger.info(file_name)
- param_flag = CommonService.get_param_flag(
- data=[datetime, file_name, pathtype])
- if param_flag is not True:
- return response.json(444)
- logger.info('开始上传')
- # 把安装包上传到s3
- aws_s3_guonei = boto3.client(
- 's3',
- aws_access_key_id=AWS_ACCESS_KEY_ID[0],
- aws_secret_access_key=AWS_SECRET_ACCESS_KEY[0],
- config=botocore.client.Config(signature_version='s3v4'),
- region_name='cn-northwest-1'
- )
- download_link = '{pathtype}/{datetime}/{file_name}'.format(
- pathtype=pathtype, datetime=datetime, file_name=file_name)
- response_url = aws_s3_guonei.generate_presigned_url(
- ClientMethod='put_object',
- Params={
- 'Bucket': 'pc-package',
- 'Key': download_link
- },
- ExpiresIn=3600
- )
- add_time = time.time()
- PctestlogModel.objects.create(user_id=userID, content=download_link, addtime=add_time)
- return response.json(0, {'upload_url': response_url, 'datetime': datetime})
- def s3download(self, request_dict, response):
- pc_name = request_dict.get('pc_name', None)
- bundle_version = request_dict.get('bundle_version', None)
- pc_version = request_dict.get('pc_version', None)
- pc_test = request_dict.get('pc_test', None)
- param_flag = CommonService.get_param_flag(
- data=[pc_name, bundle_version, pc_version, pc_test])
- if param_flag is not True:
- return response.json(444)
- path = Pc_Info.objects.filter(pc_name=pc_name, bundle_version=bundle_version, pc_version=pc_version,
- pc_test=pc_test).values('download_link')
- if not path:
- return response.json(173)
- path = path[0]['download_link']
- aws_s3_guonei = boto3.client(
- 's3',
- aws_access_key_id=AWS_ACCESS_KEY_ID[0],
- aws_secret_access_key=AWS_SECRET_ACCESS_KEY[0],
- config=botocore.client.Config(signature_version='s3v4'),
- region_name='cn-northwest-1'
- )
- response_url = aws_s3_guonei.generate_presigned_url(
- ClientMethod='get_object',
- Params={
- 'Bucket': 'pc-package',
- 'Key': path
- },
- ExpiresIn=3600
- )
- res = {'path': path,
- 'response_url': response_url
- }
- return response.json(0, res)
- def s3downloadlog(self, request_dict, response):
- id = request_dict.get('id', None)
- pclog_qs = PctestlogModel.objects.filter(id=id)
- if not pclog_qs:
- return response.json(173)
- path = pclog_qs[0].content
- aws_s3_guonei = boto3.client(
- 's3',
- aws_access_key_id=AWS_ACCESS_KEY_ID[0],
- aws_secret_access_key=AWS_SECRET_ACCESS_KEY[0],
- config=botocore.client.Config(signature_version='s3v4'),
- region_name='cn-northwest-1'
- )
- response_url = aws_s3_guonei.generate_presigned_url(
- ClientMethod='get_object',
- Params={
- 'Bucket': 'pc-package',
- 'Key': path
- },
- ExpiresIn=3600
- )
- res = {'path': path,
- 'response_url': response_url
- }
- return response.json(0, res)
- def edit(self, request_dict, response):
- id = request_dict.get('id', None)
- explain = request_dict.get('explain', None)
- content = request_dict.get('content', None)
- authority = request_dict.get('authority', None)
- is_open = request_dict.get('is_open', None)
- param_flag = CommonService.get_param_flag(
- data=[id, explain, is_open])
- if param_flag is not True:
- return response.json(444)
- file = Pc_Info.objects.filter(id=id)
- if not file.exists():
- return response.json(173)
- file.update(explain=explain, content=content, authority=authority, is_open=is_open)
- return response.json(0)
- def s3delete(self, request_dict, response):
- global file
- token = request_dict.get('token', None)
- tko = TokenObject(token)
- response.lang = tko.lang
- if tko.code != 0:
- return response.json(tko.code)
- userID = tko.userID
- if not userID:
- return response.json(104)
- id = request_dict.get('id', None)
- package = request_dict.get('package', None)
- if id and package:
- return response.json(444)
- elif id and package is None:
- file = Pc_Info.objects.filter(id=id)
- elif package and id is None:
- try:
- package = int(package)
- file = Pc_Info.objects.filter(package=package)
- except Exception as e:
- return response.json(176)
- if not file.exists():
- return response.json(173)
- try:
- # 删除s3和数据库里的相应数据
- file_path = file[0].download_link
- aws_s3_guonei = boto3.client(
- 's3',
- aws_access_key_id=AWS_ACCESS_KEY_ID[0],
- aws_secret_access_key=AWS_SECRET_ACCESS_KEY[0],
- config=botocore.client.Config(signature_version='s3v4'),
- region_name='cn-northwest-1'
- )
- try:
- # 获取存储桶的对象,判断对象是否上传成功
- obj = aws_s3_guonei.get_object(Bucket='pc-package', Key=file_path)
- except Exception as e:
- print(e)
- file.delete()
- else:
- if obj:
- aws_s3_guonei.delete_object(Bucket='pc-package', Key=file_path)
- file.delete()
- except Exception as e:
- return response.json(176, repr(e))
- else:
- return response.json(0)
- def queryall(self, request_dict, response):
- token = request_dict.get('token', None)
- tko = TokenObject(token)
- response.lang = tko.lang
- if tko.code != 0:
- return response.json(tko.code)
- userID = tko.userID
- if not userID:
- return response.json(104)
- page = int(request_dict.get('page', None))
- line = int(request_dict.get('line', None))
- if page is None or line is None:
- return response.json(444, 'page,line')
- queryset = Pc_Info.objects.all()
- if queryset.exists():
- count = queryset.count()
- res = queryset[(page - 1) * line:page * line]
- send_json = CommonService.qs_to_dict(res)
- send_json['count'] = count
- return response.json(0, send_json)
- else:
- return response.json(173)
- def addandupload(self, request_dict, response, request):
- token = request_dict.get('token', None)
- tko = TokenObject(token)
- response.lang = tko.lang
- if tko.code != 0:
- return response.json(tko.code)
- userID = tko.userID
- if not userID:
- return response.json(104)
- pc_name = request_dict.get('pc_name', None)
- bundle_version = request_dict.get('bundle_version', None)
- pc_version = request_dict.get('pc_version', None)
- pc_test = request_dict.get('pc_test', None)
- lang = request_dict.get('lang', None)
- file_name = request.FILES.get('file_name', None)
- param_flag = CommonService.get_param_flag(
- data=[pc_name, bundle_version, pc_version, pc_test, lang, file_name])
- if param_flag is not True:
- return response.json(444)
- else:
- file = Pc_Info.objects.filter(pc_name=pc_name, bundle_version=bundle_version,
- pc_version=pc_version, pc_test=pc_test)
- if file:
- return response.json(174)
- try:
- # 安装包上传到服务器本地
- file_path = 'static/pc/' + pc_name
- if not os.path.exists(file_path):
- os.makedirs(os.path.join(BASE_DIR, file_path))
- a = os.path.splitext(str(file_name))[-1]
- if not a:
- return response.json(444, "文件无后缀")
- name = pc_version + '_' + bundle_version + '_' + pc_test + str(a)
- file_path = file_path + '/' + str(name)
- upload_path = os.path.join(BASE_DIR, file_path)
- print('upload_path:', upload_path)
- with open(upload_path, 'wb+') as destination:
- for chunk in file_name.chunks():
- destination.write(chunk)
- add_time = time.time()
- create_dict = {
- 'pc_name': pc_name,
- 'bundle_version': bundle_version,
- 'pc_version': pc_version,
- 'pc_test': pc_test,
- 'lang': lang,
- 'download_link': file_path,
- 'add_time': add_time,
- 'update_time': add_time
- }
- pc_Info = Pc_Info(**create_dict)
- pc_Info.save()
- except Exception:
- errorInfo = traceback.format_exc()
- print(errorInfo)
- return response.json(700, {'details': errorInfo})
- else:
- if pc_Info.id:
- res = {'pc_name': pc_Info.pc_name,
- 'bundle_version': pc_Info.bundle_version,
- 'pc_version': pc_Info.pc_version,
- 'pc_test': pc_Info.pc_test,
- 'download_link': pc_Info.download_link,
- 'lang': pc_Info.lang,
- 'add_time': pc_Info.add_time,
- 'update_time': pc_Info.update_time
- }
- return response.json(0, res)
- else:
- return response.json(500)
- def download(self, request_dict, response):
- pc_name = request_dict.get('pc_name', None)
- bundle_version = request_dict.get('bundle_version', None)
- pc_version = request_dict.get('pc_version', None)
- pc_test = request_dict.get('pc_test', None)
- param_flag = CommonService.get_param_flag(
- data=[pc_name, bundle_version, pc_version, pc_test])
- if param_flag is not True:
- return response.json(444)
- path = Pc_Info.objects.filter(pc_name=pc_name, bundle_version=bundle_version, pc_version=pc_version,
- pc_test=pc_test).values('download_link')
- if not path:
- return response.json(173)
- filepath = path[0]['download_link']
- fullPath = os.path.join(BASE_DIR, filepath)
- fullPath.replace('\\', '/')
- res = ResponseObject()
- print('fullPath:')
- print(fullPath)
- print(os.path.basename(fullPath))
- if fullPath:
- if os.path.isfile(fullPath):
- try:
- wrapper = FileWrapper(open(fullPath, 'rb'))
- response = HttpResponse(wrapper, content_type="application/octet-stream")
- response['Content-Length'] = os.path.getsize(fullPath)
- response['Content-Disposition'] = 'attachment; filename={}'.format(parse.quote_plus(os.path.basename(fullPath), encoding="utf-8"))
- response['Content-MD5'] = getMD5orSHA265(fullPath)
- # 校验文件md5值
- response['Content-SHA265'] = getMD5orSHA265(fullPath, 'SHA265')
- response['Content-CRC32'] = getMD5orSHA265(fullPath, 'CRC32')
- response['Content-Error'] = res.formal(0)
- return response
- except Exception as e:
- return res.json(906, repr(e))
- else:
- return res.json(907)
- else:
- return res.json(444, 'fullPath')
- def delete(self, request_dict, response):
- token = request_dict.get('token', None)
- tko = TokenObject(token)
- response.lang = tko.lang
- if tko.code != 0:
- return response.json(tko.code)
- userID = tko.userID
- if not userID:
- return response.json(104)
- id = request_dict.get('id', None)
- param_flag = CommonService.get_param_flag(data=[id])
- if param_flag is not True:
- return response.json(444)
- file = Pc_Info.objects.filter(id=id)
- if not file.exists():
- return response.json(173)
- try:
- # 删除文件,文件夹和数据库里的相应数据
- file_path = file[0].download_link
- file_path = os.path.join(BASE_DIR, file_path).replace('\\', '/')
- os.remove(file_path)
- # file_path = file_path.split("/")
- # file_path = [str(i) for i in file_path][:-1]
- # file_path = "/".join(file_path)
- # shutil.rmtree(file_path)
- file.delete()
- except Exception as e:
- return response.json(176, repr(e))
- else:
- return response.json(0)
- @staticmethod
- def check_version(request_dict, response):
- """
- AVSS检查版本
- @param request_dict: 请求数据
- @request_dict version: 当前版本
- @param response: 响应
- @return: response
- """
- version = request_dict.get('version', None)
- if version is None:
- return response.json(444)
- avss_version_qs = AVSSVersion.objects.filter().values('online_version', 'force_update_version')
- if not avss_version_qs.exists():
- return response.json(173)
- # 比较当前版本和线上版本
- online_version = avss_version_qs[0]['online_version']
- if version < online_version:
- res = {'online_version': online_version}
- # 判断是否强制更新
- force_update_version = avss_version_qs[0]['force_update_version']
- if version < force_update_version:
- res['force_update'] = 1
- return response.json(0, res)
- else:
- res['force_update'] = 0
- return response.json(0, res)
- else:
- return response.json(0)
- def compareVersion(s1: str, s2: str) -> Union[str, int]:
- i, j = 0, 0
- while i < len(s1) or j < len(s2):
- k1 = i
- while k1 < len(s1) and s1[k1] != '.':
- k1 += 1
- k2 = j
- while k2 < len(s2) and s2[k2] != '.':
- k2 += 1
- a = int(s1[i:k1]) if i != k1 else 0
- b = int(s2[j:k2]) if j != k2 else 0
- if a > b:
- return s1
- if a < b:
- return s2
- i = k1 + 1
- j = k2 + 1
- return 0
- def getMD5orSHA265(fileName, encryptionType='MD5'):
- """
- :param filePath:
- :param encryptionType:
- :return:
- """
- if not os.path.isfile(fileName):
- return ''
- else:
- if encryptionType == 'MD5':
- encryption = hashlib.md5()
- elif encryptionType == 'SHA265':
- encryption = hashlib.sha256()
- elif encryptionType == 'CRC32':
- f = open(fileName, 'rb')
- chunk = f.read()
- return crc32(chunk)
- f = open(fileName, 'rb')
- block_size = 8192 # why is 8192 | 8192 is fast than 2048
- while True:
- chunk = f.read(block_size)
- if not chunk:
- break
- encryption.update(chunk)
- f.close()
- return encryption.hexdigest()
|