import websocket import datetime import hashlib import base64 import hmac import json from urllib.parse import urlencode import ssl from wsgiref.handlers import format_date_time from datetime import datetime from time import mktime import threading # 使用更现代的threading代替_thread """ 调用讯飞模型 文字转语音 """ class WsParamSynthesize: def __init__(self, APPID, APIKey, APISecret, Text, AudioType="pcm"): self.APPID = APPID self.APIKey = APIKey self.APISecret = APISecret self.Text = Text self.audio_data = "" # 初始化其他需要的属性 self.CommonArgs = {"app_id": self.APPID} if AudioType == "mp3": self.BusinessArgs = {"aue": "lame", "auf": "audio/L16;rate=16000", "vcn": "xiaoyan", "tte": "utf8", "sfl": 1} else: self.BusinessArgs = {"aue": "raw", "auf": "audio/L16;rate=16000", "vcn": "xiaoyan", "tte": "utf8"} self.Data = {"status": 2, "text": str(base64.b64encode(self.Text.encode('utf-8')), "UTF8")} def create_url(self): url = 'wss://tts-api.xfyun.cn/v2/tts' # 生成RFC1123格式的时间戳 now = datetime.now() date = format_date_time(mktime(now.timetuple())) # 拼接字符串 signature_origin = "host: " + "ws-api.xfyun.cn" + "\n" signature_origin += "date: " + date + "\n" signature_origin += "GET " + "/v2/tts " + "HTTP/1.1" # 进行hmac-sha256进行加密 signature_sha = hmac.new(self.APISecret.encode('utf-8'), signature_origin.encode('utf-8'), digestmod=hashlib.sha256).digest() signature_sha = base64.b64encode(signature_sha).decode(encoding='utf-8') authorization_origin = "api_key=\"%s\", algorithm=\"%s\", headers=\"%s\", signature=\"%s\"" % ( self.APIKey, "hmac-sha256", "host date request-line", signature_sha) authorization = base64.b64encode(authorization_origin.encode('utf-8')).decode(encoding='utf-8') # 将请求的鉴权参数组合为字典 v = { "authorization": authorization, "date": date, "host": "ws-api.xfyun.cn" } # 拼接鉴权参数,生成url url = url + '?' + urlencode(v) return url def on_message(self, ws, message): try: message = json.loads(message) code = message["code"] sid = message["sid"] status = message["data"]["status"] if status == 2: ws.close() if code != 0: errMsg = message["message"] print(f"sid:{sid} call error:{errMsg} code is:{code}") else: audio = message["data"]["audio"] self.audio_data = audio if status == 2: # 最后一帧 print("WebSocket connection is closed.") ws.close() except Exception as e: print("Receive message, but parse exception:", e) # on_error和on_close方法类似地修改,可以访问类实例的属性 def on_error(self, error): print("### error:", error) # 收到websocket关闭的处理 def on_close(self): print("### closed ###") def on_open(self, ws): def run(*args): d = {"common": self.CommonArgs, "business": self.BusinessArgs, "data": self.Data} d = json.dumps(d) ws.send(d) threading.Thread(target=run).start() # 使用threading.Thread以提供更好的线程管理 def start(self): websocket.enableTrace(False) self.ws = websocket.WebSocketApp(self.create_url(), on_message=lambda ws, msg: self.on_message(ws, msg), on_error=lambda msg: self.on_error(msg), on_close=self.on_close, on_open=lambda ws: self.on_open(ws)) # 使用 lambda 来确保 ws 参数传递 self.ws.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE}) return self.audio_data