25 AI Large Language Model Course (Overseas Version)

25.1 Multi-modal Models Application

25.1.1 Configure the Large Model API Key

Note

Note: This section requires registering on the official OpenAI website and obtaining an API key for accessing large language models.

  • Register and Deploy OpenAI Account

  1. Copy and open the following URL:

  2. Register and log in using a Google, Microsoft, or Apple account, as prompted.

  3. After logging in, click the Settings button, then go to Billing, and click Payment Methods to add a payment method. Recharge your account to purchase tokens as needed.

  4. Once your account is set up, go to the API Keys section and click Create new key. Follow the instructions to generate a new API key and save it securely for later use.

  5. The creation and deployment of the large model have been completed. This API will be used in subsequent lessons.

  • Register and Deploy openrouter Account

  1. Copy and open the following URL:https://openrouter.ai/. Click “Log In”, and register or sign in using Google or another available account.

  2. After logging in, click the icon in the top-right corner, then select “Credits” to add a payment method.

  3. Create an API key. Go to “API Keys”, then click “Create Key”. Follow the prompts to generate a key. Save the API key securely for later use.

  4. At this point, the creation and deployment of the large model have been completed; this API will be used in subsequent lessons.

25.1.2 Version Confirmation

Before starting this feature, verify that the correct microphone configuration is set in the system.

  1. Log in to the machine remotely via NoMachine. Then click the desktop icon to access the configuration interface.

  2. On the right side of the interface, select the appropriate microphone type based on the hardware being used.

  3. For the Six-Microphone Array, select xf as the microphone type as shown in the figure.

  4. Then, click Save.

  5. Click “Apply” and wait until the “Service is restarting” notification disappears; once it does, the configuration has been saved to the system environment.

  6. Then, click Exit to close the interface.

25.1.3 Voice Control with Multimodal Models

  • Brief Overview of Operation

Once the program starts, Circular Microphone Array will announce: “I’m Ready.”

To activate the voice device, speak the designated wake words: “hello hiwonder.” Upon successful activation, the voice device will respond with “I’m Here.” You can then control the robot by voice to perform the corresponding actions—for example, “move forward, backward, left, right, strafe, then drift.” The voice device will announce the generated response after processing and execute the matching movements.

  • Getting Ready

1. Version Confirmation

Before starting this feature, verify Version Confirmation that the correct microphone configuration is set in the system.

2. Configuring the Large Model API-KEY

Open a new command-line terminal and enter the following command to access the configuration file.

vim /home/ubuntu/ros2_ws/src/large_models/large_models/large_models/config.py

Refer to 25.1.1 Configure the Large Model API Key to obtain the large model API-KEY: paste the vision model (from the OpenRouter website) into the vllm_api_key parameter, and the LLM model (from the OpenAI website) into the llm_api_key parameter, as indicated by the red boxes in the figure below.

  • Enabling and Disabling the Feature

Note

Command input is case-sensitive and space-sensitive.

The robot must be connected to the Internet, either in STA (LAN) mode or AP (direct connection) mode via Ethernet.

  1. Open the command line terminal from the left side of the system interface. In the terminal window, enter the following command and press Enter to stop the auto-start service

  2. Enter the command to disable the auto-start service of the mobile app.

    sudo systemctl stop start_app_node.service
    
  3. Open the command line terminal from the left side of the system interface. In the terminal window, enter the following command and press Enter to run the voice control feature.

    ros2 launch large_models_examples llm_control_move.launch.py
    
  4. When the terminal displays output shown in the figure Circular Microphone Array announces “I’m ready”, the device has completed initialization. Then, you can say the wake words: “hello hiwonder”.

  5. When the terminal displays the corresponding output shown in the figure and the voice device responds with “I’m here”, it indicates successful activation. The system will begin recording the user’s voice command.

  6. When the terminal displays the next output as the reference image, it shows the recognized speech transcribed by the voice device.

  7. Upon successful recognition by the speech recognition service of cloud-based large speech model, the parsed command will be displayed under the publish_asr_result output in the terminal.

  8. Upon receiving user input shown in the figure, the terminal will display output indicating that the cloud-based large language model has been successfully invoked. The model will interpret the command, generate a language response, and execute a corresponding action based on the meaning of the command.

    The response is automatically generated by the model. While the semantic content is accurate, the wording and structure may vary due to randomness in language generation.

  9. When the terminal shows the output shown in the figure indicating the end of one interaction cycle, the system is ready for the next round. To initiate another interaction, repeat step 4 by speaking the wake words again.

  10. To exit the feature, press Ctrl+C in the terminal. If the feature does not shut down immediately, press Ctrl+C multiple times. If it still fails to exit, open a new terminal window and run the following command to terminate all active ROS processes and related programs.

    ~/.stop_ros.sh
    
  • Project Outcome

Once the mode is activated, you can phrase commands naturally—for example, “move forward, backward, left, right, strafe, then drift”—and the robot will translate, slide sideways, and finish with a spinning drift.

  • Program Brief Analysis

1. Launch File Analysis

File Path:

/home/ubuntu/ros2_ws/src/large_models_examples/large_models_examples/llm_control_move.launch.py

(1) Import Libraries

import os
from ament_index_python.packages import get_package_share_directory

from launch_ros.actions import Node
from launch.substitutions import LaunchConfiguration
from launch import LaunchDescription, LaunchService
from launch.launch_description_sources import PythonLaunchDescriptionSource
from launch.actions import IncludeLaunchDescription, DeclareLaunchArgument, OpaqueFunction
  1. os: used for handling file paths and operating system-related functions.

  2. ament_index_python.packages.get_package_share_directory: retrieves the share directory path of ROS 2 package.

  3. launch_ros.actions.Node: used to define ROS 2 nodes.

  4. launch.substitutions.LaunchConfiguration: retrieves parameter values defined in the Launch file.

  5. LaunchDescription, LaunchService: used to define and start the Launch file.

  6. launch_description_sources PythonLaunchDescriptionSource: enables the inclusion of other Launch files.

  7. launch.actions.IncludeLaunchDescription, DeclareLaunchArgument, OpaqueFunction: used to define actions and arguments within the Launch file.

(2) Definition of the launch_setup Function

def launch_setup(context):
    mode = LaunchConfiguration('mode', default=1)
    mode_arg = DeclareLaunchArgument('mode', default_value=mode)

    controller_package_path = get_package_share_directory('controller')
    large_models_package_path = get_package_share_directory('large_models')

    controller_launch = IncludeLaunchDescription(
        PythonLaunchDescriptionSource(
            os.path.join(controller_package_path, 'launch/controller.launch.py')),
    )

    large_models_launch = IncludeLaunchDescription(
        PythonLaunchDescriptionSource(
            os.path.join(large_models_package_path, 'launch/start.launch.py')),
        launch_arguments={'mode': mode}.items(),
    )


    llm_control_move_node = Node(
        package='large_models_examples',
        executable='llm_control_move',
        output='screen',
    )

    return [mode_arg,
            controller_launch,
            large_models_launch,
            llm_control_move_node,
            ]
  1. This function is used to configure and initialize launch actions.

  2. mode = LaunchConfiguration('mode', default=1) defines a launch argument named mode with a default value of 1.

  3. mode_arg = DeclareLaunchArgument('mode', default_value=mode) declares the mode argument and includes it in the launch description.

  4. Controller_path and large_models_package_path represent the shared directory paths for the controller package responsible for robot movement and the large_models package.

  5. controller_launch includes the controller.launch.py Launch file from the controller package using IncludeLaunchDescription.

  6. large_models_launch includes the start.launch.py Launch file from the large_models package using IncludeLaunchDescription and passes the mode argument to it.

  7. The function returns a list of all defined launch actions.

(3) Definition of the generate_launch_description Function

def generate_launch_description():
    return LaunchDescription([
        OpaqueFunction(function = launch_setup)
    ])
  1. This function is responsible for generating the complete launch description.

  2. The launch_setup function is incorporated using OpaqueFunction.

(4) Main Program Entry

if __name__ == '__main__':
    # Create a LaunchDescription object(创建一个LaunchDescription对象)
    ld = generate_launch_description()

    ls = LaunchService()
    ls.include_launch_description(ld)
    ls.run()
  1. ld = generate_launch_description() generates the launch description object.

  2. ls = LaunchService() creates the launch service object.

  3. ls.include_launch_description(ld) adds the launch description to the service.

  4. ls.run() starts the service and execute all launch actions.

2. Python File Analysis

File Path:

/home/ubuntu/ros2_ws/src/large_models_examples/large_models_examples/llm_control_move.py

(1) Prompt Template Definition

else:
    PROMPT = '''
**Role
You are an intelligent car that can be controlled via linear velocity on the x and y axes (in meters per second), and angular velocity on the z axis (in radians per second). The movement duration is controlled by t (in seconds).
Your job is to generate a corresponding instruction based on user input.

**Requirements
- Ensure valid velocity ranges:
Linear velocity: x, y ∈ [-1.0, 1.0] (negative values mean reverse direction)
Angular velocity: z ∈ [-1.0, 1.0] (negative: clockwise, positive: counterclockwise)
- Execute multiple actions sequentially, returning a list of movement instructions under the action field.
- Always append a stop command [0.0, 0.0, 0.0, 0.0] at the end to ensure the car halts.
- Default values:
x and y: 0.2
z: 1.0
t: 2.0
- For each action sequence, craft a short (5–10 characters), witty, and endlessly variable response to make interactions fun and engaging.
- Output only the final JSON result. No explanations, no extra output.
- Format:
{
  "action": [[x1, y1, z1, t1], [x2, y2, z2, t2], ..., [0.0, 0.0, 0.0, 0.0]],
  "response": "short response"
}
- Possess strong mathematical reasoning to interpret and compute physical quantities like distance, time, and velocity.

## Special Notes
The "action" key should contain an array of stringified movement instructions in execution order. If no valid command is found, output an empty array [].
The "response" key should contain a creatively written, concise reply that matches the required tone and length.
If the input command implies a backward-left direction, then the output values of X and Z will be negative.
If the input command implies a backward-right direction, then the output values of X will be negative and Z will be positive.

**Examples
Input: Move forward for 2 seconds, then rotate clockwise for 1 second
Output:
{
  "action": [[0.2, 0.0, 0.0, 2.0], [0.0, 0.0, 1.0, 1.0], [0.0, 0.0, 0.0, 0.0]],
  "response": "Full speed ahead, spin and go!"
}

Input: Move forward 1 meter
Output:
{
  "action": [[0.2, 0.0, 0.0, 5.0], [0.0, 0.0, 0.0, 0.0]],
  "response": "Got it!"
}
    '''

(2) Variable Initialization

	class LLMControlMove(Node):
    def __init__(self, name):
        rclpy.init()
        super().__init__(name)
        
        self.action = []
        self.llm_result = ''
        self.running = True
        self.interrupt = False
        self.action_finish = False
        self.play_audio_finish = False
  1. self.action: stores the list of actions parsed from LLM responses.

  2. self.llm_result: stores the result received from the LLM.

  3. self.running: a flag indicating whether the main loop is actively running.

  4. self.interrupt: a flag indicating whether the current function has been interrupted or stopped.

  5. Self.action_finish: a flag indicating whether the current action has been completed.

  6. self.play_audio_finish: a flag indicating whether the audio playback has finished.

(3) Publisher Creation

 self.tts_text_pub = self.create_publisher(String, 'tts_node/tts_text', 1)

Creates a publisher that sends String messages to the topic /tts_node/tts_text. This topic is used to send text-to-speech (TTS) content for voice feedback.

(4) Subscriber Creation

	        self.create_subscription(String, 'agent_process/result', self.llm_result_callback, 1)
        self.create_subscription(Bool, 'vocal_detect/wakeup', self.wakeup_callback, 1, callback_group=timer_cb_group)
        self.create_subscription(Bool, 'tts_node/play_finish', self.play_audio_finish_callback, 1, callback_group=timer_cb_group)
        self.set_model_client = self.create_client(SetModel, 'agent_process/set_model')
        self.set_model_client.wait_for_service()

(5) Create A Timer

     self.timer = self.create_timer(0.0, self.init_process, callback_group=timer_cb_group)

(6) LLM Model Configuration

        msg = SetModel.Request()
        # msg.model = 'qwen-plus-latest'
        msg.model = llm_model
        msg.model_type = 'llm'
        msg.api_key = api_key 
        msg.base_url = base_url
        self.send_request(self.set_model_client, msg)

(7) play_audio_finish_callback Method:

	    def play_audio_finish_callback(self, msg):
        msg = SetBool.Request()
        msg.data = True
        self.send_request(self.awake_client, msg)
        # msg = SetInt32.Request()
        # msg.data = 1
        # self.send_request(self.set_mode_client, msg)
        self.play_audio_finish = msg.data

Callback function triggered after voice playback finishes. Re-enables voice wake up functionality.

(8) process Method

    def process(self):
        while self.running:
            if self.llm_result:
                msg = String()
                if 'action' in self.llm_result:  # If a corresponding action is returned, extract and process it (如果有对应的行为返回那么就提取处理)
                    result = eval(self.llm_result[self.llm_result.find('{'):self.llm_result.find('}') + 1])
                    self.get_logger().info(str(result))
                    action_list = []
                    if 'action' in result:
                        action_list = result['action']
                    if 'response' in result:
                        response = result['response']
                    msg.data = response
                    self.tts_text_pub.publish(msg)
                    for i in action_list:
                        msg = Twist()
                        msg.linear.x = float(i[0])
                        msg.linear.y = float(i[1])
                        msg.angular.z = float(i[2])
                        self.mecanum_pub.publish(msg)
                        time.sleep(i[3])
                        if self.interrupt:
                            self.interrupt = False
                            self.mecanum_pub.publish(Twist())
                            break
                else:  # If there is no corresponding action, only respond (没有对应的行为,只回答)
                    response = self.llm_result
                    msg.data = response
                    self.tts_text_pub.publish(msg)
                self.action_finish = True 
                self.llm_result = ''
            else:
                time.sleep(0.01)
            if self.play_audio_finish and self.action_finish:
                self.play_audio_finish = False
                self.action_finish = False
                # msg = SetInt32.Request()
                # msg.data = 2
                # self.send_request(self.set_mode_client, msg)
        rclpy.shutdown()

The method acts as the main loop responsible for handling commands from the LLM, interpreting them, and executing corresponding actions. It also supports generating and broadcasting voice feedback.

(9) main Function

def main():
    node = LLMControlMove('llm_control_move')
    executor = MultiThreadedExecutor()
    executor.add_node(node)
    executor.spin()
    node.destroy_node()
  1. The function initializes a ROS 2 node.

  2. Start a multi-threaded executor to handle callbacks.

  3. Clean up and destroys the node upon program exit to release resources.

25.1.4 Autonomous Line Following with Multimodal Models

  • Brief Overview of Operation

Once the program starts, Circular Microphone Array will announce: “I’m Ready.”

To activate the voice device, speak the designated wake words: “hello hiwonder.” Upon successful activation, the voice device will respond with “I’m Here.” After that you can control the robot by voice—for example, “Follow the black line and stop when you meet an obstacle.” The terminal prints the recognized speech, the circular microphone array announces the generated response, the camera detects the black line, and the robot halts as soon as an obstacle appears in front.

  • Getting Ready

1. Version Confirmation

Before starting this feature, verify Version Confirmation that the correct microphone configuration is set in the system.

2. Configuring the Large Model API-KEY

Open a new command-line terminal and enter the following command to access the configuration file.

vim /home/ubuntu/ros2_ws/src/large_models/large_models/large_models/config.py

Refer to 25.1.1 Configure the Large Model API Key to obtain the large model API-KEY: paste the vision model (from the OpenRouter website) into the vllm_api_key parameter, and the LLM model (from the OpenAI website) into the llm_api_key parameter, as indicated by the red boxes in the figure below.

  • Enabling and Disabling the Feature

Note

Command input is case-sensitive and space-sensitive.

The robot must be connected to the Internet, either in STA (LAN) mode or AP (direct connection) mode via Ethernet.

  1. Open the command line terminal from the left side of the system interface. In the terminal window, enter the following command and press Enter to stop the auto-start service

  2. Enter the command to disable the auto-start service of the mobile app.

    sudo systemctl stop start_app_node.service
    
  3. Open the command line terminal from the left side of the system interface. In the terminal window, enter the following command and press Enter to run the visual line following feature.

    ros2 launch large_models_examples llm_visual_patrol.launch.py
    
  4. When the terminal displays output shown in the figure and Circular Microphone Array announces “I’m ready”, the device has completed initialization. Then, you can say the wake words: “hello hiwonder”.

  5. When the terminal displays the corresponding output shown in the figure and the voice device responds with “I’m here”, it indicates successful activation. The system will begin recording the user’s voice command.

  6. When the command line shows the output below, it means Circular Microphone Array has printed the recognized speech. Then say the command, “Follow the black line and stop when you meet an obstacle.”

  7. Upon successful recognition by the speech recognition service of cloud-based large speech model, the parsed command will be displayed under the publish_asr_result output in the terminal.

  8. Upon receiving user input shown in the figure, the terminal will display output indicating that the cloud-based large language model has been successfully invoked. The model will interpret the command, generate a language response, and execute a corresponding action based on the meaning of the command.

Note

The response is automatically generated by the model. While the semantic content is accurate, the wording and structure may vary due to randomness in language generation.

  1. When the terminal shows the output shown in the figure indicating the end of one interaction cycle, the system is ready for the next round. To initiate another interaction, repeat step 4 by speaking the wake words again.

  2. To exit the feature, press Ctrl+C in the terminal. If the feature does not exit immediately, press Ctrl+C multiple times.

  • Project Outcome

