深夜提醒

现在是深夜,建议您注意休息,不要熬夜哦~

🏮 🏮 🏮

新年快乐

祝君万事如意心想事成!

share-image
ESC

阿里云 DashScope Paraformer 实时语音识别接入指南

clip_1775214866984_pjk9jr.png

前言

随着 AI 技术的快速发展,语音识别已经成为许多应用的标配功能。阿里云的 DashScope 平台提供了强大的 Paraformer 实时语音识别服务,支持流式输入和实时返回识别结果。本文将详细介绍如何在 Web 应用中接入该服务。

技术架构

┌─────────────┐      WebSocket      ┌──────────────┐      WebSocket      ┌─────────────┐
│ 浏览器 │ ◄─────────────────► │ FastAPI │ ◄─────────────────► │ DashScope │
│ (WebRTC) │ 音频数据/结果 │ 后端服务 │ 音频数据/结果 │ Paraformer │
└─────────────┘ └──────────────┘ └─────────────┘

准备工作

1. 获取阿里云 API Key

  1. 访问 阿里云 DashScope 控制台
  2. 创建 API Key
  3. 记录 Key 值,后续代码中使用

2. 安装依赖

pip install dashscope fastapi uvicorn websockets

后端实现 (FastAPI)

核心代码

"""
Paraformer 实时语音识别服务
使用阿里云 DashScope Python SDK
"""

import os
import json
import asyncio
import threading
import queue
import logging
import time
from fastapi import APIRouter, WebSocket, WebSocketDisconnect

logger = logging.getLogger(__name__)
router = APIRouter()

# 配置 API Key
DASHSCOPE_API_KEY = os.getenv("DASHSCOPE_API_KEY", "")
if DASHSCOPE_API_KEY:
import dashscope
dashscope.api_key = DASHSCOPE_API_KEY


class ASRClient:
"""封装 ASR 客户端,处理缓冲和识别"""

def __init__(self, client_id: str):
self.client_id = client_id
self.audio_buffer = bytearray()
self.result_queue = queue.Queue()
self.recognition = None
self.recognition_thread = None
self.final_text = ""
self.complete_event = threading.Event()

def start_recognition(self):
"""启动语音识别"""
from dashscope.audio.asr import (
Recognition, RecognitionCallback, RecognitionResult
)

class Callback(RecognitionCallback):
def __init__(self, client):
self.client = client

def on_open(self):
logger.info(f"[CLIENT {self.client.client_id}] DashScope 连接打开")

def on_close(self):
logger.info(f"[CLIENT {self.client.client_id}] DashScope 连接关闭")

def on_complete(self):
logger.info(f"[CLIENT {self.client.client_id}] 识别完成")
self.client.result_queue.put({"type": "complete"})
self.client.complete_event.set()

def on_error(self, result: RecognitionResult):
msg = result.message if hasattr(result, 'message') else str(result)
logger.error(f"[CLIENT {self.client.client_id}] 识别错误: {msg}")
self.client.result_queue.put({"type": "error", "error": msg})
self.client.complete_event.set()

def on_event(self, result: RecognitionResult):
try:
sentence = result.get_sentence()
if sentence and isinstance(sentence, dict):
text = sentence.get("text", "")
is_final = RecognitionResult.is_sentence_end(sentence)

if text:
logger.info(f"识别: '{text}' (final={is_final})")
if is_final:
self.client.final_text += text
self.client.result_queue.put({
"type": "final", "text": text
})
else:
self.client.result_queue.put({
"type": "partial",
"text": self.client.final_text + text
})
except Exception as e:
logger.error(f"处理事件错误: {e}")

callback = Callback(self)
self.recognition = Recognition(
model='paraformer-realtime-8k-v2',
format='pcm',
sample_rate=8000,
callback=callback
)

def run():
try:
# 先发送缓冲区中的数据
if self.audio_buffer:
self.recognition.send_audio_frame(bytes(self.audio_buffer))
self.audio_buffer.clear()

# 启动识别(阻塞直到完成)
self.recognition.start()
except Exception as e:
logger.error(f"识别线程错误: {e}")
self.result_queue.put({"type": "error", "error": str(e)})
self.complete_event.set()

self.recognition_thread = threading.Thread(target=run, daemon=True)
self.recognition_thread.start()

def add_audio(self, audio_data: bytes):
"""添加音频数据"""
try:
if (self.recognition and
hasattr(self.recognition, '_running') and
self.recognition._running):
self.recognition.send_audio_frame(audio_data)
else:
self.audio_buffer.extend(audio_data)
except Exception as e:
self.audio_buffer.extend(audio_data)

