# 9. Robotic Arm Depth Camera Application ## 9.1 Depth Map Pseudo-Color Processing ### 9.1.1 Program Flow First, initialize the node and obtain the RGB image and depth map topic messages. Next, process the image to generate the pseudo-color image. Finally, transmit both the color image and pseudo-color image. ### 9.1.2 Operation Steps > [!NOTE] > > When entering commands, be sure to use correct case and spacing. You can use the Tab key to auto-complete keywords. 1) Click the terminal icon in the system desktop to open a command-line window. Enter the following command and press **Enter** to stop the auto-start service: ``` ~/.stop_ros.sh ``` 2) Enter the following command and press **Enter** to start the feature. ``` ros2 launch example get_depth_rgb_img.launch.py ``` 3) To stop the program, press **Ctrl + C** in the terminal window. If the program does not stop immediately, repeat this step until it terminates. 4) After completing the feature, you need to start the APP service. **Otherwise, future APP features may not function properly.** In the terminal, enter the following command and press **Enter** to start the app service. Wait a few seconds for it to initialize. ``` ros2 launch bringup bringup.launch.py ``` 5) Once it is successfully started, the robotic arm will return to its initial pose, and the buzzer will beep once. ### 9.1.3 Project Outcome Once the feature starts, the robotic arm will send the RGB color image and depth image to the terminal. The color will change corresponding to the distance indicated in the depth map. ### 9.1.4 Program Brief Analysis * **Launch File Analysis** (1) launch_setup Function {lineno-start=9} ``` def launch_setup(context): compiled = os.environ['need_compile'] if compiled == 'True': sdk_package_path = get_package_share_directory('sdk') peripherals_package_path = get_package_share_directory('peripherals') else: sdk_package_path = '/home/ubuntu/ros2_ws/src/driver/sdk' peripherals_package_path = '/home/ubuntu/ros2_ws/src/peripherals' sdk_launch = IncludeLaunchDescription( PythonLaunchDescriptionSource( os.path.join(sdk_package_path, 'launch/armpi_ultra.launch.py')), ) depth_camera_launch = IncludeLaunchDescription( PythonLaunchDescriptionSource( os.path.join(peripherals_package_path, 'launch/depth_camera.launch.py')), ) get_depth_rgb_img_node = Node( package='example', executable='get_depth_rgb_img', output='screen', ) return [depth_camera_launch, sdk_launch, get_depth_rgb_img_node, ] ``` Loads the **launch/armpi_ultra.launch.py** file from the SDK package to start the underlying control services of the robotic arm, such as servo drivers and kinematics computation. Loads the **launch/depth_camera.launch.py** file from the peripherals package to start the depth camera node, which provides RGB and depth image data. Defining the target node, it declares get_depth_rgb_img_node, and launches the get_depth_rgb_img executable from the example package. This node is used to acquire and process RGB and depth images from the depth camera, with logs output to the screen. (2) generate_launch_description Function {lineno-start=40} ``` def generate_launch_description(): return LaunchDescription([ OpaqueFunction(function = launch_setup) ]) ``` Creates and returns a LaunchDescription object, calling launch_setup via OpaqueFunction as the standard entry point for the ROS 2 launch file. (3) Main Function {lineno-start=45} ``` if __name__ == '__main__': # Create a LaunchDescription object(创建一个LaunchDescription对象) ld = generate_launch_description() ls = LaunchService() ls.include_launch_description(ld) ls.run() ``` Creates a LaunchDescription object and a LaunchService service. The launch description is added to the service and executed, enabling the startup of the entire system manually. * **Python File Analysis** (1) Import the Necessary Libraries {lineno-start=5} ``` import os import cv2 import time import rclpy import queue import threading import numpy as np import sdk.fps as fps import mediapipe as mp import message_filters from rclpy.node import Node from cv_bridge import CvBridge from std_srvs.srv import SetBool from std_srvs.srv import Trigger from sensor_msgs.msg import Image, CameraInfo from rclpy.executors import MultiThreadedExecutor from servo_controller_msgs.msg import ServosPosition from rclpy.callback_groups import ReentrantCallbackGroup from servo_controller.bus_servo_control import set_servo_position ``` **import sdk.fps as fps**: Imports the custom fps module, used for frame rate calculation. **import mediapipe as mp**: Imports the mediapipe library for image processing and human pose estimation. **import message_filters**: Imports the message_filters module for synchronized message subscriptions. **from servo_controller_msgs.msg import ServosPosition**: Imports the ServosPosition message type from the servo_controller_msgs.msg package, which is used to control the servo positions. **from rclpy.callback_groups import ReentrantCallbackGroup**: Imports ReentrantCallbackGroup from rclpy.callback_groups for handling reentrant callbacks in multithreaded environments. **from servo_controller.bus_servo_control import set_servo_position**: Imports the set_servo_position function from servo_controller.bus_servo_control module to set servo positions. (2) GetRgbAndDepthNode Class Initialization Function {lineno-start=26} ``` def __init__(self, name): rclpy.init() super().__init__(name) self.running = True self.fps = fps.FPS() self.image_queue = queue.Queue(maxsize=2) self.joints_pub = self.create_publisher(ServosPosition, '/servo_controller', 1) self.client = self.create_client(Trigger, '/controller_manager/init_finish') self.client.wait_for_service() # self.client = self.create_client(SetBool, '/depth_cam/set_ldp_enable') # self.client.wait_for_service() rgb_sub = message_filters.Subscriber(self, Image, '/depth_cam/rgb/image_raw') depth_sub = message_filters.Subscriber(self, Image, '/depth_cam/depth/image_raw') self.get_logger().info(f"1") info_sub = message_filters.Subscriber(self, CameraInfo, '/depth_cam/depth/camera_info') self.get_logger().info(f"2") # Synchronize timestamps, allowing 0.02s tolerance(同步时间戳,时间允许有误差在0.02s) sync = message_filters.ApproximateTimeSynchronizer([rgb_sub, depth_sub, info_sub], 3, 2) sync.registerCallback(self.multi_callback) # Execute feedback function (执行反馈函数) timer_cb_group = ReentrantCallbackGroup() self.timer = self.create_timer(0.0, self.init_process, callback_group=timer_cb_group) ``` Initializes the ROS node, sets the running state flag running, and create a frame rate counter fps. Creates a thread-safe image queue image_queue to buffer synchronized image data. Creates a servo control publisher joints_pub and wait for the robotic arm controller service to start. Uses message_filters to create subscribers for RGB images, depth images, and camera intrinsic parameters. Synchronizes their timestamps with ApproximateTimeSynchronizer, allowing 0.02s tolerance, and registers the multi_callback function to process synchronized data. Starts an initialization timer named timer for subsequent setup operations. (3) init_process Method {lineno-start=51} ``` def init_process(self): self.timer.cancel() msg = SetBool.Request() msg.data = False # self.send_request(self.client, msg) set_servo_position(self.joints_pub, 1.5, ((1, 500), (2, 500), (3, 330), (4, 900), (5, 700), (6, 500))) time.sleep(1.5) threading.Thread(target=self.main, daemon=True).start() self.create_service(Trigger, '~/init_finish', self.get_node_state) self.get_logger().info('\033[1;32m%s\033[0m' % 'start') ``` Cancels the initialization timer and resets the robotic arm servos to their initial positions, completed within 1.5 seconds. Starts the main logic thread via the main method, creates the ~/init_finish service to return the node readiness status, and outputs a startup success log. (4) get_node_state Function {lineno-start=65} ``` def get_node_state(self, request, response): response.success = True return response ``` Service callback function that returns the successful node initialization status response.success = True. (5) multi_callback Function {lineno-start=75} ``` def multi_callback(self, ros_rgb_image, ros_depth_image, depth_camera_info): if self.image_queue.full(): # If the queue is full, discard the oldest image(如果队列已满,丢弃最旧的图像) self.image_queue.get() # Put the image into the queue(将图像放入队列) self.image_queue.put((ros_rgb_image, ros_depth_image, depth_camera_info)) ``` Synchronized data callback function that stores the synchronized RGB image, depth image, and camera intrinsics into the image queue, discarding the oldest data if the queue is full, to ensure the main logic thread receives time-aligned multi-source data. (6) main Method {lineno-start=83} ``` def main(self): while self.running: try: ros_rgb_image, ros_depth_image, depth_camera_info = self.image_queue.get(block=True, timeout=1) except queue.Empty: if not self.running: break else: continue try: rgb_image = np.ndarray(shape=(ros_rgb_image.height, ros_rgb_image.width, 3), dtype=np.uint8, buffer=ros_rgb_image.data) #self.get_logger().info(f"rgb_image shape before crop: {rgb_image.shape}") #rgb_image = rgb_image[0:440,] #self.get_logger().info(f"rgb_image shape after crop: {rgb_image.shape}") depth_image = np.ndarray(shape=(ros_depth_image.height, ros_depth_image.width), dtype=np.uint16, buffer=ros_depth_image.data) #self.get_logger().info(f"depth_image shape: {depth_image.shape}") h, w = depth_image.shape[:2] depth = np.copy(depth_image).reshape((-1, )) depth[depth<=0] = 0 sim_depth_image = np.clip(depth_image, 0, 4000).astype(np.float64) sim_depth_image = sim_depth_image / 2000.0 * 255.0 # bgr_image = cv2.cvtColor(rgb_image, cv2.COLOR_RGB2BGR) # Remove this line(移除这行代码) depth_color_map = cv2.applyColorMap(sim_depth_image.astype(np.uint8), cv2.COLORMAP_JET) if len(depth_color_map.shape) == 2: depth_color_map = cv2.cvtColor(depth_color_map, cv2.COLOR_GRAY2BGR) result_image = np.concatenate([rgb_image, depth_color_map], axis=1) cv2.imshow("depth", result_image) key = cv2.waitKey(1) if key != -1: self.running = False except Exception as e: self.get_logger().info('error: ' + str(e)) ``` Main logic thread that continuously processes image queue data, reads synchronized RGB and depth images, converting them into NumPy arrays. Processes the depth image, removes invalid values (≤ 0), maps the depth range of 0–4000 to 0–255, and applies the JET colormap to generate a pseudo-colored depth image. Concatenates the RGB image with the pseudo-colored depth image and displays them in an OpenCV depth window. Listens for keyboard input, terminating the program upon any key press. ## 9.2 Distance Ranging ### 9.2.1 Program Flow First, initialize the node and obtain the RGB image and depth map topic messages. Next, process the image to obtain the pixel distance. Publish the processed color image and the pseudo-color image, and allow mouse clicks to read or clear the distance at the clicked point. ### 9.2.2 Operation Steps > [!NOTE] > > When entering commands, be sure to use correct case and spacing. You can use the Tab key to auto-complete keywords. 1) Click the terminal icon in the system desktop to open a command-line window. Enter the following command and press **Enter** to stop the auto-start service: ``` ~/.stop_ros.sh ``` 2) Enter the following command and press **Enter** to start the feature. ``` ros2 launch example distance_measure.launch.py ``` 3) To stop the program, press **Ctrl + C** in the terminal window. If the program does not stop immediately, repeat this step until it terminates. 4) After completing the feature, you need to start the APP service. **Otherwise, future APP features may not function properly.** In the terminal, enter the following command and press **Enter** to start the app service. Wait a few seconds for it to initialize. ``` ros2 launch bringup bringup.launch.py ``` 5) Once it is successfully started, the robotic arm will return to its initial pose, and the buzzer will beep once. ### 9.2.3 Project Outcome Once the feature is activated, the robotic arm streams both the RGB image and the depth image to the screen. On the depth image, the nearest point and its distance are highlighted. By left-clicking with the mouse, you can mark a point and obtain its distance. A double left-click or a middle-click cancels the mark and restores the minimum-distance measurement. ### 9.2.4 Program Brief Analysis * **launch File Analysis** (1) launch_setup Function {lineno-start=9} ``` def launch_setup(context): compiled = os.environ['need_compile'] if compiled == 'True': sdk_package_path = get_package_share_directory('sdk') peripherals_package_path = get_package_share_directory('peripherals') else: sdk_package_path = '/home/ubuntu/ros2_ws/src/driver/sdk' peripherals_package_path = '/home/ubuntu/ros2_ws/src/peripherals' sdk_launch = IncludeLaunchDescription( PythonLaunchDescriptionSource( os.path.join(sdk_package_path, 'launch/armpi_ultra.launch.py')), ) depth_camera_launch = IncludeLaunchDescription( PythonLaunchDescriptionSource( os.path.join(peripherals_package_path, 'launch/depth_camera.launch.py')), ) distance_measure_node = Node( package='example', executable='distance_measure', output='screen', ) return [depth_camera_launch, sdk_launch, distance_measure_node, ] ``` Including external launch file: Loads the launch/armpi_ultra.launch.py file from the SDK package to start the underlying control services of the robotic arm, such as servo drivers and kinematics computation. Loads the launch/depth_camera.launch.py file from the peripherals package to start the depth camera node, which provides RGB image and depth data. Defining the target node: Declares distance_measure_node to launch the distance_measure executable from the example package. This node implements the distance measurement function such as calculating the distance between the target and the camera, with log output directed to the screen. (2) generate_launch_description Function {lineno-start=40} ``` def generate_launch_description(): return LaunchDescription([ OpaqueFunction(function = launch_setup) ]) ``` Creates and returns a LaunchDescription object, calling launch_setup via OpaqueFunction as the standard entry point for the ROS 2 launch file. (3) Main Function {lineno-start=45} ``` if __name__ == '__main__': # Create a LaunchDescription object(创建一个LaunchDescription对象) ld = generate_launch_description() ls = LaunchService() ls.include_launch_description(ld) ls.run() ``` Creates a LaunchDescription object and a LaunchService service. The launch description is added to the service and executed, enabling the startup of the entire system manually. * **Python File Analysis** (1) Import the Necessary Libraries {lineno-start=5} ``` import os import cv2 import time import rclpy import queue import threading import numpy as np import sdk.fps as fps import mediapipe as mp import message_filters from rclpy.node import Node from cv_bridge import CvBridge from std_srvs.srv import SetBool from std_srvs.srv import Trigger from sensor_msgs.msg import Image, CameraInfo from rclpy.executors import MultiThreadedExecutor from servo_controller_msgs.msg import ServosPosition from rclpy.callback_groups import ReentrantCallbackGroup from servo_controller.bus_servo_control import set_servo_position ``` **import sdk.fps as fps**: Imports the custom fps module, used for calculating and tracking frame rates. **import mediapipe as mp**: Imports the MediaPipe library for processing computer vision tasks, such as pose estimation. **import message_filters**: Imports the message_filters, which is used to synchronize multiple sensor messages, such as depth images and RGB images. **from servo_controller_msgs.msg import ServosPosition**: Imports the ServosPosition message type from the servo_controller_msgs.msg package, which is used to control the servo positions. **from rclpy.callback_groups import ReentrantCallbackGroup**: Imports ReentrantCallbackGroup from rclpy.callback_groups, which enables reentrant callback groups, allowing multiple threads to execute callbacks concurrently. **from servo_controller.bus_servo_control import set_servo_position**: Imports the set_servo_position function from servo_controller.bus_servo_control module to set servo target positions. (2) DistanceMeasureNode Class Initialization Function {lineno-start=25} ``` class DistanceMeasureNode(Node): def __init__(self, name): rclpy.init() super().__init__(name) self.running = True self.fps = fps.FPS() self.image_queue = queue.Queue(maxsize=2) self.joints_pub = self.create_publisher(ServosPosition, '/servo_controller', 1) # Servo control(舵机控制) self.client = self.create_client(Trigger, '/controller_manager/init_finish') self.client.wait_for_service() # self.client = self.create_client(SetBool, '/depth_cam/set_ldp_enable') # self.client.wait_for_service() rgb_sub = message_filters.Subscriber(self, Image, '/depth_cam/rgb/image_raw') depth_sub = message_filters.Subscriber(self, Image, '/depth_cam/depth/image_raw') # Synchronize timestamps, allowing 0.02s tolerance(同步时间戳,时间允许有误差在0.02s) sync = message_filters.ApproximateTimeSynchronizer([rgb_sub, depth_sub], 4, 0.5) sync.registerCallback(self.multi_callback) # Register callback function (执行反馈函数) timer_cb_group = ReentrantCallbackGroup() self.timer = self.create_timer(0.0, self.init_process, callback_group=timer_cb_group) self.target_point = None self.last_event = 0 cv2.namedWindow("depth") cv2.setMouseCallback('depth', self.click_callback) threading.Thread(target=self.main, daemon=True).start() ``` Initializes the ROS node, sets the running state flag running, and create a frame rate counter fps. Creates a thread-safe image queue image_queue and a servo control publisher joints_pub, and waits for the robotic arm controller service to start. Uses message_filters to create subscribers for RGB images and depth images. Synchronizes their timestamps with ApproximateTimeSynchronizer, allowing 0.5s tolerance, and registers the multi_callback function to process synchronized data. Starts the initialization timer timer, sets up the mouse callback function for interactively selecting measurement points, and launches the main logic thread via main method. (3) init_process Function {lineno-start=55} ``` def init_process(self): self.timer.cancel() msg = SetBool.Request() msg.data = False #self.send_request(self.client, msg) #set_servo_position(self.joints_pub, 1.5, ((1, 500), (2, 500), (3, 330), (4, 900), (5, 700), (6, 500))) set_servo_position(self.joints_pub, 1.5, ((6, 500), (5, 600), (4, 785), (3, 110), (2, 500), (1, 210))) time.sleep(1) # threading.Thread(target=self.main, daemon=True).start() self.get_logger().info('\033[1;32m%s\033[0m' % 'start') ``` Cancels the initialization timer and resets the robotic arm servos to their initial positions, completed within 1.5 seconds, with log output directed to the screen. (4) multi_callback Function {lineno-start=69} ``` def multi_callback(self, ros_rgb_image, ros_depth_image): if self.image_queue.full(): # If the queue is full, discard the oldest image(如果队列已满,丢弃最旧的图像) self.image_queue.get() # Put the image into the queue(将图像放入队列) self.image_queue.put((ros_rgb_image, ros_depth_image)) ``` Synchronized data callback function that stores the synchronized RGB image and depth image into the image queue, discarding the oldest data if the queue is full, to ensure the main logic thread receives time-aligned image pairs. (5) click_callback Function {lineno-start=77} ``` def click_callback(self, event, x, y, flags, params): if event == cv2.EVENT_RBUTTONDOWN or event == cv2.EVENT_MBUTTONDOWN or event == cv2.EVENT_LBUTTONDBLCLK: self.target_point = None if event == cv2.EVENT_LBUTTONDOWN and self.last_event != cv2.EVENT_LBUTTONDBLCLK: if x >= 640: self.target_point = (x - 640, y) else: self.target_point = (x, y) self.last_event = event ``` Mouse interaction callback: Left-click: Sets the target point for measurement. Clicking on either the RGB image or depth map is supported, with coordinates automatically mapped. Right-click / middle-click or double left-click: Clears the target point and reverts to automatic nearest-point measurement. Calls the client's asynchronous request method call_async, sends a message, and returns a future object representing the result. (6) main Method {lineno-start=93} ``` def main(self): while self.running: try: ros_rgb_image, ros_depth_image = self.image_queue.get(block=True, timeout=1) except queue.Empty: if not self.running: break else: self.get_logger().info('\033[1;31m%s\033[0m' % 'sdsdfsd') continue try: rgb_image = np.ndarray(shape=(ros_rgb_image.height, ros_rgb_image.width, 3), dtype=np.uint8, buffer=ros_rgb_image.data) depth_image = np.ndarray(shape=(ros_depth_image.height, ros_depth_image.width), dtype=np.uint16, buffer=ros_depth_image.data) h, w = depth_image.shape[:2] depth = np.copy(depth_image).reshape((-1, )) depth[depth<=0] = 55555 min_index = np.argmin(depth) min_y = min_index // w min_x = min_index - min_y * w if self.target_point is not None: min_x, min_y = self.target_point sim_depth_image = np.clip(depth_image, 0, 2000).astype(np.float64) / 2000 * 255 depth_color_map = cv2.applyColorMap(sim_depth_image.astype(np.uint8), cv2.COLORMAP_JET) txt = 'Dist: {}mm'.format(depth_image[min_y, min_x]) cv2.circle(depth_color_map, (int(min_x), int(min_y)), 8, (32, 32, 32), -1) cv2.circle(depth_color_map, (int(min_x), int(min_y)), 6, (255, 255, 255), -1) cv2.putText(depth_color_map, txt, (11, 380), cv2.FONT_HERSHEY_PLAIN, 2.0, (32, 32, 32), 6, cv2.LINE_AA) cv2.putText(depth_color_map, txt, (10, 380), cv2.FONT_HERSHEY_PLAIN, 2.0, (240, 240, 240), 2, cv2.LINE_AA) # Color space conversion(颜色空间转换) bgr_image = rgb_image # No conversion needed here(无需转换) # bgr_image = cv2.cvtColor(rgb_image, cv2.COLOR_RGB2BGR) # Uncomment if conversion is required(如需转换可取消注释) # Ensure consistent dimensions(确保尺寸一致) depth_color_map = cv2.resize(depth_color_map, (rgb_image.shape[1], rgb_image.shape[0])) # Ensure depth_color_map is in BGR format(确保 depth_color_map 是 BGR 格式) if len(depth_color_map.shape) == 2: depth_color_map = cv2.cvtColor(depth_color_map, cv2.COLOR_GRAY2BGR) cv2.circle(bgr_image, (int(min_x), int(min_y)), 8, (32, 32, 32), -1) cv2.circle(bgr_image, (int(min_x), int(min_y)), 6, (255, 255, 255), -1) cv2.putText(bgr_image, txt, (11, h - 20), cv2.FONT_HERSHEY_PLAIN, 2.0, (32, 32, 32), 6, cv2.LINE_AA) cv2.putText(bgr_image, txt, (10, h - 20), cv2.FONT_HERSHEY_PLAIN, 2.0, (240, 240, 240), 2, cv2.LINE_AA) self.fps.update() # bgr_image = self.fps.show_fps(bgr_image) result_image = np.concatenate([bgr_image, depth_color_map], axis=1) result_image = self.fps.show_fps(result_image) cv2.imshow("depth", result_image) key = cv2.waitKey(1) except Exception as e: self.get_logger().info('error: ' + str(e)) rclpy.shutdown() ``` Main logic thread that continuously processes image queue data, reads synchronized RGB and depth images, converting them into NumPy arrays. Measurement logic: If no target point is manually selected, automatically detects the nearest point in the depth image with the minimum depth value. If a target is selected, uses the depth value of that point. Depth visualization: Maps the depth range of 0–2000 mm to a pseudocolor image JET colormap, draws a marker at the measurement point, and displays the distance text in mm. Image stitching: Horizontally concatenates the RGB image and the pseudocolor depth image, overlays FPS, and displays them in an OpenCV window named depth, with real-time interaction support. ## 9.3 Depth Map Conversion ### 9.3.1 Program Flow First, initialize the node and obtain the RGB image and depth map topic messages. Next, process the image to integrate the RGB image and depth map into RGBD image. Then convert the RGBD image into point cloud data. Lastly, crop and transmit the point cloud data. ### 9.3.2 Operation Steps > [!NOTE] > > When entering commands, be sure to use correct case and spacing. You can use the Tab key to auto-complete keywords. 1) Click the terminal icon in the system desktop to open a command-line window. Enter the following command and press **Enter** to stop the auto-start service: ``` ~/.stop_ros.sh ``` 2) Enter the following command and press **Enter** to start the feature. ``` ros2 launch example rgb_depth_to_pointcloud.launch.py ``` 3) To stop the program, press **Ctrl + C** in the terminal window. If the program does not stop immediately, repeat this step until it terminates. 4) After completing the feature, you need to start the APP service . **Otherwise, future APP features may not function properly.** In the terminal, enter the following command and press **Enter** to start the app service. Wait a few seconds for it to initialize. ``` ros2 launch bringup bringup.launch.py ``` 5) Once it is successfully started, the robotic arm will return to its initial pose, and the buzzer will beep once. ### 9.3.3 Project Outcome Once the feature is enabled, the robotic arm converts the cropped point cloud data into a depth image and feeds it into the visualization window. In the returned display, the depth image is shown. You can press and hold the left mouse button or use the scroll wheel to rotate and view the point cloud from different angles, and use the scroll wheel to zoom in for a closer look. ### 9.3.4 Program Brief Analysis * **launch File Analysis** (1) launch_setup Function {lineno-start=9} ``` def launch_setup(context): compiled = os.environ['need_compile'] if compiled == 'True': sdk_package_path = get_package_share_directory('sdk') peripherals_package_path = get_package_share_directory('peripherals') else: sdk_package_path = '/home/ubuntu/ros2_ws/src/driver/sdk' peripherals_package_path = '/home/ubuntu/ros2_ws/src/peripherals' sdk_launch = IncludeLaunchDescription( PythonLaunchDescriptionSource( os.path.join(sdk_package_path, 'launch/armpi_ultra.launch.py')), ) depth_camera_launch = IncludeLaunchDescription( PythonLaunchDescriptionSource( os.path.join(peripherals_package_path, 'launch/depth_camera.launch.py')), ) rgb_depth_to_pointcloud_node = Node( package='example', executable='rgb_depth_to_pointcloud', output='screen', ) return [depth_camera_launch, sdk_launch, rgb_depth_to_pointcloud_node, ] ``` Including external launch file: Loads the **launch/armpi_ultra.launch.py** file from the SDK package to start the underlying control services of the robotic arm, such as servo drivers and kinematics computation, providing hardware support for potential point cloud applications, robotic arm grasping for instance. Loads the **launch/depth_camera.launch.py** file from the peripherals package to start the depth camera node, which provides RGB image and depth data, which is the raw input for point cloud generation. Defining the target node, it declares rgb_depth_to_pointcloud_node and launches the rgb_depth_to_pointcloud executable from the example package, which converts synchronized RGB and depth data into 3D point clouds, with logs output to the screen. (2) generate_launch_description Function {lineno-start=40} ``` def generate_launch_description(): return LaunchDescription([ OpaqueFunction(function = launch_setup) ]) ``` Creates and returns a LaunchDescription object, calling launch_setup via OpaqueFunction as the standard entry point for the ROS 2 launch file. (3) Main Function {lineno-start=45} ``` if __name__ == '__main__': # Create a LaunchDescription object(创建一个LaunchDescription对象) ld = generate_launch_description() ls = LaunchService() ls.include_launch_description(ld) ls.run() ``` Creates a LaunchDescription object and a LaunchService service. The launch description is added to the service and executed, enabling the startup of the entire system manually. * **Python File Analysis** (1) Import the Necessary Libraries {lineno-start=1} ``` import os import cv2 import time import rclpy import queue import signal import threading import numpy as np from sdk import pid import open3d as o3d import message_filters from rclpy.node import Node from std_srvs.srv import Trigger from std_srvs.srv import SetBool from geometry_msgs.msg import Twist from sensor_msgs.msg import Image, CameraInfo from rclpy.executors import MultiThreadedExecutor from servo_controller_msgs.msg import ServosPosition from rclpy.callback_groups import ReentrantCallbackGroup from servo_controller.bus_servo_control import set_servo_position ``` **from sdk import pid**: Imports pid from the sdk module, used for PID controllers in control algorithms involving proportional, integral, and derivative components. **from rclpy.executors import MultiThreadedExecutor**: Imports MultiThreadedExecutor from rclpy.executors, which allows ROS 2 nodes and callbacks to run concurrently across multiple threads. **from servo_controller_msgs.msg import ServosPosition**: Imports the ServosPosition message type from the servo_controller_msgs.msg package, which is used to control the servo motor positions. **from servo_controller.bus_servo_control import set_servo_position**: Imports the set_servo_position function from servo_controller.bus_servo_control module to set servo motor positions. (2) TrackObjectNode Class Initialization Function {lineno-start=25} ``` class TrackObjectNode(Node): def __init__(self, name): rclpy.init() super().__init__(name, allow_undeclared_parameters=True, automatically_declare_parameters_from_overrides=True) signal.signal(signal.SIGINT, self.shutdown) self.scale = 4 self.proc_size = [int(640/self.scale), int(480/self.scale)] self.haved_add = False self.get_point = False self.display = True self.running = True self.pc_queue = queue.Queue(maxsize=1) self.target_cloud = o3d.geometry.PointCloud() # Point cloud to display(要显示的点云) self.t0 = time.time() self.joints_pub = self.create_publisher(ServosPosition, '/servo_controller', 0) # Servo control(舵机控制) timer_cb_group = ReentrantCallbackGroup() self.client = self.create_client(Trigger, '/controller_manager/init_finish') self.client.wait_for_service() # self.client = self.create_client(SetBool, '/depth_cam/set_ldp_enable') # self.client.wait_for_service() camera_name = 'depth_cam' rgb_sub = message_filters.Subscriber(self, Image, '/%s/rgb/image_raw' % camera_name) depth_sub = message_filters.Subscriber(self, Image, '/%s/depth/image_raw' % camera_name) info_sub = message_filters.Subscriber(self, CameraInfo, '/%s/depth/camera_info' % camera_name) # Synchronize timestamps, allowing 0.02s tolerance(同步时间戳,时间允许有误差在0.02s) sync = message_filters.ApproximateTimeSynchronizer([rgb_sub, depth_sub, info_sub], 3, 2) sync.registerCallback(self.multi_callback) # Execute feedback function (执行反馈函数) self.timer = self.create_timer(0.0, self.init_process, callback_group=timer_cb_group) ``` Initializes the ROS node and configures core parameters and components, sets the scale factor scale=4 and processing size proc_size, used to reduce computation, defines a point cloud queue pc_queue and target point cloud object target_cloud. Creates a servo control publisher joints_pub and waits for the robotic arm controller service to start. Subscribes to RGB images, depth images, and camera intrinsic data using message_filters, and synchronizes their timestamps with ApproximateTimeSynchronizer, allowing a 2-second tolerance, registering multi_callback to process the synchronized data. Starts the initialization timer timer for subsequent setup operations and registers an interrupt signal handler via shutdown method to ensure program exit. (3) init_process Method {lineno-start=61} ``` def init_process(self): self.timer.cancel() set_servo_position(self.joints_pub, 1, ((6, 500), (5, 765), (4, 915), (3, 150), (2, 500), (1, 200))) msg = SetBool.Request() msg.data = False #self.send_request(self.client, msg) threading.Thread(target=self.main, daemon=True).start() self.create_service(Trigger, '~/init_finish', self.get_node_state) self.get_logger().info('\033[1;32m%s\033[0m' % 'start') ``` Cancels the initialization timer and resets the robotic arm servos to the specified positions, completed within 1.5 seconds. Starts the main logic thread via the main method, creates the ~/init_finish service to return the node readiness status, and outputs a startup success log. (4) get_node_state Function {lineno-start=74} ``` def get_node_state(self, request, response): response.success = True return response ``` Service callback function that returns the node's initialization success status response.success = True, used to notify other nodes that this node is ready. (5) multi_callback Function {lineno-start=84} ``` def multi_callback(self, ros_rgb_image, ros_depth_image, depth_camera_info): print("multi_callback called") # Confirm callback function is called(确认回调函数被调用) try: # Convert ROS format to numpy(ros格式转为numpy) rgb_image = np.ndarray(shape=(ros_rgb_image.height, ros_rgb_image.width, 3), dtype=np.uint8, buffer=ros_rgb_image.data) depth_image = np.ndarray(shape=(ros_depth_image.height, ros_depth_image.width), dtype=np.uint16, buffer=ros_depth_image.data) print(f"RGB Image Shape: {rgb_image.shape}, Depth Image Shape: {depth_image.shape}") # Print image dimensions(打印图像尺寸) print(f"RGB Image dtype: {rgb_image.dtype}, Depth Image dtype: {depth_image.dtype}") # Print image data types(打印图像数据类型) rgb_image = cv2.resize(rgb_image, tuple(self.proc_size), interpolation=cv2.INTER_NEAREST) depth_image = cv2.resize(depth_image, tuple(self.proc_size), interpolation=cv2.INTER_NEAREST) intrinsic = o3d.camera.PinholeCameraIntrinsic(int(depth_camera_info.width / self.scale), int(depth_camera_info.height / self.scale), float(depth_camera_info.k[0] / self.scale), float(depth_camera_info.k[4] / self.scale), float(depth_camera_info.k[2] / self.scale), float(depth_camera_info.k[5] / self.scale)) print(f"Intrinsic Matrix: {intrinsic.intrinsic_matrix}") # Print camera intrinsics(打印相机内参) o3d_image_rgb = o3d.geometry.Image(rgb_image) o3d_image_depth = o3d.geometry.Image(np.ascontiguousarray(depth_image)) # rgbd_function --> point_cloud rgbd_image = o3d.geometry.RGBDImage.create_from_color_and_depth(o3d_image_rgb, o3d_image_depth, convert_rgb_to_intensity=False) # High CPU usage(cpu占用大) pc = o3d.geometry.PointCloud.create_from_rgbd_image(rgbd_image, intrinsic)#, extrinsic=extrinsic)ic) print(f"Point Cloud Size: {len(pc.points)}") # Add print statement(添加打印语句) # Remove the largest plane (ground), distance threshold 4mm, neighbor points, iterations(去除最大平面,即地面,距离阈4mm,邻点数,迭代次数) plane_model, inliers = pc.segment_plane(distance_threshold=0.05, ransac_n=10, num_iterations=50) # Keep inliers(保留内点) inlier_cloud = pc.select_by_index(inliers, invert=True) self.target_cloud.points = inlier_cloud.points self.target_cloud.colors = inlier_cloud.colors # Rotate 180 degrees for better viewing(转180度方便查看) self.target_cloud.transform(np.asarray([[1, 0, 0, 0], [0, -1, 0, 0], [0, 0, -1, 0], [0, 0, 0, 1]])) try: self.pc_queue.put_nowait(self.target_cloud) print("Point cloud enqueued") # Confirm point cloud is enqueued(确认点云已入队) except queue.Full: pass except BaseException as e: print('callback error:', e) self.t0 = time.time() ``` Core logic for processing synchronized data and converting images to point clouds. Receives synchronized RGB images, depth images, and camera intrinsics, converts them to NumPy arrays, and resizes them according to the scale factor to reduce computation. Creates an Open3D camera intrinsic object PinholeCameraIntrinsic using the camera intrinsics, and converts the RGB and depth images into an Open3D RGBD image. Generates the point cloud create_from_rgbd_image, segments planes using RANSAC, mainly to remove the floor, and retains non-plane points as the target point cloud target_cloud. Transforms the point cloud coordinates by rotating 180 degrees around the X-axis for visualization, and stores the processed point cloud in the queue pc_queue for visualization by the main thread. (6) main Method {lineno-start=138} ``` def main(self): if self.display: # Create visualization window(创建可视化窗口) vis = o3d.visualization.Visualizer() vis.create_window(window_name='point cloud', width=640, height=400, visible=1) while self.running: if not self.haved_add: if self.display: try: point_cloud = self.pc_queue.get(block=True, timeout=2) print("Point cloud dequeued") # Confirm point cloud is dequeued(确认点云已出队) except queue.Empty: continue vis.add_geometry(point_cloud) self.haved_add = True if self.haved_add: try: point_cloud = self.pc_queue.get(block=True, timeout=2) print("Point cloud dequeued") # Confirm point cloud is dequeued(确认点云已出队) except queue.Empty: continue # Refresh (刷新) points = np.asarray(point_cloud.points) print(f"Points array shape: {points.shape}") # Confirm the shape of the points array(确认 points 数组的形状) if len(points) > 0: twist = Twist() min_index = np.argmax(points[:, 2]) min_point = points[min_index] if len(point_cloud.colors) < min_index: continue point_cloud.colors[min_index] = [255, 255, 0] if self.display: vis.update_geometry(point_cloud) # o3d.io.write_point_cloud("output.ply", point_cloud) # (可选)保存点云到文件 vis.poll_events() vis.update_renderer() else: time.sleep(0.01) # Destroy all displayed geometries (销毁所有显示的几何图形) vis.destroy_window() self.get_logger().info('\033[1;32m%s\033[0m' % 'shutdown') rclpy.shutdown() ``` Main logic thread responsible for point cloud visualization and target tracking: if display is enabled display=True, create an Open3D visualization window vis. Continuously fetch point clouds from the queue. For the first fetch, add the point cloud to the visualization window. For subsequent updates, find the point with the largest Z-axis value, usually the closest point, and mark it in yellow (\[255, 255, 0\]). Refreshes the visualization window in real time until running is False, then destroys the window and shut down ROS. ## 9.4 Height Detection and Gripping ### 9.4.1 Program Flow First, initialize the node and obtain the RGB image and depth map topic messages. Then, perform image processing to determine the pixel coordinates of the tallest object within the field of view. Afterward, convert these pixel coordinates into the world coordinates of the robotic arm through a coordinate system transformation. Finally, conclude the process by planning the trajectory of the robotic arm for grasping and placing the large wooden block. ### 9.4.2 Operation Steps > [!NOTE] > > When entering commands, be sure to use correct case and spacing. You can use the Tab key to auto-complete keywords. 1) Click the terminal icon in the system desktop to open a command-line window. Enter the following command and press **Enter** to stop the auto-start service: ``` ~/.stop_ros.sh ``` 2) Enter the following command and press **Enter** to start the feature. ``` ros2 launch example remove_too_high.launch.py ``` 3) After starting the feature, press **S** in the image display window to start the robotic arm's grasping action. Press **A** to stop the robotic arm from grasping. 4) To stop the program, press **Ctrl + C** in the terminal window. If the program does not stop immediately, repeat this step until it terminates. 5) After completing the feature, you need to start the APP service. **Otherwise, future APP features may not function properly.** In the terminal, enter the following command and press **Enter** to start the app service. Wait a few seconds for it to initialize. ``` ros2 launch bringup bringup.launch.py ``` 6) Once it is successfully started, the robotic arm will return to its initial pose, and the buzzer will beep once. ### 9.4.3 Project Outcome Once the feature begins, place a 30x30mm block and a 40x40mm block within the recognition area on the map. Subsequently, the robotic arm will assess their heights and pick up the larger of the two. ### 9.4.4 Program Brief Analysis * **launch File Analysis** (1) launch_setup Function {lineno-start=9} ``` def launch_setup(context): compiled = os.environ['need_compile'] if compiled == 'True': sdk_package_path = get_package_share_directory('sdk') peripherals_package_path = get_package_share_directory('peripherals') else: sdk_package_path = '/home/ubuntu/ros2_ws/src/driver/sdk' peripherals_package_path = '/home/ubuntu/ros2_ws/src/peripherals' sdk_launch = IncludeLaunchDescription( PythonLaunchDescriptionSource( os.path.join(sdk_package_path, 'launch/armpi_ultra.launch.py')), ) depth_camera_launch = IncludeLaunchDescription( PythonLaunchDescriptionSource( os.path.join(peripherals_package_path, 'launch/depth_camera.launch.py')), ) remove_too_high_node = Node( package='example', executable='remove_too_high', output='screen', ) return [depth_camera_launch, sdk_launch, remove_too_high_node, ] ``` Loads the launch/armpi_ultra.launch.py file from the SDK package to start the underlying control services of the robotic arm, such as servo drivers and kinematics computation. Loads the launch/depth_camera.launch.py file from the peripherals package to start the depth camera node, which provides RGB image and depth data. Declares remove_too_high_node and launches the remove_too_high executable from the example package, used to process point cloud data, likely filtering out overly high points to retain the valid region, with logs output to the screen. (2) generate_launch_description Function {lineno-start=40} ``` def generate_launch_description(): return LaunchDescription([ OpaqueFunction(function = launch_setup) ]) ``` Creates and returns a LaunchDescription object, calling launch_setup via OpaqueFunction as the standard entry point for the ROS 2 launch file. (3) Main Function {lineno-start=45} ``` if __name__ == '__main__': # Create a LaunchDescription object(创建一个LaunchDescription对象) ld = generate_launch_description() ls = LaunchService() ls.include_launch_description(ld) ls.run() ``` Creates a LaunchDescription object and a LaunchService service. The launch description is added to the service and executed, enabling the startup of the entire system manually. ### 4.4.2 Python File Analysis (1) Import the Necessary Libraries {lineno-start=4} ``` import cv2 import time import rclpy import queue import threading import numpy as np import message_filters from rclpy.node import Node from std_srvs.srv import Trigger from sensor_msgs.msg import Image, CameraInfo from rclpy.executors import MultiThreadedExecutor from servo_controller_msgs.msg import ServosPosition from ros_robot_controller_msgs.msg import BuzzerState from rclpy.callback_groups import ReentrantCallbackGroup from kinematics.kinematics_control import set_pose_target from kinematics_msgs.srv import GetRobotPose, SetRobotPose from servo_controller.bus_servo_control import set_servo_position from typing import Tuple, List from cv_bridge import CvBridge from sdk import common ``` **from sdk import common, fps**: Imports the common and fps modules from the sdk package, which provide utility functions and frame rate handling tools. **from sensor_msgs.msg import Image, CameraInfo**: Imports the Image and CameraInfo message types from the ROS2 sensor_msgs package, used for processing image data and camera information. **from servo_controller_msgs.msg import ServosPosition**: Imports the ServosPosition message type from servo_controller_msgs, used to send commands to the servos. **from ros_robot_controller_msgs.msg import BuzzerState**: Imports the BuzzerState message type from ros_robot_controller_msgs, used to control the robot buzzer state. **from kinematics.kinematics_control import set_pose_target**: Imports the set_pose_target function from the kinematics_control module in the kinematics package, used to set the robot's target pose. **from kinematics_msgs.srv import GetRobotPose, SetRobotPose**: Imports the GetRobotPose and SetRobotPose service types from kinematics_msgs, used to retrieve and set the robot's pose. **from servo_controller.bus_servo_control import set_servo_position**: Imports the set_servo_position function from servo_controller.bus_servo_control, used to set the servo positions. (2) depth_pixel_to_camera Function {lineno-start=25} ``` def depth_pixel_to_camera(pixel_coords, depth, intrinsics): fx, fy, cx, cy = intrinsics px, py = pixel_coords x = (px - cx) * depth / fx y = (py - cy) * depth / fy z = depth return np.array([x, y, z]) ``` Calculates the (x, y, z) coordinates in the camera frame using the camera intrinsics (fx, fy, cx, cy) and pixel depth values depth based on the perspective projection formula. (3) RemoveTooHighObjectNode Class Initialization Function {lineno-start=35} ``` def __init__(self, name): super().__init__(name, allow_undeclared_parameters=True, automatically_declare_parameters_from_overrides=True) self.bridge = CvBridge() # CvBridge instance (CvBridge 实例) self.endpoint = None self.moving = False self.running = True self.stamp = time.time() self.start_process = False self.image_queue = queue.Queue(maxsize=2) # Load hand-eye calibration matrix (加载手眼标定矩阵) try: camera_info_data = common.get_yaml_data("/home/ubuntu/ros2_ws/src/peripherals/config/camera_info.yaml") self.hand2cam_tf_matrix = np.array(camera_info_data['hand2cam_tf_matrix']) self.get_logger().info('手眼变换矩阵加载成功.') except Exception as e: self.get_logger().warn(f'加载手眼变换矩阵失败,使用默认值: {e}') self.hand2cam_tf_matrix = np.array([ [0.0, 0.0, 1.0, -0.101], [-1.0, 0.0, 0.0, 0.01], [0.0, -1.0, 0.0, 0.05], [0.0, 0.0, 0.0, 1.0] ]) self.joints_pub = self.create_publisher(ServosPosition, '/servo_controller', 1) self.buzzer_pub = self.create_publisher(BuzzerState, '/ros_robot_controller/set_buzzer', 1) cb_group = ReentrantCallbackGroup() self.create_service(Trigger, f'~/{self.get_name()}/start', self.start_srv_callback, callback_group=cb_group) self.create_service(Trigger, f'~/{self.get_name()}/stop', self.stop_srv_callback, callback_group=cb_group) self.get_current_pose_client = self.create_client(GetRobotPose, '/kinematics/get_current_pose') self.set_pose_target_client = self.create_client(SetRobotPose, '/kinematics/set_pose_target') self.get_logger().info('Waiting for services...') self.get_current_pose_client.wait_for_service() self.set_pose_target_client.wait_for_service() self.get_logger().info('Services are ready.') rgb_sub = message_filters.Subscriber(self, Image, '/depth_cam/rgb/image_raw') depth_sub = message_filters.Subscriber(self, Image, '/depth_cam/depth/image_raw') info_sub = message_filters.Subscriber(self, CameraInfo, '/depth_cam/depth/camera_info') sync = message_filters.ApproximateTimeSynchronizer([rgb_sub, depth_sub, info_sub], 5, 0.1) sync.registerCallback(self.multi_callback) self.timer = self.create_timer(0.1, self.init_process, callback_group=cb_group) ``` Sets runtime parameters, creates servo and buzzer publishers, starts/stops service servers, and initializes a robotic arm pose service client. Uses message_filters to synchronize subscriptions to RGB images, depth images, and camera intrinsics. Starts an initialization timer. (4) init_process Function {lineno-start=83} ``` def init_process(self): self.timer.cancel() set_servo_position(self.joints_pub, 1, ((6, 500), (5, 600), (4, 825), (3, 110), (2, 500), (1, 210))) time.sleep(2) threading.Thread(target=self.main_loop, daemon=True).start() self.get_logger().info(f'\033[1;32mNode {self.get_name()} is ready.\033[0m') self.get_logger().info(f"\033[1;33mPress 's' or call 'start' service to begin processing.\033[0m") ``` Cancels the initialization timer and moves the robotic arm back to its home position. Starts the main loop thread and outputs a ready log and instructions, pressing **S** to start/stop processing. (5) start_srv_callback and stop_srv_callback Function {lineno-start=91} ``` def start_srv_callback(self, request, response): self.get_logger().info('\033[1;32mStart service called. Starting processing.\033[0m') self.start_process = True response.success = True return response def stop_srv_callback(self, request, response): self.get_logger().info('\033[1;31mStop service called. Stopping processing.\033[0m') self.start_process = False self.moving = False set_servo_position(self.joints_pub, 1, ((6, 500), (5, 600), (4, 815), (3, 110), (2, 500), (1, 210))) response.success = True return response ``` RemoveTooHighObjectNode.init_process(self) cancels the initialization timer, resets the robotic arm to its home position, starts the main loop thread, and outputs a node-ready log along with operation instructions, pressing **S** to start or stop processing. RemoveTooHighObjectNode.start_srv_callback(self, request, response) sets start_process = True to start the object detection and robotic arm picking logic, and returns a success response. (6) send_request Function {lineno-start=105} ``` def send_request(self, client, msg): future = client.call_async(msg) rclpy.spin_until_future_complete(self, future) return future.result() ``` Asynchronously calls services like robot pose services and waits for the result using rclpy.spin_until_future_complete to ensure reliable service communication. (7) multi_callback Function {lineno-start=110} ``` def multi_callback(self, ros_rgb_image, ros_depth_image, depth_camera_info): if not self.image_queue.full(): self.image_queue.put((ros_rgb_image, ros_depth_image, depth_camera_info)) ``` Stores synchronized RGB images, depth images, and camera intrinsics into a thread-safe queue image_queue for the main loop to process. (8) get_endpoint Function {lineno-start=114} ``` def get_endpoint(self): req = GetRobotPose.Request() res = self.send_request(self.get_current_pose_client, req) if res is not None: endpoint = res.pose self.endpoint = common.xyz_quat_to_mat([endpoint.position.x, endpoint.position.y, endpoint.position.z], [endpoint.orientation.w, endpoint.orientation.x, endpoint.orientation.y, endpoint.orientation.z]) return self.endpoint return None ``` Calls the get_current_pose service to obtain the end effector's position and orientation, converting it into a 4×4 transformation matrix endpoint for subsequent coordinate transformations from camera frame to robotic arm frame. (9) get_plane_valuest Function {lineno-start=124} ``` def get_plane_values(self, depth_image: np.ndarray, plane: Tuple[float, float, float, float], intrinsic_matrix: List[float]) -> np.ndarray: a, b, c, d = plane fx, fy, cx, cy = intrinsic_matrix[0], intrinsic_matrix[4], intrinsic_matrix[2], intrinsic_matrix[5] H, W = depth_image.shape u, v = np.meshgrid(np.arange(W), np.arange(H)) z = depth_image.astype(np.float32) / 1000.0 valid_mask = (z > 0.1) & (z < 2.0) x = np.zeros_like(z) y = np.zeros_like(z) x[valid_mask] = (u[valid_mask] - cx) * z[valid_mask] / fx y[valid_mask] = (v[valid_mask] - cy) * z[valid_mask] / fy plane_distances = np.full_like(z, np.inf) normal_magnitude = np.sqrt(a**2 + b**2 + c**2) if normal_magnitude > 1e-6: numerator = a * x + b * y + c * z + d plane_distances[valid_mask] = numerator[valid_mask] / normal_magnitude return plane_distances ``` Calculates the vertical distance from each valid pixel in the depth image to a reference plane using the plane equation plane and camera intrinsics intrinsic_matrix. Generates a height map where positive values indicate points above the plane and negative values indicate points below the plane. (10) robust_plane_valuest Function {lineno-start=146} ``` def robust_plane_detection(self, depth_image: np.ndarray, intrinsic_matrix: List[float], sample_ratio: float = 0.2) -> Tuple[float, float, float, float]: fx, fy, cx, cy = intrinsic_matrix[0], intrinsic_matrix[4], intrinsic_matrix[2], intrinsic_matrix[5] H, W = depth_image.shape z = depth_image.astype(np.float32) / 1000.0 valid_mask = (z > 0.2) & (z < 2.0) valid_indices = np.where(valid_mask) if len(valid_indices[0]) < 1000: return (0, 0, 1, 0) num_points = len(valid_indices[0]) num_samples = min(num_points, max(int(num_points * sample_ratio), 1000)) sample_indices = np.random.choice(num_points, num_samples, replace=False) v_s, u_s = valid_indices[0][sample_indices], valid_indices[1][sample_indices] z_s = z[v_s, u_s] x_s = (u_s - cx) * z_s / fx y_s = (v_s - cy) * z_s / fy points = np.column_stack([x_s, y_s, z_s]) try: A_matrix = np.c_[points[:, :2], np.ones(len(points))] C, _, _, _ = np.linalg.lstsq(A_matrix, points[:, 2], rcond=None) a, b, c, d = C[0], C[1], -1.0, C[2] norm = np.sqrt(a**2 + b**2 + c**2) if norm == 0: return (0, 0, 1, 0) return (a / norm, b / norm, c / norm, d / norm) except np.linalg.LinAlgError: return (0, 0, 1, 0) ``` Samples valid points from the depth image, filtering out invalid depths, and fits a plane equation Ax + By + Cz + D = 0 using least squares, returning the normalized plane coefficients. (11) pick Function {lineno-start=178} ``` def pick(self, position, angle): self.moving = True angle = angle % 90 angle = angle - 90 if angle > 45 else (angle + 90 if angle < -45 else angle) set_servo_position(self.joints_pub, 0.5, ((1, 210),)) time.sleep(0.5) if position[0] > 0.21: offset_x = self.pick_offset[0] else: offset_x = self.pick_offset[1] if position[1] > 0: offset_y = self.pick_offset[2] else: offset_y = self.pick_offset[3] offset_z = self.pick_offset[-1] target_pos = [position[0] + offset_x, position[1] + offset_y, position[2] + offset_z + 0.05] res = self.send_request(self.set_pose_target_client, set_pose_target(target_pos, 80, [-180.0, 180.0], 1.5)) if res and res.pulse: set_servo_position(self.joints_pub, 1.5, ((6, res.pulse[0]), (5, res.pulse[1]), (4, res.pulse[2]), (3, res.pulse[3]))) else: self.get_logger().warn('Pick: Failed to get pose for approach') self.moving = False return time.sleep(1.5) final_angle = 500 + int(1000 * (angle + res.rpy[-1]) / 240) set_servo_position(self.joints_pub, 0.5, ((2, final_angle),)) time.sleep(0.5) target_pos[2] -= 0.065 res = self.send_request(self.set_pose_target_client, set_pose_target(target_pos, 80, [-180.0, 180.0], 1.5)) if res and res.pulse: set_servo_position(self.joints_pub, 1.5, ((6, res.pulse[0]), (5, res.pulse[1]), (4, res.pulse[2]), (3, res.pulse[3]))) time.sleep(1.5) set_servo_position(self.joints_pub, 1.0, ((1, 600),)) time.sleep(1.0) target_pos[2] += 0.08 res = self.send_request(self.set_pose_target_client, set_pose_target(target_pos, 80, [-180.0, 180.0], 1.0)) if res and res.pulse: set_servo_position(self.joints_pub, 1.0, ((6, res.pulse[0]), (5, res.pulse[1]), (4, res.pulse[2]), (3, res.pulse[3]))) time.sleep(1.0) set_servo_position(self.joints_pub, 1.0, ((6, 150), (5, 635), (4, 900), (3, 260), (2, 500))) time.sleep(1.5) set_servo_position(self.joints_pub, 1.0, ((1, 200),)) time.sleep(1.0) set_servo_position(self.joints_pub, 1.0, ((6, 500), (5, 600), (4, 825), (3, 110), (2, 500), (1, 210))) time.sleep(1.5) self.moving = False ``` Controls the robotic arm to complete the full pick-and-place workflow: open gripper → move above target → adjust orientation → descend to pick → lift → move to placement point → release → return to home, ensuring over-height objects are successfully removed. (12) main_loop Function {lineno-start=237} ``` def main_loop(self): while self.running: try: ros_rgb, ros_depth, cam_info = self.image_queue.get(block=True, timeout=1) except queue.Empty: continue try: # Safely convert using CvBridge (使用 CvBridge 安全转换) rgb_image = self.bridge.imgmsg_to_cv2(ros_rgb, desired_encoding='rgb8') depth_image = self.bridge.imgmsg_to_cv2(ros_depth, desired_encoding='passthrough') bgr_image = cv2.cvtColor(rgb_image, cv2.COLOR_RGB2BGR) K = cam_info.k plane_coeffs = self.robust_plane_detection(depth_image, K) height_map = self.get_plane_values(depth_image, plane_coeffs, K) # Correct orientation (ensure positive height) (方向修正(保证高度为正)) MIN_OBJECT_HEIGHT_FOR_CHECK = 0.01 potential_object_heights = height_map[np.abs(height_map) > MIN_OBJECT_HEIGHT_FOR_CHECK] potential_object_heights = potential_object_heights[np.isfinite(potential_object_heights)] if potential_object_heights.size > 20 and np.median(potential_object_heights) < 0: height_map *= -1.0 MIN_OBJECT_HEIGHT = 0.015 object_mask = np.where((height_map > MIN_OBJECT_HEIGHT) & np.isfinite(height_map), 255, 0).astype(np.uint8) object_mask = cv2.erode(object_mask, np.ones((3, 3), np.uint8), iterations=1) object_mask = cv2.dilate(object_mask, np.ones((5, 5), np.uint8), iterations=1) contours, _ = cv2.findContours(object_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_NONE) display_height_map = height_map.copy() display_height_map[~np.isfinite(display_height_map)] = 0 display_height_map = np.clip(display_height_map, 0, 0.1) display_height_map = (display_height_map * 2550).astype(np.uint8) height_color_map = cv2.applyColorMap(display_height_map, cv2.COLORMAP_JET) tallest_object = None max_height = 0.0 if len(contours) > 0: for contour in contours: if cv2.contourArea(contour) < 400 or self.moving: continue mask = np.zeros(height_map.shape, dtype=np.uint8) cv2.drawContours(mask, [contour], -1, 255, -1) heights_in_contour = height_map[mask == 255] heights_in_contour = heights_in_contour[np.isfinite(heights_in_contour)] if heights_in_contour.size > 0: current_max_height = np.max(heights_in_contour) if current_max_height > max_height: max_height = current_max_height peak_coords = np.where((height_map == current_max_height) & (mask == 255)) peak_y, peak_x = peak_coords[0][0], peak_coords[1][0] tallest_object = {"height": current_max_height, "contour": contour, "peak_xy": (peak_x, peak_y)} if tallest_object and not self.moving: contour = tallest_object["contour"] cx, cy = tallest_object["peak_xy"] # Only draw contours and small dots, do not display height text (只绘制轮廓和小圆点,不显示高度文本) cv2.drawContours(bgr_image, [contour], -1, (0, 255, 0), 2) cv2.circle(bgr_image, (cx, cy), 5, (0, 0, 255), -1) PICK_HEIGHT_THRESHOLD = 0.0015 if tallest_object["height"] > PICK_HEIGHT_THRESHOLD and self.start_process: if time.time() - self.stamp > 1.0: self.stamp = time.time() depth_mm = depth_image[cy, cx] if depth_mm > 0: self.get_endpoint() if self.endpoint is not None: cam_coords = depth_pixel_to_camera((cx, cy), depth_mm / 1000.0, (K[0], K[4], K[2], K[5])) pose_end = np.matmul(self.hand2cam_tf_matrix, common.xyz_euler_to_mat(cam_coords, (0, 0, 0))) world_pose = np.matmul(self.endpoint, pose_end) pose_t, _ = common.mat_to_xyz_euler(world_pose) rect = cv2.minAreaRect(contour) threading.Thread(target=self.pick, args=(list(pose_t), rect[2]), daemon=True).start() else: self.stamp = time.time() result_image = np.concatenate([bgr_image, height_color_map], axis=1) cv2.imshow("Remove Too High Object", result_image) key = cv2.waitKey(1) & 0xFF if key == ord('s'): self.start_process = not self.start_process if self.start_process: self.get_logger().info("\033[1;32mProcessing started by keyboard.\033[0m") else: self.get_logger().info("\033[1;31mProcessing stopped by keyboard.\033[0m") elif key == ord('q'): self.running = False break except Exception as e: self.get_logger().error(f'Main loop error: {e}', throttle_duration_sec=1.0) cv2.destroyAllWindows() self.get_logger().info("Shutting down...") ``` Reads image data from the queue, computes the height map, detects over-height objects (height \> threshold), and triggers the pick thread to remove them. Visualizes results by combining RGB images with the pseudo-colored height map. Supports keyboard interaction, pressing **S** to toggle processing, press **Q** to quit. (13) main Method {lineno-start=342} ``` def main(args=None): rclpy.init(args=args) try: node = RemoveTooHighObjectNode('remove_too_high_object') executor = MultiThreadedExecutor() executor.add_node(node) executor.spin() except KeyboardInterrupt: pass finally: if 'node' in locals() and rclpy.ok(): node.destroy_node() rclpy.shutdown() ``` Initializes ROS2, creates an instance of RemoveTooHighObjectNode, and starts a multi-threaded executor for callback handling. Captures keyboard interrupts to clean up node resources and shut down ROS properly. ## 9.5 3D Space Gripping ### 9.5.1 Program Flow First, initialize the node and obtain the RGB image and depth map topic messages. Next, process the image to obtain the coordinate of the target pixel so as to track the target. Afterward, convert these pixel coordinates into the world coordinates of the robotic arm through a coordinate system transformation. Lastly, plan the trajectory for the robotic arm enabling it to place the item to corresponding position. ### 9.5.2 Operation Steps > [!NOTE] > > When entering commands, be sure to use correct case and spacing. You can use the Tab key to auto-complete keywords. 1) Click the terminal icon in the system desktop to open a command-line window. Enter the following command and press **Enter** to stop the auto-start service: ``` ~/.stop_ros.sh ``` 2) Enter the following command and press **Enter** to start the feature. ``` ros2 launch example track_and_grab.launch.py ``` 3) To stop the program, press **Ctrl + C** in the terminal window. If the program does not stop immediately, repeat this step until it terminates. 4) After completing the feature, you need to start the APP service. **Otherwise, future APP features may not function properly.** In the terminal, enter the following command and press **Enter** to start the app service. Wait a few seconds for it to initialize. ``` ros2 launch bringup bringup.launch.py ``` 5) Once it is successfully started, the robotic arm will return to its initial pose, and the buzzer will beep once. ### 5.3 Project Outcome Once the operation is started, the robotic arm raises the depth camera to face forward, detects colored blocks in the air, which is set to red by default, tracks the blocks, obtains their poses, picks them up, and places them at designated locations. > [!NOTE] > > If the item is placed within the minimal detection range, the camera cannot measure the item's distance. In this scenario, the measurement result will be set to 0, and the terminal will display 'TOO CLOSE !!!'. To resolve this issue, ensure the target block is positioned away from the camera for proper gripping. ### 5.4 Program Brief Analysis * **launch File Analysis** (1) launch_setup Function {lineno-start=9} ``` def launch_setup(context): compiled = os.environ['need_compile'] start = LaunchConfiguration('start', default='true') start_arg = DeclareLaunchArgument('start', default_value=start) color = LaunchConfiguration('color', default='red') color_arg = DeclareLaunchArgument('color', default_value=color) if compiled == 'True': sdk_package_path = get_package_share_directory('sdk') peripherals_package_path = get_package_share_directory('peripherals') else: sdk_package_path = '/home/ubuntu/ros2_ws/src/driver/sdk' peripherals_package_path = '/home/ubuntu/ros2_ws/src/peripherals' sdk_launch = IncludeLaunchDescription( PythonLaunchDescriptionSource( os.path.join(sdk_package_path, 'launch/armpi_ultra.launch.py')), ) depth_camera_launch = IncludeLaunchDescription( PythonLaunchDescriptionSource( os.path.join(peripherals_package_path, 'launch/depth_camera.launch.py')), ) track_and_grab_node = Node( package='example', executable='track_and_grab', output='screen', parameters=[{'color': color}, {'start': start}] ) return [depth_camera_launch, sdk_launch, track_and_grab_node, ] ``` Loads the **launch/armpi_ultra.launch.py** file from the SDK package to start the underlying control services of the robotic arm, such as servo drivers and kinematics computation. Loads the **launch/depth_camera.launch.py** file from the peripherals package to start the depth camera node, which provides RGB image and depth data, which is the basic input for tracking and grasping. Declares track_and_grab_node and launches the track_and_grab executable from the example package to implement the color target tracking + robotic arm picking functionality. The color and start parameters are passed to the node via parameters, allowing dynamic configuration of the tracking color and start state. Logs are output to the screen. (2) generate_launch_description Function {lineno-start=44} ``` def generate_launch_description(): return LaunchDescription([ OpaqueFunction(function = launch_setup) ]) ``` Creates and returns a LaunchDescription object, calling launch_setup via OpaqueFunction as the standard entry point for the ROS 2 launch file. (3) Main Function {lineno-start=49} ``` if __name__ == '__main__': # Create a LaunchDescription object(创建一个LaunchDescription对象) ld = generate_launch_description() ls = LaunchService() ls.include_launch_description(ld) ls.run() ``` Creates a LaunchDescription object and a LaunchService service. The launch description is added to the service and executed, enabling the startup of the entire system manually. * **Python File Analysis** (1) Import the Necessary Libraries {lineno-start=6} ``` import cv2 import math import time import rclpy import queue import signal import threading import numpy as np import message_filters from rclpy.node import Node from std_srvs.srv import SetBool from sdk import pid, common, fps from std_srvs.srv import Trigger from interfaces.srv import SetString from sensor_msgs.msg import Image, CameraInfo from rclpy.executors import MultiThreadedExecutor from servo_controller_msgs.msg import ServosPosition from rclpy.callback_groups import ReentrantCallbackGroup from kinematics.kinematics_control import set_pose_target from kinematics_msgs.srv import GetRobotPose, SetRobotPose from servo_controller.bus_servo_control import set_servo_position ``` **from kinematics_msgs.srv import SetRobotPose**: Imports the SetBool service type for boolean-type service requests. **from sdk import pid, common, fps**: Imports PID control, common utilities, and frame rate calculation functions from the sdk module. **from servo_controller_msgs.msg import ServosPosition**: Imports the ServosPosition message type from the servo_controller_msgs package, which is used to control the servo positions. **from kinematics.kinematics_control import set_pose_target**: Imports the set_pose_target function from the kinematics module, used to set the robot's target pose. **from kinematics_msgs.srv import GetRobotPose, SetRobotPose**: Imports the GetRobotPose and SetRobotPose service types from kinematics_msgs module, used to retrieve and set the robot's pose. **from servo_controller.bus_servo_control import set_servo_position**: Imports the set_servo_position function from the servo_controller.bus_servo_control module to configure servo positions. (2) depth_pixel_to_camera Function {lineno-start=29} ``` def depth_pixel_to_camera(pixel_coords, depth, intrinsics): fx, fy, cx, cy = intrinsics px, py = pixel_coords x = (px - cx) * depth / fx y = (py - cy) * depth / fy z = depth return np.array([x, y, z]) ``` Converts pixel coordinates from a depth image into 3D points in the camera coordinate frame. (3) ColorTracker Class Initialization Function {lineno-start=38} ``` def __init__(self, target_color): self.target_color = target_color self.pid_yaw = pid.PID(20.5, 1.0, 1.2) self.pid_pitch = pid.PID(20.5, 1.0, 1.2) self.yaw = 500 self.pitch = 150 ``` Initializes the color tracker, sets the target color, initializes PID controllers for pitch and yaw adjustments, and sets the initial angles. (4) proc Function {lineno-start=45} ``` def proc(self, source_image, result_image, color_ranges): h, w = source_image.shape[:2] color = color_ranges['color_range_list'][self.target_color] img = cv2.resize(source_image, (int(w/2), int(h/2))) img_blur = cv2.GaussianBlur(img, (3, 3), 3) # Gaussian blur (高斯模糊) img_lab = cv2.cvtColor(img_blur, cv2.COLOR_BGR2LAB) # Convert to LAB color space (转换到 LAB 空间源COLOR_BGR2LAB) mask = cv2.inRange(img_lab, tuple(color['min']), tuple(color['max'])) # Binarization (二值化) # Smooth edges, remove small blocks, and merge adjacent blocks (平滑边缘,去除小块,合并靠近的块) eroded = cv2.erode(mask, cv2.getStructuringElement(cv2.MORPH_RECT, (3, 3))) dilated = cv2.dilate(eroded, cv2.getStructuringElement(cv2.MORPH_RECT, (3, 3))) # Find the largest contour (找出最大轮廓) contours = cv2.findContours(dilated, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_NONE)[-2] min_c = None for c in contours: if math.fabs(cv2.contourArea(c)) < 50: continue (center_x, center_y), radius = cv2.minEnclosingCircle(c) # Minimum enclosing circle (最小外接圆) if min_c is None: min_c = (c, center_x) elif center_x < min_c[1]: if center_x < min_c[1]: min_c = (c, center_x) # If there is a contour that meets the requirement (如果有符合要求的轮廓) if min_c is not None: (center_x, center_y), radius = cv2.minEnclosingCircle(min_c[0]) # Minimum enclosing circle (最小外接圆) # Draw a circle around the detected color block to track (圈出识别的的要追踪的色块) circle_color = common.range_rgb[self.target_color] if self.target_color in common.range_rgb else (0x55, 0x55, 0x55) cv2.circle(result_image, (int(center_x * 2), int(center_y * 2)), int(radius * 2), circle_color, 2) center_x = center_x * 2 center_x_1 = center_x / w if abs(center_x_1 - 0.5) > 0.02: # If the difference is within a small range, no movement is needed (相差范围小于一定值就不用再动了) self.pid_yaw.SetPoint = 0.5 # Our goal is to keep the color block at the center of the frame (整个画面的像素宽度的 1/2 位置) self.pid_yaw.update(center_x_1) self.yaw = min(max(self.yaw + self.pid_yaw.output, 0), 1000) else: self.pid_yaw.clear() # Reset PID controller if already at the center (如果已经到达中心了就复位一下 pid 控制器) center_y = center_y * 2 center_y_1 = center_y / h if abs(center_y_1 - 0.5) > 0.02: self.pid_pitch.SetPoint = 0.5 self.pid_pitch.update(center_y_1) self.pitch = min(max(self.pitch + self.pid_pitch.output, 100), 720) else: self.pid_pitch.clear() return (result_image, (self.pitch, self.yaw), (center_x, center_y), radius * 2) else: return (result_image, None, None, 0) ``` Processes input images, detects target color blocks in the LAB color space, calculates block center positions, adjusts pitch and yaw using PID control, and returns the processed image, angles, block center coordinates, and radius. (5) TrackAndGrabNode Class Initialization Function {lineno-start=100} ``` class TrackAndGrabNode(Node): def __init__(self, name): rclpy.init() super().__init__(name, allow_undeclared_parameters=True, automatically_declare_parameters_from_overrides=True) self.fps = fps.FPS() self.moving = False self.count = 0 self.start = False self.running = True self.last_pitch_yaw = (0, 0) self.enable_disp = 1 signal.signal(signal.SIGINT, self.shutdown) # Load LAB color range configuration file (加载LAB颜色范围配置文件) self.data = common.get_yaml_data("/home/ubuntu/ros2_ws/src/app/config/lab_config.yaml") self.lab_data = self.data['/**']['ros__parameters'] try: camera_info_data = common.get_yaml_data("/home/ubuntu/ros2_ws/src/peripherals/config/camera_info.yaml") self.hand2cam_tf_matrix = camera_info_data['hand2cam_tf_matrix'] self.get_logger().info('手眼变换矩阵加载成功.') except Exception as e: self.get_logger().error(f'加载手眼变换矩阵失败: {e}') # If loading fails, set a default value or shut down the node (如果加载失败,可以设置一个默认值或直接关闭节点) self.hand2cam_tf_matrix = np.identity(4).tolist() # Set as identity matrix as backup (设置为单位矩阵作为备用) self.running = False self.last_position = (0, 0, 0) self.stamp = time.time() self.joints_pub = self.create_publisher(ServosPosition, '/servo_controller', 1) self.target_color = None self.get_current_pose_client = self.create_client(GetRobotPose, '/kinematics/get_current_pose') self.get_current_pose_client.wait_for_service() self.set_pose_target_client = self.create_client(SetRobotPose, '/kinematics/set_pose_target') self.set_pose_target_client.wait_for_service() self.create_service(Trigger, '~/start', self.start_srv_callback) self.create_service(Trigger, '~/stop', self.stop_srv_callback) self.create_service(SetString, '~/set_color', self.set_color_srv_callback) self.tracker = None self.image_queue = queue.Queue(maxsize=2) self.endpoint = None self.start_stamp = time.time() + 3 self.client = self.create_client(Trigger, '/controller_manager/init_finish') self.client.wait_for_service() rgb_sub = message_filters.Subscriber(self, Image, '/depth_cam/rgb/image_raw') depth_sub = message_filters.Subscriber(self, Image, '/depth_cam/depth/image_raw') info_sub = message_filters.Subscriber(self, CameraInfo, '/depth_cam/depth/camera_info') # Synchronize timestamps, allowing 0.02s tolerance(同步时间戳,时间允许有误差在0.02s) sync = message_filters.ApproximateTimeSynchronizer([rgb_sub, depth_sub, info_sub], 3, 0.2) sync.registerCallback(self.multi_callback) # Execute feedback function (执行反馈函数) timer_cb_group = ReentrantCallbackGroup() self.timer = self.create_timer(0.0, self.init_process, callback_group=timer_cb_group) ``` Initializes the node, sets runtime parameters and ROS communication components, including publishers, subscribers, services, loads color configuration and hand-eye transformation matrix, synchronizes subscriptions to RGB images, depth images, and camera intrinsics, and starts the initialization timer. (6) init_process Function {lineno-start=165} ``` def init_process(self): self.timer.cancel() servo_positions = ((6, 500), (5, 600), (4, 825), (3, 110), (2, 500), (1, 300)) self.get_logger().info('\033[1;36mInit Servo Positions: %s\033[0m' % str(servo_positions)) set_servo_position(self.joints_pub, 1, servo_positions) time.sleep(1) if self.get_parameter('start').value: self.target_color = self.get_parameter('color').value self.get_logger().info('\033[1;35mget_parameter color: %s\033[0m' % self.target_color) # Print the obtained color (打印获取的颜色) msg = SetString.Request() msg.data = self.target_color self.set_color_srv_callback(msg, SetString.Response()) threading.Thread(target=self.main, daemon=True).start() self.create_service(Trigger, '~/init_finish', self.get_node_state) self.get_logger().info('\033[1;32m%s\033[0m' % 'start') ``` Completes node initialization, sets the initial servo positions, starts the main loop thread, creates a service to indicate initialization completion, and outputs startup logs. (7) get_node_state Function {lineno-start=183} ``` def get_node_state(self, request, response): response.success = True return response ``` Service callback that returns the node's initialization status. (8) shutdown Function {lineno-start=187} ``` def shutdown(self, signum, frame): self.running = False ``` Handles interrupt signals, for example, **Ctrl + C**, sets the running flag to False, and stops the node. (9) set_color_srv_callback Function {lineno-start=190} ``` def set_color_srv_callback(self, request, response): self.get_logger().info('\033[1;32m%s\033[0m' % "set_color") self.target_color = request.data self.get_logger().info('\033[1;35mset_color_srv_callback color: %s\033[0m' % self.target_color) # Print the set color (打印设置的颜色) self.tracker = ColorTracker(self.target_color) self.get_logger().info('\033[1;32mset color: %s\033[0m' % self.target_color) self.start = True response.success = True response.message = "set_color" return response ``` Service callback that sets the target tracking color, initializes the color tracker, and returns a success response. (10) start_srv_callback and stop_srv_callback Function {lineno-start=201} ``` def start_srv_callback(self, request, response): self.get_logger().info('\033[1;32m%s\033[0m' % "start") self.start = True response.success = True response.message = "start" return response def stop_srv_callback(self, request, response): self.get_logger().info('\033[1;32m%s\033[0m' % "stop") self.start = False self.moving = False self.count = 0 self.last_pitch_yaw = (0, 0) self.last_position = (0, 0, 0) servo_positions = ((6, 500), (5, 600), (4, 825), (3, 110), (2, 500), (1, 210)) self.get_logger().info('\033[1;36mStop Servo Positions: %s\033[0m' % str(servo_positions)) set_servo_position(self.joints_pub, 1, servo_positions) response.success = True response.message = "stop" return response ``` start_srv_callback: Service callback that starts the tracking and gripping functionality, returning a success response. stop_srv_callback: Service callback that stops the functionality, resets the servos to their initial positions, and returns a success response. (11) send_request Function {lineno-start=222} ``` def send_request(self, client, msg): future = client.call_async(msg) while rclpy.ok(): if future.done() and future.result(): return future.result() ``` Sends a service request and blocks until a response is received, ensuring reliable service communication. (12) multi_callback Function {lineno-start=228} ``` def multi_callback(self, ros_rgb_image, ros_depth_image, depth_camera_info): if self.image_queue.full(): # If the queue is full, discard the oldest image(如果队列已满,丢弃最旧的图像) self.image_queue.get() # Put the image into the queue(将图像放入队列) self.image_queue.put((ros_rgb_image, ros_depth_image, depth_camera_info)) ``` Synchronizes and receives RGB images, depth images, and camera intrinsics, storing them in a thread-safe queue for main loop processing. (13) get_endpoint Function {lineno-start=235} ``` def get_endpoint(self): endpoint = self.send_request(self.get_current_pose_client, GetRobotPose.Request()).pose self.endpoint = common.xyz_quat_to_mat([endpoint.position.x, endpoint.position.y, endpoint.position.z], [endpoint.orientation.w, endpoint.orientation.x, endpoint.orientation.y, endpoint.orientation.z]) return self.endpoint ``` Retrieves the current pose of the robotic arm's end effector, converts it into a 4×4 transformation matrix, and facilitates coordinate transformations. (14) pick Function {lineno-start=241} ``` def pick(self, position): if position[2] < 0.2: yaw = 80 else: yaw = 30 msg = set_pose_target(position, yaw, [-180.0, 180.0], 1.0) res = self.send_request(self.set_pose_target_client, msg) if res.pulse: servo_data = res.pulse servo_positions = ((6, servo_data[0]),) self.get_logger().info('\033[1;36mPick Servo Positions 1: %s\033[0m' % str(servo_positions)) set_servo_position(self.joints_pub, 1, servo_positions) time.sleep(1) servo_positions = ((6, servo_data[0]),(5, servo_data[1]), (4, servo_data[2]),(3, servo_data[3]), (2, servo_data[4])) self.get_logger().info('\033[1;36mPick Servo Positions 2: %s\033[0m' % str(servo_positions)) set_servo_position(self.joints_pub, 1.5, servo_positions) time.sleep(1.5) servo_positions = ((1, 600),) self.get_logger().info('\033[1;36mPick Servo Positions 3: %s\033[0m' % str(servo_positions)) set_servo_position(self.joints_pub, 0.5, servo_positions) time.sleep(1) position[2] += 0.06 msg = set_pose_target(position, yaw, [-180.0, 180.0], 1.0) res = self.send_request(self.set_pose_target_client, msg) if res.pulse: servo_data = res.pulse servo_positions = ((6, servo_data[0]),(5, servo_data[1]), (4, servo_data[2]),(3, servo_data[3]), (2, servo_data[4])) self.get_logger().info('\033[1;36mPick Servo Positions 4: %s\033[0m' % str(servo_positions)) set_servo_position(self.joints_pub, 1, servo_positions) time.sleep(1) servo_positions = ((6, 500), (5, 720), (4, 900), (3, 120), (2, 500), (1, 500)) self.get_logger().info('\033[1;36mPick Servo Positions 5: %s\033[0m' % str(servo_positions)) set_servo_position(self.joints_pub, 1, servo_positions) time.sleep(1) servo_positions = ((6, 125), (5, 635), (4, 780), (3, 200), (2, 500)) self.get_logger().info('\033[1;36mPick Servo Positions 6: %s\033[0m' % str(servo_positions)) set_servo_position(self.joints_pub, 1, servo_positions) time.sleep(1) servo_positions = ((6, 125), (5,550), (4, 780), (3, 150), (2, 500)) self.get_logger().info('\033[1;36mPick Servo Positions 7: %s\033[0m' % str(servo_positions)) set_servo_position(self.joints_pub, 1.5, servo_positions) time.sleep(1.5) servo_positions = ((6, 125), (5, 550), (4, 780), (3, 170), (2, 500), (1, 200)) self.get_logger().info('\033[1;36mPick Servo Positions 8: %s\033[0m' % str(servo_positions)) set_servo_position(self.joints_pub, 1, servo_positions) time.sleep(1.5) servo_positions = ((6, 500), (5, 720), (4, 900), (3, 250), (2, 500), (1, 200)) self.get_logger().info('\033[1;36mPick Servo Positions 9: %s\033[0m' % str(servo_positions)) set_servo_position(self.joints_pub, 1, servo_positions) time.sleep(2) self.tracker.yaw = 500 self.tracker.pitch = 150 self.tracker.pid_yaw.clear() self.tracker.pid_pitch.clear() self.stamp = time.time() else: self.tracker.yaw = 500 self.tracker.pitch = 150 self.tracker.pid_yaw.clear() self.tracker.pid_pitch.clear() self.stamp = time.time() self.get_logger().info('\033[1;31m%s\033[0m' % "Out of gripping range") self.moving = False ``` Implements the robotic arm picking logic that moves servos to the target position, picks up the object, lifts it, places it at the desired location, and finally resets the arm. (15) main Method {lineno-start=309} ``` def main(self): while self.running: try: ros_rgb_image, ros_depth_image, depth_camera_info = self.image_queue.get(block=True, timeout=1) except queue.Empty: if not self.running: break else: continue try: rgb_image = np.ndarray(shape=(ros_rgb_image.height, ros_rgb_image.width, 3), dtype=np.uint8, buffer=ros_rgb_image.data) depth_image = np.ndarray(shape=(ros_depth_image.height, ros_depth_image.width), dtype=np.uint16, buffer=ros_depth_image.data) result_image = np.copy(rgb_image) h, w = depth_image.shape[:2] depth = np.copy(depth_image).reshape((-1, )) depth[depth<=0] = 55555 sim_depth_image = np.clip(depth_image, 0, 2000).astype(np.float64) sim_depth_image = sim_depth_image / 2000.0 * 255.0 bgr_image = cv2.cvtColor(rgb_image, cv2.COLOR_RGB2BGR) depth_color_map = cv2.applyColorMap(sim_depth_image.astype(np.uint8), cv2.COLORMAP_JET) if self.tracker is not None and self.moving == False and time.time() > self.start_stamp and self.start: result_image, p_y, center, r = self.tracker.proc(rgb_image, result_image, self.lab_data) if p_y is not None: servo_positions = ((6, int(p_y[1])), (4, (1000-int(p_y[0])))) self.get_logger().info('\033[1;36mTracker Servo Positions: %s\033[0m' % str(servo_positions)) set_servo_position(self.joints_pub, 0.02, servo_positions) center_x, center_y = center if center_x > w: center_x = w if center_y > h: center_y = h if abs(self.last_pitch_yaw[0] - p_y[0]) < 3 and abs(self.last_pitch_yaw[1] - p_y[1]) < 3: if time.time() - self.stamp > 2: self.stamp = time.time() roi = [int(center_y) - 5, int(center_y) + 5, int(center_x) - 5, int(center_x) + 5] roi[0] = max(0, roi[0]) roi[1] = min(h, roi[1]) roi[2] = max(0, roi[2]) roi[3] = min(w, roi[3]) roi_distance = depth_image[roi[0]:roi[1], roi[2]:roi[3]] valid_depths = roi_distance[np.logical_and(roi_distance > 0, roi_distance < 10000)] if valid_depths.size == 0: self.get_logger().warn("No valid depth data in ROI. Skipping calculation.") dist = 0.20 # Or set a default value, e.g., dist = 0.15 (或设置一个默认值,如 dist = 0.15) else: dist = round(float(np.mean(valid_depths) / 1000.0), 3) self.get_logger().info("DISTANCE: " + str(dist)) # dist += 0.015 # Object radius compensation (物体半径补偿) # dist += 0.015 # Error compensation (误差补偿) K = depth_camera_info.k self.get_endpoint() position = depth_pixel_to_camera((center_x, center_y), dist, (K[0], K[4], K[2], K[5])) position[0] -= 0.01 # RGB camera and depth camera TF have 1cm offset (rgb相机和深度相机tf有1cm偏移) pose_end = np.matmul(self.hand2cam_tf_matrix, common.xyz_euler_to_mat(position, (0, 0, 0))) # Converted end-effector relative coordinates (转换的末端相对坐标) world_pose = np.matmul(self.endpoint, pose_end) # Convert to robotic arm world coordinates (转换到机械臂世界坐标) pose_t, pose_R = common.mat_to_xyz_euler(world_pose) self.stamp = time.time() self.moving = True threading.Thread(target=self.pick, args=(pose_t,)).start() else: self.stamp = time.time() dist = depth_image[int(center_y),int(center_x)] if dist < 200: txt = "TOO CLOSE !!!" else: txt = "Dist: {}mm".format(dist) cv2.circle(result_image, (int(center_x), int(center_y)), 5, (255, 255, 255), -1) cv2.circle(depth_color_map, (int(center_x), int(center_y)), 5, (255, 255, 255), -1) cv2.putText(depth_color_map, txt, (10, 400 - 20), cv2.FONT_HERSHEY_PLAIN, 2.0, (0, 0, 0), 10, cv2.LINE_AA) cv2.putText(depth_color_map, txt, (10, 400 - 20), cv2.FONT_HERSHEY_PLAIN, 2.0, (255, 255, 255), 2, cv2.LINE_AA) self.last_pitch_yaw = p_y else: self.stamp = time.time() if self.enable_disp: #result_image = cv2.cvtColor(result_image[40:440, ], cv2.COLOR_RGB2BGR) result_image = result_image result_image = np.concatenate([result_image, depth_color_map], axis=1) cv2.imshow("depth", result_image) key = cv2.waitKey(1) if key == ord('q') or key == 27: # Exit by pressing q or esc (按q或者esc退出) self.running = False except Exception as e: self.get_logger().info('error1: ' + str(e)) rclpy.shutdown() ``` Node entry point that initializes ROS, creates a node instance, starts a multi-threaded executor to handle callbacks, captures interrupts, and cleans up resources. ## 9.6 3D Shape Sorting ### 9.6.1 Program Flow First, initialize the node and obtain the RGB image and depth map topic messages. Next, perform image processing by analyzing the number of corner points in the detected object contours to determine the shape of objects within the field of view. Subsequently, acquire the pixel coordinates of the highest object within the field of view, perform coordinate system transformation on the obtained coordinates, converting them into the world coordinates of the robotic arm. Lastly, plan the trajectory for the robotic arm enabling it to place the item to corresponding position. ### 9.6.2 Operation Steps > [!NOTE] > > When entering commands, be sure to use correct case and spacing. You can use the Tab key to auto-complete keywords. 1) Click the terminal icon in the system desktop to open a command-line window. Enter the following command and press **Enter** to stop the auto-start service: ``` ~/.stop_ros.sh ``` 2) Enter the following command and press **Enter** to start the feature. ``` ros2 launch example shape_recognition.launch.py ``` 3) To stop the program, press **Ctrl + C** in the terminal window. If the program does not stop immediately, repeat this step until it terminates. 4) After completing the feature, you need to start the APP service . **Otherwise, future APP features may not function properly.** In the terminal, enter the following command and press **Enter** to start the app service. Wait a few seconds for it to initialize. ``` ros2 launch bringup bringup.launch.py ``` 5) Once it is successfully started, the robotic arm will return to its initial pose, and the buzzer will beep once. ### 9.6.3 Project Outcome Upon initiating the feature, the robotic arm will identify cuboids, cylinders, and spheres within the field of view. It will then proceed to grasp and categorize these objects, placing them in their respective positions. ### 9.6.4 Program Brief Analysis * **launch File Analysis** (1) launch_setup Function {lineno-start=9} ``` def launch_setup(context): compiled = os.environ['need_compile'] start = LaunchConfiguration('start', default='true') start_arg = DeclareLaunchArgument('start', default_value=start) display = LaunchConfiguration('display', default='true') display_arg = DeclareLaunchArgument('display', default_value=display) if compiled == 'True': sdk_package_path = get_package_share_directory('sdk') peripherals_package_path = get_package_share_directory('peripherals') else: sdk_package_path = '/home/ubuntu/ros2_ws/src/driver/sdk' peripherals_package_path = '/home/ubuntu/ros2_ws/src/peripherals' sdk_launch = IncludeLaunchDescription( PythonLaunchDescriptionSource( os.path.join(sdk_package_path, 'launch/armpi_ultra.launch.py')), ) depth_camera_launch = IncludeLaunchDescription( PythonLaunchDescriptionSource( os.path.join(peripherals_package_path, 'launch/depth_camera.launch.py')), ) shape_recognition_node = Node( package='app', executable='shape_recognition', output='screen', parameters=[{ 'start': start, 'display': display}] ) return [start_arg, display_arg, depth_camera_launch, sdk_launch, shape_recognition_node, ] ``` Loads the **launch/armpi_ultra.launch.py** file from the SDK package to start the underlying control services of the robotic arm, such as servo drivers and kinematics computation. Loads the **launch/depth_camera.launch.py** file from the peripherals package to start the depth camera node, which provides RGB image and depth data, which is the basic input for shape recognition. Declares shape_recognition_node and launches the shape_recognition executable from the app package to implement the shape recognition functionality. The start and display parameters are passed to the node via parameters, allowing dynamic control of the start state and visualization toggle. Logs are output to the screen. (2) generate_launch_description Function {lineno-start=48} ``` def generate_launch_description(): return LaunchDescription([ OpaqueFunction(function = launch_setup) ]) ``` Creates and returns a LaunchDescription object, calling launch_setup via OpaqueFunction as the standard entry point for the ROS 2 launch file. (3) Main Function {lineno-start=53} ``` if __name__ == '__main__': # Create a LaunchDescription object(创建一个LaunchDescription对象) ld = generate_launch_description() ls = LaunchService() ls.include_launch_description(ld) ls.run() ``` Creates a LaunchDescription object and a LaunchService service. The launch description is added to the service and executed, enabling the startup of the entire system manually. * **Python File Analysis** (1) Import the Necessary Libraries {lineno-start=6} ``` import os import cv2 import time import math import rclpy import queue import signal import threading import numpy as np import message_filters from rclpy.node import Node from sdk import common, fps from interfaces.srv import SetStringList from kinematics import kinematics_control from std_srvs.srv import Trigger, SetBool from sensor_msgs.msg import Image, CameraInfo from rclpy.executors import MultiThreadedExecutor from servo_controller_msgs.msg import ServosPosition from ros_robot_controller_msgs.msg import BuzzerState from rclpy.callback_groups import ReentrantCallbackGroup from kinematics_msgs.srv import SetJointValue, SetRobotPose from kinematics.kinematics_control import set_joint_value_target from servo_controller.bus_servo_control import set_servo_position from servo_controller.action_group_controller import ActionGroupController from example.rgbd_function.include.position_change_detect import position_reorder ``` **from sdk import common, fps**: Imports the common module and frame rate fps module from the SDK, used for general utilities and performance analysis. **from kinematics import kinematics_control**: Imports the kinematics control module, used to control the robotic arm's movements. **from servo_controller_msgs.msg import ServosPosition**: Imports the ServosPosition message type from the servo controller, used to send servo position commands. **from ros_robot_controller_msgs.msg import BuzzerState**: Imports the BuzzerState message type from the robot controller, used to control the buzzer state. **from kinematics_msgs.srv import SetJointValue, SetRobotPose**: Imports the kinematics service interfaces, used to set joint values and the robot's pose. **from servo_controller.bus_servo_control import set_servo_position**: Imports the set_servo_position function from the servo controller module, used to directly set servo positions. **from servo_controller.action_group_controller import ActionGroupController**: Imports the action group controller, used for batch control of servo action groups. **from example.rgbd_function.include.position_change_detect import position_reorder**: Imports the position_reorder function, used to reorder detected positions. (2) depth_pixel_to_camera Function {lineno-start=33} ``` def depth_pixel_to_camera(pixel_coords, intrinsic_matrix): fx, fy, cx, cy = intrinsic_matrix[0], intrinsic_matrix[4], intrinsic_matrix[2], intrinsic_matrix[5] px, py, pz = pixel_coords x = (px - cx) * pz / fx y = (py - cy) * pz / fy z = pz return np.array([x, y, z]) ``` Converts pixel coordinates with depth values from a depth image into 3D coordinates in the camera frame. Requires pixel coordinates and the camera intrinsic matrix as inputs. (3) ObjectClassificationNode Class Initialization Function {lineno-start=60} ``` def __init__(self, name): rclpy.init() super().__init__(name, allow_undeclared_parameters=True, automatically_declare_parameters_from_overrides=True) self.fps = fps.FPS() self.moving = False self.count = 0 self.running = True self.start = False self.shapes = None # self.colors = None self.target_shapes = '' self.roi = [60, 380, 160, 560] self.endpoint = None self.last_position = 0, 0 self.last_object_info_list = [] signal.signal(signal.SIGINT, self.shutdown) self.language = os.environ['ASR_LANGUAGE'] self.image_queue = queue.Queue(maxsize=2) self.debug = self.get_parameter('debug').value self.joints_pub = self.create_publisher(ServosPosition, '/servo_controller', 1) self.buzzer_pub = self.create_publisher(BuzzerState, '/ros_robot_controller/set_buzzer', 1) self.create_service(Trigger, '~/start', self.start_srv_callback) self.create_service(Trigger, '~/stop', self.stop_srv_callback) self.create_service(SetStringList, '~/set_shape', self.set_shape_srv_callback) rgb_sub = message_filters.Subscriber(self, Image, '/depth_cam/rgb/image_raw') depth_sub = message_filters.Subscriber(self, Image, '/depth_cam/depth/image_raw') info_sub = message_filters.Subscriber(self, CameraInfo, '/depth_cam/depth/camera_info') # Synchronize timestamps, allowing a time error of up to 0.03s (同步时间戳, 时间允许有误差在0.03s) sync = message_filters.ApproximateTimeSynchronizer([rgb_sub, depth_sub, info_sub], 3, 0.2) sync.registerCallback(self.multi_callback) self.client = self.create_client(Trigger, '/controller_manager/init_finish') self.client.wait_for_service() # self.client = self.create_client(SetBool, '/depth_cam/set_ldp_enable') # self.client.wait_for_service() timer_cb_group = ReentrantCallbackGroup() self.set_joint_value_target_client = self.create_client(SetJointValue, '/kinematics/set_joint_value_target', callback_group=timer_cb_group) self.set_joint_value_target_client.wait_for_service() self.kinematics_client = self.create_client(SetRobotPose, '/kinematics/set_pose_target') self.kinematics_client.wait_for_service() self.controller = ActionGroupController(self.create_publisher(ServosPosition, 'servo_controller', 1), '/home/ubuntu/arm_pc/ActionGroups') ``` Initializes the node, sets runtime parameters and ROS communication components, including publishers, subscribers, and services, synchronizes subscriptions to RGB images, depth images, and camera intrinsics, starts the initialization timer, and loads the robotic arm controller. (4) init_process Function {lineno-start=108} ``` def init_process(self): self.timer.cancel() msg = SetBool.Request() msg.data = False #self.send_request(self.client, msg) self.goto_default() msg = SetStringList.Request() msg.data = ['sphere', 'cuboid', 'cylinder'] self.set_shape_srv_callback(msg, SetStringList.Response()) threading.Thread(target=self.main, daemon=True).start() self.create_service(Trigger, '~/init_finish', self.get_node_state) self.get_logger().info('\033[1;32m%s\033[0m' % 'start') ``` Completes node initialization, moves the robotic arm to its default position, starts the main loop thread, creates a service indicating initialization completion, and outputs startup logs. (5) get_node_state Function {lineno-start=126} ``` def get_node_state(self, request, response): response.success = True return response ``` Service callback that returns the node's initialization status. (6) shutdown Function {lineno-start=130} ``` def shutdown(self, signum, frame): self.running = False ``` Handles interrupt signals, for example, **Ctrl + C**, sets the running flag to False, and stops the node. (7) send_request Function {lineno-start=133} ``` def send_request(self, client, msg): future = client.call_async(msg) while rclpy.ok(): if future.done() and future.result(): return future.result() ``` Sends a service request and blocks until a response is received, ensuring reliable service communication. (8) set_shape_srv_callback Function {lineno-start=139} ``` def set_shape_srv_callback(self, request, response): self.get_logger().info('\033[1;32m%s\033[0m' % "set_shape") # self.colors = None self.shapes = request.data self.start = True response.success = True response.message = "set_shape" return response ``` Service callback that sets the target shape for recognition, such as sphere and cuboid, starts the recognition process, and returns a success response. (9) start_srv_callback and stop_srv_callback Function {lineno-start=149} ``` def start_srv_callback(self, request, response): self.get_logger().info('\033[1;32m%s\033[0m' % "start") self.start = True response.success = True response.message = "start" return response def stop_srv_callback(self, request, response): self.get_logger().info('\033[1;32m%s\033[0m' % "stop") self.start = False # self.colors = None self.shapes = None self.moving = False self.count = 0 self.target_shapes = '' self.last_position = 0, 0 self.last_object_info_list = [] response.success = True response.message = "stop" return response ``` start_srv_callback: Service callback that starts shape recognition and robotic arm grasping, returning a success response. stop_srv_callback: Service callback that stops the functionality, resets the robotic arm state and recognition parameters, and returns a success response. (10) goto_default Function {lineno-start=171} ``` def goto_default(self): msg = set_joint_value_target([500, 600, 825, 110, 500]) endpoint = self.send_request(self.set_joint_value_target_client, msg) pose_t = endpoint.pose.position pose_r = endpoint.pose.orientation set_servo_position(self.joints_pub, 1, ((5, 600), (4, 780), (3, 110), (2, 500))) time.sleep(1) set_servo_position(self.joints_pub, 1, ((6, 500),)) time.sleep(1) set_servo_position(self.joints_pub, 0.5, ((1, 210),)) time.sleep(0.5) self.endpoint = common.xyz_quat_to_mat([pose_t.x, pose_t.y, pose_t.z], [pose_r.w, pose_r.x, pose_r.y, pose_r.z]) ``` Moves the robotic arm to its default home position and adjusts all servo angles. (11) move Function {lineno-start=188} ``` def move(self, obejct_info): shape, pose_t = obejct_info[:2] self.get_logger().info('\033[1;32m%s\033[0m' % pose_t) angle = obejct_info[-1] msg = BuzzerState() msg.freq = 1900 msg.on_time = 0.2 msg.off_time = 0.01 msg.repeat = 1 self.buzzer_pub.publish(msg) time.sleep(1) if 'sphere' in shape: offset_z = self.pick_offset[-1] elif 'cylinder' in shape: offset_z = 0.03 + self.pick_offset[-1] else: offset_z = 0.02 + self.pick_offset[-1] if pose_t[0] > 0.21: offset_x = self.pick_offset[0] else: offset_x = self.pick_offset[1] if pose_t[1] > 0: offset_y = self.pick_offset[2] else: offset_y = self.pick_offset[3] pose_t[0] += offset_x pose_t[1] += offset_y pose_t[2] += offset_z msg = kinematics_control.set_pose_target(pose_t, 85) res1 = self.send_request(self.kinematics_client, msg) if res1.pulse: servo_data = res1.pulse set_servo_position(self.joints_pub, 1.5, ((6, servo_data[0]), (5, servo_data[1]), (4, servo_data[2]), (3, servo_data[3]), (2, servo_data[4]))) time.sleep(1.5) pose_t[2] -= 0.05 msg = kinematics_control.set_pose_target(pose_t, 85) res2 = self.send_request(self.kinematics_client, msg) if angle != 0: if 'sphere' in shape or ('cylinder' in shape and 'cylinder_horizontal_' not in shape): angle = 500 else: angle = angle % 180 angle = angle - 180 if angle > 90 else (angle + 180 if angle < -90 else angle) angle = 500 + int(1000 * (angle + res2.rpy[-1]) / 240) else: angle = 500 if res2.pulse: servo_data = res2.pulse set_servo_position(self.joints_pub, 0.5, ((2, angle),)) time.sleep(0.5) set_servo_position(self.joints_pub, 1, ((6, servo_data[0]), (5, servo_data[1]), (4, servo_data[2]), (3, servo_data[3]), (2, angle))) time.sleep(1) set_servo_position(self.joints_pub, 0.6, ((1, 600),)) time.sleep(0.6) if res1.pulse: servo_data = res1.pulse set_servo_position(self.joints_pub, 1, ((6, servo_data[0]), (5, servo_data[1]), (4, servo_data[2]), (3, servo_data[3]), (2, angle))) time.sleep(1) set_servo_position(self.joints_pub, 1, ((6, 500), (5, 600), (4, 780), (3, 110), (2, 500), (1, 210))) time.sleep(1) # if self.colors is None: self.get_logger().info('shape: %s' % shape.split("_")[0]) if "sphere" in shape: self.controller.run_action("target_1") if "cylinder" in shape: self.controller.run_action("target_2") if "cuboid" in shape: self.controller.run_action("target_3") set_servo_position(self.joints_pub, 1, ((6, 500), (5, 600), (4, 780), (3, 110), (2, 500), (1, 210))) time.sleep(1) self.moving = False ``` Implements the robotic arm picking logic: adjusts picking offsets based on object information, including shape, position, orientation, moves the servos to the target position to pick up the object, places it in the designated area, and finally resets the arm. (11) multi_callback Function ```python def multi_callback(self, ros_rgb_image, ros_depth_image, depth_camera_info): if self.image_queue.full(): # If the queue is full, discard the oldest image (如果队列已满,丢弃最旧的图像) self.image_queue.get() # Put the image into the queue (将图像放入队列) self.image_queue.put((ros_rgb_image, ros_depth_image, depth_camera_info)) ``` Synchronizes and receives RGB images, depth images, and camera intrinsics, storing them in a thread-safe queue for main loop processing. (12) cal_position Function {lineno-start=268} ``` def cal_position(self, x, y, depth, intrinsic_matrix): position = depth_pixel_to_camera([x, y, depth / 1000], intrinsic_matrix) pose_end = np.matmul(self.hand2cam_tf_matrix, common.xyz_euler_to_mat(position, (0, 0, 0))) world_pose = np.matmul(self.endpoint, pose_end) pose_t, pose_r = common.mat_to_xyz_euler(world_pose) return pose_t ``` Converts object coordinates from the camera coordinate system to the robotic arm's world coordinate system, using the hand–eye transformation matrix together with the arm's end-effector pose. (13) get_min_distance Function {lineno-start=276} ``` def get_min_distance(self, depth_image): ih, iw = depth_image.shape[:2] # Mask certain regions to reduce recognition conditions and improve reliability (屏蔽掉一些区域,降低识别条件,使识别跟可靠) depth_image[:, :self.roi[2]] = np.array([[1000, ] * self.roi[2]] * ih) depth_image[:, self.roi[3]:] = np.array([[1000, ] * (iw - self.roi[3])] * ih) depth_image[self.roi[1]:, :] = np.array([[1000, ] * iw] * (ih - self.roi[1])) depth_image[:self.roi[0], :] = np.array([[1000, ] * iw] * self.roi[0]) depth = np.copy(depth_image).reshape((-1,)) depth[depth <= 0] = 55555 # Distance of 0 may indicate dead zone, assign a large value (距离为0可能是进入死区,将距离赋一个大值) min_index = np.argmin(depth) # Pixel with minimum distance (距离最小的像素) min_y = min_index // iw min_x = min_index - min_y * iw min_dist = depth_image[min_y, min_x] # Get the distance to the nearest object to the camera (获取距离摄像头最近的物体的距离) return min_dist ``` Extracts the nearest object distance from the ROI region of the depth image while filtering out invalid depth values. (14) get_contours Function {lineno-start=293} ``` def get_contours(self, depth_image, min_dist): depth_image = np.where(depth_image > 210, 0, depth_image) depth_image = np.where(depth_image > min_dist + 20, 0, depth_image) # Set pixels with depth values greater than the minimum distance plus 15mm to 0 (将深度值大于最小距离15mm的像素置0) sim_depth_image_sort = np.clip(depth_image, 0, min_dist - 10).astype(np.float64) / (min_dist - 10) * 255 depth_gray = sim_depth_image_sort.astype(np.uint8) _, depth_bit = cv2.threshold(depth_gray, 1, 255, cv2.THRESH_BINARY) contours, hierarchy = cv2.findContours(depth_bit, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_NONE) return contours ``` Generates object contours based on the depth image and the nearest distance by applying thresholding and binarization. (15) shape_recognition Function {lineno-start=302} ``` def shape_recognition(self, rgb_image, depth_image, depth_color_map, intrinsic_matrix, min_dist): object_info_list = [] image_height, image_width = depth_image.shape[:2] self.get_logger().info('\033[1;32m%s\033[0m' % str(min_dist)) if min_dist <= 270: # The robotic arm's initial position is not level with the table. If greater than this value, it means below the table, possible detection error, skip recognition (机械臂初始位置并不是与桌面水平,大于这个值说明已经低于桌面了,可能检测有误,不进行识别) sphere_index = 0 cuboid_index = 0 cylinder_index = 0 cylinder_horizontal_index = 0 contours = self.get_contours(depth_image, min_dist) for obj in contours: area = cv2.contourArea(obj) if area < 300: continue perimeter = cv2.arcLength(obj, True) # Calculate contour perimeter (计算轮廓周长) approx = cv2.approxPolyDP(obj, 0.035 * perimeter, True) # Get contour corner points (获取轮廓角点坐标) CornerNum = len(approx) (cx, cy), r = cv2.minEnclosingCircle(obj) center, (width, height), angle = cv2.minAreaRect(obj) if angle < -45: angle += 89 if width > height and width / height > 1.5: angle = angle + 90 depth = depth_image[int(cy), int(cx)] position = self.cal_position(cx, cy, depth, intrinsic_matrix) x, y, w, h = cv2.boundingRect(approx) mask = np.full((image_height, image_width), 0, dtype=np.uint8) cv2.drawContours(mask, [obj], -1, (255), cv2.FILLED) # Calculate the standard deviation of pixel values within the contour area (计算轮廓区域内像素值的标准差) depth_image_mask = np.where(depth_image == 0, np.nan, depth_image) depth_std = np.nanstd(mask) self.get_logger().info('\033[1;32m%s\033[0m' % 'depth_std: ' + str(depth_std)) self.get_logger().info('\033[1;32m%s\033[0m' % 'CornerNum ' + str(CornerNum)) objType = None if depth_std > 50.0 and CornerNum > 4: sphere_index += 1 angle = 0 objType = 'sphere_' + str(sphere_index) elif depth_std > 35.0: cuboid_index += 1 objType = "cuboid_" + str(cuboid_index) elif depth_std > 40.0: if abs(width/height - 1) < 0.2: cuboid_index += 1 objType = "cuboid_" + str(cuboid_index) else: cylinder_horizontal_index += 1 objType = "cylinder_horizontal_" + str(cylinder_horizontal_index) if depth_std < 35.0 and CornerNum > 4: cylinder_index += 1 angle = 0 objType = "cylinder_" + str(cylinder_index) if objType is not None: object_info_list.append([objType, position, depth, [x, y, w, h, center, width, height], rgb_image[int(center[1]), int(center[0])], angle]) cv2.rectangle(depth_color_map, (x, y), (x + w, y + h), (255, 255, 255), 2) return object_info_list ``` Recognizes object shapes, such as sphere, cuboid, and cylinder, calculates object position and orientation, and returns a list containing detailed object information. Also marks the contours on the image. (16) main Function {lineno-start=366} ``` def main(self): count = 0 while self.running: try: ros_rgb_image, ros_depth_image, depth_camera_info = self.image_queue.get(block=True, timeout=1) except queue.Empty: if not self.running: break else: continue try: rgb_image = np.ndarray(shape=(ros_rgb_image.height, ros_rgb_image.width, 3), dtype=np.uint8, buffer=ros_rgb_image.data) depth_image = np.ndarray(shape=(ros_depth_image.height, ros_depth_image.width), dtype=np.uint16, buffer=ros_depth_image.data) depth_image = depth_image.copy() min_dist = self.get_min_distance(depth_image) if self.debug: count += 1 # self.get_logger().info(str(min_dist)) if count > 30: count = 0 msg = BuzzerState() msg.freq = 1900 msg.on_time = 0.2 msg.off_time = 0.01 msg.repeat = 1 self.buzzer_pub.publish(msg) self.debug = False else: #Limit pixel values to the range 0 to 350, and normalize depth image pixel values to the range 0 to 255 (像素值限制在0到350的范围内, 将深度图像的像素值限制和归一化到0到255的范围内) sim_depth_image = np.clip(depth_image, 0, 350).astype(np.float64) / 350 * 255 depth_color_map = cv2.applyColorMap(sim_depth_image.astype(np.uint8), cv2.COLORMAP_JET) if not self.moving: object_info_list = self.shape_recognition(rgb_image, depth_image, depth_color_map, depth_camera_info.k, min_dist) if self.start: reorder_object_info_list = object_info_list if object_info_list: if self.last_object_info_list: # Reorder based on the position of objects from the previous frame (对比上一次的物体的位置来重新排序) reorder_object_info_list = position_reorder(object_info_list, self.last_object_info_list, 20) if reorder_object_info_list: if not self.target_shapes: if self.shapes is not None: indices = [i for i, info in enumerate(reorder_object_info_list) if info[0].split('_')[0] in self.shapes] if indices: min_depth_index = min(indices, key=lambda i: reorder_object_info_list[i][2]) self.target_shapes = reorder_object_info_list[min_depth_index][0].split('_')[0] else: target_index = [i for i, info in enumerate(reorder_object_info_list) if info[0].split('_')[0] == self.target_shapes] if target_index: target_index = target_index[0] obejct_info = reorder_object_info_list[target_index] x, y, w, h, center, width, height = obejct_info[3] angle = obejct_info[-1] cv2.putText(depth_color_map, self.target_shapes, (x + w // 2, y + (h // 2) - 10), cv2.FONT_HERSHEY_COMPLEX, 1.0, (0, 0, 0), 2, cv2.LINE_AA) cv2.putText(depth_color_map, self.target_shapes, (x + w // 2, y + (h // 2) - 10), cv2.FONT_HERSHEY_COMPLEX, 1.0, (255, 255, 255), 1) cv2.drawContours(depth_color_map, [np.int0(cv2.boxPoints((center, (width, height), angle)))], -1, (0, 0, 255), 2, cv2.LINE_AA) position = obejct_info[1] e_distance = round(math.sqrt(pow(self.last_position[0] - position[0], 2)) + math.sqrt( pow(self.last_position[1] - position[1], 2)), 5) if e_distance <= 0.005: self.count += 1 else: self.count = 0 if self.count > 15: self.count = 0 self.target_shapes = None self.moving = True threading.Thread(target=self.move, args=(obejct_info,)).start() self.last_position = position else: self.target_shapes = None self.last_object_info_list = reorder_object_info_list self.fps.update() #bgr_image = cv2.cvtColor(rgb_image[40:440, ], cv2.COLOR_RGB2BGR) bgr_image = rgb_image cv2.rectangle(bgr_image, (self.roi[2], self.roi[0]), (self.roi[3], self.roi[1]), (0, 255, 255), 1) bgr_image = self.fps.show_fps(bgr_image) result_image = np.concatenate([bgr_image, depth_color_map], axis=1) cv2.imshow("depth", result_image) key = cv2.waitKey(1) if key == ord('q') or key == 27: # Exit by pressing q or esc (按q或者esc退出) self.running = False except Exception as e: self.get_logger().info(str(e)) rclpy.shutdown() ``` Implements the main loop that retrieves image data from the queue, calls the shape recognition function, and triggers the grasping thread once the target object is deemed stable based on whether the position variation below the threshold. It also handles visualization to merge RGB with depth pseudo-color images, and supports keyboard exit.