import json import os from Ansjer.config import LOGGER from Object.ResponseObject import ResponseObject from django.views import View import time from Object.WsParam.AIChatObject import ChatClient from Object.WsParam.AudioProcessorObject import AudioProcessor from Object.WsParam.WsParamRecognizeObject import WsParamRecognize from Object.WsParam.WsParamSynthesizeObject import WsParamSynthesize from django.http import FileResponse class WsParamService(View): def get(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') request_dict = request.GET return self.validation(request, request_dict, operation) def post(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') request_dict = request.POST return self.validation(request, request_dict, operation) def validation(self, request, request_dict, operation): language = request_dict.get('language', 'en') response = ResponseObject(language) if operation == 'smartReply': return self.smart_reply(request, request_dict, response) else: return response.json(414) def smart_reply(self, request, request_dict, response): app_id = "fcff8f4b" api_key = "037571e7285e64e8dc321fa5b937fea2" api_secret = "ZTU3NWMyNTI1MTI4NTU5ZGUxMDZhNmQ5" gpt_url = "wss://spark-api.xf-yun.com/v3.5/chat" domain = "generalv3.5" try: system = request_dict.get('system', None) audio = request.FILES.get('audio', None) history = request_dict.get('history', None) if audio is None: return response.json(444) save_directory = 'static/demo_files/' os.makedirs(save_directory, exist_ok=True) original_audio_path = os.path.join(save_directory, audio.name) with open(original_audio_path, 'wb') as destination: for chunk in audio.chunks(): destination.write(chunk) # 转码 pcm_audio_path = os.path.splitext(original_audio_path)[0] + '.pcm' audio_processor = AudioProcessor() audio_processor.convert_audio(original_audio_path, pcm_audio_path) # 传入语音 -> 转文字 APPID, APISecret, APIKey, AudioFile start = time.time() audio_file = pcm_audio_path wsParamRecognize = WsParamRecognize(app_id, api_secret, api_key, audio_file) query = wsParamRecognize.start() end = time.time() LOGGER.info(f"********smart_reply语音转文字所需时间为{end - start}秒,内容为{query}********") # 删除文件 pcm_audio_path 和 original_audio_path os.remove(pcm_audio_path) os.remove(original_audio_path) # 大语言模型 APPID, APIKey, APISecret, gpt_url, domain, query, history=None, system=None start = time.time() chat = ChatClient(app_id, api_key, api_secret, gpt_url, domain, query, history, system) answer = chat.start() end = time.time() LOGGER.info(f"********smartReplyAI回复所需时间为{end - start}秒,内容为{answer}********") # 文字转音频 APPID, APIKey, APISecret, Text, AudioName="demo" start = time.time() audio_name = f"{os.path.splitext(audio.name)[0]}_answer" wsParamSynthesize = WsParamSynthesize(app_id, api_key, api_secret, answer, audio_name) wsParamSynthesize.start() answer_audio_path = os.path.splitext(original_audio_path)[0] + '_answer.mp3' g711a_audio_path = os.path.splitext(answer_audio_path)[0] + '.g711a' print(answer_audio_path, g711a_audio_path) # 如果有旧文件就删掉 if os.path.exists(g711a_audio_path): os.remove(g711a_audio_path) audio_processor.convert_audio(answer_audio_path, g711a_audio_path) os.remove(answer_audio_path) end = time.time() LOGGER.info(f"********smartReply文字转编码所需时间为{end - start}秒********") return FileResponse(open(g711a_audio_path, 'rb'), as_attachment=True, filename=os.path.basename(g711a_audio_path)) except Exception as e: LOGGER.error('*****WsParamService.smart_reply:errLine:{}, errMsg:{}' .format(e.__traceback__.tb_lineno, repr(e))) return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))