def stop(self):
"""停止识别"""
if self.recognition:
try:
self.recognition.stop()
except:
pass
if self.recognition_thread:
self.recognition_thread.join(timeout=2.0)


@router.websocket("/ws/voice")
async def voice_websocket(websocket: WebSocket):
"""WebSocket 实时语音识别"""
await websocket.accept()

if not DASHSCOPE_API_KEY:
await websocket.send_json({
"type": "error", "error": "未配置 DASHSCOPE_API_KEY"
})
await websocket.close()
return

client_id = str(id(websocket))
client = ASRClient(client_id)
recognition_started = False
result_task = None

try:
await websocket.send_json({
"type": "ready", "message": "请发送音频数据"
})

# 后台任务:实时发送识别结果给客户端
async def send_results_realtime():
"""从 result_queue 获取识别结果并实时发送给客户端"""
result_count = 0
last_result_time = time.time()

while True:
try:
msg = client.result_queue.get_nowait()
result_count += 1
last_result_time = time.time()
await websocket.send_json(msg)

if msg.get("type") in ["complete", "error"]:
# 再等待2秒,确保没有遗漏的消息
await asyncio.sleep(2.0)
while True:
try:
msg = client.result_queue.get_nowait()
await websocket.send_json(msg)
except queue.Empty:
break
break

except queue.Empty:
time_since_last = time.time() - last_result_time
if time_since_last > 30:
break
await asyncio.sleep(0.05)
continue

except Exception as e:
logger.error(f"发送结果错误: {e}")
break

# 接收音频数据
while True:
try:
data = await asyncio.wait_for(websocket.receive(), timeout=0.5)

if isinstance(data, dict):
msg_type = data.get('type')
if msg_type == 'websocket.receive':
# 控制消息
text_data = data.get('text')
if text_data:
try:
msg = json.loads(text_data)
if msg.get("type") == "start":
if not recognition_started:
client.start_recognition()
recognition_started = True
result_task = asyncio.create_task(
send_results_realtime()
)
await websocket.send_json({
"type": "status",
"message": "recognition_started"
})
elif msg.get("type") == "stop":
break
except json.JSONDecodeError:
pass
# 音频数据
bytes_data = data.get('bytes')
if bytes_data:
client.add_audio(bytes_data)
if (not recognition_started and
len(client.audio_buffer) >= 16000):
client.start_recognition()
recognition_started = True
result_task = asyncio.create_task(
send_results_realtime()
)
await websocket.send_json({
"type": "status",
"message": "recognition_started"
})
elif msg_type == 'websocket.disconnect':
break

except asyncio.TimeoutError:
continue
except WebSocketDisconnect:
break

# 等待实时发送任务完成
if result_task:
try:
await asyncio.wait_for(result_task, timeout=35.0)
except asyncio.TimeoutError:
result_task.cancel()

except Exception as e:
logger.error(f"语音识别错误: {e}")
finally:
if result_task:
result_task.cancel()
client.stop()
try:
await websocket.close()
except:
pass

