软件设计讲解
2025-09-15
软件框架
框架设计图
业务系统启动流程
代码讲解
AI 唤醒&人声检测
class Application(object):
def on_keyword_spotting(self, state):
logger.info("on_keyword_spotting: {}".format(state))
if state == 0:
# 唤醒词触发
if self.__working_thread is not None and self.__working_thread.is_running():
return
self.__working_thread = Thread(target=self.__working_thread_handler)
self.__working_thread.start()
self.__keyword_spotting_event.clear()
else:
self.__keyword_spotting_event.set()
def on_voice_activity_detection(self, state):
gc.collect()
logger.info("on_voice_activity_detection: {}".format(state))
if state == 1:
self.__voice_activity_event.set() # 有人声
else:
self.__voice_activity_event.clear() # 无人声
AI 初始化
初始化 AI 对象以及其他硬件驱动。
class Application(object):
def __init__(self):
# 初始化唤醒按键
self.talk_key = ExtInt(ExtInt.GPIO19, ExtInt.IRQ_RISING_FALLING, ExtInt.PULL_PU, self.on_talk_key_click, 50)
# 初始化 led; write(1) 灭; write(0) 亮
self.wifi_red_led = Led(33)
self.wifi_green_led = Led(32)
self.power_red_led = Led(39)
self.power_green_led = Led(38)
self.lte_red_led = Led(23)
self.lte_green_led = Led(24)
self.led_power_pin = Pin(Pin.GPIO27, Pin.OUT, Pin.PULL_DISABLE, 0)
# 初始化充电管理
self.charge_manager = ChargeManager()
# 初始化音频管理
self.audio_manager = AudioManager()
self.audio_manager.set_kws_cb(self.on_keyword_spotting)
self.audio_manager.set_vad_cb(self.on_voice_activity_detection)
# 初始化网络管理
self.net_manager = NetManager()
# 初始化任务调度器
self.task_manager = TaskManager()
# 初始化协议
self.__protocol = WebSocketClient()
self.__protocol.set_callback(
audio_message_handler=self.on_audio_message,
json_message_handler=self.on_json_message
)
self.__working_thread = None
self.__record_thread = None
self.__record_thread_stop_event = Event()
self.__voice_activity_event = Event()
self.__keyword_spotting_event = Event()
AI 对话中断逻辑
在唤醒AI之后启动该人声音频检测与上传的线程,self.start_vad()用于启动录音功能,当有人声时会通过websocket发送“开始监听”的标志位start,然后执行self.__protocol.abort()结束当前的语音从而实现打断AI说话。
class Application(object):
def __chat_process(self):
self.start_vad()
try:
with self.__protocol:
self.power_red_led.on()
self.__protocol.hello()
self.__protocol.wakeword_detected("小智")
is_listen_flag = False
while True:
data = self.audio_manager.opus_read()
if self.__voice_activity_event.is_set():
# 有人声
if not is_listen_flag:
self.__protocol.abort()
self.__protocol.listen("start")
is_listen_flag = True
self.__protocol.send(data)
# logger.debug("send opus data to server")
else:
if is_listen_flag:
self.__protocol.listen("stop")
is_listen_flag = False
if not self.__protocol.is_state_ok():
break
# logger.debug("read opus data length: {}".format(len(data)))
except Exception as e:
logger.debug("working thread handler got Exception: {}".format(repr(e)))
finally:
self.power_red_led.blink(250, 250)
self.stop_vad()
音频管理
统一管理设备的音频输入输出、编解码、语音识别相关功能(关键词识别 KWS 和语音活动检测 VAD),并提供回调接口供上层应用使用。
class AudioManager(object):
def __init__(self, channel=0, volume=5, pa_number=29):
self.aud = audio.Audio(channel) # 初始化音频播放通道
self.aud.set_pa(pa_number)
self.aud.setVolume(volume) # 设置音量
self.aud.setCallback(self.audio_cb)
self.rec = audio.Record(channel)
self.rec.gain_set(4,10)
self.__skip = 0
def setvolume_down(self):
global volume
volume -= 1
if volume < 0: volume = 0
self.aud.setVolume(volume)
return volume
def setvolume_up(self):
global volume
volume += 1
if volume > 11: volume = 11
self.aud.setVolume(volume)
return volume
def setvolume_close(self):
self.aud.setVolume(0)
volume = 0
return volume
# ========== 音频文件 ====================
def audio_cb(self, event):
if event == 0:
# logger.info('audio play start.')
pass
elif event == 7:
# logger.info('audio play finish.')
pass
else:
pass
def play(self, file):
self.aud.play(0, 1, file)
# ========= opus ====================
def open_opus(self):
self.pcm = audio.Audio.PCM(0, 1, 16000, 2, 1, 15) # 5 -> 25
self.opus = Opus(self.pcm, 0, 6000) # 6000 ~ 128000
def close_opus(self):
self.opus.close()
self.pcm.close()
del self.opus
del self.pcm
def opus_read(self):
return self.opus.read(60)
def opus_write(self, data):
return self.opus.write(data)
# ========= vad & kws ====================
def set_kws_cb(self, cb):
self.rec.ovkws_set_callback(cb)
def set_vad_cb(self, cb):
def wrapper(state):
if self.__skip != 2:
self.__skip += 1
return
return cb(state)
self.rec.vad_set_callback(wrapper)
def end_cb(self, para):
if(para[0] == "stream"):
if(para[2] == 1):
pass
elif (para[2] == 3):
pass
else:
pass
else:
pass
def start_kws(self):
self.rec.ovkws_start("_xiao_zhi_xiao_zhi", 0.7)
def stop_kws(self):
self.rec.ovkws_stop()
def start_vad(self):
self.__skip = 0
self.rec.vad_start()
def stop_vad(self):
self.rec.vad_stop()
MCP管理
基于 MCP 协议的消息构造与发送功能,涵盖了初始化、工具列表查询、工具调用响应以及设备通知等场景。所有消息均遵循 JSON-RPC 2.0 格式,并通过 send_mcp 方法统一发送,确保了代码的模块化和一致性。
class WebSocketClient(object):
def send_mcp(self, payload, session_id=""):
"""
发送标准MCP消息,payload为JSON-RPC 2.0格式字典
"""
with self.__resp_helper:
self.send(
JsonMessage(
{
"session_id": session_id,
"type": "mcp",
"payload": payload
}
).to_bytes()
)
def mcp_initialize(self, capabilities=None, session_id="", req_id=1):
"""
发送MCP initialize响应
"""
payload = {
"jsonrpc": "2.0",
"id": req_id,
"result": {
"protocolVersion": "2025-9-03",
"capabilities": {
"tools":{},
"notifications": {}
},
"serverInfo": {
"name": 'xiaozhi-mqtt-client',
"version": "1.0.0"
}
}
}
self.send_mcp(payload, session_id)
def mcp_tools_list(self, cursor="", session_id="", req_id=2):
"""
发送MCP tools/list响应请求
"""
payload = {
"jsonrpc": "2.0",
"id": req_id,
"result": {
"tools": [
{
"name": "self.setvolume_down()",
"description": "只通过调用setvolume_down方法来控制音量变小,接收到回应后会播报当前音量大小",
"inputSchema": {}
},
{
"name": "self.setvolume_up()",
"description": "只通过调用setvolume_up方法来控制音量变大,接收到回应后会播报当前音量大小",
"inputSchema": {}
},
{
"name": "self.setvolume_close()",
"description": "只通过调用setvolume_close方法来静音,接收到回应后会播报当前音量大小",
"inputSchema": {}
},
],
}
}
self.send_mcp(payload, session_id)
def mcp_tools_call(self, session_id="", req_id="", error=None, tool_name=""):
"""
发送MCP tools/call响应
:param error: 如果为None则返回成功响应,否则返回错误响应(字典,包含code和message)
"""
if error is None:
if tool_name == "self.setvolume_down()":
payload = {
"jsonrpc": "2.0",
"id": req_id,
"result": {
"content": [
{ "type": "text", "text": "音量已调小 "}
],
"isError": False
}
}
elif tool_name == "self.setvolume_up()":
payload = {
"jsonrpc": "2.0",
"id": req_id,
"result": {
"content": [
{ "type": "text", "text": "音量已调大" }
],
"isError": False
}
}
elif tool_name == "self.setvolume_close()":
payload = {
"jsonrpc": "2.0",
"id": req_id,
"result": {
"content": [
{ "type": "text", "text": "已静音" }
],
"isError": False
}
}
else:
payload = {
"jsonrpc": "2.0",
"id": req_id,
"error": {
"code": error.get("code", -32601),
"message": error.get("message", "Unknown error")
}
self.send_mcp(payload, session_id)
def mcp_notify(self, method, params, session_id=""):
"""
设备主动发送MCP通知
"""
payload = {
"jsonrpc": "2.0",
"method": "notifications/state_changed",
"params": {
"newState": "idle",
"oldState": "connecting"
}
}
self.send_mcp(payload, session_id)