Once the feature is activated, feel free to give commands in your own words. For instance, “Follow the black line and stop when you encounter an obstacle.” The robot uses its camera to detect and follow the black line, and it will stop when an obstacle is detected in its path. The system is pre-configured to recognize four line colors: red, blue, green, and black.

  • Program Brief Analysis

1.Launch File Analysis

File Path:

/home/ubuntu/ros2_ws/src/large_models_examples/large_models_examples/llm_visual_patrol.launch.py

(1) Import Libraries

import os
from ament_index_python.packages import get_package_share_directory

from launch_ros.actions import Node
from launch.substitutions import LaunchConfiguration
from launch import LaunchDescription, LaunchService
from launch.launch_description_sources import PythonLaunchDescriptionSource
from launch.actions import IncludeLaunchDescription, DeclareLaunchArgument, OpaqueFunction
  1. os: used for handling file paths and operating system-related functions.

  2. ament_index_python.packages.get_package_share_directory: retrieves the share directory path of ROS 2 package.

  3. launch_ros.actions.Node: used to define ROS 2 nodes.

  4. launch.substitutions.LaunchConfiguration: retrieves parameter values defined in the Launch file.

  5. LaunchDescription, LaunchService: used to define and start the Launch file.

  6. launch_description_sources PythonLaunchDescriptionSource: enables the inclusion of other Launch files.

  7. launch.actions.IncludeLaunchDescription, DeclareLaunchArgument, OpaqueFunction: used to define actions and arguments within the Launch file.

(2) Definition of the launch_setup Function

def launch_setup(context):
    mode = LaunchConfiguration('mode', default=1)
    mode_arg = DeclareLaunchArgument('mode', default_value=mode)

    app_package_path = get_package_share_directory('app')
    large_models_package_path = get_package_share_directory('large_models')

    line_following_node_launch = IncludeLaunchDescription(
        PythonLaunchDescriptionSource(
            os.path.join(app_package_path, 'launch/line_following_node.launch.py')),
        launch_arguments={
            'debug': 'true',
        }.items(),
    )

    large_models_launch = IncludeLaunchDescription(
        PythonLaunchDescriptionSource(
            os.path.join(large_models_package_path, 'launch/start.launch.py')),
        launch_arguments={'mode': mode}.items(),
    )

    llm_visual_patrol_node = Node(
        package='large_models_examples',
        executable='llm_visual_patrol',
        output='screen',
    )

    return [mode_arg,
            line_following_node_launch,
            large_models_launch,
            llm_visual_patrol_node,
            ]
  1. This function is used to configure and initialize launch actions.

  2. mode = LaunchConfiguration(‘mode’, default=1) defines a launch argument named mode with a default value of 1.

  3. mode_arg = DeclareLaunchArgument(‘mode’, default_value=mode) declares the mode argument and includes it in the launch description.

  4. large_models_package_path: retrieves the shared directory path of the large_models package.

  5. waste_classification_launch: includes the waste_classification_launch file using IncludeLaunchDescription.

  6. large_models_launch: includes the start.launch.py Launch file from the large_models package using IncludeLaunchDescription and passes the mode argument to it.

  7. llm_vision_pratrol: defines a ROS 2 node from the large_models package, executes the executable files from the llm_vision_patrol, and prints the node’s output to the screen.

  8. The function returns a list of all defined launch actions.

(3) Definition of the generate_launch_description Function

def generate_launch_description():
    return LaunchDescription([
        OpaqueFunction(function = launch_setup)
    ])
  1. This function is responsible for generating the complete launch description.

  2. The launch_setup function is incorporated using OpaqueFunction.

(4) Main Program Entry

if __name__ == '__main__':
    # Create a LaunchDescription object(创建一个LaunchDescription对象)
    ld = generate_launch_description()

    ls = LaunchService()
    ls.include_launch_description(ld)
    ls.run()
  1. ld = generate_launch_description() generates the launch description object.

  2. ls = LaunchService() creates the launch service object.

  3. ls.include_launch_description(ld) adds the launch description to the service.

  4. ls.run() starts the service and execute all launch actions.

2. Python File Analysis

File Path:

/home/ubuntu/ros2_ws/src/large_models_examples/large_models_examples/llm_visual_patrol.py

(1) Import Libraries

import os
import re
import time
import rclpy
import threading
from speech import speech
from rclpy.node import Node
from std_msgs.msg import String, Bool
from std_srvs.srv import Trigger, SetBool, Empty

from interfaces.srv import SetString as SetColor
from large_models.config import *
from large_models_msgs.srv import SetModel, SetString, SetInt32

from rclpy.executors import MultiThreadedExecutor
from rclpy.callback_groups import ReentrantCallbackGroup

(2) Prompt Template Definition

else:
    PROMPT = '''
**Role
You are a smart robot that generates corresponding JSON commands based on user input.

**Requirements
- For every user input, search for matching commands in the action function library and output the corresponding JSON instruction.
- For each action sequence, craft a witty and creative response (10 to 30 characters) to make interactions delightful.
- Directly return the JSON result — do not include any explanations or extra text.
- There are four target colors: red, green, blue, and black.
- Format:
{
  "action": ["xx", "xx"],
  "response": "xx"
}

**Special Notes
The "action" field should contain a list of function names as strings, ordered by execution. If no matching action is found, output an empty list: [].
The "response" field should provide a concise and charming reply, staying within the word and tone guidelines.

**Action Function Library
Follow a line of a given color: line_following('black')

**Example
Input: Follow the red line
Output:
{
  "action": ["line_following('red')"],
  "response": "Roger that!"
}
    '''

(3) Variable Initialization

	    self.action = []
        self.stop = True
        self.llm_result = ''
  1. self.action: stores the list of actions parsed from LLM responses.

  2. self.llm_result: stores the result received from the LLM.

  3. self.running: a flag indicating whether the main loop is actively running.

(4) Publisher Creation

self.tts_text_pub = self.create_publisher(String, 'tts_node/tts_text', 1)

Creates a publisher that sends String messages to the topic /tts_node/tts_text. This topic is used to send text-to-speech (TTS) content for voice feedback.

(5) Subscriber Creation

		self.create_subscription(String, 'agent_process/result', self.llm_result_callback, 1)
        self.create_subscription(Bool, 'vocal_detect/wakeup', self.wakeup_callback, 1, callback_group=timer_cb_group)
        self.create_subscription(Bool, 'tts_node/play_finish', self.play_audio_finish_callback, 1, callback_group=timer_cb_group)

(6) Necessary Services Creation

	        self.set_model_client = self.create_client(SetModel, 'agent_process/set_model')
        self.set_model_client.wait_for_service()
        self.set_prompt_client = self.create_client(SetString, 'agent_process/set_prompt')
        self.set_prompt_client.wait_for_service()
        self.enter_client = self.create_client(Trigger, 'line_following/enter')
        self.enter_client.wait_for_service()
        self.start_client = self.create_client(SetBool, 'line_following/set_running')
        self.start_client.wait_for_service()
        self.set_target_client = self.create_client(SetColor, 'line_following/set_color')
        self.set_target_client.wait_for_service()

(7) LLM Model Configuration

        msg = SetModel.Request()
        msg.model = llm_model
        msg.model_type = 'llm'
        msg.api_key = api_key 
        msg.base_url = base_url
        self.send_request(self.set_model_client, msg)

(8) Enter

        self.send_request(self.set_prompt_client, msg)

(9) Play Startup Audio

		speech.play_audio(start_audio_path)

(10) Process LLM Commands to Create Speech Feedback Messages

		msg = SetString.Request()

(11) Check if the Command Contains Actions

	                if 'action' in self.llm_result: # If a corresponding action is returned, extract and process it (如果有对应的行为返回那么就提取处理)
                    result = eval(self.llm_result[self.llm_result.find('{'):self.llm_result.find('}')+1])

(12) Parse the Command

 result = eval(self.llm_result[self.llm_result.find('{'):self.llm_result.find('}')+1])

(13) Extract the Recognized Results

if 'action' in result:
                        text = result['action']
                        # Use regular expressions to extract all strings inside parentheses (使用正则表达式提取括号中的所有字符串)
                        pattern = r"line_following\('([^']+)'\)"

(14) Set Target Color Request

								color = match.group(1)
                                self.get_logger().info(str(color))
                                color_msg = SetColor.Request()
                                color_msg.data = color
                                self.send_request(self.set_target_client, color_msg)

Create a SetStringList. Request message to specify the target for waste classification. Use the send_request method to send the request to the waste_classification/set_target service, setting the waste classification target.

(15) Initiate and Send Request to Start Line Following

	                            start_msg = SetBool.Request()
                                start_msg.data = True 
                                self.send_request(self.start_client, start_msg)

Create a SetBool. Request message to enable the waste classification transport feature. Use the send_request method to send a request to the waste_classification/enable_transport service, starting the waste classification transport.

(16) main Function

	def main():
    node = LLMColorTrack('llm_line_following')
    executor = MultiThreadedExecutor()
    executor.add_node(node)
    executor.spin()
    node.destroy_node()
  1. The function initializes a ROS 2 node.

  2. Start a multi-threaded executor to handle callbacks.

  3. Clean up and destroys the node upon program exit to release resources.

25.1.5 Color Tracking with Multimodal Models

  • Brief Overview of Operation

Once the program starts, Circular Microphone Array will announce: “I’m Ready.” To activate the voice device, speak the designated wake words: “hello hiwonder.” Upon successful activation, the voice device will respond with “I’m Here.” Once activated, voice commands such as “Follow the red object” can be issued. The terminal will display the recognized command, and the voice device will respond with a generated reply after processing. The robot will then autonomously identify the red object captured by its camera and begin tracking it.

  • Getting Ready

1. Version Confirmation

Before starting this feature, verify Version Confirmation that the correct microphone configuration is set in the system.

2. Configuring the Large Model API-KEY

Open a new command-line terminal and enter the following command to access the configuration file.

vim /home/ubuntu/ros2_ws/src/large_models/large_models/large_models/config.py

Refer to 25.1.1 Configure the Large Model API Key to obtain the large model API-KEY: paste the vision model (from the OpenRouter website) into the vllm_api_key parameter, and the LLM model (from the OpenAI website) into the llm_api_key parameter, as indicated by the red boxes in the figure below.

  • Enabling and Disabling the Feature

Note

Command input is case-sensitive and space-sensitive.

The robot must be connected to the Internet, either in STA (LAN) mode or AP (direct connection) mode via Ethernet.

  1. Open the command line terminal from the left side of the system interface. In the terminal window, enter the following command and press Enter to stop the auto-start service

  2. Open the command line terminal from the left side of the system interface. In the terminal window, enter the following command to stop the auto-start service

    sudo systemctl stop start_app_node.service
    
  3. Enter the following command and press Enter to launch the color tracking feature.

    ros2 launch large_models_examples llm_color_track.launch.py
    
  4. When the terminal displays output shown in the figure and Circular Microphone Array announces “I’m ready”, the device has completed initialization. Then, you can say the wake words: “hello hiwonder”.

  5. After the program has loaded successfully, the camera feed will appear on screen.

  6. When the terminal displays the corresponding output shown in the figure and the voice device responds with “I’m here”, it indicates successful activation. The system will begin recording the user’s voice command.

  7. When the command line shows the output below, it means the Circular Microphone Array has printed the recognized speech. Then say the command, “Follow the red object.”

  8. Upon successful recognition by the speech recognition service of cloud-based large speech model, the parsed command will be displayed under the publish_asr_result output in the terminal.

  9. Upon receiving user input shown in the figure, the terminal will display output indicating that the cloud-based large language model has been successfully invoked. The model will interpret the command, generate a language response, and execute a corresponding action based on the meaning of the command.

Note

The response is automatically generated by the model. While the semantic content is accurate, the wording and structure may vary due to randomness in language generation.

  1. Then, the robot will detect the red object in its camera feed and begin tracking it in real time.

  2. When the terminal shows the output shown in the figure indicating the end of one interaction cycle, the system is ready for the next round. To initiate another interaction, repeat step 4 by speaking the wake words again.

  3. To exit the feature, press Ctrl+C in the terminal. If the feature does not shut down immediately, press Ctrl+C multiple times. If it still fails to exit, open a new terminal window and run the following command to terminate all active ROS processes and related programs.

    /.stop_ros.sh
    
  • Project Outcome

Once the feature is activated, feel free to give commands in your own words. For instance, “Follow the red object.” The robot will use the camera feed to identify and track the red object. Similarly, commands like “Follow the blue object” or “Follow the green object” can be used to instruct the robot to track objects of the specified colors.

  • Program Brief Analysis

1. Launch File Analysis

File Path:

/home/ubuntu/ros2_ws/src/large_models_examples/large_models_examples/llm_color_track.launch.py

(1) Import Libraries

import os
from ament_index_python.packages import get_package_share_directory

from launch_ros.actions import Node
from launch.substitutions import LaunchConfiguration
from launch import LaunchDescription, LaunchService
from launch.launch_description_sources import PythonLaunchDescriptionSource
from launch.actions import IncludeLaunchDescription, DeclareLaunchArgument, OpaqueFunction
  1. os: used for handling file paths and operating system-related functions.

  2. ament_index_python.packages.get_package_share_directory: retrieves the share directory path of ROS 2 package.

  3. launch_ros.actions.Node: used to define ROS 2 nodes.

  4. launch.substitutions.LaunchConfiguration: retrieves parameter values defined in the Launch file.

  5. LaunchDescription, LaunchService: used to define and start the Launch file.

  6. launch_description_sources PythonLaunchDescriptionSource: enables the inclusion of other Launch files.

  7. launch.actions.IncludeLaunchDescription, DeclareLaunchArgument, OpaqueFunction: used to define actions and arguments within the Launch file.

(2) Definition of the launch_setup Function

def launch_setup(context):
    mode = LaunchConfiguration('mode', default=1)
    mode_arg = DeclareLaunchArgument('mode', default_value=mode)

    app_package_path = get_package_share_directory('app')
    large_models_package_path = get_package_share_directory('large_models')

    object_tracking_node_launch = IncludeLaunchDescription(
        PythonLaunchDescriptionSource(
            os.path.join(app_package_path, 'launch/object_tracking_node.launch.py')),
        launch_arguments={
            'debug': 'true',
        }.items(),
    )

    large_models_launch = IncludeLaunchDescription(
        PythonLaunchDescriptionSource(
            os.path.join(large_models_package_path, 'launch/start.launch.py')),
        launch_arguments={'mode': mode}.items(),
    )

    llm_color_track_node = Node(
        package='large_models_examples',
        executable='llm_color_track',
        output='screen',
    )

    return [mode_arg,
            object_tracking_node_launch,
            large_models_launch,
            llm_color_track_node,
            ]
  1. This function is used to configure and initialize launch actions.

  2. mode = LaunchConfiguration(‘mode’, default=1) defines a launch argument named mode with a default value of 1.

  3. mode_arg = DeclareLaunchArgument(‘mode’, default_value=mode) declares the mode argument and includes it in the launch description.

  4. objet_sorting_launch: includes the object_sorting_node.launch.py Launch file from the large_models package using IncludeLaunchDescription and passes the mode argument to it.

  5. large_models_launch: includes the start.launch.py file from the large_models package using IncludeLaunchDescription and passes the mode argument to it.

  6. llm_color_track_node: defines a ROS 2 node from the large_models package, executes the executable files from the llm_color_track, and prints the node’s output to the screen.

  7. The function returns a list of all defined launch actions.

(3) Definition of the generate_launch_description Function

def generate_launch_description():
    return LaunchDescription([
        OpaqueFunction(function = launch_setup)
    ])
  1. This function is responsible for generating the complete launch description.

  2. The launch_setup function is incorporated using OpaqueFunction.

(4) Main Program Entry

if __name__ == '__main__':
    # Create a LaunchDescription object(创建一个LaunchDescription对象)
    ld = generate_launch_description()

    ls = LaunchService()
    ls.include_launch_description(ld)
    ls.run()
  1. ld = generate_launch_description() generates the launch description object.

  2. ls = LaunchService() creates the launch service object.

  3. ls.include_launch_description(ld) adds the launch description to the service.

  4. ls.run() starts the service and execute all launch actions.

2. Python File Analysis

File Path:

/home/ubuntu/ros2_ws/src/large_models_examples/large_models_examples/llm_color_track.py

(1) Import Libraries

import os
import re
import time
import rclpy
import threading
from rclpy.node import Node
from std_msgs.msg import String, Bool
from std_srvs.srv import Trigger, SetBool, Empty

from speech import speech
from interfaces.srv import SetString as SetColor
from large_models.config import *
from large_models_msgs.srv import SetModel, SetString, SetInt32

from rclpy.executors import MultiThreadedExecutor
from rclpy.callback_groups import ReentrantCallbackGroup

(2) Prompt Template Definition

else:
    PROMPT = '''
**Role
You are an intelligent companion robot. Your job is to generate corresponding JSON commands based on the user’s input.

**Requirements
- For every user input, search the action function library for matching commands and return the corresponding JSON instruction.
- Craft a witty, ever-changing, and concise response (between 10 to 30 characters) for each action sequence to make interactions lively and fun.
- Output only the JSON result — do not include explanations or any extra text.
- Output format:{"action": ["xx", "xx"], "response": "xx"}

**Special Notes
The "action" key holds an array of function name strings arranged in execution order. If no match is found, return an empty array [].
The "response" key must contain a cleverly worded, short reply (10–30 characters), adhering to the tone and length guidelines above.

**Action Function Library
Track an object of a specific color: color_track('red')

**Example
Input: Track a green object
Output:
{"action": ["color_track('green')"], "response": "Got it!"}
    '''

