123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113 |
- import websocket
- import datetime
- import hashlib
- import base64
- import hmac
- import json
- from urllib.parse import urlencode
- import time
- import ssl
- from wsgiref.handlers import format_date_time
- from datetime import datetime
- from time import mktime
- import threading
- """
- 调用讯飞模型 语音转文字
- """
- class WsParamRecognize:
- STATUS_FIRST_FRAME = 0
- STATUS_CONTINUE_FRAME = 1
- STATUS_LAST_FRAME = 2
- def __init__(self, APPID, APISecret, APIKey, AudioFile):
- self.APPID = APPID
- self.APISecret = APISecret
- self.APIKey = APIKey
- self.AudioFile = AudioFile
- self.all_sentences = ""
- # 公共参数
- self.CommonArgs = {"app_id": self.APPID}
- # 业务参数
- self.BusinessArgs = {"domain": "iat", "language": "zh_cn", "accent": "mandarin", "vinfo": 1, "vad_eos": 10000}
- def create_url(self):
- url = 'wss://ws-api.xfyun.cn/v2/iat'
- now = datetime.now()
- date = format_date_time(mktime(now.timetuple()))
- signature_origin = "host: ws-api.xfyun.cn\n" + "date: " + date + "\nGET /v2/iat HTTP/1.1"
- signature_sha = hmac.new(self.APISecret.encode('utf-8'), signature_origin.encode('utf-8'),
- digestmod=hashlib.sha256).digest()
- signature_sha = base64.b64encode(signature_sha).decode(encoding='utf-8')
- authorization_origin = "api_key=\"%s\", algorithm=\"hmac-sha256\", headers=\"host date request-line\", signature=\"%s\"" % (
- self.APIKey, signature_sha)
- authorization = base64.b64encode(authorization_origin.encode('utf-8')).decode(encoding='utf-8')
- v = {"authorization": authorization, "date": date, "host": "ws-api.xfyun.cn"}
- url = url + '?' + urlencode(v)
- return url
- def on_message(self, ws, message):
- try:
- message = json.loads(message)
- code = message["code"]
- sid = message["sid"]
- if code != 0:
- errMsg = message["message"]
- print("sid:%s call error:%s code is:%s" % (sid, errMsg, code))
- else:
- data = message["data"]["result"]["ws"]
- for i in data:
- for w in i["cw"]:
- self.all_sentences += w["w"]
- except Exception as e:
- print("receive msg,but parse exception:", e)
- def on_error(self, error):
- print("### error:", error)
- def on_close(self):
- print("### closed ###")
- def on_open(self, ws):
- def run(*args):
- frameSize = 8000
- intervel = 0.04
- status = self.STATUS_FIRST_FRAME
- with open(self.AudioFile, "rb") as fp:
- while True:
- buf = fp.read(frameSize)
- if not buf:
- status = self.STATUS_LAST_FRAME
- if status == self.STATUS_FIRST_FRAME:
- d = {"common": self.CommonArgs, "business": self.BusinessArgs,
- "data": {"status": 0, "format": "audio/L16;rate=16000",
- "audio": str(base64.b64encode(buf), 'utf-8'), "encoding": "raw"}}
- ws.send(json.dumps(d))
- status = self.STATUS_CONTINUE_FRAME
- elif status == self.STATUS_CONTINUE_FRAME:
- ws.send(json.dumps({"data": {"status": 1, "format": "audio/L16;rate=16000",
- "audio": str(base64.b64encode(buf), 'utf-8'), "encoding": "raw"}}))
- elif status == self.STATUS_LAST_FRAME:
- ws.send(json.dumps({"data": {"status": 2, "format": "audio/L16;rate=16000",
- "audio": str(base64.b64encode(buf), 'utf-8'), "encoding": "raw"}}))
- time.sleep(1)
- break
- time.sleep(intervel)
- ws.close()
- threading.Thread(target=run).start()
- def start(self):
- websocket.enableTrace(False)
- wsUrl = self.create_url()
- self.ws = websocket.WebSocketApp(wsUrl,
- on_message=lambda ws, msg: self.on_message(ws, msg),
- on_error=self.on_error,
- on_close=self.on_close,
- on_open=lambda ws: self.on_open(ws))
- self.ws.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE})
- return self.all_sentences
|