Software Design Explanation

Software Framework

Scheme Framework

Business Process

Code Explanation

Application Class Initialization

Initialize AI objects and other hardware drivers.

class Application(object):
    def __init__(self):

    	Pin(Pin.GPIO33, Pin.OUT, Pin.PULL_PD, 1)
        self.prev_emoj = None        

        # Initialize charge management
        self.charge_manager = ChargeManager()

        # Initialize audio management
        self.audio_manager = AudioManager()
        self.audio_manager.set_kws_cb(self.on_keyword_spotting)
        self.audio_manager.set_vad_cb(self.on_voice_activity_detection)

        # Initialize network management
        self.net_manager = NetManager()

        # Initialize task scheduler
        self.task_manager = TaskManager()

        # Initialize protocol
        self.__protocol = MqttClient()
        self.__protocol.set_callback(
            audio_message_handler=self.on_audio_message,
            json_message_handler=self.on_json_message
        )

        self.__working_thread = None
        self.__record_thread = None
        self.__record_thread_stop_event = Event()
        self.__voice_activity_event = Event()
        self.__keyword_spotting_event = Event()

Wake-up & Voice Detection

When the wake-up word is detected, the working thread self.__working_thread will be pulled up, and the working thread will enable vad for voice activity detection

class Application(object):
    def on_keyword_spotting(self, state):
        logger.info("on_keyword_spotting: {}".format(state))
        if state == 0:
            # Wake-up word triggered
            if self.__working_thread is not None and self.__working_thread.is_running():
                return
            self.__working_thread = Thread(target=self.__working_thread_handler)
            self.__working_thread.start()
            self.__keyword_spotting_event.clear()
        else:
            self.__keyword_spotting_event.set()

    def on_voice_activity_detection(self, state):
        gc.collect()
        logger.info("on_voice_activity_detection: {}".format(state))
        if state == 1:
            self.__voice_activity_event.set()  # Voice detected
        else:
            self.__voice_activity_event.clear()  # No voice detected

Dialogue Logic

After the working thread is started, it will start a thread to execute __chat_process, and send the read audio data when voice is detected

class Application(object):
    def __chat_process(self):
        self.start_vad()
        try:
            with self.__protocol:
                self.power_red_led.on()
                self.__protocol.hello()
                self.__protocol.wakeword_detected("Xiaozhi")
                is_listen_flag = False
                while True:
                    data = self.audio_manager.opus_read()
                    if self.__voice_activity_event.is_set():
                        # Voice detected
                        if not is_listen_flag:
                            self.__protocol.listen("start")
                            is_listen_flag = True
                        self.__protocol.udp_send(data) 
                        # logger.debug("send opus data to server")
                    else:
                        if is_listen_flag:
                            self.__protocol.listen("stop")
                            is_listen_flag = False
                    if not self.__protocol.is_state_ok():
                        break
                    # logger.debug("read opus data length: {}".format(len(data)))
        except Exception as e:
            logger.debug("working thread handler got Exception: {}".format(repr(e)))
        finally:
            self.power_red_led.blink(250, 250)
            self.stop_vad()

Audio Management

Unified management of the device's audio input and output, encoding and decoding, speech recognition-related functions (Keyword Spotting KWS and Voice Activity Detection VAD), and providing callback interfaces for upper-layer applications.

class AudioManager(object):
    def __init__(self, channel=0, volume=11, pa_number=29):
        self.aud = audio.Audio(channel)  # Initialize audio playback channel
        self.aud.set_pa(pa_number)
        self.aud.setVolume(volume)  # Set volume
        self.aud.setCallback(self.audio_cb)
        self.rec = audio.Record(channel)
        self.__skip = 0
    # ========== Audio files ====================
    def audio_cb(self, event):
        if event == 0:
            # logger.info('audio play start.')
            pass
        elif event == 7:
            # logger.info('audio play finish.')
            pass
        else:
            pass
    def play(self, file):
        self.aud.play(0, 1, file)
    # ========= opus ====================
    def open_opus(self):
        self.pcm = audio.Audio.PCM(0, 1, 16000, 2, 1, 15)  # 5 -> 25
        self.opus = Opus(self.pcm, 0, 6000)  # 6000 ~ 128000 
    def close_opus(self):
        self.opus.close()
        self.pcm.close()
        del self.opus
        del self.pcm
    def opus_read(self):
        return self.opus.read(60)
    def opus_write(self, data):
        return self.opus.write(data)
    # ========= vad & kws ====================
    def set_kws_cb(self, cb):
        self.rec.ovkws_set_callback(cb)         
    def set_vad_cb(self, cb):
        def wrapper(state):
            if self.__skip != 2:
                self.__skip += 1
                return
            return cb(state)
        self.rec.vad_set_callback(wrapper)
    def end_cb(self, para):
        if(para[0] == "stream"):
            if(para[2] == 1):
                pass
            elif (para[2] == 3):
                pass
            else:
                pass
        else:
            pass   
    def start_kws(self):
        self.rec.ovkws_start("_xiao_zhi_xiao_zhi", 0.7)
    def stop_kws(self):
        self.rec.ovkws_stop()
    def start_vad(self):
        self.__skip = 0
        self.rec.vad_start() 
    def stop_vad(self):
        self.rec.vad_stop()

MQTT and UDP Connections

MQTT Connection

First of all, it should be clear that to successfully connect to Xiaozhi platform's MQTT server, the corresponding MQTT connection parameters are required, and these parameters need to be obtained through Xiaozhi's OTA interface. In this scheme, we directly use Quecpython's request module to send a GET request to obtain the parameters; the specific OTA request method, request header format and request body format are all strictly implemented in accordance with the OTA documentation officially provided by Xiaozhi.

The scheme specially designs a parameter acquisition method ota_get(), which is automatically called once during the initialization of the MqttClient class to ensure that the necessary connection parameters can be obtained at the initial stage of MQTT client instance creation, laying the foundation for the subsequent MQTT server connection process.

After successfully obtaining the MQTT connection parameters, the scheme will temporarily store these parameters in the global list aes_opus_info to realize temporary storage of parameters and convenient subsequent calls.

When the system triggers the wake-up process through the wake-up word, the program will automatically start the MQTT connection operation —— at this time, the MQTT connection parameters obtained through the OTA interface will be extracted from the global list aes_opus_info, and the connection configuration with Xiaozhi platform's MQTT server will be completed based on these parameters, ensuring that the MQTT connection process can be smoothly executed with valid and correct parameters.

UDP Connection

After the connection with Xiaozhi platform's MQTT server is successfully established, the system will further send a message through the MQTT channel to obtain the parameters required for UDP connection —— specifically, a "hello" type message specified by Xiaozhi platform needs to be sent, and the format of this message should be implemented in accordance with the relevant specifications officially provided by Xiaozhi.

After successfully receiving the UDP connection parameters returned by Xiaozhi platform, the program will start the UDP connection process based on these parameters, complete the establishment of the UDP channel with Xiaozhi platform, and prepare for the subsequent transmission of audio stream data.

Audio Data Encryption

After the UDP channel with Xiaozhi platform is successfully established, the subsequent audio stream data is transmitted using UDP. According to the clear technical specifications of Xiaozhi platform, the audio stream data sent through the UDP protocol must be encrypted to ensure the security and integrity of the data during transmission, and the specified encryption algorithm is the AES-CTR algorithm.

In this scheme, we directly rely on Quecpython's encryption module ucryptolib to implement the AES-CTR encryption function: before sending the audio stream data, the program will call the corresponding encryption interface in the ucryptolib module, pass in preset parameters such as the key, complete the data encryption process; the encrypted audio stream data is then sent to Xiaozhi platform through the established UDP channel, ensuring that the entire audio transmission process conforms to the platform specifications and has reliable security guarantees.

Internal MCP

When connecting, the mcp channel is opened in the hello message, and the server will send a request message (of type tools/list). At this time, we need to inform the local tools that can be called by Xiaozhi. During our voice interaction, when the server recognizes our intention, it will request to call our local tools. After the device receives the tool call request, it will respond and reply to the server. We can add tool information and processing logic in the response messages of tools/list and tools/call.