(3) Variable Initialization

	    self.action = []
        self.stop = True
        self.llm_result = ''
  1. self.action: stores the list of actions parsed from LLM responses.

  2. self.llm_result: stores the result received from the LLM.

  3. self.running: a flag indicating whether the main loop is actively running.

(4) Publisher Creation

self.tts_text_pub = self.create_publisher(String, 'tts_node/tts_text', 1)

Creates a publisher that sends String messages to the topic /tts_node/tts_text. This topic is used to send text-to-speech (TTS) content for voice feedback.

(5) Subscriber Creation

        self.create_subscription(String, 'agent_process/result', self.llm_result_callback, 1)
        self.create_subscription(Bool, 'vocal_detect/wakeup', self.wakeup_callback, 1, callback_group=timer_cb_group)
        self.create_subscription(Bool, 'tts_node/play_finish', self.play_audio_finish_callback, 1, callback_group=timer_cb_group)
        self.awake_client = self.create_client(SetBool, 'vocal_detect/enable_wakeup')

(6) Necessary Services Creation

	        self.set_model_client = self.create_client(SetModel, 'agent_process/set_model')
        self.set_model_client.wait_for_service()
        self.set_prompt_client = self.create_client(SetString, 'agent_process/set_prompt')
        self.set_prompt_client.wait_for_service()
        self.enter_client = self.create_client(Trigger, 'object_tracking/enter')
        self.enter_client.wait_for_service()
        self.start_client = self.create_client(SetBool, 'object_tracking/set_running')
        self.start_client.wait_for_service()
        self.set_target_client = self.create_client(SetColor, 'object_tracking/set_color')
        self.set_target_client.wait_for_service()

(7) LLM Model Configuration

        msg = SetModel.Request()
        msg.model = llm_model
        msg.model_type = 'llm'
        msg.api_key = api_key 
        msg.base_url = base_url
        self.send_request(self.set_model_client, msg)

(8) Sending Prompt Service

self.send_request(self.set_model_client, msg)

(9) Play Startup Audio

speech.play_audio(start_audio_path)

(10) process Method

    def process(self):
        while self.running:
            if self.llm_result:
                msg = String()
                if 'action' in self.llm_result: # If a corresponding action is returned, extract and process it (如果有对应的行为返回那么就提取处理)
                    result = eval(self.llm_result[self.llm_result.find('{'):self.llm_result.find('}')+1])
                    if 'action' in result:
                        text = result['action']
                        # Use regular expressions to extract all strings inside parentheses (使用正则表达式提取括号中的所有字符串)
                        pattern = r"color_track\('([^']+)'\)"
                        # Use re.search to find the matching result (使用re.search找到匹配的结果)
                        for i in text:
                            match = re.search(pattern, i)
                            # Extract the result (提取结果)
                            if match:
                                # Get all parameter sections, which are the contents inside parentheses (获取所有的参数部分(括号内的内容))
                                color = match.group(1)
                                self.get_logger().info(str(color))
                                color_msg = SetColor.Request()
                                color_msg.data = color
                                self.send_request(self.set_target_client, color_msg)
                                # Start the sorting process (开启分拣)
                                start_msg = SetBool.Request()
                                start_msg.data = True 
                                self.send_request(self.start_client, start_msg)
                    if 'response' in result:
                        msg.data = result['response']
                else: # If there is no corresponding action, only respond (没有对应的行为,只回答)
                    msg.data = self.llm_result
                self.tts_text_pub.publish(msg)
                self.action_finish = True
  1. The method acts as the main loop responsible for handling commands from the LLM, interpreting them, and executing corresponding actions.

  2. If a new command is received:

  • A String message is created to deliver voice feedback.

  • If the command includes an ‘action’ field:

The JSON data within the command is parsed using eval.

The send_request method is called to send a request to the object_sorting/set_target service, specifying the object sorting target.

The send_request method is also called to send a request to the object_sorting/enable_sorting service, enabling the object sorting function.

  • If the command includes a ‘response’ field, voice feedback is sent.

  • If the command does not include an ‘action’, only voice feedback is provided.

  • The self.llm_result field is then cleared to prepare for the next command.

  1. If no new command is received, the system waits for 10 milliseconds.

  2. When self.running is set to False, the loop exits and the ROS 2 node is shut down.

(11) main Function

	def main():
    node = LLMColorTrack('llm_color_track')
    executor = MultiThreadedExecutor()
    executor.add_node(node)
    executor.spin()
    node.destroy_node()
  1. The function initializes a ROS 2 node.

  2. Start a multi-threaded executor to handle callbacks.

  3. Clean up and destroys the node upon program exit to release resources.

25.2 Embodied AI Application

25.2.1 Obtain and Configure the Large Model API Key

Note

Note: This section requires registering on the official OpenAI website and obtaining an API key for accessing large language models.

  • Register and Deploy OpenAI Account

  1. Copy and open the following URL: https://platform.openai.com/docs/overview

  2. Register and log in using a Google, Microsoft, or Apple account, as prompted.

  3. After logging in, click the Settings button, then go to Billing, and click Payment Methods to add a payment method. Recharge your account to purchase tokens as needed.

  4. Once your account is set up, go to the API Keys section and click Create new key. Follow the instructions to generate a new API key and save it securely for later use.

  5. At this point, the creation and deployment of the large model have been completed; this API will be used in subsequent lessons.

  • Register and Deploy openrouter Account

  1. Copy and open the following URL: Click “Log In”, and register or sign in using Google or another available account.

  2. After logging in, click the icon in the top-right corner, then select “Credits” to add a payment method.

  3. Create an API key. Go to “API Keys”, then click “Create Key”. Follow the prompts to generate a key. Save the API key securely for later use.

  4. At this point, the creation and deployment of the large model have been completed; this API will be used in subsequent lessons.

25.2.2 Version Confirmation

Before starting this feature, verify that the correct microphone configuration is set in the system.

  1. Log in to the machine remotely via NoMachine. Then click the desktop icon to access the configuration interface.

  2. On the right side of the interface, select the appropriate microphone type based on the hardware being used.

  3. For the Six-Microphone Array, select xf as the microphone type as shown in the figure.

  4. Then, click Save.

  5. Click “Apply” and wait until the “Service is restarting” notification disappears; once it does, the configuration has been saved to the system environment.

  6. Then, click Exit to close the interface.

25.2.3 Overview of Embodied Intelligence

  • Overview of Embodied Intelligence

Embodied Intelligence represents a key branch of artificial intelligence that emphasizes learning and decision-making through the interaction between a physical entity and its environment. At its core lies the principle that intelligence emerges from the dynamic interplay between an agent’s physical embodiment and its environment. This approach moves beyond the limitations of traditional AI, which often relies solely on static data. Embodied intelligence has found broad applications across industries such as manufacturing, healthcare, service, education, and military.

  • Multimodal Information Fusion

An important feature of embodied intelligence is multimodal information fusion, which refers to the effective integration of data from different modalities—such as text, images, and speech—to produce a more comprehensive and accurate representation of information. This process is especially critical in artificial intelligence, as real-world data is inherently multimodal. Relying on a single modality often fails to provide sufficient context or detail. Multimodal fusion aims to enhance model performance and robustness by leveraging the complementary strengths of multiple data sources.

Several common approaches to multimodal information fusion include the following:

Early Fusion: Combines data from different modalities at an early stage of processing. This typically occurs at the input level, where raw data is aligned and transformed into a shared space to create a richer, more expressive representation. For example, image and speech data can be concatenated directly and fed into a neural network.

Late Fusion: Integrates data from multiple modalities at intermediate or output stages of the model. This approach allows each modality to be processed independently using specialized algorithms, making it easier to add or replace modalities in the future. For instance, results from image recognition and text analysis can be combined to support final decision-making.

Hybrid Fusion: Leverages both early and late fusion techniques by performing integration at multiple stages. This method takes advantage of modality complementarity and can improve both performance and model robustness.

Feature-Level Fusion: Involves extracting features from each modality and mapping them into a unified feature vector, which is then passed to a classifier for decision-making. Fusion occurs during the feature extraction phase, enabling the model to learn correlations across modalities.

Decision-Level Fusion: Merges outputs from individual modality-specific decisions to produce the final outcome. This technique offers strong noise resistance and places fewer demands on sensor types or quality, though it may result in some loss of information.

Deep Fusion: Takes place during feature extraction, where multimodal data is blended in the feature space to generate fused features that can compensate for missing or weak signals from any single modality. These fused features are then used for classification or regression tasks during prediction.

Multimodal information fusion holds not only theoretical significance but also substantial practical value. It has been widely applied in fields such as autonomous driving, medical image analysis, and human-computer interaction, where it has significantly improved system performance and reliability.

25.2.4 Real-Time Detection in Embodied AI Applications

The large model used in this lesson operates online, requiring a stable network connection for the main controller in use.

  • Overview of Operation

Once the program starts, Circular Microphone Array will announce: “I’m Ready.” To activate the voice device, speak the designated wake words: “hello hiwonder.” Upon successful activation, the voice device will respond with “I’m Here.” Once activated, voice commands such as “Please describe what you saw.” can be issued. The terminal will display the recognized command, and the voice device will respond with a generated reply after processing. The robot will autonomously recognize the images captured by the camera and describe the content of the images.

  • Preparation

1. Version Confirmation

Before starting this feature, verify Version Confirmation that the correct microphone configuration is set in the system.

2. Configuring the Large Model API-KEY

Open a new command-line terminal and enter the following command to access the configuration file.

vim /home/ubuntu/ros2_ws/src/large_models/large_models/large_models/config.py

Refer to 25.1.1 Obtain and Configure the Large Model API Key to obtain the large-model API-KEY: paste the vision model (from the OpenRouter website) into the vllm_api_key parameter, and the LLM model (from the OpenAI website) into the llm_api_key parameter, as indicated by the red boxes in the figure below.

  • Enabling and Disabling the Feature

  1. Open the command line terminal from the left side of the system interface. In the terminal window, enter the following command and press Enter to stop the auto-start service

  2. Enter the command to disable the auto-start service of the mobile app.

    sudo systemctl stop start_app_node.service
    
  3. Open the command line terminal from the left side of the system interface. In the terminal window, enter the following command and press Enter to run the Real-time detection feature.

    ros2 launch large_models_examples vllm_with_camera.launch.py
    
  4. When the terminal displays output shown in the figure and Circular Microphone Array announces “I’m ready”, the device has completed initialization. Then, you can say the wake words: “hello hiwonder”.

  5. When the terminal displays the corresponding output shown in the figure and the voice device responds with “I’m here”, it indicates successful activation. The system will begin recording the user’s voice command.

  6. When the terminal displays the next output as the reference image, it shows the recognized speech transcribed by the voice device.

  7. Upon successful recognition by the speech recognition service of cloud-based large speech model, the parsed command will be displayed under the publish_asr_result output in the terminal.

  8. Upon receiving user input shown in the figure, the terminal will display output indicating that the cloud-based large language model has been successfully invoked. The model will interpret the command and generate a language response.

Note

The response is automatically generated by the model. While the semantic content is accurate, the wording and structure may vary due to randomness in language generation.

  1. When the terminal shows the output shown in the figure indicating the end of one interaction cycle, the system is ready for the next round. To initiate another interaction, repeat step 4 by speaking the wake words again.

  2. To exit the feature, press Ctrl+C in the terminal. If the feature does not exit immediately, press Ctrl+C multiple times.

  • Project Outcome

Once the feature is activated, you can speak freely—for example: “Describe what you saw.” The robot will automatically analyze the scene within its field of view, think about what it sees, and describe the scene content in detail.

  • Program Brief Analysis

1. Launch File Analysis

File Path:

/home/ubuntu/ros2_ws/src/large_models_examples/large_models_examples/vllm_with_camera.launch.py

(1) Import Libraries

import os
from ament_index_python.packages import get_package_share_directory

from launch_ros.actions import Node
from launch.substitutions import LaunchConfiguration
from launch import LaunchDescription, LaunchService
from launch.launch_description_sources import PythonLaunchDescriptionSource
from launch.actions import IncludeLaunchDescription, DeclareLaunchArgument, OpaqueFunction
  1. os: used for handling file paths and operating system-related functions.

  2. ament_index_python.packages.get_package_share_directory: retrieves the share directory path of ROS 2 package.

  3. launch_ros.actions.Node: used to define ROS 2 nodes.

  4. launch.substitutions.LaunchConfiguration: retrieves parameter values defined in the Launch file.

  5. LaunchDescription, LaunchService: used to define and start the Launch file.

  6. launch_description_sources PythonLaunchDescriptionSource: enables the inclusion of other Launch files.

  7. launch.actions.IncludeLaunchDescription, DeclareLaunchArgument, OpaqueFunction: used to define actions and arguments within the Launch file.

(2) Definition of the launch_setup Function

def launch_setup(context):
    mode = LaunchConfiguration('mode', default=1)
    mode_arg = DeclareLaunchArgument('mode', default_value=mode)

    camera_topic = LaunchConfiguration('camera_topic', default='depth_cam/rgb/image_raw')
    camera_topic_arg = DeclareLaunchArgument('camera_topic', default_value=camera_topic)

    controller_package_path = get_package_share_directory('controller')
    peripherals_package_path = get_package_share_directory('peripherals')
    large_models_package_path = get_package_share_directory('large_models')

    controller_launch = IncludeLaunchDescription(
        PythonLaunchDescriptionSource(
            os.path.join(controller_package_path, 'launch/controller.launch.py')),
    )
    
    depth_camera_launch = IncludeLaunchDescription(
        PythonLaunchDescriptionSource(
            os.path.join(peripherals_package_path, 'launch/depth_camera.launch.py')),
    )

    large_models_launch = IncludeLaunchDescription(
        PythonLaunchDescriptionSource(
            os.path.join(large_models_package_path, 'launch/start.launch.py')),
        launch_arguments={'mode': mode, 'camera_topic': camera_topic}.items(),
    )

    vllm_with_camera_node = Node(
        package='large_models_examples',
        executable='vllm_with_camera',
        output='screen',
        parameters=[{"camera_topic": camera_topic}],
    )
  1. This function is used to configure and initialize launch actions.

  2. mode = LaunchConfiguration(‘mode’, default=1) defines a launch argument named mode with a default value of 1.

  3. mode_arg = DeclareLaunchArgument(‘mode’, default_value=mode) declares the mode argument and includes it in the launch description.

  4. depth_camera_launch: uses IncludeLaunchDescription to bring in the depth_camera.launch.py launch file from the large_models package and passes the mode argument.

  5. Sdk_launch includes the jetarm_sdk.launch.py Launch file from the large_models package using IncludeLaunchDescription and passes the mode argument to it.

  6. large_models_launch: includes the start.launch.py file from the large_models package using IncludeLaunchDescription and passes the mode argument to it.

  7. Vllm_with_camera defines a ROS 2 node from the large_models package, executes the executable files from the vllm_with_camera, and prints the node’s output to the screen.

  8. The function returns a list of all defined launch actions.

(3) Definition of the generate_launch_description Function

def generate_launch_description():
    return LaunchDescription([
        OpaqueFunction(function = launch_setup)
    ])
  1. This function is responsible for generating the complete launch description.

  2. The launch_setup function is incorporated using OpaqueFunction.

(4) Main Program Entry

if __name__ == '__main__':
    # Create a LaunchDescription object(创建一个LaunchDescription对象)
    ld = generate_launch_description()

    ls = LaunchService()
    ls.include_launch_description(ld)
    ls.run()
  1. ld = generate_launch_description() generates the launch description object.

  2. ls = LaunchService() creates the launch service object.

  3. ls.include_launch_description(ld) adds the launch description to the service.

  4. ls.run() starts the service and execute all launch actions.

2. Python File Analysis

The source code for this program is located at:

/home/ubuntu/ros2_ws/src/large_models_examples/large_models_examples/vllm_with_camera.py

(1) Import the Necessary Libraries

import os
import cv2
import json
import queue
import rclpy
import threading
import numpy as np

from rclpy.node import Node
from cv_bridge import CvBridge
from sensor_msgs.msg import Image
from std_msgs.msg import String, Bool
from std_srvs.srv import Trigger, SetBool, Empty
from rclpy.executors import MultiThreadedExecutor
from rclpy.callback_groups import ReentrantCallbackGroup

from speech import speech
from large_models.config import *
from large_models_msgs.srv import SetModel, SetString, SetInt32
from servo_controller.bus_servo_control import set_servo_position
from servo_controller_msgs.msg import ServosPosition, ServoPosition
  1. cv2: Utilized for image processing and display using OpenCV.

  2. time: manages execution delays and time-related operations.

  3. queue: handles image queues between threads.

  4. rclpy: provides tools for creating and communicating between ROS 2 nodes.

  5. threading: enables multithreading for concurrent task processing.

  6. config: contains configuration file.

  7. numpy (np): supports matrix and vector operations.

  8. std_srvs.srv: contains standard ROS service types, used to define standard service.

  9. std_msgs.msg:contains standard ROS message types.

  10. sensor_msgs.msg.Image: used for receiving image messages from the camera.

  11. servo_controller_msgs.msg.ServosPosition: custom message type for controlling servo motors.

  12. servo_controller.bus_servo_control.set_servo_position: function used to set the servo angle.

  13. rclpy.callback_groups.ReentrantCallbackGroup: supports concurrent callback handling.

  14. rclpy.executors.MultiThreadedExecutor: multithreaded executor in ROS 2 for handling concurrent tasks.

  15. Rclpy.node: node class in ROS 2.

  16. Speech: module related to large model voice interaction.

  17. large_models_msgs.srv: custom service types for large models.

  18. Large_models.config: configuration file for large models.

(2) VLLMWithCamera Class

class VLLMWithCamera(Node):
    def __init__(self, name):
        rclpy.init()
        super().__init__(name)
        self.image_queue = queue.Queue(maxsize=2)
        self.set_above = False
        self.vllm_result = ''
        self.running = True
        self.action_finish = False
        self.play_audio_finish = False
        self.bridge = CvBridge()
        self.client = speech.OpenAIAPI(api_key, base_url)
        self.declare_parameter('camera_topic', '/depth_cam/rgb/image_raw')
        camera_topic = self.get_parameter('camera_topic').value
        
        timer_cb_group = ReentrantCallbackGroup()
        self.joints_pub = self.create_publisher(ServosPosition, 'servo_controller', 1)
        self.tts_text_pub = self.create_publisher(String, 'tts_node/tts_text', 1)
        self.create_subscription(Image, camera_topic, self.image_callback, 1)
        self.create_subscription(String, 'agent_process/result', self.vllm_result_callback, 1)
        self.create_subscription(Bool, 'tts_node/play_finish', self.play_audio_finish_callback, 1, callback_group=timer_cb_group)
        self.awake_client = self.create_client(SetBool, 'vocal_detect/enable_wakeup')
        self.awake_client.wait_for_service()
        self.set_model_client = self.create_client(SetModel, 'agent_process/set_model')
        self.set_model_client.wait_for_service()
        self.set_mode_client = self.create_client(SetInt32, 'vocal_detect/set_mode')
        self.set_mode_client.wait_for_service()
        self.set_prompt_client = self.create_client(SetString, 'agent_process/set_prompt')
        self.set_prompt_client.wait_for_service()
        self.timer = self.create_timer(0.0, self.init_process, callback_group=timer_cb_group)
  1. Display_size: defines the size of the display window.

  2. Rclpy.init(): initializes the ROS 2 node.

  3. Super().init(name): calls the constructor of the superclass to initialize the node with the specified name.

  4. Self.image_queue: creates a queue to store image data, with a maximum size of 2.

  5. self.vllm_result: stores results received from the agent_process/result topic.

  6. self.running: a flag variable used to control the running state of the program.

  7. Timer_cb_group: creates a reentrant callback group for managing timer callbacks.

  8. Self.joints_pub: a publisher is created for sending robotic arm’s joint position information.

  9. self.tts_text_pub: a publisher is created for sending text data to the text-to-speech node.

  10. self.create_subscription: creates subscribers to receive image messages, VLLM results, and playback completion signals.

  11. self.awake_client and self.set_model_client: creates service clients for triggering the wake-up and model configuration services.

  12. self.timer: creates a timer that triggers the init_process function.

(3) get_node_state Method

    def get_node_state(self, request, response):
        return response

return response: returns a response object.

(4) init_process Method

    def init_process(self):
        self.timer.cancel()
        
        msg = SetModel.Request()
        msg.model_type = 'vllm'
        if os.environ['ASR_LANGUAGE'] == 'Chinese':
            msg.model = stepfun_vllm_model
            msg.api_key = stepfun_api_key
            msg.base_url = stepfun_base_url
        else:
            msg.model = vllm_model
            msg.api_key = vllm_api_key
            msg.base_url = vllm_base_url
        self.send_request(self.set_model_client, msg)

        msg = SetString.Request()
        msg.data = VLLM_PROMPT
        self.send_request(self.set_prompt_client, msg)

        set_servo_position(self.joints_pub, 1.0,
                           ((1, 500), (2, 645), (3, 135), (4, 80), (5, 500), (10, 220)))  # 设置机械臂初始位置
        speech.play_audio(start_audio_path)
        threading.Thread(target=self.process, daemon=True).start()
        self.create_service(Empty, '~/init_finish', self.get_node_state)
        self.get_logger().info('\033[1;32m%s\033[0m' % 'start')
  1. Self.timer.cancel(): stops the timer.

  2. SetModel.Request: creates a request message to configure the model.

  3. Self.send_request: sends the request to the corresponding service.

  4. Set_servo_position: sets the initial joint positions of the robotic arm.

  5. Speech.play_audio: plays an audio file.

  6. threading.Thread: creates a new thread to run the process function.

  7. Self.create_service: registers a service to notify that the initialization process is complete.

  8. Self.get_logger().info: logs informational messages to the console.

(5) send_request Method

    def send_request(self, client, msg):
        future = client.call_async(msg)
        while rclpy.ok():
            if future.done() and future.result():
                return future.result()
  1. Client.call_async(msg): makes an asynchronous service call.

  2. future.done() and future.result(): check if the service call is complete and retrieve the result.

(6) vllm_result_callback Method

    def vllm_result_callback(self, msg):
        self.vllm_result = msg.data

This callback receives results from the agent_process/result topic and stores them in self.vllm_result.

(7) process Method

    def process(self):
        # box = []
        while self.running:
            image = self.image_queue.get(block=True)
            if self.vllm_result:
                msg = String()
                # self.get_logger().info('vllm_result: %s' % self.vllm_result)
                msg.data = self.vllm_result
                self.tts_text_pub.publish(msg)
                # if self.vllm_result.startswith("```") and self.vllm_result.endswith("```"):
                    # self.vllm_result = self.vllm_result.strip("```").replace("json\n", "").strip()
                # self.vllm_result = json.loads(self.vllm_result)
                # box = self.vllm_result['xyxy']
                # if box:
                    # self.get_logger().info('box: %s' % str(box))
                    # if isinstance(box[0], float):
                        # box = [int(box[0] * 640), int(box[1] * 360), int(box[2] * 640), int(box[3] * 360)]
                    # else:
                        # self.client.data_process(box, 640, 360)
                    # box[0] = int(box[0] / 640 * display_size[0])
                    # box[1] = int(box[1] / 360 * display_size[1])
                    # box[2] = int(box[2] / 640 * display_size[0])
                    # box[3] = int(box[3] / 360 * display_size[1])
                    # self.get_logger().info('box: %s' % str(box))
                self.vllm_result = ''
                self.action_finish = True
            if self.play_audio_finish and self.action_finish:
                self.play_audio_finish = False
                self.action_finish = False
                # msg = SetInt32.Request()
                # msg.data = 1
                # self.send_request(self.set_mode_client, msg)
                msg = SetBool.Request()
  1. cv2.namedWindow and cv2.moveWindow: create and position a window for image display.

  2. Self.image_queue.get: retrieves image data from the queue.

  3. Self.vllm_result: if a VLLM result is available, publishes it to the text-to-speech node.

  4. Cv2.imshow: displays the image on the screen.

  5. cv2.waitKey: waits for a keyboard event.

  6. cv2.destroyAllWindows: closes all OpenCV display windows.

(8) Play_audio_finish_callback Method

    def play_audio_finish_callback(self, msg):
        self.play_audio_finish = msg.data

It sends a request to enable the wake-up feature once audio playback is complete.

(9) image_callback Method

    def image_callback(self, ros_image):
        cv_image = self.bridge.imgmsg_to_cv2(ros_image, "rgb8")
        rgb_image = np.array(cv_image, dtype=np.uint8)

        if self.image_queue.full():
            # If the queue is full, discard the oldest image (如果队列已满,丢弃最旧的图像)
            self.image_queue.get()
            # Put the image into the queue (将图像放入队列)
        self.image_queue.put(rgb_image)

It converts received ROS image messages to NumPy arrays and stores them in the queue.

(10) main Method

def main():
    node = VLLMWithCamera('vllm_with_camera')
    executor = MultiThreadedExecutor()
    executor.add_node(node)
    executor.spin()
    node.destroy_node()
  1. Create an instance of the VLLMWithCamera node.

  2. A multithreaded executor is used to handle the node’s tasks.

  3. Call executor.spin() to start processing ROS events.

  4. Upon shutdown, the node is properly destroyed using node.destroy_node().

25.2.5 Vision Tracking in Embodied AI Applications

The large model used in this lesson operates online, requiring a stable network connection for the main controller in use.

  • Overview of Operation

Once the program starts, Circular Microphone Array will announce: “I’m Ready.”

To activate the voice device, speak the designated wake words: “hello hiwonder.” Upon successful activation, the voice device will respond with “I’m Here.” After that you can control the robot by voice—for example, “Follow the person wearing white clothes ahead.” The terminal prints the recognized speech, the Circular microphone Array announces the generated response, the camera detects the black line, and the robot halts as soon as an obstacle appears in front.

  • Preparation

1. Version Confirmation

Before starting this feature, verify Version Confirmation that the correct microphone configuration is set in the system.

2. Configuring the Large Model API-KEY

Open a new command-line terminal and enter the following command to access the configuration file.

vim /home/ubuntu/ros2_ws/src/large_models/large_models/large_models/config.py

Refer to 25.1.1 Obtain and Configure the Large Model API Key to obtain the large-model API-KEY: paste the vision model (from the OpenRouter website) into the vllm_api_key parameter, and the LLM model (from the OpenAI website) into the llm_api_key parameter, as indicated by the red boxes in the figure below.

  • Enabling and Disabling the Feature

Note

Command input is case-sensitive and space-sensitive.

The robot must be connected to the Internet, either in STA (LAN) mode or AP (direct connection) mode via Ethernet.

  1. Click to launch the command bar, enter the command, and press Enter to disable the auto-start service.

  2. Enter the command to disable the auto-start service of the mobile app.

    sudo systemctl stop start_app_node.service
    
  3. Open the command line terminal from the left side of the system interface. In the terminal window, enter the following command and press Enter to run the voice control feature.

    ros2 launch large_models_examples vllm_track.launch.py
    
  4. When the command line shows the output below and the device announces “Ready,” it means the microphone array has finished initializing and the YOLOv8 model has also been loaded. At this point you can say the wake word: “Hello, HiWonder.”

  5. When the terminal displays the corresponding output shown in the figure and the voice device responds with “I’m here”, it indicates successful activation. The system will begin recording the user’s voice command.

  6. When the terminal displays the next output as the reference image, it shows the recognized speech transcribed by the voice device.

  7. Upon successful recognition by the speech recognition service of cloud-based large speech model, the parsed command will be displayed under the publish_asr_result output in the terminal.

  8. Upon receiving user input shown in the figure, the terminal will display output indicating that the cloud-based large language model has been successfully invoked. The model will interpret the command, generate a language response, and execute a corresponding action based on the meaning of the command.

Note

The response is automatically generated by the model. While the semantic content is accurate, the wording and structure may vary due to randomness in language generation.

  1. When the terminal shows the output shown in the figure indicating the end of one interaction cycle, the system is ready for the next round. To initiate another interaction, repeat step 4 by speaking the wake words again.

  2. To exit the feature, press Ctrl+C in the terminal. If the feature does not exit immediately, press Ctrl+C multiple times.

  • Project Outcome

Once the feature is activated, natural language commands such as “Follow the person in white in front.” can be used. The robot will identify the person wearing white in the camera view and begin following them. It will automatically stop once a certain distance has been reached.

  • Program Brief Analysis

1. Launch File Analysis

File Path:

/home/ubuntu/ros2_ws/src/large_models_examples/large_models_examples/vllm_track.launch.py

(1) Import Libraries

import os
from ament_index_python.packages import get_package_share_directory

from launch_ros.actions import Node
from launch.substitutions import LaunchConfiguration
from launch import LaunchDescription, LaunchService
from launch.launch_description_sources import PythonLaunchDescriptionSource
from launch.actions import IncludeLaunchDescription, DeclareLaunchArgument, OpaqueFunction
  1. os: used for handling file paths and operating system-related functions.

  2. ament_index_python.packages.get_package_share_directory: retrieves the share directory path of ROS 2 package.

  3. launch_ros.actions.Node: used to define ROS 2 nodes.

  4. launch.substitutions.LaunchConfiguration: retrieves parameter values defined in the Launch file.

  5. LaunchDescription, LaunchService: used to define and start the Launch file.

  6. launch_description_sources PythonLaunchDescriptionSource: enables the inclusion of other Launch files.

  7. launch.actions.IncludeLaunchDescription, DeclareLaunchArgument, OpaqueFunction: used to define actions and arguments within the Launch file.

(2) Definition of the launch_setup Function

def launch_setup(context):
    mode = LaunchConfiguration('mode', default=1)
    mode_arg = DeclareLaunchArgument('mode', default_value=mode)
    
    slam_package_path = get_package_share_directory('slam')
    large_models_package_path = get_package_share_directory('large_models') 
    
    base_launch = IncludeLaunchDescription(
        PythonLaunchDescriptionSource(
            os.path.join(slam_package_path, 'launch/include/robot.launch.py')),
        launch_arguments={
            'sim': 'false',
            'master_name': os.environ['MASTER'],
            'robot_name': os.environ['HOST']
        }.items(),
    )

    large_models_launch = IncludeLaunchDescription(
        PythonLaunchDescriptionSource(
            os.path.join(large_models_package_path, 'launch/start.launch.py')),
        launch_arguments={'mode': mode}.items(),
    )

    vllm_track_node = Node(
        package='large_models_examples',
        executable='vllm_track',
        output='screen',
    )

    # rqt
    calibrate_rqt_reconfigure_node = Node(
        package='rqt_reconfigure',
        executable='rqt_reconfigure',
        name='calibrate_rqt_reconfigure'
    )

    return [mode_arg,
            base_launch,
            large_models_launch,
            vllm_track_node,
            # calibrate_rqt_reconfigure_node,
            ]
  1. The launch_setup function includes the robot.launch.py file from the slam package, which is responsible for launching the robot’s chassis control and initializing related peripheral connections with corresponding parameters.

  2. It also includes the start.launch.py file from the large_models package, which launches the intelligent agent equipped with the capabilities of “seeing, listening, thinking, and speaking.”

  3. Additionally, it includes the vllm_track executable from the large_models_examples package, which serves as the main node for the visual tracking functionality.

(3) Definition of the generate_launch_description Function

def generate_launch_description():
    return LaunchDescription([
        OpaqueFunction(function = launch_setup)
    ])
  1. This function is responsible for generating the complete launch description.

  2. The launch_setup function is incorporated using OpaqueFunction.

(4) Main Program Entry

if __name__ == '__main__':
    # Create a LaunchDescription object(创建一个LaunchDescription对象)
    ld = generate_launch_description()

    ls = LaunchService()
    ls.include_launch_description(ld)
    ls.run()
  1. ld = generate_launch_description() generates the launch description object.

  2. ls = LaunchService() creates the launch service object.

  3. ls.include_launch_description(ld) adds the launch description to the service.

  4. ls.run() starts the service and execute all launch actions.

2. Python File Analysis

The source code for this program is located at:

/home/ubuntu/ros2_ws/src/large_models_examples/large_models_examples/vllm_track.py

(1) Import the Necessary Libraries

import os
import cv2
import json
import time
import queue
import rclpy
import threading
import PIL.Image
import numpy as np
import sdk.fps as fps
import message_filters
from sdk import common
from rclpy.node import Node
from sensor_msgs.msg import Image
from geometry_msgs.msg import Twist
from std_msgs.msg import String, Float32, Bool
from std_srvs.srv import Trigger, SetBool, Empty
from rcl_interfaces.msg import SetParametersResult
from rclpy.executors import MultiThreadedExecutor
from rclpy.callback_groups import ReentrantCallbackGroup

from speech import speech
from large_models.config import *
from large_models_msgs.srv import SetString, SetModel, SetInt32
from large_models_examples.track_anything import ObjectTracker
from servo_controller.bus_servo_control import set_servo_position
from servo_controller_msgs.msg import ServosPosition, ServoPosition
from large_models_examples.tracker import Tracker
  1. Image Processing

    cv2: provides core functions for image processing and display using OpenCV.

    PIL.Image and numpy: used for image data manipulation and numerical computations.

  2. ROS Communication and Synchronization

    rclpy: core library for creating ROS 2 nodes and handling communication.

    message_filters: ensures time synchronization between multi-sensor data streams, such as RGB and depth images.

    sensor_msgs, geometry_msgs, std_msgs, std_srvs, rcl_interfaces: core message and service types used for ROS data and service exchange.

    MultiThreadedExecutor and ReentrantCallbackGroup: enable concurrent callbacks and multithreaded scheduling to maintain system responsiveness and real-time performance.

(2) PROMPT String

    '''
else:
    PROMPT = '''
**Role
You are a smart car with advanced visual recognition capabilities. Your task is to analyze an image sent by the user, perform object detection, and follow the detected object. Finally, return the result strictly following the specified output format.

Step 1: Understand User Instructions
You will receive a sentence. From this sentence, extract the object name to be detected.
Note: Use English for the object value, do not include any objects not explicitly mentioned in the instruction.

Step 2: Understand the Image
You will also receive an image. Locate the target object in the image and return its coordinates as the top-left and bottom-right pixel positions in the form [xmin, ymin, xmax, ymax].
Note: If the object is not found, then "xyxy" should be an empty list: [], only detect and report objects mentioned in the user instruction.The coordinates (xmin, ymin, xmax, ymax) must be normalized to the range [0, 1]

**Important: Accurately understand the spatial position of the object. The "response" must reflect both the users instruction and the detection result.

**Output Format (strictly follow this format, do not output anything else.The coordinates (xmin, ymin, xmax, ymax) must be normalized to the range [0, 1])
{
    "object": "name", 
    "xyxy": [xmin, ymin, xmax, ymax],
    "response": "reflect both the user’s instruction and the detection result (5–30 characters)"
}

**Example
Input: track the person
Output:
{
    "object": "person",
    "xyxy": [0.1, 0.3, 0.4, 0.6],
    "response": "I have detected a person in a white T-shirt and will track him now."
}
    '''

It defines a prompt string (PROMPT) used to guide how user commands and image data are processed. It specifies the recognition task logic and expected output format.

(3) VLLMTrack Class

display_size = [int(640*6/4), int(360*6/4)]
class VLLMTrack(Node):
    def __init__(self, name):
        rclpy.init()
        super().__init__(name)
        self.fps = fps.FPS() # Frame rate counter(帧率统计器)
        self.image_queue = queue.Queue(maxsize=2)
        self.vllm_result = ''
        # self.vllm_result = '''json{"object":"red cube", "xyxy":[521, 508, 637, 683]}''' (self.vllm_result = '''json{"object":"红色方块", "xyxy":[521, 508, 637, 683]}''')
        self.set_above = False
        self.running = True
        self.data = []
        self.box = []
        self.stop = True
        self.start_track = False
        self.action_finish = False
        self.play_audio_finish = False
        self.track = ObjectTracker(use_mouse=True, automatic=True, log=self.get_logger())
        
        timer_cb_group = ReentrantCallbackGroup()
        self.client = speech.OpenAIAPI(api_key, base_url)
        self.joints_pub = self.create_publisher(ServosPosition, 'servo_controller', 1)
        self.mecanum_pub = self.create_publisher(Twist, '/controller/cmd_vel', 1)  # Chassis control (底盘控制)
        self.tts_text_pub = self.create_publisher(String, 'tts_node/tts_text', 1)
        self.create_subscription(Bool, 'tts_node/play_finish', self.play_audio_finish_callback, 1, callback_group=timer_cb_group)
        self.create_subscription(String, 'agent_process/result', self.vllm_result_callback, 1)
        self.create_subscription(Bool, 'vocal_detect/wakeup', self.wakeup_callback, 1)
        
        self.awake_client = self.create_client(SetBool, 'vocal_detect/enable_wakeup')
        self.awake_client.wait_for_service()
        self.set_model_client = self.create_client(SetModel, 'agent_process/set_model')
        self.set_model_client.wait_for_service()
        self.set_mode_client = self.create_client(SetInt32, 'vocal_detect/set_mode')
        self.set_mode_client.wait_for_service()
        self.set_prompt_client = self.create_client(SetString, 'agent_process/set_prompt')
        self.set_prompt_client.wait_for_service()

        image_sub = message_filters.Subscriber(self, Image, 'depth_cam/rgb/image_raw')
        depth_sub = message_filters.Subscriber(self, Image, 'depth_cam/depth/image_raw')

        # Synchronize timestamps, allowing an error margin of up to 0.03 seconds (同步时间戳, 时间允许有误差在0.03s)
        sync = message_filters.ApproximateTimeSynchronizer([depth_sub, image_sub], 3, 0.02)
        sync.registerCallback(self.multi_callback)

        # Define PID parameters (定义 PID 参数)
        # 0.07, 0, 0.001
        self.pid_params = {
            'kp1': 0.1, 'ki1': 0.0, 'kd1': 0.00,
            'kp2': 0.002, 'ki2': 0.0, 'kd2': 0.0,
        }
  1. Display_size: defines the size of the display window.

  2. rclpy.init(): initializes the ROS 2 system.

  3. super().init(name): calls the initializer of the parent class (Node) to create a ROS 2 node.

  4. fps.FPS(): creates an instance of a frame rate tracker.

  5. queue.Queue(maxsize=2): creates a queue with a maximum capacity of 2.

  6. self.vllm_result: stores the results processed by the model.

  7. self.running: a boolean flag used to control the runtime state of the program.

  8. self.data: an empty list used for storing intermediate data.

  9. self.start_track: a boolean flag used to control the start or stop of target tracking.

  10. self.track = ObjectTracker(): instantiates an object tracker.

  11. timer_cb_group: creates a reentrant callback group to allow concurrent callback execution.

  12. speech.OpenAIAPI(): instantiates a speech interaction client.

  13. self.joints_pub: creates a publisher for sending servo motor position commands.

  14. self.tts_text_pub: a publisher is created for sending text data to the text-to-speech node.

  15. self.create_subscription: subscribes to image data, speech playback completion signals, and image recognition results.

  16. self.create_client: creates service clients for tasks such as wake-up control, model configuration, and motion control.

  17. Each PID parameter in self.pid_params is declared as a ROS 2 dynamic parameter with a default value.

  18. Current values are retrieved using self.get_parameter() and used to update self.pid_params.

  19. self.track.update_pid(): passes the updated PID parameters to the object tracker to adjust its PID controller.

  20. self.pid_params: contains the PID controller parameters used in the target tracking control logic.

  21. self.add_on_set_parameters_callback: registers a callback function to handle parameter updates dynamically.

  22. self.timer: creates a timer used to trigger the initialization process.

(4) on_parameter_update Method

    def on_parameter_update(self, params):
        """Parameter update callback (参数更新回调)"""
        for param in params:
            if param.name in self.pid_params.keys():
                self.pid_params[param.name] = param.value
        # self.get_logger().info(f'PID parameters updated: {self.pid_params}')
        # Update PID parameters (更新 PID 参数)
        self.track.update_pid([self.pid_params['kp1'], self.pid_params['ki1'], self.pid_params['kd1']],
                      [self.pid_params['kp2'], self.pid_params['ki2'], self.pid_params['kd2']])
  1. self.pid_params: a dictionary storing PID parameters.

  2. self.track.update_pid: updates the PID parameters used by the target tracking module.

(5) create_update_callback Method

    def create_update_callback(self, param_name):
        """Generate dynamic update callback (生成动态更新回调)"""
        def update_param(msg):
            new_value = msg.data
            self.pid_params[param_name] = new_value
            self.set_parameters([Parameter(param_name, Parameter.Type.DOUBLE, new_value)])
            self.get_logger().info(f'Updated {param_name}: {new_value}')
            # Update PID parameters (更新 PID 参数)

It generates a dynamic update callback function for each PID parameter.

(6) get_node_state Method

    def get_node_state(self, request, response):
        return response

return response: returns a response object.

(7) init_process Method

    def init_process(self):
        self.timer.cancel()
        
        msg = SetModel.Request()
        msg.model_type = 'vllm'
        if language == 'Chinese':
            msg.model = stepfun_vllm_model
            msg.api_key = stepfun_api_key
            msg.base_url = stepfun_base_url
        else:
            msg.api_key = vllm_api_key
            msg.base_url = vllm_base_url
            msg.model = vllm_model
        self.send_request(self.set_model_client, msg)

        msg = SetString.Request()
        msg.data = PROMPT
        self.send_request(self.set_prompt_client, msg)
        
        set_servo_position(self.joints_pub, 1.5, ((10, 300), (5, 500), (4, 100), (3, 100), (2, 750), (1, 500)))
        self.mecanum_pub.publish(Twist())
        time.sleep(1.8)
        speech.play_audio(start_audio_path)
        threading.Thread(target=self.process, daemon=True).start()
        threading.Thread(target=self.display_thread, daemon=True).start()
        self.create_service(Empty, '~/init_finish', self.get_node_state)
        self.get_logger().info('\033[1;32m%s\033[0m' % 'start')
        self.get_logger().info('\033[1;32m%s\033[0m' % PROMPT)
  1. self.timer.cancel(): cancels the timer.

  2. SetModel.Request(): creates a request message to set the model.

  3. SetString.Request(): creates a request message to set the prompt string.

  4. set_pose_target: sets the robot’s initial position.

  5. self.send_request: Sends a service request.

  6. set_servo_position: sets the position of servo joints.

  7. Speech.play_audio: plays an audio file.

  8. threading.Thread: starts a new thread for image processing and tracking.

  9. Self.create_service: registers a service to notify that the initialization process is complete.

  10. self.get_logger().info: prints log messages.

(8) send_request Method

    def send_request(self, client, msg):
        future = client.call_async(msg)
        while rclpy.ok():
            if future.done() and future.result():
                return future.result()
  1. client.call_async(msg): makes an asynchronous service call.

  2. future.done() and future.result(): check if the service call is complete and retrieve the result.

(9) vllm_result_callback Method

    def vllm_result_callback(self, msg):
        self.vllm_result = msg.data

This callback receives results from the agent_process/result topic and stores them in self.vllm_result.

(10) Play_audio_finish_callback Method

    def play_audio_finish_callback(self, msg):
        self.play_audio_finish = msg.data

The play_audio_finish_callback method sends a wake-up signal after audio playback is completed.

(11) Process Method

    def process(self):
        box = ''
        while self.running:
            if self.vllm_result:
                try:
                    # self.get_logger().info('vllm_result: %s' % self.vllm_result)
                    if self.vllm_result.startswith("```") and self.vllm_result.endswith("```"):
                        self.vllm_result = self.vllm_result.strip("```").replace("json\n", "").strip()
                    self.vllm_result = json.loads(self.vllm_result)
                    response = self.vllm_result['response']
                    msg = String()
                    msg.data = response
                    self.tts_text_pub.publish(msg)
                    box = self.vllm_result['xyxy']
                    if box:
                        if language == 'Chinese':
                            box = self.client.data_process(box, 640, 480)
                            self.get_logger().info('box: %s' % str(box))
                        else:
                            box = [int(box[0] * 640), int(box[1] * 480), int(box[2] * 640), int(box[3] * 480)]
                        box = [box[0], box[1], box[2] - box[0], box[3] - box[1]]
                        box[0] = int(box[0] / 640 * display_size[0])
                        box[1] = int(box[1] / 360 * display_size[1])
                        box[2] = int(box[2] / 640 * display_size[0])
                        box[3] = int(box[3] / 360 * display_size[1])
                        self.get_logger().info('box: %s' % str(box))
                        self.box = box
                except (ValueError, TypeError):
                    self.box = []
                    msg = String()
                    msg.data = self.vllm_result
                    self.tts_text_pub.publish(msg)
                self.vllm_result = ''
                self.action_finish = True
            else:
                time.sleep(0.02)
            if self.play_audio_finish and self.action_finish:
                self.play_audio_finish = False
                self.action_finish = False
                msg = SetBool.Request()
                msg.data = True
                self.send_request(self.awake_client, msg)
                # msg = SetInt32.Request()
                # msg.data = 1
                # self.send_request(self.set_mode_client, msg)
                self.stop = False
  1. while self.running: ensures continuous data processing while the node is active.

  2. if self.vllm_result: checks whether a new VLLM response has been received.

  3. Json.loads(self.vllm_result): parses the received JSON-formatted string into a dictionary for easier data extraction.

  4. Extracting response and xyxy:

    response = self.vllm_result[‘response’]: retrieves the text message for TTS (Text-to-Speech) playback.

    box = self.vllm_result[‘xyxy’]: retrieves the bounding box coordinates returned from object detection.

  5. Processing Bounding Box Data: If a target is detected and the box is not empty, the method calls self.client.data_process to normalize the coordinates using a frame size of 640 by 480. It then converts the coordinates into the format [xmin, ymin, width, height], scales them based on the display window size, and stores the result in self.box for tracking.

  6. Sending TTS Response: A String message is created and the extracted response is sent to the TTS node for speech playback.

  7. Error Handling: If a ValueError or TypeError occurs during parsing or extraction, the target info is cleared, and the original vllm_result is sent to the TTS node as plain text.

  8. State Updates:

self.vllm_result is reset to an empty string to indicate the response has been processed.

self.action_finish is set to True to mark that the action has been completed.

  1. Loop Wait Mechanism: If no new vllm_result is received, the loop sleeps for 20 ms using time.sleep(0.02) to avoid high CPU usage.

  2. Subsequent Service Request: when both self.play_audio_finish and self.action_finish are True,

    related flags are reset.

A service request is sent via self.send_request(self.set_mode_client, msg) to set the mode to 2, allowing target tracking to begin.

self.stop is updated to False, lifting the pause on target tracking.

(12) track_thread

    def process(self):
        box = ''
        while self.running:
            if self.vllm_result:
                try:
                    # self.get_logger().info('vllm_result: %s' % self.vllm_result)
                    if self.vllm_result.startswith("```") and self.vllm_result.endswith("```"):
                        self.vllm_result = self.vllm_result.strip("```").replace("json\n", "").strip()
                    self.vllm_result = json.loads(self.vllm_result)
                    response = self.vllm_result['response']
                    msg = String()
                    msg.data = response
                    self.tts_text_pub.publish(msg)
                    box = self.vllm_result['xyxy']
                    if box:
                        if language == 'Chinese':
                            box = self.client.data_process(box, 640, 480)
                            self.get_logger().info('box: %s' % str(box))
                        else:
                            box = [int(box[0] * 640), int(box[1] * 480), int(box[2] * 640), int(box[3] * 480)]
                        box = [box[0], box[1], box[2] - box[0], box[3] - box[1]]
                        box[0] = int(box[0] / 640 * display_size[0])
                        box[1] = int(box[1] / 360 * display_size[1])
                        box[2] = int(box[2] / 640 * display_size[0])
                        box[3] = int(box[3] / 360 * display_size[1])
                        self.get_logger().info('box: %s' % str(box))
                        self.box = box
                except (ValueError, TypeError):
                    self.box = []
                    msg = String()
                    msg.data = self.vllm_result
                    self.tts_text_pub.publish(msg)
                self.vllm_result = ''
                self.action_finish = True
  1. while self.running: controls the thread’s runtime, and the thread continuously processes image data as long as self.running is True.

  2. image, depth_image = self.image_queue.get(block=True): fetches the latest RGB and depth images from the thread-safe queue to ensure synchronized data processing.

  3. if box: checks if a new detection bounding box (box) is available. If available, the method calls self.track.set_track_target(self.box, image) to set the current tracking target in the image, sets self.start_track = True, and clears self.box.

  4. if self.start_track: verifies if target tracking has been initiated. If True, it calls self.track.track(image, depth_image) to obtain the tracking result, which is then stored in self.data.

  5. Use of self.data: the first two values in the tracking result usually represent control commands, such as linear and angular velocity. The last element is the image annotated with the tracking results, which is used for display.

  6. Construct and Publish Twist Messages: a Twist message is constructed using control commands extracted from self.data and published via self.mecanum_pub.publish(twist) to control the movement of the robot base.

  7. self.fps.update and self.fps.show_fps: update the frame rate statistics and overlay the current FPS on the image for real-time performance monitoring.

  8. cv2.imshow: display the processed image in a window to provide a visual representation of the target tracking status.

  9. cv2.waitKey: wait for keyboard input. If the user presses the q or ESC key, the tracking thread is stopped and a stop-motion Twist message is sent to halt the robot.

  10. Window on Top Handling: The display window is repositioned using cv2.moveWindow, and the command os.system(“wmctrl -r image -b add,above”) sets the window to stay on top, ensuring it’s always visible.

  11. cv2.destroyAllWindows:

    after the tracking thread exits, all OpenCV-created windows are closed to complete the cleanup process.

(13) main Method

def main():
    node = VLLMTrack('vllm_track')
    executor = MultiThreadedExecutor()
    executor.add_node(node)
    executor.spin()
    node.destroy_node()
  1. Create a VLLMTrack node instance.

  2. Use a multithreaded executor to handle the node’s tasks.

  3. Call executor.spin() to start processing ROS events.

  4. Upon shutdown, the node is properly destroyed using node.destroy_node().

25.2.6 Smart Home Assistant in Embodied AI Applications

The large model used in this lesson operates online, requiring a stable network connection for the main controller in use.

  • Overview of Operation

Once the program starts, Circular Microphone Array will announce: “I’m Ready.”

To activate the voice device, speak the designated wake words: “hello hi wonder.” Upon successful activation, the voice device will respond with “I’m Here.” Once activated, you can control the robot by voice—issue a command like “Go to the front desk, check whether the main door is closed, then come back and tell me.” Upon receiving a command, the terminal displays the recognized speech content. The voice device then verbally responds with a generated answer and the robot simultaneously executes the corresponding action.

  • Preparation

1. Version Confirmation

Before starting this feature, verify Version Confirmation that the correct microphone configuration is set in the system.

2. Configuring the Large Model API-KEY

Open a new command-line terminal and enter the following command to access the configuration file.

vim /home/ubuntu/ros2_ws/src/large_models/large_models/large_models/config.py

Refer to 25.1.1 Obtain and Configure the Large Model API Key to obtain the large-model API-KEY: paste the vision model (from the OpenRouter website) into the vllm_api_key parameter, and the LLM model (from the OpenAI website) into the llm_api_key parameter, as indicated by the red boxes in the figure below.

3. Navigation Map Construction

Before enabling this feature, a map must be created in advance. Please refer to 19 Mapping and Navigation / 19.1 Mapping Tutorial for detailed instructions on how to build the map.

  • Enabling and Disabling the Feature

Note

Command input is case-sensitive and space-sensitive.

The robot must be connected to the Internet, either in STA (LAN) mode or AP (direct connection) mode via Ethernet.

  1. Open the command line terminal from the left side of the system interface. In the terminal window, enter the following command and press Enter to stop the auto-start service

  2. Enter the command to disable the auto-start service of the mobile app.

    sudo systemctl stop start_app_node.service
    
  3. Open the command line terminal from the left side of the system interface. In the terminal window, enter the following command and press Enter to run the voice control feature.

    ros2 launch large_models_examples vllm_navigation.launch.py map:=map_01
    
  4. After opening the map in RViz, click the “2D Pose Estimate” icon to set the robot’s initial position.

  5. When the command line shows the output below and the device announces “Ready,” it means the microphone array has finished initializing and the YOLOv8 model has also been loaded. At this point you can say the wake word: “Hello, HiWonder.”

  6. When the terminal displays the corresponding output shown in the figure and the voice device responds with “I’m here”, it indicates successful activation. The system will begin recording the user’s voice command.

  7. When the terminal displays the next output as the reference image, it shows the recognized speech transcribed by the voice device.

  8. Upon successful recognition by the speech recognition service of cloud-based large speech model, the parsed command will be displayed under the publish_asr_result output in the terminal.

  9. Upon receiving user input shown in the figure, the terminal will display output indicating that the cloud-based large language model has been successfully invoked. The model will interpret the command, generate a language response, and execute a corresponding action based on the meaning of the command.

Note

The response is automatically generated by the model. While the semantic content is accurate, the wording and structure may vary due to randomness in language generation.

  1. Arrive at the “front-desk” waypoint, detect the door status.

  2. Then return to the start point and report—for example: “The door is open; please close it to keep everything secure and private.”

  3. When the terminal shows the output shown in the figure indicating the end of one interaction cycle, the system is ready for the next round. To initiate another interaction, repeat step 5 by speaking the wake words again.

  4. To exit the feature, press Ctrl+C in the terminal. If the feature does not exit immediately, press Ctrl+C multiple times.

  • Project Outcome

Once the mode is active you can phrase the order any way you like, e.g. “Go to the front desk, check if the main door is closed, then come back and tell me.” The robot will navigate to the preset front-desk location, inspect the door, return to the origin, and give you the result.

  • Modifying Navigation Location

To change the navigation points, edit the file at the path below:

~/ros2_ws/src/large_models_examples/large_models_examples/vllm_navigation.py

  1. Follow the start/stop procedure to launch the program and open the RViz map display, then set a navigation goal: click “2D Goal Pose” and designate the target on the map.

  1. Return to the command terminal and check the target location parameter for the publication:

Locate the corresponding section of the code shown below, and fill in your target location parameters after the appropriate location name.

In the program, each location is defined as a target navigation point relative to the map’s origin, which corresponds to the robot’s starting position during the mapping process. Each navigation point includes five parameters:

x: position on the x-axis (meters)

y: position on the y-axis (meters)

roll: rotation around the x-axis (degrees)

pitch: rotation around the y-axis (degrees)

yaw: rotation around the z-axis (degrees)

For example, to define the position of “front desk,” the parameters can be set to [2.8, -2.7, 0.0, 0.0, 0.0], indicating that the robot will navigate to a location offset from the map origin by those values.

For example, given the quaternion: Quaternion(x=0.0, y=0.0, z=-0.5677173914973032, w=0.8232235197025761). After conversion to Euler angles (roll, pitch, yaw), the result is approximately: roll ≈ 0°, pitch ≈ 0°, yaw ≈ -69.3°

  • Program Brief Analysis

1. Launch File Analysis

File Path:

/home/ubuntu/ros2_ws/src/large_models_examples/large_models_examples/vllm_navigation.launch.py

(1) Import Libraries

import os
from ament_index_python.packages import get_package_share_directory

from launch_ros.actions import Node
from launch_ros.actions import PushRosNamespace
from launch import LaunchDescription, LaunchService
from launch.substitutions import LaunchConfiguration
from launch.launch_description_sources import PythonLaunchDescriptionSource
from launch.actions import DeclareLaunchArgument, IncludeLaunchDescription, GroupAction, OpaqueFunction, TimerAction, ExecuteProcess
  1. os: used for handling file paths and operating system-related functions.

  2. ament_index_python.packages.get_package_share_directory: retrieves the share directory path of ROS 2 package.

  3. launch_ros.actions.Node: used to define ROS 2 nodes.

  4. LaunchDescription, LaunchService: used to define and start the Launch file.

  5. launch_description_sources PythonLaunchDescriptionSource: enables the inclusion of other Launch files.

  6. launch.actions.IncludeLaunchDescription, DeclareLaunchArgument, OpaqueFunction: used to define actions and arguments within the Launch file.

(2) Definition of the launch_setup Function

def launch_setup(context):
    slam_package_path = get_package_share_directory('slam')
    navigation_package_path = get_package_share_directory('navigation')
    large_models_package_path = get_package_share_directory('large_models')
    
    mode = LaunchConfiguration('mode', default=1)
    mode_arg = DeclareLaunchArgument('mode', default_value=mode)
    map_name = LaunchConfiguration('map', default='map_01').perform(context)
    robot_name = LaunchConfiguration('robot_name', default=os.environ['HOST'])
    master_name = LaunchConfiguration('master_name', default=os.environ['MASTER'])

    map_name_arg = DeclareLaunchArgument('map', default_value=map_name)
    master_name_arg = DeclareLaunchArgument('master_name', default_value=master_name)
    robot_name_arg = DeclareLaunchArgument('robot_name', default_value=robot_name)

    navigation_launch = IncludeLaunchDescription(
        PythonLaunchDescriptionSource(os.path.join(navigation_package_path, 'launch/navigation.launch.py')),
        launch_arguments={
            'sim': 'false',
            'map': map_name,
            'robot_name': robot_name,
            'master_name': master_name,
            'use_teb': 'true',
        }.items(),
    )

    navigation_controller_node = Node(
        package='large_models_examples',
        executable='navigation_controller',
        output='screen',
        parameters=[{'map_frame': 'map', 'nav_goal': '/nav_goal'}]
    )

    rviz_node = ExecuteProcess(
            cmd=['rviz2', 'rviz2', '-d', os.path.join(navigation_package_path, 'rviz/navigation_controller.rviz')],
            output='screen'
        )

    large_models_launch = IncludeLaunchDescription(
        PythonLaunchDescriptionSource(
            os.path.join(large_models_package_path, 'launch/start.launch.py')),
        launch_arguments={'mode': mode}.items(),
    )

    vllm_navigation_node = Node(
        package='large_models_examples',
        executable='vllm_navigation',
        output='screen',
    )

    return [
            mode_arg,
            map_name_arg, 
            master_name_arg,
            robot_name_arg,
            navigation_launch, 
            navigation_controller_node,
            rviz_node,
            large_models_launch,
            vllm_navigation_node
            ]
  1. This function is used to configure and initialize launch actions.

  2. mode = LaunchConfiguration(‘mode’, default=1) defines a launch argument named mode with a default value of 1.

  3. mode_arg = DeclareLaunchArgument(‘mode’, default_value=mode) declares the mode argument and includes it in the launch description.

  4. navigation_package_pathlarge_models_package_path: retrieves the shared directory path of the navigation and large_models package.

  5. navigation_launch: includes the robot.launch.py file and the bringup.launch.py file using IncludeLaunchDescription.

  6. large_models_launch: includes the start.launch.py Launch file from the large_models package using IncludeLaunchDescription and passes the mode argument to it.

  7. vllm_navigation_node: defines a ROS 2 node from the large_models_examples package, executes the executable files from the vllm_navigation.py, and prints the node’s output to the screen.

  8. vllm_navigation_node: defines a ROS 2 node from the large_models_examples package, executes the executable files from the vllm_navigation.py, and prints the node’s output to the screen.

  9. navigation_controller_node: defines a ROS 2 node from the large_models_examples package, executes the executable files from the navigation_controller.py to enable navigation control.

  10. rviz_node: defines a ROS 2 node from the navigation package, executes the navigation_controller.rviz files, and enables displaying the navigation control.

  11. The function returns a list of all defined launch actions.

(3) Definition of the generate_launch_description Function

def generate_launch_description():
    return LaunchDescription([
        OpaqueFunction(function = launch_setup)
    ])
  1. This function is responsible for generating the complete launch description.

  2. The launch_setup function is incorporated using OpaqueFunction.

(4) Main Program Entry

if __name__ == '__main__':
    # Create a LaunchDescription object(创建一个LaunchDescription对象)
    ld = generate_launch_description()

    ls = LaunchService()
    ls.include_launch_description(ld)
    ls.run()
  1. ld = generate_launch_description() generates the launch description object.

  2. ls = LaunchService() creates the launch service object.

  3. ls.include_launch_description(ld) adds the launch description to the service.

  4. ls.run() starts the service and execute all launch actions.

2. Python File Analysis

File Path:

/home/ubuntu/ros2_ws/src/large_models_examples/large_models_examples/vllm_navigation.py

(1) Import the Necessary Libraries

import os
import re
import time
import json
import rclpy
import threading
from speech import speech
from rclpy.node import Node
from sensor_msgs.msg import Image
from std_msgs.msg import String, Bool
from std_srvs.srv import Trigger, SetBool, Empty

from large_models.config import *
from large_models_msgs.srv import SetModel, SetContent, SetString, SetInt32

from interfaces.srv import SetPose2D
from rclpy.executors import MultiThreadedExecutor
from servo_controller_msgs.msg import ServosPosition
from rclpy.callback_groups import ReentrantCallbackGroup
from servo_controller.action_group_controller import ActionGroupController
  1. json: used for handling data in JSON format.

  2. time: manages execution delays and time-related operations.

  3. rclpy: provides tools for creating and communicating between ROS 2 nodes.

  4. threading: enables multithreading for concurrent task processing.

  5. speech: handles modules related to large model voice interaction.

  6. std_msgs.msg:contains standard ROS message types.

  7. rclpy.callback_groups.ReentrantCallbackGroup: supports concurrent callback handling.

  8. rclpy.executors.MultiThreadedExecutor: multithreaded executor in ROS 2 for handling concurrent tasks.

  9. large_models_msgs.srv: custom service types for large models.

  10. Large_models.config: configuration file for large models.

(2) PROMPT String

    '''
else:
    position_dict = {"kitchen": [2.8, -2.8, 0.0, 0.0, 0.0], #xyzrpy, m,deg
                     "front desk": [2.8, -2.7, 0.0, 0.0, 0.0],
                     "bedroom": [0.0, 0.0, 0.0, 0.0, 0.0],
                     "zoo": [1.3, 0.37, 0.0, 0.0, 0.0],
                     "space base": [1.58, -0.74, 0.0, 0.0, -48.0],
                     "football field": [0.32, -0.65, 0.0, 0.0, -90.0],
                     "origin": [0.0, 0.0, 0.0, 0.0, 0.0],
                     "home": [0.1, 0.4, 0.0, 0.0, 0.0]
                     }

    LLM_PROMPT = '''
**Role
You are a smart navigation vehicle equipped with a camera and speaker. You can move to different places, analyze visual input, and respond by playing audio. Based on user input, you need to generate the corresponding JSON command.

**Requirements
- For any user input, look up corresponding functions from the Action Function Library, and generate the proper JSON output.
- For each action sequence, include a concise (520 characters) and witty, varied response to make the interaction lively and engaging.
- Output only the JSON result, no analysis or extra text.
- Output format:
{
  "action": ["xx", "xx"],
  "response": "xx"
}

**Special Notes
The "action" field contains an ordered list of function names to be executed in sequence. If no matching function is found, return: "action": [].
The "response" field should contain a carefully crafted, short, humorous, and varied message (520 characters).

**Action Function Library
Move to a specified place: move('kitchen')
Return to starting point: move('origin')
Analyze current view: vision('What do you see?')
Play audio response: play_audio()

**Example
Input: Go to the front desk to see if the door is closed, and then come back and tell me
Output:
{
  "action": ["move('front desk')", "vision('Is the door closed?')", "move("origin")", "play_audio()"],
  "response": "On it, reporting soon!"
}
    '''

(3) VLLMNavigation Class

class VLLMNavigation(Node):
    def __init__(self, name):
        rclpy.init()
        super().__init__(name)
        
        self.action = []
        self.response_text = ''
        self.llm_result = ''
        self.play_audio_finish = False
        # self.llm_result = '{\'action\':[\'move(\"前台\")\', \'vision(\"大门有没有关\")\', \'move(\"原点\")\', \'play_audio()\'], \'response\':\'马上!\'}'
        self.running = True
        self.play_delay = False
        self.reach_goal = False
        self.interrupt = False
        
        timer_cb_group = ReentrantCallbackGroup()
        self.tts_text_pub = self.create_publisher(String, 'tts_node/tts_text', 1)
        # self.create_subscription(Image, 'depth_cam/rgb/image_raw', self.image_callback, 1)
        self.create_subscription(String, 'agent_process/result', self.llm_result_callback, 1)
        self.create_subscription(Bool, 'vocal_detect/wakeup', self.wakeup_callback, 1)
        self.create_subscription(Bool, 'tts_node/play_finish', self.play_audio_finish_callback, 1, callback_group=timer_cb_group)
        self.create_subscription(Bool, 'navigation_controller/reach_goal', self.reach_goal_callback, 1)
        self.awake_client = self.create_client(SetBool, 'vocal_detect/enable_wakeup')
        self.awake_client.wait_for_service()
        self.set_mode_client = self.create_client(SetInt32, 'vocal_detect/set_mode')
        self.set_mode_client.wait_for_service()
        self.set_model_client = self.create_client(SetModel, 'agent_process/set_model')
        self.set_model_client.wait_for_service()
        self.set_prompt_client = self.create_client(SetString, 'agent_process/set_prompt')
        self.set_prompt_client.wait_for_service()
        self.set_vllm_content_client = self.create_client(SetContent, 'agent_process/set_vllm_content')
        self.set_vllm_content_client.wait_for_service()
        self.set_pose_client = self.create_client(SetPose2D, 'navigation_controller/set_pose')
        self.set_pose_client.wait_for_service()

(4) get_node_state Method

	    def get_node_state(self, request, response):
        return response

return response: returns a response object.

(5) init_process Method

	    def init_process(self):
        self.timer.cancel()
        
        msg = SetModel.Request()
        msg.model = llm_model
        msg.model_type = 'llm'
        msg.api_key = api_key 
        msg.base_url = base_url
        self.send_request(self.set_model_client, msg)

        msg = SetString.Request()
        msg.data = LLM_PROMPT
        self.send_request(self.set_prompt_client, msg)
        
        init_finish = self.create_client(Empty, 'navigation_controller/init_finish')
        init_finish.wait_for_service()
        speech.play_audio(start_audio_path)
        threading.Thread(target=self.process, daemon=True).start()
        self.create_service(Empty, '~/init_finish', self.get_node_state)
        self.get_logger().info('\033[1;32m%s\033[0m' % 'start')
        self.get_logger().info('\033[1;32m%s\033[0m' % LLM_PROMPT)
  1. self.timer.cancel(): cancels the timer.

  2. SetModel.Request(): creates a request message to set the LLM model.

  3. SetModel.Request: creates a request for SetModel service.

  4. msg.model_type = 'llm': specifies the model type as ‘llm’.

  5. self.send_request(self.set_model_client, msg): sends the model configuration request through the set_model_client.

  6. SetString.Request(): creates a request message to set the prompt string.

  7. self.send_request(self.set_prompt_client,msg): sends the model configuration request through the set_prompt_client.

  8. Empty.Request: request message used to clear the current conversation history.

  9. self.send_request(self.clear_chat_client, msg): sends the request through the clear_chat_client to reset chat context.

  10. self.create_client(Empty, 'object_transport/init_finish'): creates a service client for calling the object_transport/init_finish endpoint.

  11. init_finish.wait_for_service(): waits for the init_finish service to become available.

  12. self.send_request(self.enter_client, Trigger.Request()): sends a Trigger request through the enter_client to initiate the process.

  13. speech.play_audio: plays an audio cue.

  14. threading.Thread: launches the main logic in a new thread.

  15. Self.create_service: registers a service to notify that the initialization process is complete.

  16. self.get_logger().info: prints log messages.

(6) send_request Method

    def send_request(self, client, msg):
        future = client.call_async(msg)
        while rclpy.ok():
            if future.done() and future.result():
                return future.result()
  1. client.call_async(msg): makes an asynchronous service call.

  2. future.done() and future.result(): check if the service call is complete and retrieve the result.

(7) vllm_result_callback Method

	    def llm_result_callback(self, msg):
        self.llm_result = msg.data

This callback receives results from the agent_process/result topic and stores them in self.llm_result.

(8) wakeup_callback Method

	    def wakeup_callback(self, msg):
        self.get_logger().info('wakeup interrupt')
        self.interrupt = msg.data

This method is invoked when the user activates the system via voice to receive and process the wake-up signal from the vocal_detect/wakeup topic.

(9) move(self, position) Method

	    def move(self, position):
        self.get_logger().info('position: %s' % str(position))
        msg = SetPose2D.Request()
        if position not in position_dict:
            return False
        p = position_dict[position]
        msg.data.x = float(p[0])
        msg.data.y = float(p[1])
        msg.data.roll = p[2]
        msg.data.pitch = p[3]
        msg.data.yaw = p[4]
        self.send_request(self.set_pose_client, msg)
        return True
  1. Parameter receiving: receives a location name, such as “kitchen” and “front desk”.

  2. p = position_dict[position] Position Lookup: retrieves the corresponding coordinate and position data from the position_dict dictionary.

  3. Message Construction: creates a SetPose2D.Request() message and sets the values for x, y coordinates, and roll, pitch, yaw angles.

  4. self.send_request(self.set_pose_client, msg) Service Invocation: sends the request to the navigation controller using set_pose_client.

(10) play_audio(self) Method

	    def play_audio(self):
        msg = String()
        msg.data = self.response_text
        self.get_logger().info(f'{self.response_text}')
        while not self.play_audio_finish:
            time.sleep(0.1)
        self.play_audio_finish = False
        self.tts_text_pub.publish(msg)
        self.response_text = ''
        # time.sleep(20)

