import websocket import datetime import hashlib import base64 import hmac import json from urllib.parse import urlencode import ssl from wsgiref.handlers import format_date_time from datetime import datetime from time import mktime import threading """ 调用讯飞模型 文字转语音 """ class WsParamSynthesize: def __init__(self, APPID, APIKey, APISecret, Text, AudioName="demo"): self.APPID = APPID self.APIKey = APIKey self.APISecret = APISecret self.Text = Text self.AudioType = "mp3" self.AudioName = AudioName # 初始化其他需要的属性 self.CommonArgs = {"app_id": self.APPID} if self.AudioType == "mp3": self.BusinessArgs = {"aue": "lame", "auf": "audio/L16;rate=8000", "vcn": "xiaoyan", "tte": "utf8", "sfl": 1} else: self.BusinessArgs = {"aue": "raw", "auf": "audio/L16;rate=8000", "vcn": "xiaoyan", "tte": "utf8"} self.Data = {"status": 2, "text": str(base64.b64encode(self.Text.encode('utf-8')), "UTF8")} def create_url(self): url = 'wss://tts-api.xfyun.cn/v2/tts' # 生成RFC1123格式的时间戳 now = datetime.now() date = format_date_time(mktime(now.timetuple())) # 拼接字符串 signature_origin = "host: " + "ws-api.xfyun.cn" + "\n" signature_origin += "date: " + date + "\n" signature_origin += "GET " + "/v2/tts " + "HTTP/1.1" # 进行hmac-sha256进行加密 signature_sha = hmac.new(self.APISecret.encode('utf-8'), signature_origin.encode('utf-8'), digestmod=hashlib.sha256).digest() signature_sha = base64.b64encode(signature_sha).decode(encoding='utf-8') authorization_origin = "api_key=\"%s\", algorithm=\"%s\", headers=\"%s\", signature=\"%s\"" % ( self.APIKey, "hmac-sha256", "host date request-line", signature_sha) authorization = base64.b64encode(authorization_origin.encode('utf-8')).decode(encoding='utf-8') # 将请求的鉴权参数组合为字典 v = { "authorization": authorization, "date": date, "host": "ws-api.xfyun.cn" } # 拼接鉴权参数,生成url url = url + '?' + urlencode(v) return url def on_message(self, ws, message): try: message = json.loads(message) code = message["code"] sid = message["sid"] audio = message["data"]["audio"] if code != 0: errMsg = message["message"] print(f"Error: {errMsg}, code: {code}") return None audio = base64.b64decode(audio) status = message["data"]["status"] if status == 2: print("WebSocket is closed") ws.close() if code != 0: errMsg = message["message"] print("sid:%s call error:%s code is:%s" % (sid, errMsg, code)) else: with open(f'static/demo_files/{self.AudioName}.{self.AudioType}', 'ab') as f: f.write(audio) except Exception as e: print("Exception while parsing message:", e) return None # on_error和on_close方法类似地修改,可以访问类实例的属性 def on_error(self, error): print("### error:", error) # 收到websocket关闭的处理 def on_close(self): print("### closed ###") def on_open(self, ws): def run(*args): d = {"common": self.CommonArgs, "business": self.BusinessArgs, "data": self.Data} d = json.dumps(d) ws.send(d) threading.Thread(target=run).start() def start(self): websocket.enableTrace(False) self.ws = websocket.WebSocketApp(self.create_url(), on_message=lambda ws, msg: self.on_message(ws, msg), on_error=lambda msg: self.on_error(msg), on_close=self.on_close, on_open=lambda ws: self.on_open(ws)) # 使用 lambda 来确保 ws 参数传递 self.ws.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE})