#!/usr/bin/env python3 # -*- coding: utf-8 -*- import json import os import shutil import time import traceback from django.core import serializers from django.db import models from django.http import HttpResponse from django.utils.decorators import method_decorator from django.views.decorators.csrf import csrf_exempt from django.views.generic.base import View import Ansjer from Ansjer.config import BASE_DIR, SERVER_TYPE, SERVER_DOMAIN, SERVER_DOMAIN_SSL from Model.models import FAQModel, HelpLink from Object.Enums.RedisKeyConstant import RedisKeyConstant from Object.RedisObject import RedisObject from Object.ResponseObject import ResponseObject from Object.TokenObject import TokenObject from var_dump import var_dump from Ansjer.config import LOGGER from Service.CommonService import CommonService from Service.ModelService import ModelService, ZositechHelpModel class FAQUploadView(View): @method_decorator(csrf_exempt) def dispatch(self, request, *args, **kwargs): return super(FAQUploadView, self).dispatch(request, *args, **kwargs) def get(self, request, *args, **kwargs): request.encoding = 'utf-8' request_dict = request.GET fileName = request.FILES.get('fileName', None) return self.validate(fileName, request_dict) def post(self, request, *args, **kwargs): request.encoding = 'utf-8' request_dict = request.POST fileName = request.FILES.get('fileName', None) return self.validate(fileName, request_dict) def validate(self, fileName, request_dict): token = TokenObject(request_dict.get('token', None)) response = ResponseObject() if token.code != 0: return response.json(token.code) own_permission = ModelService.check_perm(userID=token.userID, permID=120) if own_permission is not True: return response.json(404) try: redisObject = RedisObject() path = '/'.join((BASE_DIR, 'static/FAQImages/tmp')).replace('\\', '/') + '/' # path = '/'.join((BASE_DIR, 'static/{user}/FAQImages'.format(user=token.userID))).replace('\\', '/') + '/' if not os.path.exists(path): os.makedirs(path) # 先从redis中取出token对应的图片信息 images = redisObject.get_data(key=token.token) if images is not False: images = json.loads(images) # 判断此次编辑是否已经存在对应的图片 if images.__contains__(str(fileName)): file_name = images[str(fileName)] index = file_name.find('static/') filePath = file_name[index:] if SERVER_TYPE == "Ansjer.formal_settings": filePath = SERVER_DOMAIN+'faq/image/' + filePath else: filePath = SERVER_DOMAIN_SSL+'faq/image/' + filePath return response.json(0, {'filePath': filePath}) # redis中没有对应的图片信息 tmp_name = str(fileName) suffix = tmp_name[tmp_name.find('.'):] tmp_file_name = int(time.time()) tmp_file_name = '{file_name}{suffix}'.format(file_name=tmp_file_name, suffix=suffix) file_name = path + str(tmp_file_name) if os.path.exists(file_name): os.remove(file_name) destination = open(file_name, 'wb+') for chunk in fileName.chunks(): destination.write(chunk) destination.close() # 把图片信息保存到redis中 images = redisObject.get_data(token.token) if images is False: images = json.dumps({}) images = json.loads(images) images[tmp_name] = file_name redisObject.set_data(key=token.token, val=json.dumps(images), expire=3600) except Exception as e: errorInfo = traceback.format_exc() print('上传文件错误: %s' % errorInfo) return response.json(700, {'details': repr(e)}) else: index = file_name.find('static/') filePath = file_name[index:] if SERVER_TYPE == "Ansjer.formal_settings": filePath = 'http://www.zositechc.cn/faq/image/' + filePath else: filePath = 'http://test.www.zositechc.cn/faq/image/' + filePath # filePath = "http://192.168.136.35:8000/" + 'faq/image/' + filePath return response.json(0, {'filePath': filePath}) class getFAQImage(View): def post(self, request, *args, **kwargs): request.encoding = 'utf-8' filePath = kwargs.get('filePath', None) filePath.encode(encoding='utf-8', errors='strict') response = ResponseObject() return self.getFile(filePath, response) def get(self, request, *args, **kwargs): request.encoding = 'gb2312' filePath = kwargs.get('filePath', None) response = ResponseObject() filePath.encode(encoding='gb2312', errors='strict') return self.getFile(filePath, response) def getFile(self, filePath, response): if filePath: pass else: return response.json(800) fullPath = os.path.join(BASE_DIR, filePath).replace('\\', '/') var_dump(fullPath) if os.path.isfile(fullPath): try: Imagedata = open(fullPath, 'rb').read() except Exception as e: return response.json(906, repr(e)) else: return HttpResponse(Imagedata, content_type="image/jpeg") else: return response.json(907) class FAQView(View): @method_decorator(csrf_exempt) def dispatch(self, *args, **kwargs): return super(FAQView, self).dispatch(*args, **kwargs) def post(self, request, *args, **kwargs): request.encoding = 'utf-8' request_dict = request.POST operation = kwargs.get('operation', None) return self.validate(request_dict, operation) def get(self, request, *args, **kwargs): request.encoding = 'utf-8' request_dict = request.GET operation = kwargs.get('operation', None) return self.validate(request_dict, operation) def validate(self, request_dict, operation): token = TokenObject(request_dict.get('token', None)) response = ResponseObject() if operation != 'zositechHelp': if token.code != 0: return response.json(token.code) if operation == 'add': return self.do_add(token, request_dict, response) elif operation == 'query': return self.do_query(token.userID, request_dict, response) elif operation == 'update': return self.do_update(token, request_dict, response) elif operation == 'delete': return self.do_delete(token.userID, request_dict, response) elif operation == 'zositechHelp': return self.do_zositechHelp(request_dict, response) elif operation == 'synZositechHelp': return self.do_synZositechHelp(request_dict, response) else: return response.json(404) def do_add(self, token, request_dict, response): own_permission = ModelService.check_perm(userID=token.userID, permID=120) if own_permission is not True: return response.json(404) title = request_dict.get('title', None) content = request_dict.get('content', None) if title and content: try: # 对content中的图片路径进行修改 content = str(content) content = content.replace('faq/image/static/FAQImages/tmp', 'faq/image/static/FAQImages') # 取出redis中保存的此次上传的图片信息 redisObject = RedisObject() images = redisObject.get_data(key=token.token) if images is not False: images = json.loads(images) # 把图片从临时文件移动到FAQ资源文件夹下 for k, v in images.items(): start_index1 = v.find('tmp/') start_index2 = start_index1 + 4 new_path = v[0:start_index1] + v[start_index2:] shutil.move(v, new_path) now_time = int(time.time()) FAQModel.objects.create(**{ 'title': title, 'content': content, 'add_time': now_time, 'update_time': now_time }) # 删除redis中token对应的信息 redisObject.del_data(key=token.token) except Exception as e: print(e) return response.json(174) return response.json(0) else: return response.json(444) def do_query(self, userID, request_dict, response): page = request_dict.get('page', None) line = request_dict.get('line', None) search_key = request_dict.get('search_key', None) if page and line: if search_key: own_permission = ModelService.check_perm(userID=userID, permID=110) if own_permission is not True: return response.json(404) faq_qs = FAQModel.objects.filter(title__contains=search_key).order_by('-add_time') else: own_permission = ModelService.check_perm(userID=userID, permID=100) if own_permission is not True: return response.json(404) faq_qs = FAQModel.objects.filter().order_by('-add_time') if not faq_qs.exists(): return response.json(0, {'count': 0, 'data': []}) count = faq_qs.count() page = int(page) line = int(line) start = (page - 1) * line end = start + line faq_qs = faq_qs.values()[start:end] return response.json(0, {'count': count, 'data': list(faq_qs)}) else: return response.json(444) def do_update(self, token, request_dict, response): own_permission = ModelService.check_perm(userID=token.userID, permID=130) if own_permission is not True: return response.json(404) id = request_dict.get('id', None) title = request_dict.get('title', None) content = request_dict.get('content', None) if id: now_time = int(time.time()) data = { 'update_time': now_time } if title: data['title'] = title if content: content = str(content) content = content.replace('faq/image/static/FAQImages/tmp', 'faq/image/static/FAQImages') # 取出redis中保存的此次上传的图片信息 redisObject = RedisObject() images = redisObject.get_data(key=token.token) if images is not False: images = json.loads(images) # 把图片从临时文件移动到FAQ资源文件夹下 for k, v in images.items(): start_index1 = v.find('tmp/') start_index2 = start_index1 + 4 new_path = v[0:start_index1] + v[start_index2:] shutil.move(v, new_path) # 删除redis中token对应的信息 redisObject.del_data(key=token.token) data['content'] = content FAQModel.objects.filter(id=id).update(**data) return response.json(0) else: return response.json(444) def do_delete(self, userID, request_dict, response): own_permission = ModelService.check_perm(userID=userID, permID=140) if own_permission is not True: return response.json(404) id = request_dict.get('id', None) if id: try: faq_qs = FAQModel.objects.filter(id=id) faq_qs.delete() except Exception as e: print(e) return response.json(173) else: return response.json(0) else: return response.json(444) def do_zositechHelp(self, request_dict, response): locale = request_dict.get('locale', None) label_names = request_dict.get('label_names', None) origin = request_dict.get('origin', None) help_qs = None if label_names: help = ZositechHelpModel.objects.filter(locale=locale, label_names__contains=label_names, origin=origin).values() else: help = ZositechHelpModel.objects.filter(locale=locale, origin=origin).values() if help.exists(): # send_dict = CommonService.qs_to_dict(help) send_dict = list(help) return response.json(0, send_dict) else: return response.json(444) def do_synZositechHelp(self, request_dict, response): zhresults = request_dict.get('zhresults', None).replace("\'", "XX??????XX") #.replace("\"", "XX??????XX").replace("\'", "\"").replace("XX??????XX", "\'") zhresults = json.loads(zhresults) enresults = request_dict.get('enresults', None).replace("\'", "XX??????XX") enresults = json.loads(enresults) ZositechHelpModel.objects.all().delete() for data in zhresults['articles']: labname = "" if data['label_names']: for lab in data['label_names']: if lab: labname += "," labname = lab if not labname: labname = None ZositechHelpModel.objects.create(**{ 'locale': data['locale'], 'label_names': labname, 'origin': 'web_widget', 'content': json.dumps(data).replace("\'", "\"").replace("XX??????XX", "\'") }) for data in enresults['articles']: labname = "" if data['label_names']: for lab in data['label_names']: if lab: labname += "," labname = lab if not labname: labname = None ZositechHelpModel.objects.create(**{ 'locale': data['locale'], 'label_names': labname, 'origin': 'web_widget', 'content': json.dumps(data).replace("\'", "\"").replace("XX??????XX", "\'") }) return response.json(0) class HelpLinkView(View): def get(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') return self.validation(request.GET, request, operation) def post(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') return self.validation(request.POST, request, operation) def validation(self, request_dict, request, operation): response = ResponseObject('en') tko = TokenObject(request.META.get('HTTP_AUTHORIZATION')) if tko.code != 0: return response.json(tko.code) response.lang = tko.lang userID = tko.userID if operation == 'queryFAQByDeviceType': # 获取电池电量列表 return HelpLinkView.query_faq_by_device_type(request, request_dict, response) else: return response.json(414) @staticmethod def query_faq_by_device_type(request, request_dict, response): """根据设备类型和语言查询帮助链接""" device_type = request_dict.get('device_type', None) # lang = request_dict.get('lang', 'en') # app_bundle_id = request_dict.get('app_bundle_id', None) lang = 'en' if not device_type: return response.json(444, {'message': 'device_type和lang参数不能为空'}) try: device_type = int(device_type) redis = RedisObject() cache_key = RedisKeyConstant.HELP_LINK_TYPE.value + f'{device_type}:{lang}' # 先尝试从缓存获取 cached_data = redis.get_data(cache_key) if cached_data: cached_data = json.loads(cached_data) return response.json(0, cached_data) # 优化数据库查询 - 单次查询获取结果 help_link = HelpLink.objects.filter( models.Q(device_type=device_type) | models.Q(device_type=-1), lang=lang, is_active=True ).order_by( models.Case( models.When(device_type=device_type, then=0), default=1 ) ).first() if not help_link: return response.json(173) # 构建返回数据 data = { 'url': help_link.url, 'title': help_link.title, 'description': help_link.description } # 设置缓存,过期时间30天 try: redis.set_data(cache_key, json.dumps(data), RedisKeyConstant.EXPIRE_TIME_30_DAYS.value) except Exception as e: LOGGER.error(f"设置Redis缓存出错: {repr(e)}") # 缓存失败不影响主流程 return response.json(0, data) except Exception as e: LOGGER.error(f"查询帮助链接出错: {repr(e)}") return response.json(500)