This method plays the text content stored in self.response_text.

  1. msg = String() Message Creation: initializes a String type message.

  2. msg.data = self.response_text Logging: logs the content to be played.

  3. while not self.play_audio_finish Waiting: enters a loop that waits for the previous audio playback to complete.

  4. self.play_audio_finish = False State Reset: resets the playback status with self.play_audio_finish = False.

  5. Self.tts_text_pub.publish(msg) Publishing: Publishes the message to the tts_text_pub topic to trigger text-to-speech conversion.

  6. self.response_text = ‘’: clears self.response_text. This method enables the robot to deliver voice feedback, allowing it to communicate with the user through speech.

(11) reach_goal_callback(self, msg) Method

	    def reach_goal_callback(self, msg):
        self.get_logger().info('reach goal')
        self.reach_goal = msg.data
  1. Parameter Receiving: receives a Boolean message indicating whether the robot has reached its destination.

  2. State Update: updates the internal navigation status by assigning the message data to self.reach_goal.

    This callback handles feedback from the navigation controller and informs the system when the robot completes a movement task.

(12) vision(self, query) Method

	    def vision(self, query):
        self.controller.run_action('horizontal')
        msg = SetContent.Request()
        if language == 'Chinese':
            msg.api_key = stepfun_api_key
            msg.base_url = stepfun_base_url
            msg.model = stepfun_vllm_model
        else:
            msg.api_key = vllm_api_key
            msg.base_url = vllm_base_url
            msg.model = vllm_model
        msg.prompt = VLLM_PROMPT
        msg.query = query
        self.get_logger().info('vision: %s' % query)
        res = self.send_request(self.set_vllm_content_client, msg)
        self.controller.run_action('init')
        return res.message

This method processes visual queries using the Vision-Language Large Model (VLLM).

  1. Parameter Receiving: accepts a query string such as “What do you see?”.

  2. Message Construction: creates a SetContent.Request() message containing the API key, base URL, model name, prompt string, and query content.

  3. Service Invocation: sends the request to the vision-language large model service via set_vllm_content_client.

  4. Response Return: returns the response message from the VLLM.

    This method equips the robot with visual perception capabilities, allowing it to understand and describe its visual surroundings.

(13) play_audio_finish_callback(self, msg) Method

	    def play_audio_finish_callback(self, msg):
        # msg = SetBool.Request()
        # msg.data = True
        # self.send_request(self.awake_client, msg)
        # msg = SetInt32.Request()
        # msg.data = 1
        # self.send_request(self.set_mode_client, msg)
        self.play_audio_finish = msg.data

This method handles the callback notification indicating the completion of audio playback.

Parameter Receiving: receives a Boolean message that indicates whether audio playback is complete.

State Update: updates the playback status by assigning the message data to self.play_audio_finish.

Wake-up Configuration: constructs a SetBool.Request() message to enable the wake-up function.

Mode Configuration: constructs a SetInt32.Request() message to set the interaction mode to 1.

This method handles post-playback processing and prepares the system for the next user command, supporting continuous interaction.

(14) process Method

    def process(self):
        first = True
        while self.running:
            if self.llm_result:
                self.interrupt = False
                msg = String()
                if 'action' in self.llm_result: # If a corresponding action is returned, extract and process it (如果有对应的行为返回那么就提取处理)
                    result = json.loads(self.llm_result[self.llm_result.find('{'):self.llm_result.find('}')+1])
                    if 'response' in result:
                        msg.data = result['response']
                        self.tts_text_pub.publish(msg)
                    if 'action' in result:
                        action = result['action']
                        self.get_logger().info(f'vllm action: {action}')
                        for a in action:
                            if 'move' in a:
                                self.reach_goal = False
                                res = eval(f'self.{a}')
                                if res:
                                    while not self.reach_goal:
                                        if self.interrupt:
                                            self.get_logger().info('interrupt')
                                            break
                                        # self.get_logger().info('waiting for reach goal')
                                        time.sleep(0.01)
                            elif 'vision' in a:
                                res = eval(f'self.{a}')
                                self.response_text = res
                                self.get_logger().info(f'vllm response: {res}')
                            elif 'play_audio' in a:
                                eval(f'self.{a}')
                                while not self.play_audio_finish:
                                    time.sleep(1)
  1. Check LLM Result if self.llm_result:    check whether self.llm_result contains content. If not empty, it indicates that a new result from the LLM needs to be processed. Otherwise, the system enters a short sleep to wait for new results.

  2. Check for Action Commands if ‘action’ in self.llm_result:    If the returned result contains the “action” field, it means that in addition to the response text, there are action commands to execute.

  3. Extract and Parse JSON Data result = json.loads(self.llm_result[self.llm_result.find(‘{‘):self.llm_result.find(‘}’)+1]): use string search to locate the start and end of the JSON section, and parse it into a dictionary using json.loads to extract fields such as “response” and “action”.

  4. Handle Text Response and Execute Actions If the parsed result contains “response”, assign the content to a message and publish it to the tts_text_pub topic to trigger TTS playback. If “action” is present, iterate through the action list: a. Move command: If the action string contains “move”, first set self.reach_goal to False, call the corresponding move function using eval to execute dynamically, and then loop until self.reach_goal becomes True, which indicats that the robot has reached the goal. During this period, monitor self.interrupt to allow interruption. b. Vision detection: If “vision” is present, execute the corresponding vision function, assign the returned result to self.response_text, and log the result. c. Audio playback: If “play_audio” is present, directly call the corresponding audio playback function.

  5. Interrupt Check During each action execution, continuously monitor the self.interrupt flag. If an interrupt is triggered while waiting or executing an action, the current action is interrupted and the corresponding loop is exited in time.

  6. Mark as Complete and Switch Mode Regardless of whether actions are included, after processing the current LLM result, set self.action_finish to True and clear self.llm_result. Later in the loop, when both self.play_audio_finish and self.action_finish are True, reset these two flags and send a mode-switch request (via set_mode_client), entering the next state. Finally, when self.running becomes False, the loop exits and the ROS system is shut down.

(15) main Method

	def main():
    node = VLLMNavigation('vllm_navigation')
    executor = MultiThreadedExecutor()
    executor.add_node(node)
    executor.spin()
    node.destroy_node()
  1. Create an instance of the VLLMObjectTransport node.

  2. A multithreaded executor is used to handle the node’s tasks.

  3. Call executor.spin() to start processing ROS events.

  4. Upon shutdown, the node is properly destroyed using node.destroy_node().

25.2.7 Intelligent Transport in Embodied AI Applications

The large model used in this lesson operates online, requiring a stable network connection for the main controller in use.

  • Overview of Operation

Once the program starts, Circular Microphone Array(here after called the voice device.) will announce: “I’m Ready.”

To activate the voice device, speak the designated wake words: “hello hiwonder.” Upon successful activation, the voice device will respond with “I’m Here.” Once activated, you can control the robot by voice—issue a command like “Put the red block into the blue box.” Upon receiving a command, the terminal displays the recognized speech content. The voice device then verbally responds with a generated answer and the robot simultaneously executes the corresponding action.

  • Preparation

1. Version Confirmation

Before starting this feature, verify Version Confirmation that the correct microphone configuration is set in the system.

2. Configuring the Large Model API-KEY

Open a new command-line terminal and enter the following command to access the configuration file.

vim /home/ubuntu/ros2_ws/src/large_models/large_models/large_models/config.py

Refer to 25.1.1 Obtain and Configure the Large Model API Key to obtain the large-model API-KEY: paste the vision model (from the OpenRouter website) into the vllm_api_key parameter, and the LLM model (from the OpenAI website) into the llm_api_key parameter, as indicated by the red boxes in the figure below.

3. Grasp and Placement Calibration

Before starting the intelligent handling process, you need to adjust the grasping behavior to ensure accurate performance. If the robotic arm fails to pick up the colored blocks during the demo or operation, you can follow the steps below to perform grasp calibration. This allows you to adjust the pickup area using program commands.

  1. Open a new command-line terminal and run the following command to stop the auto-start service of the app:

    sudo systemctl stop start_app_node.service
    
  2. Enter the command to start grip-position calibration:

    ros2 launch large_models_examples automatic_pick.launch.py debug:=pick
    
  3. Wait until the program finishes loading and you hear the voice prompt “I’m Ready.”

  4. The robotic arm will perform a calibration grasping action and place the object at the gripper’s pickup location. Once the robotic arm returns to its original posture, use the mouse to left-click and drag a bounding box around the target object on the screen. Once the object is correctly identified, the system will automatically calibrate the pickup position for that specific target.

  5. The calibration process for placing objects is the same as for grasping. To begin placement calibration, run the following command:

    ros2 launch large_models_examples automatic_pick.launch.py debug:=place
    

4. Navigation Map Construction

Before enabling this feature, a map must be created in advance. Please refer to “19 Mapping and Navigation / 19.1 Mapping Tutorial” for detailed instructions on how to build the map.

  • Enabling and Disabling the Feature

Note

Command input is case-sensitive and space-sensitive.

The robot must be connected to the Internet, either in STA (LAN) mode or AP (direct connection) mode via Ethernet.

  1. Double-click to launch the command bar, enter the command, and press Enter to disable the auto-start service.

  2. Enter the command to disable the auto-start service of the mobile app.

    sudo systemctl stop start_app_node.service
    
  3. Click the command-line terminal on the left side of the system interface and enter the command in the launch bar. If no map exists yet, first build one by following “19. Mapping & Navigation Course / 19.1 Mapping Course.” After the map is created, re-compile the slam package with the command below.

    cd ~/ros2_ws && colcon build --event-handlers console_direct+ --cmake-args -DCMAKE_BUILD_TYPE=Release --symlink-install --packages-select slam
    
  4. Enter the following command and press Enter to launch the real-time detection feature.

    ros2 launch large_models_examples vllm_navigation_transport.launch.py map:=map_01
    
  5. After opening the map in RViz, click the “2D Pose Estimate” icon to set the robot’s initial position.

  6. When the command line shows the output below and the device announces “Ready,” it means the microphone array has finished initializing and the YOLOv8 model has also been loaded. At this point you can say the wake word: “Hello, HiWonder.”

  7. When the terminal displays the corresponding output shown in the figure and the voice device responds with “I’m here”, it indicates successful activation. The system will begin recording the user’s voice command.

    Speak a command such as “Put the red square in the blue box.” The voice device will capture the speech, and the large language model will process the command.

  8. Upon successful recognition by the speech recognition service of cloud-based large speech model, the parsed command will be displayed under the publish_asr_result output in the terminal.

  9. Upon receiving user input shown in the figure, the terminal will display output indicating that the cloud-based large language model has been successfully invoked. The model will interpret the command, generate a language response, and execute a corresponding action based on the meaning of the command.

Note

The response is automatically generated by the model. While the semantic content is accurate, the wording and structure may vary due to randomness in language generation.

  1. The robot first moves to the “red square,” picks it up.

  2. Then travels to the “blue box” and drops the square inside.

  3. When the terminal shows the output shown in the figure indicating the end of one interaction cycle, the system is ready for the next round. To initiate another interaction, repeat step 4 by speaking the wake words again.

  4. To exit the feature, press Ctrl+C in the terminal. If the feature does not exit immediately, press Ctrl+C multiple times.

  • Project Outcome

Once the mode is active you can phrase the command however you like—for example, “Put the red square into the blue box.” The robot will navigate to the red square, grasp it, continue to the blue-box waypoint, and place the square inside.

  • Modifying Navigation Location

To change the navigation points, edit the file at the path below:

~/ros2_ws/src/large_models_examples/large_models_examples/navigation_transport/vllm_navigation_transport.py

  1. Begin by launching the program and displaying the map in rviz. Then, click on “2D Goal Pose” in the rviz interface to set the desired navigation target on the map.

  1. Return to the command terminal and check the target location parameter for the publication:

Locate the corresponding section of the code shown below, and fill in your target location parameters after the appropriate location name.

In the program, each location is defined as a target navigation point relative to the map’s origin, which corresponds to the robot’s starting position during the mapping process. Each navigation point includes five parameters:

x: position on the x-axis (meters)

y: position on the y-axis (meters)

roll: rotation around the x-axis (degrees)

pitch: rotation around the y-axis (degrees)

yaw: rotation around the z-axis (degrees)

For example, to define the position of “green square,” the parameters can be set to [0.0, 0.45, 0.0, 0.0, 90.0], indicating that the robot will navigate to a location offset from the map origin by those values.

For example, given the quaternion: Quaternion(x=0.0, y=0.0, z=-0.5677173914973032, w=0.8232235197025761). After conversion to Euler angles (roll, pitch, yaw), the result is approximately: roll ≈ 0°, pitch ≈ 0°, yaw ≈ -69.3°

  • Program Brief Analysis

1. Launch File Analysis

File Path:

/home/ubuntu/ros2_ws/src/large_models_examples/large_models_examples/navigation_transport/vllm_navigation_transport.launch.py

(1) Import Libraries

import os
from ament_index_python.packages import get_package_share_directory

from launch_ros.actions import Node
from launch_ros.actions import PushRosNamespace
from launch import LaunchDescription, LaunchService
from launch.substitutions import LaunchConfiguration
from launch.launch_description_sources import PythonLaunchDescriptionSource
from launch.actions import DeclareLaunchArgument, IncludeLaunchDescription, GroupAction, OpaqueFunction, TimerAction, ExecuteProcess
  1. os: used for handling file paths and operating system-related functions.

  2. ament_index_python.packages.get_package_share_directory: retrieves the share directory path of ROS 2 package.

  3. launch_ros.actions.Node: used to define ROS 2 nodes.

  4. launch.substitutions.LaunchConfiguration: retrieves parameter values defined in the Launch file.

  5. LaunchDescription, LaunchService: used to define and start the Launch file.

  6. launch_description_sources PythonLaunchDescriptionSource: enables the inclusion of other Launch files.

  7. launch.actions.IncludeLaunchDescription, DeclareLaunchArgument, OpaqueFunction: used to define actions and arguments within the Launch file.

(2) Definition of the launch_setup Function

def launch_setup(context):
    slam_package_path = get_package_share_directory('slam')
    navigation_package_path = get_package_share_directory('large_models_examples')
    large_models_package_path = get_package_share_directory('large_models')
    
    mode = LaunchConfiguration('mode', default=1)
    mode_arg = DeclareLaunchArgument('mode', default_value=mode)
    map_name = LaunchConfiguration('map', default='map_01').perform(context)
    robot_name = LaunchConfiguration('robot_name', default=os.environ['HOST'])
    master_name = LaunchConfiguration('master_name', default=os.environ['MASTER'])

    map_name_arg = DeclareLaunchArgument('map', default_value=map_name)
    master_name_arg = DeclareLaunchArgument('master_name', default_value=master_name)
    robot_name_arg = DeclareLaunchArgument('robot_name', default_value=robot_name)

    navigation_controller_launch = IncludeLaunchDescription(
        PythonLaunchDescriptionSource(os.path.join(navigation_package_path, 'large_models_examples/navigation_transport/navigation_transport.launch.py')),
        launch_arguments={
            'map': map_name,
            'debug': 'false',
            'robot_name': robot_name,
            'master_name': master_name,
        }.items(),
    )

    large_models_launch = IncludeLaunchDescription(
        PythonLaunchDescriptionSource(
            os.path.join(large_models_package_path, 'launch/start.launch.py')),
        launch_arguments={'mode': mode}.items(),
    )

    vllm_navigation_transport_node = Node(
        package='large_models_examples',
        executable='vllm_navigation_transport',
        output='screen',
    )

    return [
            mode_arg,
            map_name_arg, 
            master_name_arg,
            robot_name_arg,
            navigation_controller_launch,
            large_models_launch,
            vllm_navigation_transport_node
            ]
  1. This function includes the navigation_transport.launch.py file from the large_models_examples package, which launches the navigation-to-goal functionality.

  2. Large_models_launch: includes the start.launch.py Launch file from the large_models package using IncludeLaunchDescription and passes the mode argument to it.

  3. vllm_navigation_transport_node: defines a ROS 2 node from the large_models package, executes the executable files from vllm_navigation_transport, and prints the node’s output to the screen.

  4. The function returns a list of all defined launch actions.

(3) Definition of the generate_launch_description Function

def generate_launch_description():
    return LaunchDescription([
        OpaqueFunction(function = launch_setup)
    ])
  1. This function is responsible for generating the complete launch description.

  2. The launch_setup function is incorporated using OpaqueFunction.

(4) Main Program Entry

if __name__ == '__main__':
    # Create a LaunchDescription object(创建一个LaunchDescription对象)
    ld = generate_launch_description()

    ls = LaunchService()
    ls.include_launch_description(ld)
    ls.run()
  1. ld = generate_launch_description() generates the launch description object.

  2. ls = LaunchService() creates the launch service object.

  3. ls.include_launch_description(ld) adds the launch description to the service.

  4. ls.run() starts the service and execute all launch actions.

2. Python File Analysis

File Path:

/home/ubuntu/ros2_ws/src/large_models_examples/large_models_examples/navigation_transport/vllm_navigation_transport.py

(1) Import the Necessary Libraries

import re
import cv2
import time
import json
import rclpy
import queue
import threading
import numpy as np
from speech import speech
from rclpy.node import Node
from cv_bridge import CvBridge
from sensor_msgs.msg import Image
from std_msgs.msg import String, Bool
from std_srvs.srv import Trigger, SetBool, Empty

from large_models.config import *
from large_models_msgs.srv import SetModel, SetContent, SetString, SetInt32