关键要点

  1. RecognitionCallback: 必须实现所有回调方法(on_open, on_close, on_complete, on_error, on_event
  2. 音频格式: 必须为 8kHz 采样率、16-bit PCM、单声道
  3. 缓冲策略: 先缓冲约1秒音频(16000 bytes)再启动识别器,避免 NO_VALID_AUDIO_ERROR
  4. 实时推送: 使用 queue.Queue 和 asyncio 后台任务实现结果的实时推送

前端实现

// 语音录制状态
let isVoiceRecording = false;
let voiceWs = null;
let voiceAudioContext = null;
let voiceProcessor = null;
let voiceStream = null;
let voiceFinalText = '';
let voicePartialText = '';

async function toggleVoiceRecording() {
if (!isVoiceRecording) {
await startVoiceRecording();
} else {
await stopVoiceRecording();
}
}

async function startVoiceRecording() {
try {
// 重置状态
voiceFinalText = '';
voicePartialText = '';

// 获取麦克风权限 (8kHz 采样率)
voiceStream = await navigator.mediaDevices.getUserMedia({
audio: {
sampleRate: 8000, // 8kHz 与模型匹配
channelCount: 1, // 单声道
echoCancellation: true,
noiseSuppression: true
}
});

// 连接 WebSocket
const wsUrl = `ws://${window.location.host}/ws/voice`;
voiceWs = new WebSocket(wsUrl);

voiceWs.onopen = () => {
console.log('语音识别 WebSocket 连接成功');
startVoiceAudioProcessing();
};

voiceWs.onmessage = (event) => {
const data = JSON.parse(event.data);
handleVoiceMessage(data);
};

voiceWs.onerror = (error) => {
console.error('WebSocket 错误:', error);
stopVoiceRecording();
};

voiceWs.onclose = () => {
console.log('WebSocket 关闭');
};

// 更新 UI
isVoiceRecording = true;
document.getElementById('voiceBtn').classList.add('recording');

} catch (err) {
console.error('启动录音失败:', err);
showToast('无法访问麦克风: ' + err.message, 'error');
}
}

function startVoiceAudioProcessing() {
// 创建音频上下文
voiceAudioContext = new (window.AudioContext || window.webkitAudioContext)({
sampleRate: 8000
});

// 创建音频源
const source = voiceAudioContext.createMediaStreamSource(voiceStream);

// 创建处理器 (bufferSize=1024, 单声道)
voiceProcessor = voiceAudioContext.createScriptProcessor(1024, 1, 1);

voiceProcessor.onaudioprocess = (e) => {
if (!isVoiceRecording || !voiceWs || voiceWs.readyState !== WebSocket.OPEN)
return;

// 获取音频数据
const inputData = e.inputBuffer.getChannelData(0);

// 转换为 16位 PCM
const pcmData = floatTo16BitPCM(inputData);

// 发送音频数据到服务器
voiceWs.send(pcmData);
};

// 连接音频节点
source.connect(voiceProcessor);
voiceProcessor.connect(voiceAudioContext.destination);
}

function floatTo16BitPCM(float32Array) {
const buffer = new ArrayBuffer(float32Array.length * 2);
const view = new DataView(buffer);

for (let i = 0; i < float32Array.length; i++) {
let s = Math.max(-1, Math.min(1, float32Array[i]));
s = s < 0 ? s * 0x8000 : s * 0x7FFF;
view.setInt16(i * 2, s, true);
}

return buffer;
}

function handleVoiceMessage(data) {
const input = document.getElementById('userInput');

if (data.type === 'ready') {
console.log('语音识别服务就绪');

} else if (data.type === 'status') {
console.log('语音识别状态:', data.message);

} else if (data.type === 'partial') {
// 中间结果 - 实时显示
voicePartialText = data.text;
input.value = voiceFinalText + voicePartialText;
input.style.color = '#666';

} else if (data.type === 'final') {
// 最终结果片段
voiceFinalText += data.text;
voicePartialText = '';
input.value = voiceFinalText;
input.style.color = '';

} else if (data.type === 'complete') {
console.log('识别完成');

} else if (data.type === 'error') {
console.error('语音识别错误:', data.error);
showToast('语音识别错误: ' + data.error, 'error');
}
}

async function stopVoiceRecording() {
if (!isVoiceRecording) return;

isVoiceRecording = false;

// 停止音频处理
if (voiceProcessor) {
voiceProcessor.disconnect();
voiceProcessor = null;
}

if (voiceStream) {
voiceStream.getTracks().forEach(track => track.stop());
voiceStream = null;
}

if (voiceAudioContext) {
voiceAudioContext.close();
voiceAudioContext = null;
}

// 发送停止命令
if (voiceWs && voiceWs.readyState === WebSocket.OPEN) {
voiceWs.send(JSON.stringify({ type: 'stop' }));
setTimeout(() => {
voiceWs.close();
voiceWs = null;
}, 1000);
}

// 自动发送识别的消息
const finalText = voiceFinalText + voicePartialText;
if (finalText.trim()) {
input.value = finalText.trim();
setTimeout(() => {
sendMessage();
}, 300);
}
}

常见问题

1. NO_VALID_AUDIO_ERROR

原因: DashScope 服务器没有收到有效音频数据

解决:

  • 确保音频格式正确(8kHz, 16-bit PCM, 单声道)
  • 先缓冲约1秒音频再启动识别器
  • 检查麦克风权限是否获取成功

2. 识别结果不实时返回

原因: recognition.start() 是阻塞的,需要后台任务推送结果

解决:

# 启动后台任务实时推送结果
result_task = asyncio.create_task(send_results_realtime())

3. WebSocket 连接断开

原因:

  • 长时间未发送数据
  • 网络不稳定

解决:

  • 保持音频数据持续发送
  • 实现重连机制

总结

通过本文的介绍,我们完成了阿里云 DashScope Paraformer 实时语音识别的接入。核心要点:

  1. 音频格式: 8kHz, 16-bit PCM, 单声道
  2. 缓冲策略: 先缓冲1秒音频再启动识别
  3. 实时推送: 使用 queue + asyncio 后台任务
  4. 回调处理: 实现完整的 RecognitionCallback

完整代码已开源,欢迎参考使用。

参考链接


作者: Awen
发布时间: 2026-04-03
标签: #语音识别 #阿里云 #DashScope #Paraformer #WebSocket #FastAPI

文章作者:阿文
文章链接: https://www.awen.me/post/397d3771.html
版权声明:本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 阿文的博客

评论

0 条评论
😀😃😄 😁😅😂 🤣😊😇 🙂🙃😉 😌😍🥰 😘😗😙 😚😋😛 😝😜🤪 🤨🧐🤓 😎🥸🤩 🥳😏😒 😞😔😟 😕🙁☹️ 😣😖😫 😩🥺😢 😭😤😠 😡🤬🤯 😳🥵🥶 😱😨😰 😥😓🤗 🤔🤭🤫 🤥😶😐 😑😬🙄 😯😦😧 😮😲🥱 😴🤤😪 😵🤐🥴 🤢🤮🤧 😷🤒🤕 🤑🤠😈 👿👹👺 🤡💩👻 💀☠️👽 👾🤖🎃 😺😸😹 😻😼😽 🙀😿😾 👍👎👏 🙌👐🤲 🤝🤜🤛 ✌️🤞🤟 🤘👌🤏 👈👉👆 👇☝️ 🤚🖐️🖖 👋🤙💪 🦾🖕✍️ 🙏💅🤳 💯💢💥 💫💦💨 🕳️💣💬 👁️‍🗨️🗨️🗯️ 💭💤❤️ 🧡💛💚 💙💜🖤 🤍🤎💔 ❣️💕💞 💓💗💖 💘💝💟 ☮️✝️☪️ 🕉️☸️✡️ 🔯🕎☯️ ☦️🛐 🆔⚛️🉑 ☢️☣️📴 📳🈶🈚 🈸🈺🈷️ ✴️🆚💮 🉐㊙️㊗️ 🈴🈵🈹 🈲🅰️🅱️ 🆎🆑🅾️ 🆘 🛑📛 🚫💯💢 ♨️🚷🚯 🚳🚱🔞 📵🚭 ‼️⁉️🔅 🔆〽️⚠️ 🚸🔱⚜️ 🔰♻️ 🈯💹❇️ ✳️🌐 💠Ⓜ️🌀 💤🏧🚾 🅿️🈳 🈂🛂🛃 🛄🛅🛗 🚀🛸🚁 🚉🚆🚅 ✈️🛫🛬 🛩️💺🛰️
您的评论由 AI 智能审核,一般1分钟内会展示,若不展示请确认你的评论是否符合社区和法律规范
加载中...

留言反馈

😀😃😄 😁😅😂 🤣😊😇 🙂🙃😉 😌😍🥰 😘😗😙 😚😋😛 😝😜🤪 🤨🧐🤓 😎🥸🤩 🥳😏😒 😞😔😟 😕🙁☹️ 😣😖😫 😩🥺😢 😭😤😠 😡🤬🤯 😳🥵🥶 😱😨😰 😥😓🤗 🤔🤭🤫 🤥😶😐 😑😬🙄 😯😦😧 😮😲🥱 😴🤤😪 😵🤐🥴 🤢🤮🤧 😷🤒🤕 🤑🤠😈 👿👹👺 🤡💩👻 💀☠️👽 👾🤖🎃 😺😸😹 😻😼😽 🙀😿😾 👍👎👏 🙌👐🤲 🤝🤜🤛 ✌️🤞🤟 🤘👌🤏 👈👉👆 👇☝️ 🤚🖐️🖖 👋🤙💪 🦾🖕✍️ 🙏💅🤳 💯💢💥 💫💦💨 🕳️💣💬 👁️‍🗨️🗨️🗯️ 💭💤❤️ 🧡💛💚 💙💜🖤 🤍🤎💔 ❣️💕💞 💓💗💖 💘💝💟 ☮️✝️☪️ 🕉️☸️✡️ 🔯🕎☯️ ☦️🛐 🆔⚛️🉑 ☢️☣️📴 📳🈶🈚 🈸🈺🈷️ ✴️🆚💮 🉐㊙️㊗️ 🈴🈵🈹 🈲🅰️🅱️ 🆎🆑🅾️ 🆘 🛑📛 🚫💯💢 ♨️🚷🚯 🚳🚱🔞 📵🚭 ‼️⁉️🔅 🔆〽️⚠️ 🚸🔱⚜️ 🔰♻️ 🈯💹❇️ ✳️🌐 💠Ⓜ️🌀 💤🏧🚾 🅿️🈳 🈂🛂🛃 🛄🛅🛗 🚀🛸🚁 🚉🚆🚅 ✈️🛫🛬 🛩️💺🛰️