| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113 | import websocketimport datetimeimport hashlibimport base64import hmacimport jsonfrom urllib.parse import urlencodeimport timeimport sslfrom wsgiref.handlers import format_date_timefrom datetime import datetimefrom time import mktimeimport threading"""调用讯飞模型 语音转文字"""class WsParamRecognize:    STATUS_FIRST_FRAME = 0    STATUS_CONTINUE_FRAME = 1    STATUS_LAST_FRAME = 2    def __init__(self, APPID, APISecret, APIKey, AudioFile):        self.APPID = APPID        self.APISecret = APISecret        self.APIKey = APIKey        self.AudioFile = AudioFile        self.all_sentences = ""        # 公共参数        self.CommonArgs = {"app_id": self.APPID}        # 业务参数        self.BusinessArgs = {"domain": "iat", "language": "zh_cn", "accent": "mandarin", "vinfo": 1, "vad_eos": 10000}    def create_url(self):        url = 'wss://ws-api.xfyun.cn/v2/iat'        now = datetime.now()        date = format_date_time(mktime(now.timetuple()))        signature_origin = "host: ws-api.xfyun.cn\n" + "date: " + date + "\nGET /v2/iat HTTP/1.1"        signature_sha = hmac.new(self.APISecret.encode('utf-8'), signature_origin.encode('utf-8'),                                 digestmod=hashlib.sha256).digest()        signature_sha = base64.b64encode(signature_sha).decode(encoding='utf-8')        authorization_origin = "api_key=\"%s\", algorithm=\"hmac-sha256\", headers=\"host date request-line\", signature=\"%s\"" % (            self.APIKey, signature_sha)        authorization = base64.b64encode(authorization_origin.encode('utf-8')).decode(encoding='utf-8')        v = {"authorization": authorization, "date": date, "host": "ws-api.xfyun.cn"}        url = url + '?' + urlencode(v)        return url    def on_message(self, ws, message):        try:            message = json.loads(message)            code = message["code"]            sid = message["sid"]            if code != 0:                errMsg = message["message"]                print("sid:%s call error:%s code is:%s" % (sid, errMsg, code))            else:                data = message["data"]["result"]["ws"]                for i in data:                    for w in i["cw"]:                        self.all_sentences += w["w"]        except Exception as e:            print("receive msg,but parse exception:", e)    def on_error(self, error):        print("### error:", error)    def on_close(self):        print("### closed ###")    def on_open(self, ws):        def run(*args):            frameSize = 8000            intervel = 0.04            status = self.STATUS_FIRST_FRAME            with open(self.AudioFile, "rb") as fp:                while True:                    buf = fp.read(frameSize)                    if not buf:                        status = self.STATUS_LAST_FRAME                    if status == self.STATUS_FIRST_FRAME:                        d = {"common": self.CommonArgs, "business": self.BusinessArgs,                             "data": {"status": 0, "format": "audio/L16;rate=16000",                                      "audio": str(base64.b64encode(buf), 'utf-8'), "encoding": "raw"}}                        ws.send(json.dumps(d))                        status = self.STATUS_CONTINUE_FRAME                    elif status == self.STATUS_CONTINUE_FRAME:                        ws.send(json.dumps({"data": {"status": 1, "format": "audio/L16;rate=16000",                                                     "audio": str(base64.b64encode(buf), 'utf-8'), "encoding": "raw"}}))                    elif status == self.STATUS_LAST_FRAME:                        ws.send(json.dumps({"data": {"status": 2, "format": "audio/L16;rate=16000",                                                     "audio": str(base64.b64encode(buf), 'utf-8'), "encoding": "raw"}}))                        time.sleep(1)                        break                    time.sleep(intervel)            ws.close()        threading.Thread(target=run).start()    def start(self):        websocket.enableTrace(False)        wsUrl = self.create_url()        self.ws = websocket.WebSocketApp(wsUrl,                                         on_message=lambda ws, msg: self.on_message(ws, msg),                                         on_error=self.on_error,                                         on_close=self.on_close,                                         on_open=lambda ws: self.on_open(ws))        self.ws.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE})        return self.all_sentences
 |