from interfaces.srv import SetPose2D, SetPoint, SetBox
from rclpy.executors import MultiThreadedExecutor
from rclpy.callback_groups import ReentrantCallbackGroup
  1. cv2: Utilized for image processing and display using OpenCV.

  2. json: used for handling data in JSON format.

  3. time: manages execution delays and time-related operations.

  4. queue: handles image queues between threads.

  5. rclpy: provides tools for creating and communicating between ROS 2 nodes.

  6. threading: enables multithreading for concurrent task processing.

  7. numpy (np): supports matrix and vector operations.

  8. std_srvs.srv: contains standard ROS service types, used to define standard service.

  9. **std_msgs.msg:**contains standard ROS message types.

  10. rclpy: provides tools for creating and communicating between ROS 2 nodes.

  11. rclpy.node.Node: base class for ROS 2 nodes.

  12. rclpy.callback_groups.ReentrantCallbackGroup: supports concurrent callback handling.

  13. rclpy.executors.MultiThreadedExecutor: multithreaded executor in ROS 2 for handling concurrent tasks.

  14. rclpy.node: node class in ROS 2.

  15. speech: module related to large model voice interaction.

  16. large_models_msgs.srv: custom service types for large models.

  17. large_models.config: configuration file for large models.

(2) PROMPT String

    '''
else:
    position_dict = {
            "green cube": [0.0, 0.45, 0.0, 0.0, 90.0],
            "red box": [1.2, 0.55, 0.0, 0.0, 0.0],
            "red cube": [0.25, 0.35, 0.0, 0.0, 90.0],
            "green box": [1.2, -0.9, 0.0, 0.0, -62.0],
            "blue cube": [0.0, 0.5, 0.0, 0.0, -180.0],
            "blue box": [-0.2, -0.75, 0.0, 0.0, -90.0],
            "origin": [0.0, 0.0, 0.0, 0.0, 0.0]}

    LLM_PROMPT = '''
You are a language expert who precisely interprets user commands.

**Skills
- Treat each pick-and-place as one composite action.
- Strong logical reasoning; understand object descriptors.
- Accurately decompose which objects to pick and which to place.

**Requirements & Constraints
- Map the users request to the available functions and output those calls.
- Bundle multiple actions into a JSON array.
- Add a concise (1030 characters), witty, ever-changing feedback message for the sequence.
- Each transport has two phases: move to the pick location, then move to the place location.
- If no move target is specified, omit the move call.
- Valid locations: blue cube, blue box, red cube, red box, green cube, green box, origin.
- Only cubes with red, green, or blue faces count as "cubes"; ignore the ground.
- Dont include any extra text (e.g. task complete).
- Output only this JSON structure:
{"action":"xx", "response":"xx"}

**Available Functions
- pick('red cube')
- place('red box')
- move('origin')

**Example
Input: Put the red cube into its matching box
Output:
{"action": ["move('red cube')", "pick('red cube')", "move('red box')", "place('red box')"], "response": "Right away—on the move!"}
Input: Put the blue cube into the blue box
Output: {"action": ["move('blue cube')","pick('blue cube')", "move('blue box')", "place('blue box')"], "response": "OK, start the task"}
    '''

It defines three prompt strings (PROMPT) used to guide how user commands and image data are processed. It specifies the recognition task logic and expected output format.

(3) VLLMObjectTransport Class

	class VLLMNavigation(Node):
    def __init__(self, name):
        rclpy.init()
        super().__init__(name)
        
        self.action = []
        self.response_text = ''
        self.llm_result = ''
        self.action_finish = False
        self.transport_action_finish = False
        self.play_audio_finish = False
        # self.llm_result = '{\'action\':[\'move(\"前台\")\', \'vision(\"大门有没有关\")\', \'move(\"原点\")\', \'play_audio()\'], \'response\':\'马上!\'}'
        self.running = True
        self.reach_goal = False
        self.interrupt = False
        self.bridge = CvBridge()
        self.image_queue = queue.Queue(maxsize=2)
        timer_cb_group = ReentrantCallbackGroup()
        self.client = speech.OpenAIAPI(api_key, base_url)
        self.tts_text_pub = self.create_publisher(String, 'tts_node/tts_text', 1)
        self.create_subscription(Image, 'automatic_pick/image_result', self.image_callback, 1)
        self.create_subscription(String, 'agent_process/result', self.llm_result_callback, 1)
        self.create_subscription(Bool, 'vocal_detect/wakeup', self.wakeup_callback, 1)
        self.create_subscription(Bool, 'tts_node/play_finish', self.play_audio_finish_callback, 1, callback_group=timer_cb_group)
        self.create_subscription(Bool, 'navigation_controller/reach_goal', self.reach_goal_callback, 1)
        self.create_subscription(Bool, 'automatic_pick/action_finish', self.action_finish_callback, 1)
        self.awake_client = self.create_client(SetBool, 'vocal_detect/enable_wakeup')
        self.awake_client.wait_for_service()
        self.set_mode_client = self.create_client(SetInt32, 'vocal_detect/set_mode')
        self.set_mode_client.wait_for_service()
        self.set_model_client = self.create_client(SetModel, 'agent_process/set_model')
        self.set_model_client.wait_for_service()
        self.set_prompt_client = self.create_client(SetString, 'agent_process/set_prompt')
        self.set_prompt_client.wait_for_service()
        self.set_vllm_content_client = self.create_client(SetContent, 'agent_process/set_vllm_content')
        self.set_vllm_content_client.wait_for_service()
        self.set_pose_client = self.create_client(SetPose2D, 'navigation_controller/set_pose')
        self.set_pose_client.wait_for_service()
        # self.set_target_client = self.create_client(SetPoint, 'automatic_pick/set_target_color')
        # self.set_target_client.wait_for_service()
        self.set_box_client = self.create_client(SetBox, 'automatic_pick/set_box')
        self.set_box_client.wait_for_service()
        self.set_pick_client = self.create_client(Trigger, 'automatic_pick/pick')
        self.set_pick_client.wait_for_service()
        self.set_place_client = self.create_client(Trigger, 'automatic_pick/place')
        self.set_place_client.wait_for_service()
        self.timer = self.create_timer(0.0, self.init_process, callback_group=timer_cb_group)
  1. rclpy.init(): initializes the ROS 2 system.

  2. super().init(name): calls the initializer of the parent class (Node) to create a ROS 2 node.

  3. self.llm_result:stores results returned from LLM.

  4. self.running: used to control the runtime state of the program.

  5. self.transport_finished:indicates whether the transport task has been completed.

  6. self.tts_text_pub: publishes TTS (Text-to-Speech) text to a ROS topic.

  7. self.create_subscription: subscribes to LLM results and the transport completion status.

  8. self.lock: threading lock used for synchronization in multi-threaded operations.

  9. self.client: speech API client used to communicate with the voice service.

  10. self.tts_text_pub: publishes TTS messages to a ROS topic.

  11. self.asr_pub: publishes speech recognition results to a ROS topic.

  12. self.create_subscription: subscribes to LLM result and transport status topics.

  13. self.create_client: creates multiple service clients for calling various functionalities.

  14. self.timer: creates a timer used to trigger the initialization process.

(4) get_node_state Method

	    def get_node_state(self, request, response):
        return response

return response: returns a response object.

(5) init_process Method

	    def init_process(self):
        self.timer.cancel()
        
        msg = SetModel.Request()
        msg.model = llm_model
        msg.model_type = 'llm'
        msg.api_key = api_key 
        msg.base_url = base_url
        self.send_request(self.set_model_client, msg)

        msg = SetString.Request()
        msg.data = LLM_PROMPT
        self.send_request(self.set_prompt_client, msg)
        
        init_finish = self.create_client(Empty, 'navigation_controller/init_finish')
        init_finish.wait_for_service()
        speech.play_audio(start_audio_path)
        threading.Thread(target=self.process, daemon=True).start()
        self.create_service(Empty, '~/init_finish', self.get_node_state)
        self.get_logger().info('\033[1;32m%s\033[0m' % 'start')
  1. self.timer.cancel(): cancels the timer.

  2. SetModel.Request(): creates a request message to set the LLM model.

  3. SetModel.Request: creates a request for SetModel service.

  4. msg.model_type = ‘llm’: specifies the model type as ‘llm’.

  5. self.send_request(self.set_model_client, msg): sends the model configuration request through the set_model_client.

  6. SetString.Request: creates a request message to set the prompt string.

  7. self.send_request(self.set_prompt_client,msg): sends the model configuration request through the set_prompt_client.

  8. Empty.Request: request message used to clear the current conversation history.

  9. self.send_request(self.clear_chat_client, msg): sends the request through the clear_chat_client to reset chat context.

  10. self.create_client(Empty, 'object_transport/init_finish'): creates a service client for calling the object_transport/init_finish endpoint.

  11. init_finish.wait_for_service(): waits for the init_finish service to become available.

  12. self.send_request(self.enter_client, Trigger.Request()): sends a Trigger request through the enter_client to initiate the process.

  13. speech.play_audio: plays an audio cue.

  14. threading.Thread: launches the main logic in a new thread.

  15. self.create_service: creates a service to signal the completion of initialization.

  16. self.get_logger().info: prints log messages.

(6) send_request Method

	    def send_request(self, client, msg):
        future = client.call_async(msg)
        while rclpy.ok():
            if future.done() and future.result():
                return future.result()
  1. client.call_async(msg): makes an asynchronous service call.

  2. future.done() and future.result(): check if the service call is complete and retrieve the result.

(7) vllm_result_callback Method

	    def llm_result_callback(self, msg):
        self.llm_result = msg.data

This callback receives results from the agent_process/result topic and stores them in self.vllm_result.

(8) get_object_position Method

	    def get_object_position(self, query, image):
        msg = SetContent.Request()
        if language == 'Chinese':
            msg.model = stepfun_vllm_model
            msg.api_key = stepfun_api_key
            msg.base_url = stepfun_base_url
        else:
            msg.api_key = vllm_api_key
            msg.base_url = vllm_base_url
            msg.model = vllm_model
        msg.prompt = VLLM_PROMPT
        msg.query = query
        msg.image = self.bridge.cv2_to_imgmsg(image, "bgr8")
         
        self.get_logger().info('vision: %s' % query)
        self.get_logger().info('send image')
        res = self.send_request(self.set_vllm_content_client, msg)
        vllm_result = res.message
        self.get_logger().info('vllm_result: %s' % vllm_result)
        if 'object' in vllm_result: 
            if vllm_result.startswith("```") and vllm_result.endswith("```"):
                vllm_result = vllm_result.strip("```").replace("json\n", "").strip()
            # self.get_logger().info('vllm_result: %s' % vllm_result)
            vllm_result = json.loads(vllm_result[vllm_result.find('{'):vllm_result.find('}')+1])
            box = vllm_result['xyxy']
            if box:
                h, w = image.shape[:2]
                if language == 'Chinese':
                    box = self.client.data_process(box, w, h)
                    self.get_logger().info('box: %s' % str(box))
                else:
                    box = [int(box[0] * w), int(box[1] * h), int(box[2] * w), int(box[3] * h)]
                cv2.rectangle(image, (box[0], box[1]), (box[2], box[3]), (255, 0, 0), 2, 1)
            return box
        else:
            msg = String()
            msg.data = vllm_result
            self.tts_text_pub.publish(vllm_result)
            return []
  1. query:a query request.

  2. image: the input image data.

  3. offset: offset of the placement position.

  4. SetContent.Request: request message used to set the LLM content.

  5. self.get_logger().info: prints log messages.

  6. self.send_request: sends the service request.

  7. res.message: extracts the message content from the service response.

  8. if 'object' in vllm_result: checks whether the result contains object-related information.

  9. if vllm_result.startswith(“```”) and vllm_result.endswith(“```”): checks the message format.

  10. vllm_result.strip(“```”).replace(“json\n”, “”).strip()**: cleans up the message content, removes the code block markers (```), strips the “json\n” prefix, trims leading and trailing whitespace, and extracts a pure JSON string.

  11. json.loads(message): parses the cleaned JSON string into a Python dictionary.

  12. vllm_result.message: extracts the message the response.

  13. box = self.client.data_process(box, w, h): processes the bounding box for the placement position.

  14. self.tts_text_pub.publish(msg): publishes the final message.

(9) pick Method

  1. Trigger Pick Preparation

self.send_request(self.set_pick_client, Trigger.Request()): sends a trigger request to the pick service,

which may cause the robotic arm to enter pick-ready status.

  1. Acquire Image

image = self.image_queue.get(block=True): retrieves the latest image from the image queue.

block=True: means the method will wait until an image is available before proceeding.

  1. Detect Target Object

box = self.get_object_position(query, image): calls the get_object_position method to detect the object

based on the query and input image, returning bounding box coordinates.

  1. Handle Detection Result

if box: checks whether the object was successfully detected.

If successful, the bounding box data is recorded for further processing.

  1. Create Bounding Box Request

msg = SetBox.Request(): creates a request to set the bounding box coordinates.

Set the bounding box coordinates:

msg.x_min = box[0]

msg.y_min = box[1]

msg.x_max = box[2]

msg.y_max = box[3]

  1. Send Bounding Box to Pick Node

self.log_info("Step 5: Sending bounding box to automatic_pick node"): logs the operation of sending the bounding box.

self.send_request(self.set_box_client, msg): sends the bounding box request to the pick node

so it knows where the object is located for grasping.

(10) place Method

The logic and structure are analogous to the pick method.

(12) process Method

    def process(self):
        while self.running:
            if self.llm_result:
                self.interrupt = False
                msg = String()
                if 'action' in self.llm_result: # If a corresponding action is returned, extract and process it (如果有对应的行为返回那么就提取处理)
                    result = eval(self.llm_result[self.llm_result.find('{'):self.llm_result.find('}')+1])
                    if 'response' in result:
                        msg.data = result['response']
                        self.tts_text_pub.publish(msg)
                    if 'action' in result:
                        action = result['action']
                        self.get_logger().info(f'vllm action: {action}')
                        for a in action:
                            if 'move' in a: 
                                self.reach_goal = False
                                res = eval(f'self.{a}')
                                if res:
                                    while not self.reach_goal:
                                        # if self.interrupt:
                                            # self.get_logger().info('interrupt')
                                            # break
                                        # self.get_logger().info('waiting for reach goal')
                                        time.sleep(0.01)
                                else:
                                    self.get_logger().info('cannot move to %s' % a)
                                    break
                            elif 'pick' in a or 'place' in a:
                                time.sleep(3.0)
                                eval(f'self.{a}')
                                self.transport_action_finish = False
                                while not self.transport_action_finish:
                                    # if self.interrupt:
                                        # self.get_logger().info('interrupt')
                                        # break
                                    time.sleep(0.01)
                            # if self.interrupt:
                                # self.get_logger().info('interrupt')
                                # break
                else: # If there is no corresponding action, only respond (没有对应的行为,只回答)
                    msg.data = self.llm_result
                    self.tts_text_pub.publish(msg)
                self.action_finish = True
                self.llm_result = ''
            else:
                time.sleep(0.01)
            if self.play_audio_finish and self.action_finish:
                self.play_audio_finish = False
                self.action_finish = False
                msg = SetBool.Request()
                msg.data = True
                self.send_request(self.awake_client, msg)
                # msg = SetInt32.Request()
                # msg.data = 2
                # self.send_request(self.set_mode_client, msg)
        rclpy.shutdown()
  1. while self.running: this loop keeps running as long as self.running is set to True.

  2. if self.vllm_result: checks whether self.vllm_result is empty, which is the result from vllm model.

  3. self.interrupt = False: resets the interrupt flag.

  4. if ‘action’ in self.llm_result: checks if the result contains an action command.

  5. result = eval(self.llm_result[…]): parses the JSON result.

  6. if ‘response’ in result: checks if the result includes a response message.

  7. msg.data = result[‘response’]: set response text.

  8. self.tts_text_pub.publish(msg): publishes the response text.

  9. if ‘action’ in result: checks whether the result contains an action list.

  10. action = result[‘action’]: retrieves the action list.

  11. if ‘move’ in a: checks whether it is a move action.

  12. self.reach_goal = False: resets the “goal reached” flag.

  13. eval(f'self.{a}'): executes the move command.

  14. while not self.reach_goal: waits until the target is reached.

  15. elif ‘pick’ in a or ‘place’ in a: checks whether it is a pick or place action.

  16. eval(f’self.{a}’): executes the pick or place command.

  17. self.transport_action_finish = False: resets the transport action completion flag.

  18. while not self.transport_action_finish: waits until the action is completed.

  19. Interrupt Handling: if self.interrupt checks whether the process has been interrupted.

  20. No action response handling else: # No corresponding action found, only respond. handling process when no action is found.

  21. Finish Handlling self.action_finish = True: marks the action as completed.

  22. Waiting Handling else: time.sleep(0.01): short sleep.

  23. Mode Switching if self.play_audio_finish and self.action_finish: checks if both audio playback and action execution are complete.

  24. Shutdown Handlingrclpy.shutdown(): shut down the ROS 2 system.

  25. for a in action: iterates over the action list and execute each action.

(13) main Method

	def main():
    node = VLLMNavigation('vllm_navigation')
    executor = MultiThreadedExecutor()
    executor.add_node(node)
    executor.spin()
    node.destroy_node()
  1. Create an instance of the VLLMObjectTransport node.

  2. A multithreaded executor is used to handle the node’s tasks.

  3. Call executor.spin() to start processing ROS events.

  4. Upon shutdown, the node is properly destroyed using node.destroy_node().