# 12. ROS2-OpenCV Vision Course ## 12.1 Positioning & Gripping ### 12.1.1 Camera Calling The camera functions as the eyes of the robot arm. The first step in implementing the gripping function is to capture the object through the camera. Subsequently, it will calculate the object's location for gripping and proper placement. * **Initiate Camera Control Node** :::{Note} The input command should be case sensitive, and complement the keywords using **Tab** key. ::: Start the robot arm, and access the robot system desktop using NoMachine. To get detailed instructions on remote control software connection, please refer to the tutorials saved in [1. Getting Ready->1.6 Development Environment Setup and Configuration](1.Getting_Ready.md#development-environment-setup-and-configuration) (1) Click-on to open the command-line terminal, then execute the following command to disable the app auto-start service: ``` sudo systemctl stop start_app_node.service ``` (2) Execute the command below to launch the camera file. ``` ros2 launch example camera_topic_invoke.launch.py ``` (3) If you need to terminate the running program, press 'Ctrl+C'. If the program fails to stop, please retry. (4) To avoid the interference on the following app functions, please execute the command to initiate the app service. Once you hear a beep sound from the robot, the service has been restarted successfully. ``` sudo systemctl start start_app_node.service ``` * **Program Outcome** The live camera feed will pop up. * **Launch File Analysis** The launch file is saved in this path: [/home/ubuntu/ros2_ws/src/example/example/opencv/camera_topic_invoke.launch.py](../_static/source_code/opencv.zip) (1) The launch_setup function is used to configure and return the nodes and files to be launched. {lineno-start=9} ```python def launch_setup(context): compiled = os.environ['need_compile'] camera_type = os.environ['CAMERA_TYPE'] if compiled == 'True': peripherals_package_path = get_package_share_directory('peripherals') example_package_path = get_package_share_directory('example') else: peripherals_package_path = '/home/ubuntu/ros2_ws/src/peripherals' example_package_path = '/home/ubuntu/ros2_ws/src/example' depth_camera_launch = IncludeLaunchDescription( PythonLaunchDescriptionSource( os.path.join(peripherals_package_path, 'launch/depth_camera.launch.py')), ) ``` ① `compiled = os.environ['need_compile']`: Retrieves the need_compile environment variable to determine whether to compile or use the source code. The value of compiled determines the ROS2 package path. ② `camera_type = os.environ['CAMERA_TYPE']`: Retrieves the CAMERA_TYPE environment variable, though this variable is not used in this code segment. ③ Based on the value of compiled, either the ROS2 package share directory (using get_package_share_directory) or a local path (e.g., /home/ubuntu/ros2_ws/src/peripherals) is selected. (2) Depth Camera Node Configuration {lineno-start=23} ```python color_detect_node = Node( package='example', executable='camera_topic_invoke', output='screen', ) ``` Create a Node instance to launch an executable named camera_topic_invoke, which is located in the example package. The output='screen' specifies that the output of this node will be displayed on the terminal (screen). (3) generate_launch_description Function {lineno-start=33} ```python def generate_launch_description(): return LaunchDescription([ OpaqueFunction(function = launch_setup) ]) ``` ① `generate_launch_description` function creates and returns a LaunchDescription object, which defines all the nodes and configurations for the launch process. ② `OpaqueFunction(function=launch_setup)` indicates that the launch_setup function is embedded as an action within the launch file. This means that the content of the launch_setup function will be executed at runtime, dynamically configuring the nodes and files to be launched. (4) Main Program {lineno-start=38} ```python if __name__ == '__main__': # Create a LaunchDescription object. 创建一个LaunchDescription对象 ld = generate_launch_description() ls = LaunchService() ls.include_launch_description(ld) ls.run() ``` ① Call `generate_launch_description() `to create a LaunchDescription object. ② Create a` LaunchService` object to manage the launch process. ③ Add the `LaunchDescription` to the `LaunchService` and start the launch process by calling ls.run(). * **Python code Analysis** The source code file is saved in [/home/ubuntu/ros2_ws/src/example/example/opencv/include/camera_topic_invoke.py](../_static/source_code/opencv.zip) The program implementation logic is as pictured: Following the program flowchart, the main function of the program involves processing the captured image and transmitting the final image. (1) Import Feature Pack {lineno-start=4} ```python import cv2 import rclpy import queue import threading from rclpy.node import Node from sensor_msgs.msg import Image from cv_bridge import CvBridge import numpy as np import signal ``` ① `cv2`: OpenCV library for image processing and display. ② `rclpy`: The Python client library for ROS2, used for creating ROS2 nodes and interacting with other nodes. ③ `queue`: Python's queue module, used for passing data between multiple threads. Here, it stores and transmits image data. ④ `threading`: Used for concurrent processing of image display and message reception. ⑤ `rclpy.node.Node`: The base class for ROS2 nodes; all ROS2 nodes must inherit from this class. ⑥ `sensor_msgs.msg.Image`: A ROS2 message type used for handling image data. ⑦ `cv_bridge.CvBridge`: Used to convert ROS image messages (sensor_msgs/Image) to OpenCV format images. ⑧ `numpy`: Used for array processing; in OpenCV, images are typically represented as numpy arrays. ⑨ `signal`: Used for capturing signals (e.g, Ctrl+C) and performing custom cleanup actions. (2) Camera Class Initiate the camera class. {lineno-start=15} ```python def __init__(self, name): rclpy.init() super().__init__(name) self.name = name self.image = None self.running = True self.image_sub = None self.image_queue = queue.Queue(maxsize=2) # Create a CvBridge object to convert ROS Image to OpenCV format. 创建 CvBridge 对象,用于将 ROS Image 转为 OpenCV 格式 self.bridge = CvBridge() # Register Ctrl+C interrupt signal handler. 注册 Ctrl+C 中断信号处理器 signal.signal(signal.SIGINT, self.shutdown) # Image subscriber 图像订阅者 self.image_sub = self.create_subscription(Image, '/depth_cam/rgb/image_raw', self.image_callback, 1) threading.Thread(target=self.image_processing, daemon=True).start() ``` ① `rclpy.init()`: Initializes the ROS2 client library. ② `super().__init__(name)`: Calls the parent class `Node` constructor to initialize the node. ③ `self.image_queue = queue.Queue(maxsize=2)`: Creates a queue with a maximum size of 2 to store received images. The size limit ensures that memory usage is controlled. When the queue is full, the oldest image will be discarded. ④ `self.bridge = CvBridge()`: Creates a `CvBridge` object, which is used to convert ROS2 image messages to OpenCV format images. ⑤ `signal.signal(signal.SIGINT, self.shutdown)`: Registers a signal handler so that the `shutdown` method is executed when the Ctrl+C signal is received, allowing the program to exit gracefully. ⑥ `self.image_sub = self.create_subscription()`: Creates an image subscriber to subscribe to the `/depth_cam/rgb/image_raw` topic, receiving RGB image messages from the depth camera. ⑦ The `/depth_cam/rgb/image_raw` topic sends messages of type `sensor_msgs/Image`. ⑧ The callback function `self.image_callback` handles the received messages. ⑨ The subscription queue size is set to 1, which means the message processing is fast, and ROS2 will buffer a maximum of 1 message. ⑩ `threading.Thread(target=self.image_processing, daemon=True).start()`: Creates a background thread for image processing. This thread calls `self.image_processing()`, a function responsible for retrieving images from the queue and displaying them. (3) image_processing Function {lineno-start=34} ```python def image_processing(self): while self.running: self.image = self.image_queue.get() if self.running and self.image is not None: # Display image 展示图像 cv2.imshow('image', self.image) cv2.waitKey(1) ``` This function continuously retrieves image data from the self.image_queue queue. The queue size is limited to 2, and when the queue is full, the oldest image will be discarded. It uses OpenCV's cv2.imshow() to display the image. cv2.waitKey(1) ensures that the OpenCV image window remains refreshed. (4) Shutdown Function {lineno-start=42} ```python def shutdown(self, signum, frame): self.get_logger().info("Shutting down...") self.running = False rclpy.shutdown() ``` This method is called when the SIGINT signal (e.g., Ctrl+C) is received, allowing the program to shut down gracefully. `self.running = False` sets the running flag to False, which terminates the `image_processing` loop. `rclpy.shutdown() `stops the ROS2 client and cleans up resources. (5) image_callback Function {lineno-start=42} ```python # Process ROS Image messages and convert to OpenCV format. 处理 ROS Image 消息并转化为 OpenCV 格式 def image_callback(self, ros_image): try: # Convert ROS Image message to OpenCV image. 将 ROS Image 消息转换为 OpenCV 图像 self.image = self.bridge.imgmsg_to_cv2(ros_image, "bgr8") if self.image_queue.full(): # If the queue is full, discard the oldest image. 如果队列已满,丢弃最旧的图像 self.image_queue.get() # Put the image into the queue. 将图像放入队列 self.image_queue.put(self.image) # if self.running and self.image is not None: # Display image 展示图像 # cv2.imshow('image', self.image) # cv2.waitKey(1) except Exception as e: self.get_logger().error(f"Error converting image: {e}") ``` `image_callback` is the callback function for image messages, used to process images received from the `/depth_cam/rgb/image_raw` topic. `self.bridge.imgmsg_to_cv2(ros_image, "bgr8") `converts the ROS image message into an OpenCV image (where "**bgr8**" represents an 8-bit BGR image format). If the queue is full, the oldest image is discarded. The new image is then placed into the `self.image_queue`. (6) main Function {lineno-start=68} ```python def main(): camera_node = Camera('camera_topic_invoke') rclpy.spin(camera_node) camera_node.destroy_node() rclpy.shutdown() ``` ① `camera_node = Camera('camera_topic_invoke')`: Create an instance of the Camera class. ② `rclpy.spin(camera_node)`: Keep the node running and wait for the callback function to be triggered. This method blocks until the node is shut down. ③ `camera_node.destroy_node()`: Destroy the ROS2 node. ④ `rclpy.shutdown()`: Shut down the ROS2 client and clean up resources. ### 12.1.2 Internal Parameter Calibration The camera may produce distorted images due to the concave nature of its lens. Camera calibration allows users to acquire internal and external distortion parameters. With these parameters, the distorted image can be corrected. Furthermore, these parameters enable the reconstruction of a 3D scene. The Gemini camera's parameters can be calibrated, eliminating the need for manual calibration. ### 12.1.3 Color Space Conversion JetArm adopts RGB color space. The introduction to the color space is as below: * **Color Space Introduction** The image we perceive is actually composed of pixels arranged by three color components: B (blue), G (green), and R (red) in each frame. A color model, also known as a color space, is a mathematical model that uses a set of values to describe colors. RGB is a commonly used color space type, but there are several others, including the GRAY color space (for grayscale images), Lab color space, XYZ color space, YCrCb color space, HSV color space, HLS color space, CIELab\* color space, CIELuv\* color space, Bayer color space, among others. Each color space excels in addressing specific problems, and to conveniently handle a particular issue, one may need to perform color space type conversion. Color space type conversion refers to transforming an image from one color space to another. For example, when using OpenCV to process images, one might perform conversions between the RGB color space and the Lab color space. During tasks like feature extraction or distance calculation, it is common to convert the image from the RGB color space to the grayscale color space. In some applications, it may be necessary to convert color space images to binary images. * **Common Color Space Type** (1) RGB Color Space RGB color space includes the following characteristics: ① It is a color space where colors are obtained through the linear combination of the Red (R), Green (G), and Blue (B) components. ② Object illumination affects the values of each channel in this color space, and the three color channels are correlated. Let's split the image into its R, G, and B components and observe them to gain a deeper understanding of the color space. From the image below, if you look at the blue channel, you can see that the blue and white parts of the Rubik's Cube in the second image appear similar under indoor lighting conditions, but there is a noticeable difference in the first image. This non-uniformity makes color-based segmentation in this color space very challenging. Additionally, there is an overall difference in values between the two images. Therefore, the RGB color space exhibits issues with uneven color value distribution and the blending of chromaticity and brightness. (2) Lab Color Space Similar to RGB space, Lab also has three channels. L: Lightness channel a: color channel a representing the colors ranging from green to magenta. b: color channel b representing the colors ranging from blue to yellow. The Lab color space is entirely different from the RGB color space. In the RGB color space, color information is divided into three channels, but these same three channels also include brightness information. On the other hand, in the Lab color space, the L channel is independent and contains only brightness information, while the other two channels encode color. L Component: Represents the brightness of pixels. A higher L value corresponds to higher brightness. a Component: Represents the range from red to green. b Component: Represents the range from yellow to blue. In OpenCV, the numerical range of R, G, B values in the RGB color space is \[0-255\]. In the Lab color space, the L value range is \[0-100\], where 0 represents black, and 100 represents white. The a and b values range from \[-128, 127\], and both a and b values of 0 represent gray. To further aid in understanding the comparison between RGB and Lab, here is an example using Photoshop software: ① Use the color picker to pick the color. ② The relationship between the Lab and RGB is as below: The Lab color space possesses the following characteristics: ① Perceptually uniform color space, closely approximating how we perceive colors. ② Device-independent (regardless of capture or display device). ③ Widely used in Adobe Photoshop. ④ Correlated with the RGB color space through complex transformation equations.When reading an image and converting it to the Lab color space in OpenCV, the resulting image is depicted in the figure below: (3)Ycrcb Color Space The Human Visual System (HVS) is less sensitive to color than it is to brightness. In the traditional RGB color space, the three primary colors (R, G, B) are given equal importance, but brightness information is overlooked. In the YCrCb color space, Y represents the brightness of the light source, while chrominance information is stored in Cr and Cb. Here, Cr represents the red component information, and Cb represents the blue component information. Luminance provides information about the degree of color brightness or darkness, and this information can be calculated through the weighted sum of intensity components in the illumination. In RGB light sources, the green component has the greatest impact, while the blue component has the smallest impact. For changes in illumination, similar observations can be made for intensity and color components in LAB. In comparison to LAB, the perceptual difference between red and orange in outdoor images is relatively small, while white undergoes changes across all three components. (4) HSV Color Space The HSV color space is a perceptually-based color model with three components: H (Hue), S (Saturation), and V (Value). Hue is associated with the dominant wavelengths in the blended spectrum, such as "**red, orange, yellow, green, cyan, blue, violet**", representing different hues. If considered from the perspective of wavelength, light of different wavelengths actually reflects differences in hue. Saturation refers to the relative purity or the amount of white light mixed with a color. Pure spectral colors are fully saturated, while colors like deep red (red plus white) and light purple (purple plus white) are desaturated. Saturation is inversely proportional to the amount of white light added. Value reflects the brightness of light perceived by the human eye, and this metric is related to the reflectivity of an object. Concerning color, the more white is added, the higher the brightness; the more black is added, the lower the brightness. One notable feature of HSV is its use of only one channel to describe color (H), making it very intuitive for specifying colors. However, HSV color is device-dependent. The H component is remarkably similar in both images, indicating that even with changes in lighting, color information remains consistent. The S component in both images is also highly alike, while the V component represents brightness and thus varies with changes in illumination. There is a significant disparity in the values of red between outdoor and indoor images. This is because the H value represents the starting angle of the color in degrees. Therefore, it may take values between \[300, 360\] and \[0, 60\]. (5) Gray Color Space The GRAY color space typically refers to grayscale images, where each pixel is a monochromatic image ranging from black to white, processed into a single-color image with 256 levels of grayscale. These 256 levels of grayscale are represented by numerical values in the range of \[0, 255\], where '0' denotes pure black, '255' represents white, and values between 0 and 255 represent various shades of gray (i.e., the intensity or brightness levels of the color), ranging from dark gray to light gray. * **Color Threshold Value** (1) Program Goal By subscribing to the camera image data, the program transfers the image's color space into LAB color space, then recognize the color using the specific color threshold. After that, match the converted image with the color threshold value, and output the binary image. The object in the target color will represent white on the live camera feed, and other colors will represent black. By subscribing to the camera image data, the program converts the image's color space to LAB color space, proceeds to identify the color using a specific color threshold, matches the converted image with the color threshold value, and generates a binary image as output. The object in the target color will appear as white in the live camera feed, while other colors will appear as black. (2) Operation Steps :::{Note} The input command should be case sensitive, and keywords can be complemented using Tab key. ::: ① Start the robot arm, and access the robot system desktop using NoMachine according to the tutorials saved in [1. Getting Ready->1.6 Development Environment Setup and Configuration](1.Getting_Ready.md#development-environment-setup-and-configuration) ② Double-click to open the command-line terminal, and execute the command, then hit Enter to terminate the app auto-start service. ``` sudo systemctl stop start_app_node.service ``` ③ Execute the command and hit Enter to run the game program. ``` ros2 launch example color_threshold.launch.py ``` ④ In the image: ① lab_image (the LAB image converted from RGB); ② bgr_image (the image in the BGR color space); ③ result_image (the image obtained after applying color thresholding and binarization). ⑤ If you need to terminate the program, please use short-cut '**Ctrl+C**'. If the program fails to stop, please retry. ⑥ After running the previous program, you need to restart the app service by executing the command below. ``` sudo systemctl start start_app_node.service ``` (3) launch File Analysis Launch file is saved in this folder: [/home/ubuntu/ros2_ws/src/example/example/opencv/color_threshold.launch.py](../_static/source_code/opencv.zip) ① launch_setup Function {lineno-start=9} ```python def launch_setup(context): compiled = os.environ['need_compile'] if compiled == 'True': peripherals_package_path = get_package_share_directory('peripherals') example_package_path = get_package_share_directory('example') else: peripherals_package_path = '/home/ubuntu/ros2_ws/src/peripherals' example_package_path = '/home/ubuntu/ros2_ws/src/example' depth_camera_launch = IncludeLaunchDescription( PythonLaunchDescriptionSource( os.path.join(peripherals_package_path, 'launch/depth_camera.launch.py')), ) ``` * `os.environ['need_compile']`: Reads the environment variable need_compile, which is used to determine whether compilation is required. In ROS2, this is often used to control different execution paths or actions based on the environment settings (e.g., development or release environments). * If `need_compile == 'True'`, the ROS2 package path is used (via `get_package_share_directory`) to retrieve the locations of the peripherals and example packages. * If `need_compile` is not equal to 'True', a fixed path is used to point to the package locations, typically indicating a scenario where compilation is not needed, such as in a pre-built environment. ② Create a ROS2 Node {lineno-start=22} ```python color_threshold_node = Node( package='example', executable='color_threshold', output='screen', ) ``` * `package='example'`: Specifies the ROS2 package to which the node belongs. * `executable='color_threshold'`: Specifies the executable file name for the node. * `output='screen'`: Sets the output display method for the node, indicating that the node's log will be shown on the screen (standard output). ③ generate_launch_description Function {lineno-start=32} ```python def generate_launch_description(): return LaunchDescription([ OpaqueFunction(function = launch_setup) ]) ``` * `generate_launch_description()`: This is the standard function in ROS2 launch files, used to generate a LaunchDescription object. It defines all the operations to be executed during the launch process. * In this case, the `OpaqueFunction` is used to wrap the launch_setup function. This means that during the launch, the launch_setup function will dynamically generate the launch content based on the current environment configuration. ④ Main Program {lineno-start=37} ```python if __name__ == '__main__': # Create a LaunchDescription object. 创建一个LaunchDescription对象 ld = generate_launch_description() ls = LaunchService() ls.include_launch_description(ld) ls.run() ``` This part of the code is responsible for directly running the launch file. If this file is executed as the main program, it will create a `LaunchDescription ` object (ld) and use `LaunchService ` to execute the launch description. `LaunchService ` is used to actually perform and start the operations described in the launch file, including nodes and other launch files. (4)Source Code Analysis The source code file is saved in [/home/ubuntu/ros2_ws/src/example/example/opencv/include/color_threshold.py](../_static/source_code/opencv.zip) The program flow chart is as below: From the above diagram, the program's logical flow primarily involves processing the acquired image and displaying the feedback. ① Import Feature Pack {lineno-start=4} ```python import cv2 import yaml import rclpy import queue import threading import numpy as np from rclpy.node import Node import sdk.common as common from cv_bridge import CvBridge from sensor_msgs.msg import Image ``` * `cv2`: OpenCV library, used for image processing and display. * `rclpy`: ROS2 Python client library, providing APIs for creating and managing ROS2 nodes. * `queue`: The Python standard library's queue module, used for inter-thread communication. * `threading`: The Python standard library's threading module, used for multi-threaded processing. * `numpy`: NumPy library, used for numerical computation of image data, particularly in image processing. * `CvBridge`: A library used to convert ROS image messages (sensor_msgs/Image) into OpenCV image format (cv2). * `Image`: ROS2 message type used for transmitting image data. ② Camera Class {lineno-start=15} ```python class Camera(Node): def __init__(self, name): rclpy.init() super().__init__(name) self.name = name self.image = None self.running = True self.image_sub = None self.thread_started = False self.image_queue = queue.Queue(maxsize=2) self.data = common.get_yaml_data("/home/ubuntu/ros2_ws/src/app/config/lab_config.yaml") self.lab_data = self.data['/**']['ros__parameters'] # Create a CvBridge object to convert ROS Image to OpenCV format. 创建 CvBridge 对象,用于将 ROS Image 转为 OpenCV 格式 self.bridge = CvBridge() self.target_color = "red" # Set target color 设置目标颜色 # Image subscriber 图像订阅者 self.image_sub = self.create_subscription(Image, '/depth_cam/rgb/image_raw', self.image_callback, 1) ``` * `rclpy.init()`: Initializes the ROS2 system; this function must be called to start the ROS2 client. * `super().init(name)`: Initializes the parent class Node with the given node name name. * `self.image_queue = queue.Queue(maxsize=2)`: Creates a queue with a maximum size of 2 to store image data, ensuring that only the latest two images are kept. * `self.data = common.get_yaml_data(...)`: Reads data from the configuration file located at /home/ubuntu/ros2_ws/src/app/config/lab_config.yaml. This file should contain the target color range data. * `self.lab_data = self.data['/**']['ros__parameters']`: Extracts the parameters related to the color range from the configuration file. * `self.bridge = CvBridge()`: Creates a CvBridge object to convert ROS image messages into OpenCV image format. * `self.target_color = "red"`: Sets the target color to "**red**". * `self.image_sub = self.create_subscription(...)`: Subscribes to the /depth_cam/rgb/image_raw image topic with the callback function image_callback. ③ image_callback method {lineno-start=38} ```python # Process ROS Image message and convert it to OpenCV format. 处理 ROS Image 消息并转化为 OpenCV 格式 def image_callback(self, ros_image): # Convert ROS Image message to OpenCV image. 将 ROS Image 消息转换为 OpenCV 图像 bgr_image = self.bridge.imgmsg_to_cv2(ros_image, "bgr8") if self.image_queue.full(): # If the queue is full, discard the oldest image. 如果队列已满,丢弃最旧的图像 self.image_queue.get() # Put the image into the queue. 将图像放入队列 self.image_queue.put(bgr_image) if not self.thread_started: # Start the thread only on the first call. 只在第一次调用时启动线程 threading.Thread(target=self.image_process, daemon=True).start() self.thread_started = True ``` * `bgr_image = self.bridge.imgmsg_to_cv2(ros_image, "bgr8")`: Converts the ROS image message to an OpenCV-compatible BGR format image. * `self.image_queue.put(bgr_image)`: Places the converted image into the queue. If the queue is full, the oldest image will be discarded, ensuring that only two frames are stored in the queue. * `if not self.thread_started`: If the image processing thread has not been started yet, it will start a new thread to process the image data, ensuring that image processing is initiated only once. ④ image_process Method {lineno-start=51} ```python def image_process(self): while self.running: bgr_image = self.image_queue.get(block=True, timeout=1) # Convert the image color space to LAB. 将图像的颜色空间转换成LAB lab_image = cv2.cvtColor(bgr_image, cv2.COLOR_BGR2LAB) # Set color threshold and perform binarization, default is red. 设置颜色阈值并进行二值化,默认为红色 result_image = cv2.inRange(lab_image, np.array(self.lab_data['color_range_list'][self.target_color]['min']), np.array(self.lab_data['color_range_list'][self.target_color]['max'])) # Display the image. 展示图像 cv2.imshow('bgr_image', bgr_image) cv2.imshow('lab_image', lab_image) cv2.imshow('result_image', result_image) cv2.waitKey(1) ``` * `bgr_image = self.image_queue.get(block=True, timeout=1)`: Retrieves image data from the queue. If the queue is empty, the function will wait until an image becomes available. * `lab_image = cv2.cvtColor(bgr_image, cv2.COLOR_BGR2LAB)`: Converts the BGR image to the LAB color space. The LAB color space is commonly used for color detection as it is more robust to lighting variations. * `result_image = cv2.inRange(lab_image, np.array(self.lab_data['color_range_list'][self.target_color]['min']), np.array(self.lab_data['color_range_list'][self.target_color]['max']))`: Binarizes the image based on the defined red color threshold, extracting pixels within the specified range. The output is a binary image, where white areas indicate regions that meet the conditions. * `cv2.imshow(...)`: Displays the original image, the LAB-converted image, and the color-filtered binary image. * `cv2.waitKey(1)`: Refreshes the display and waits for 1 millisecond before continuing. ⑤ main Function {lineno-start=70} ```python def main(): camera_node = Camera('color_threshold') rclpy.spin(camera_node) camera_node.destroy_node() rclpy.shutdown() ``` * `camera_node = Camera('color_threshold')`: Creates an instance of the Camera class and assigns it the name color_threshold. * `rclpy.spin(camera_node)`: Starts the ROS 2 client library's event loop, allowing it to wait for and process callback functions. * `camera_node.destroy_node()`: Shuts down the node and releases its resources. * `rclpy.shutdown()`: Terminates the ROS 2 client library and performs resource cleanup. * **Color Space Conversion** (1) Program Goal When subscribing to camera images and performing a color space conversion on the acquired images, we can observe images in different color spaces through feedback. (2) Operation Steps :::{Note} The input command is case-sensitive, and keywords can be auto-completed using the Tab key. ::: ① Start the robot arm, and access the robot system desktop using NoMachine according to the tutorials saved in [1. Getting Ready->1.6 Development Environment Setup and Configuration](1.Getting_Ready.md#development-environment-setup-and-configuration) ② Double-click to open the command-line terminal. Execute the following command below to disable the app auto-start service. ``` sudo systemctl stop start_app_node.service ``` ③ Execute the command below and hit Enter to run the game program. ``` ros2 launch example color_space.launch.py ``` ④ The image shows the following: ① lab_image: The image converted from RGB to the LAB color space. ② rgb_image: The original image captured by the camera. ③ bgr_image: The image converted from RGB to the BGR color space. ⑤ If you need to terminate the running program, press short-cut '**Ctrl+C**'. If the program fails to run, please retry. ⑥ After using the previous function, it is necessary to restart the app service; otherwise, subsequent app operations may be affected. Execute the command below and press Enter to initiate the app service. ``` sudo systemctl start start_app_node.service ``` (3) Launch File Analysis The Launch file locates in: [/home/ubuntu/ros2_ws/src/example/example/opencv/color_space.launch.py](../_static/source_code/opencv.zip) ① launch_setup Function {lineno-start=9} ```python def launch_setup(context): compiled = os.environ['need_compile'] if compiled == 'True': peripherals_package_path = get_package_share_directory('peripherals') example_package_path = get_package_share_directory('example') else: peripherals_package_path = '/home/ubuntu/ros2_ws/src/peripherals' example_package_path = '/home/ubuntu/ros2_ws/src/example' depth_camera_launch = IncludeLaunchDescription( PythonLaunchDescriptionSource( os.path.join(peripherals_package_path, 'launch/depth_camera.launch.py')), ) ``` * `compiled = os.environ['need_compile']`: This retrieves the need_compile variable from the environment, which determines whether compilation is needed. During the build process, if need_compile is set to 'True', the program will use the ROS2 package share directory path; if it is set to 'False', it will use the local path. * `get_package_share_directory('peripherals') and get_package_share_directory('example')`: These functions are used to retrieve the share directories for the ROS2 packages 'peripherals' and 'example', respectively. ROS2 uses these directories to store resources in the package, such as configuration files, launch files, etc. * `depth_camera_launch`: This is an `IncludeLaunchDescription ` object that includes and launches the depth_camera.launch.py file from the 'peripherals' package. This means it will invoke all nodes and configurations defined within the depth_camera.launch.py file during startup. ② Create Node {lineno-start=22} ```python color_space_node = Node( package='example', executable='color_space', output='screen', ) ``` * `package='example'`: Specifies the ROS2 package to which the node belongs. *  `executable='color_space'`: Specifies the name of the executable file for the node. * `output='screen'`: Sets the output display mode for the node, meaning the node's log output will be displayed on the screen (standard output). ③ generate_launch_description Function {lineno-start=33} ```python def generate_launch_description(): return LaunchDescription([ OpaqueFunction(function = launch_setup) ]) ``` * `generate_launch_description()`: This is the standard function for ROS2 launch files, used to generate a LaunchDescription object. It defines all the actions that should be executed during startup. * Here, the `OpaqueFunction ` is used to wrap the `launch_setup ` function. This means that during startup, `launch_setup ` will dynamically generate the launch content based on the current environment configuration. (4) Source Code Analysis The source code file is stored in [/home/ubuntu/ros2_ws/src/example/example/opencv/include/color_space.py](../_static/source_code/opencv.zip) The program flowchart is as below: From the above diagram, the program revolves around processing the acquired images and providing live camera feed. ① import Feature Pack {lineno-start=4} ```python import cv2 import rclpy from rclpy.node import Node from sensor_msgs.msg import Image from cv_bridge import CvBridge import numpy as np import signal ``` ② Camera Class {lineno-start=12} ```python class Camera(Node): def __init__(self, name): rclpy.init() super().__init__(name) self.name = name self.image = None self.running = True self.image_sub = None # Create a CvBridge object to convert ROS Image to OpenCV format. 创建 CvBridge 对象,用于将 ROS Image 转为 OpenCV 格式 self.bridge = CvBridge() # Image subscriber 图像订阅者 self.image_sub = self.create_subscription(Image, '/depth_cam/rgb/image_raw', self.image_callback, 1) ``` * `__init__(self, name)`: Initializes the ROS node by calling `rclpy.init()` to initialize the ROS2 system. `super().__init__(name)` initializes the parent Node class. `self.running` is used to control whether the node continues to run. * `self.bridge = CvBridge()`: Creates a CvBridge object for converting between ROS image messages and OpenCV image formats. * `self.create_subscription(...)`: Creates a subscriber that listens to Image messages from the /depth_cam/rgb/image_raw topic. The callback function for processing the messages is image_callback, with a queue size set to 1. ③ Process the Signal and Terminate the Node {lineno-start=30} ```python def shutdown(self, signum, frame): self.get_logger().info("Shutting down...") self.running = False rclpy.shutdown() ``` shutdown is the signal handler function for program termination, ensuring that the ROS node shuts down safely. ④ Image Callback Function {lineno-start=35} ```python # Process ROS Image message and convert it to OpenCV format. 处理 ROS Image 消息并转化为 OpenCV 格式 def image_callback(self, ros_image): try: # Convert ROS Image message to OpenCV image. 将 ROS Image 消息转换为 OpenCV 图像 self.rgb_image = self.bridge.imgmsg_to_cv2(ros_image, "rgb8") self.bgr_image = cv2.cvtColor(self.rgb_image, cv2.COLOR_RGB2BGR) # Convert the image color space to LAB. 将图像的颜色空间转换成LAB self.lab_image = cv2.cvtColor(self.rgb_image, cv2.COLOR_RGB2LAB) if self.running and self.bgr_image is not None and self.lab_image is not None: # Display the image. 展示图像 cv2.imshow('rgb_image', self.rgb_image) cv2.imshow('bgr_image', self.bgr_image) cv2.imshow('lab_image', self.lab_image) cv2.waitKey(1) except Exception as e: self.get_logger().error(f"Error converting image: {e}") ``` * `self.bridge.imgmsg_to_cv2(ros_image, 'rgb8')`: Uses `CvBridge` to convert a ROS image message (`sensor_msgs/Image`) into an OpenCV image format (a numpy array). The image format is \`rgb8\`. * `cv2.cvtColor(self.rgb_image, cv2.COLOR_RGB2BGR)`: Converts the RGB image to BGR format, as OpenCV uses BGR by default. * `cv2.cvtColor(self.rgb_image, cv2.COLOR_RGB2LAB)`: Converts the RGB image to the LAB color space, which is commonly used for color analysis in image processing. * `cv2.imshow(...)`: Displays the RGB, BGR, and LAB images using OpenCV. * `cv2.waitKey(1)`: Ensures the OpenCV window is updated and processes keyboard events (with a wait time of 1 millisecond). * `Exception`: Catches and logs any errors that occur during the image conversion process. ⑤ Program Entry {lineno-start=57} ```python def main(): camera_node = Camera('color_space') rclpy.spin(camera_node) camera_node.destroy_node() rclpy.shutdown() ``` * `camera_node = Camera('color_space')`: Creates a ROS node named 'color_space'. * `rclpy.spin(camera_node)`: Enters the ROS2 event loop, where it waits for and processes incoming subscription messages. * `camera_node.destroy_node()`: Destroys the node after it is shut down. * `rclpy.shutdown()`: Shuts down the ROS2 system. * **Color Recognition** (1) Program Goal By subscribing to camera image data, the captured images are converted to the LAB color space. Following this conversion, color recognition can be performed using specific color thresholds. After matching the transformed image with the color thresholds, the program identifies the desired colors and prints the recognized color in the terminal. Additionally, it outputs the BGR image and the binary image. (2) Operation Steps :::{Note} The input command is case-sensitive, and keywords can be auto-completed using the Tab key. ::: ① Start the robot arm, and access the robot system desktop using NoMachine according to the tutorials saved in [1. Getting Ready->1.6 Development Environment Setup and Configuration](1.Getting_Ready.md#development-environment-setup-and-configuration) ② Double-click to open the command-line terminal. Execute the command below, and hit Enter to disable the app auto-start service. ``` sudo systemctl stop start_app_node.service ``` ③ Execute the following command and hit Enter. ``` ros2 launch example color_recognition.launch.py ``` ④ The image shows the following: ① BGR: The image in the BGR color space. ② result_image: The binary image generated from the color recognition results. ⑤ If you need to terminate the running program, press short-cut '**Ctrl+C**'. If the program fails to run, please retry. ⑥ After using the previous function, it is necessary to restart the app service; otherwise, subsequent app operations may be affected. Execute the command below and press Enter to initiate the app service. ``` sudo systemctl start start_app_node.service ``` (3) Launch File Analysis The launch file is saved in [/home/ubuntu/ros2_ws/src/example/example/opencv/color_recognition.launch.py](../_static/source_code/opencv.zip) ① launch_setup Function {lineno-start=9} ```python def launch_setup(context): compiled = os.environ['need_compile'] enable_display = LaunchConfiguration('enable_display', default='true') enable_display_arg = DeclareLaunchArgument('enable_display', default_value=enable_display) if compiled == 'True': peripherals_package_path = get_package_share_directory('peripherals') example_package_path = get_package_share_directory('example') sdk_package_path = get_package_share_directory('sdk') else: peripherals_package_path = '/home/ubuntu/ros2_ws/src/peripherals' example_package_path = '/home/ubuntu/ros2_ws/src/example' sdk_package_path = '/home/ubuntu/ros2_ws/src/driver/sdk' depth_camera_launch = IncludeLaunchDescription( PythonLaunchDescriptionSource( os.path.join(peripherals_package_path, 'launch/depth_camera.launch.py')), ) sdk_launch = IncludeLaunchDescription( PythonLaunchDescriptionSource( os.path.join(sdk_package_path, 'launch/jetarm_sdk.launch.py')), ) color_detect_node = Node( package='example', executable='color_recognition', output='screen', parameters=[{'enable_display': enable_display}] ) return [ sdk_launch, depth_camera_launch, enable_display_arg, color_detect_node, ] ``` * Get Environment Variable: `compiled = os.environ['need_compile']`: Retrieves the value of `need_compile` from the environment. This value determines whether the ROS2 package path is the compiled path or the source path. * Launch Configuration:` enable_display = LaunchConfiguration('enable_display', default='true')`: Defines a launch configuration `enable_display ` to control whether the display feature is enabled, with a default value of 'true'. * Declare Launch Argument: `enable_display_arg = DeclareLaunchArgument('enable_display', default_value=enable_display)`: Declares a launch argument `enable_display`that can be modified via command-line arguments at launch. * Select Path Based on compiled Environment Variable:`if compiled == 'True':`, the path is taken from the compiled ROS2 package; otherwise, the path is taken from the source directory. peripherals_package_path and example_package_path represent the paths for the `peripherals` and `example`packages, respectively. * Include Other Launch Files: `depth_camera_launch = IncludeLaunchDescription(...)`: Includes the depth_camera.launch.py launch file from the peripherals package. This means that depth camera-related nodes will be launched via this file. * Launch Color Detection Node: `color_detect_node = Node(...)`: Launches a node named color_detection, which belongs to the example package. The node's parameters include enable_display, which controls whether image display functionality is enabled. The node's output is shown in the terminal. * Return Launch Items: The function returns a list containing the depth camera launch file, launch arguments, and the color detection node. ② generate_launch_description Function: {lineno-start=43} ```python def generate_launch_description(): return LaunchDescription([ OpaqueFunction(function = launch_setup) ]) ``` * This function creates and returns a `LaunchDescription `object. `LaunchDescription `is a core component of ROS2 launch files, describing all the actions to be executed during startup. * `OpaqueFunction(function=launch_setup)`: Adds the launch_setup function as an opaque function (`OpaqueFunction`) to the launch description. During startup, the launch_setup function will be executed to generate the specific launch items. ③ Main Function: {lineno-start=48} ```python if __name__ == '__main__': # Create a LaunchDescription object. 创建一个LaunchDescription对象 ld = generate_launch_description() ls = LaunchService() ls.include_launch_description(ld) ls.run() ``` * `ld = generate_launch_description()`: Calls the generate_launch_description function to create the launch description object. * `ls = LaunchService()`: Creates a LaunchService object to manage and execute the ROS2 launch file. * `ls.include_launch_description(ld)`: Includes the generated launch description ld into the launch service. * `ls.run()`: Starts and executes the ROS2 launch file, initiating the nodes and related actions. (4) Source Code Analysis The source code locates in [/home/ubuntu/ros2_ws/src/example/example/opencv/include/color_recognition.py](../_static/source_code/opencv.zip) The program flowchart is as below: From the above diagram, the program revolves around processing the acquired images and providing visual feedback. ① Import Feature Pack {lineno-start=4} ```python import cv2 import time import math import rclpy import numpy as np import sdk.common as common from rclpy.node import Node from cv_bridge import CvBridge ``` * `cv2`: The OpenCV library, used for image processing tasks such as image transformation, color space conversion, binarization, erosion, and dilation. * `Node`: The base class for creating ROS2 nodes. * `math`: A library for mathematical computations, primarily used for calculating contour areas. * `numpy`: A library for numerical computations, particularly for array manipulation. * `rclpy`: The ROS2 Python API, used for interacting with the ROS2 system. * `cv_bridge`: A utility for converting between ROS image messages (sensor_msgs/Image) and OpenCV images. ② ColorDetection Class (Color Recognition Class) {lineno-start=18} ```python def __init__(self, name): super().__init__(name, allow_undeclared_parameters=True, automatically_declare_parameters_from_overrides=True) self.rgb_image = None self.bgr_image = None self.result_image = None self.bridge = CvBridge() # Used for the conversion between ROS Image messages and OpenCV images. 用于ROS Image消息与OpenCV图像之间的转换 self.display = self.get_parameter('enable_display').value self.data = common.get_yaml_data("/home/ubuntu/ros2_ws/src/app/config/lab_config.yaml") self.lab_data = self.data['/**']['ros__parameters'] self.joints_pub = self.create_publisher(ServosPosition, 'servo_controller', 1) # Wait for the service to start. 等待服务启动 self.client = self.create_client(Trigger, '/controller_manager/init_finish') self.client.wait_for_service() joint_angle = [500, 520, 210, 50, 500, 200] set_servo_position(self.joints_pub, 1, ((2, joint_angle[1]), (3, joint_angle[2]), (4, joint_angle[3]), (5, joint_angle[4]), (10, joint_angle[5]))) time.sleep(1) set_servo_position(self.joints_pub, 1, ((1, joint_angle[0]), )) time.sleep(1) # Subscribe to image topic. 订阅图像话题 self.image_sub = self.create_subscription(Image, '/depth_cam/rgb/image_raw', self.image_callback, 1) # Publish processed image for color recognition. 发布颜色识别处理图像 self.image_pub = self.create_publisher(Image, '/color_detection/result_image', 1) ``` * `super().__init__(...)`: Calls the parent class Node's initialization function to create a ROS2 node with the specified name. * `self.rgb_image, self.bgr_image, self.result_image`: Variables used to store images in different formats. * `self.bridge = CvBridge()`: Creates a CvBridge object for converting between ROS image messages and OpenCV images. * `self.display = self.get_parameter('enable_display').value`: Reads the enable_display parameter to determine whether to display the processing results (default is True). * `self.data = common.get_yaml_data("/home/ubuntu/ros2_ws/src/app/config/lab_config.yaml")`: Retrieves threshold information stored in the lab_config.yaml configuration file. * `self.image_sub = self.create_subscription(...)`: Subscribes to the /depth_cam/rgb/image_raw topic to receive image messages from the camera. * `self.image_pub = self.create_publisher(...)`: Creates a publisher to send the color detection results to the /color_detection/result_image topic. ③ Image Callback Processing Function {lineno-start=47} ```python # Process ROS Image message and convert it to OpenCV image. 处理ROS Image消息,将其转换为OpenCV图像 def image_callback(self, ros_image): try: # Convert ROS Image message to OpenCV image. 将 ROS Image 消息转换为 OpenCV 图像 self.rgb_image = self.bridge.imgmsg_to_cv2(ros_image, "rgb8") # Convert the image color space to LAB. 将图像的颜色空间转换成LAB self.lab_image = cv2.cvtColor(self.rgb_image, cv2.COLOR_RGB2LAB) # Convert RGB to BGR. RGB转BGR self.bgr_image = cv2.cvtColor(self.rgb_image, cv2.COLOR_RGB2BGR) # Start color detection. 开始颜色检测 self.color_detection("red",self.bgr_image) # Display original image and detection results. 展示原图和检测结果 if self.result_image is not None: cv2.imshow('BGR', self.bgr_image) cv2.imshow('result_image', self.result_image) cv2.waitKey(1) except Exception as e: self.get_logger().error('Failed to convert image: {}'.format(e)) ``` * Convert ROS image messages to OpenCV images: Use `CvBridge` to convert ROS image messages to RGB images: `self.bridge.imgmsg_to_cv2(ros_image, "rgb8")`. * Convert RGB images to LAB color space: Use OpenCV to convert the RGB image to LAB color space: `self.lab_image = cv2.cvtColor(self.rgb_image, cv2.COLOR_RGB2LAB)`. * Convert RGB images to BGR: Transform the RGB image into BGR format using OpenCV: `self.bgr_image = cv2.cvtColor(self.rgb_image, cv2.COLOR_RGB2BGR)`. * Perform color detection: Call `self.color_detection("red", self.bgr_image)` to detect a specific color. In this case, "**red**" is hardcoded, meaning it will always detect red areas. * Display images if color is detected: If a color region is detected, use `cv2.imshow` to display both the original BGR image and the resulting image. * Handle exceptions: If the image conversion fails, catch the exception and log the error message. ④ Corrosion and Dilation {lineno-start=73} ```python # Erosion and dilation operations. 腐蚀与膨胀操作 def erode_and_dilate(self, binary, kernel=3): element = cv2.getStructuringElement(cv2.MORPH_RECT, (kernel, kernel)) eroded = cv2.erode(binary, element) # Erosion 腐蚀 dilated = cv2.dilate(eroded, element) # Dilation 膨胀 return dilated ``` * **Erosion**: Reduces the boundaries of objects, helping to eliminate small noise. * **Dilation:** Restores the boundaries of eroded objects, making them more distinct. ⑤ Color Recognition {lineno-start=80} ```python # Color recognition function 颜色识别函数 def color_detection(self, color, bgr_image): self.result_image = cv2.GaussianBlur(bgr_image, (3, 3), 3) self.result_image = cv2.cvtColor(self.result_image, cv2.COLOR_BGR2LAB) # Set color thresholds and generate a binary image. 设置颜色阈值并生成二值化图像 self.result_image = cv2.inRange(self.result_image, np.array(self.lab_data['color_range_list'][color]['min']), np.array(self.lab_data['color_range_list'][color]['max'])) self.result_image = self.erode_and_dilate(self.result_image) # Find all contours. 找出所有轮廓 contours = cv2.findContours(self.result_image, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_NONE)[-2] if contours: c = max(contours, key=cv2.contourArea) area = math.fabs(cv2.contourArea(c)) if area >= 2000: if self.display: self.get_logger().info(f"Detected {color}") # Convert the binary image to BGR format for publishing. 将二值化的图像转换为BGR格式,便于发布 # result_image = cv2.cvtColor(self.result_image, cv2.COLOR_GRAY2BGR) # Publish the color BGR image. 发布彩色 BGR 图像 self.image_pub.publish(self.bridge.cv2_to_imgmsg(self.result_image, "mono8")) ``` * Preprocessing: Apply Gaussian blur using `cv2.GaussianBlur` to reduce noise, then convert the image from BGR to the LAB color space. * Color Extraction: Use `cv2.inRange` to extract regions of the specified color, generating a binary image. The color threshold is defined by `self.color_threshold[color]`. * Erosion and Dilation: Perform erosion and dilation operations on the binary image to clean and enhance the detected areas. * Contour Detection: Use `cv2.findContours` to identify contours in the image and calculate their areas. If the largest detected contour has an area greater than 2000, the target color is considered detected. * Result Publishing: Convert the processed binary image into a ROS image message and publish it to the `/color_detection/result_image` topic. ⑥ Main Function {lineno-start=106} ```python def main(): rclpy.init() node = ColorDetection('color_detection') rclpy.spin(node) node.destroy_node() rclpy.shutdown() ``` * `rclpy.init()`: Initialize ROS2 system * Create and launch the `ColorDetection `node. * `rclpy.spin(node)`: Enter the ROS2 event loop, and keep the node running. * camera `node.destroy_node()`: clear and terminate node resource. * `rclpy.shutdown()`: Close ROS2 system. ### 12.1.4 Pixel Coordinate Calculation * **Program Logic** When the robotic arm identifies a colored block on the live camera feed, it obtains the coordinates of the block's center point on the live camera feed. By using the coordinates of the center point, the corresponding pixel coordinates are calculated, facilitating subsequent object localization. * **Operation Steps** :::{Note} The input command should be case sensitive, and complement the keywords using **Tab** key. ::: (1) Start the robot arm, and access the robot system desktop using NoMachine according to the tutorials saved in [1. Getting Ready->1.6 Development Environment Setup and Configuration](1.Getting_Ready.md#development-environment-setup-and-configuration) (2)Double-click to open the command-line terminal. Execute the command, and hit Enter to disable the app auto-start service. ``` sudo systemctl stop start_app_node.service ``` (3) Execute the command below and hit Enter to run the game program. ``` ros2 launch example pixel_coordinate_calculation.launch.py ``` If you need to change the recognition color, you can change the color set at the end of the command to '**blue**' or '**green**'. (4) As shown in the picture below, the terminal ① prints the coordinates of the pixels (where 'x' represents the horizontal coordinate and 'y' represents the vertical coordinate). The window ② named "**RGB**" in the bottom-left corner displays the feedback image from the depth camera (an image transformed from RGB to BGR). The window ③ named "**color_detection**", displays the binarized black and white image generated by the "**color_detection**" function, highlighting the distinctive features of the subject. (5) If you need to terminate the running program, press short-cut '**Ctrl+C**'. If the program fails to run, please retry. (6) After using the previous function, it is necessary to restart the app service; otherwise, subsequent app operations may be affected. Execute the command below and press Enter to initiate the app service. ``` sudo systemctl start start_app_node.service ``` * **Launch File Analysis** Launch file is saved in this path: [/home/ubuntu/ros2_ws/src/example/example/opencv/pixel_coordinate_calculation.launch.py](../_static/source_code/opencv.zip) (1) launch_setup Function {lineno-start=9} ```python def launch_setup(context): compiled = os.environ['need_compile'] enable_display = LaunchConfiguration('enable_display', default='false') enable_display_arg = DeclareLaunchArgument('enable_display', default_value=enable_display) if compiled == 'True': example_package_path = get_package_share_directory('example') else: example_package_path = '/home/ubuntu/ros2_ws/src/example' color_recognition_launch = IncludeLaunchDescription( PythonLaunchDescriptionSource( os.path.join(example_package_path, 'example/opencv/color_recognition.launch.py')), launch_arguments={ 'enable_display': enable_display }.items() ) pixel_coordinate_calculation_node = Node( package='example', executable='pixel_coordinate_calculation', output='screen', ) return [ color_recognition_launch, pixel_coordinate_calculation_node, ] ``` ① `compiled = os.environ['need_compile']` retrieves the value of the need_compile environment variable to determine whether resources should be loaded from precompiled packages or source paths. ② `If compiled == 'True'`, resources are loaded from the precompiled package path; otherwise, they are loaded from the source path. ③ `enable_display = LaunchConfiguration('enable_display', default='false')` defines a dynamic launch parameter named enable_display to control whether the display function is enabled. The default value is false. ④ `enable_display_arg = DeclareLaunchArgument('enable_display', default_value=enable_display)` declares the enable_display launch argument. This allows users to specify the value of the parameter via the command line during launch. ⑤ Based on the compiled variable, the resource path is determined. If need_compile is True, the precompiled package path is used; otherwise, the source path is chosen. ⑥ example_package_path specifies the final package path using get_package_share_directory('example'), which retrieves the shared directory of the example package. ⑦ `color_detect_launch = IncludeLaunchDescription(...)` includes and executes another launch file, color_detection.launch.py, located in the example/opencv path. The enable_display parameter is passed to the included launch file. ⑧ `pixel_coordinate_calculation_node = Node(...)` creates a ROS2 node named pixel_coordinate_calculation under the example package. The node runs the executable pixel_coordinate_calculation, with output displayed in the terminal. (2) generate_launch_description Function {lineno-start=38} ```python def generate_launch_description(): return LaunchDescription([ OpaqueFunction(function = launch_setup) ]) ``` ① The generate_launch_description function creates and returns a `LaunchDescription ` object. The `LaunchDescription`is a core component of ROS2 launch files, describing all the actions to be executed during the launch process. ② `OpaqueFunction (Dynamic Launch Generation)`: `OpaqueFunction(function = launch_setup)`uses OpaqueFunction to call the launch_setup function, dynamically generating the launch content. OpaqueFunction is a special launch description type that allows you to execute a function within the launch file and dynamically generate launch items based on the function's output. (3) Main Function {lineno-start=43} ```python if __name__ == '__main__': # Create a LaunchDescription object. 创建一个LaunchDescription对象 ld = generate_launch_description() ls = LaunchService() ls.include_launch_description(ld) ls.run() ``` * **Source Code Analysis** The program file is saved in: [/home/ubuntu/ros2_ws/src/example/example/opencv/include/pixel_coordinate_calculation.py](../_static/source_code/opencv.zip) (1) Import Module {lineno-start=4} ```python import cv2 import rclpy from rclpy.node import Node import numpy as np from sensor_msgs.msg import Image from cv_bridge import CvBridge ``` ① `import cv2`: Import the OpenCV library for image processing. ② `import rclpy`: Import the ROS2 Python client library. ③ `from rclpy.node import Node`: Import the Node class from ROS2 to create and manage nodes. ④ `import numpy as np`: Import the NumPy library for array manipulation. ⑤ `from sensor_msgs.msg import Image`: Import the Image message type from ROS to receive image data. ⑥ `from cv_bridge import CvBridge`: Import CvBridge for converting between ROS image messages and OpenCV image formats. (2) PixelCoordinateCalculation Class ① Initialization Function: {lineno-start=12} ```python def __init__(self, name): rclpy.init() super().__init__(name, allow_undeclared_parameters=True, automatically_declare_parameters_from_overrides=True) self.bridge = CvBridge() # Used for the conversion between ROS Image messages and OpenCV images. 用于ROS Image消息与OpenCV图像之间的转换 # Subscribe to image topic. 订阅图像话题 self.image_sub = self.create_subscription(Image, '/color_detection/result_image', self.image_callback, 1) ``` ① `rclpy.init()`: Initialize the ROS2 client. ② `super().init(name, allow_undeclared_parameters=True, automatically_declare_parameters_from_overrides=True)`: Call the parent class constructor to initialize the node and allow undeclared parameters. ③ `self.bridge = CvBridge()`: Create a CvBridge instance for image format conversion. ④ `self.image_sub = self.create_subscription(Image, '/color_detection/result_image', self.image_callback, 1)`: Subscribe to the image topic /color_detection/result_image and specify the callback function image_callback with a queue size of 1. (3) image_callback Function: {lineno-start=19} ```python # Process ROS node data. 处理ROS节点数据 def image_callback(self, result_image): try: # Convert ROS Image message to OpenCV image. 将 ROS Image 消息转换为 OpenCV 图像 result_image = self.bridge.imgmsg_to_cv2(result_image, "mono8") # Convert the grayscale image to a BGR image. 将灰度图像转换为 BGR 图像 # result_image = cv2.cvtColor(bgr_image, cv2.COLOR_BGR2GRAY) if result_image is not None : # Calculate the detected contours. 计算识别到的轮廓 contours = cv2.findContours(result_image, cv2.RETR_EXTERNAL,cv2.CHAIN_APPROX_NONE)[-2] # Find all contours. 找出所有轮廓 if contours : # Find the largest contour. 找出最大轮廓 c = max(contours, key = cv2.contourArea) # Decide whether to proceed based on contour size. 根据轮廓大小判断是否进行下一步处理 rect = cv2.minAreaRect(c) # Get the minimum bounding rectangle. 获取最小外接矩形 corners = np.intp(cv2.boxPoints(rect)) # Get the four corner points of the minimum bounding rectangle. 获取最小外接矩形的四个角点 x, y = rect[0][0],rect[0][1] # Print pixel coordinates. 打印像素坐标 self.get_logger().info(f"像素坐标为:x: {x} y: {y}") else: self.get_logger().info("请将需要识别颜色的物体放入摄像头的识别范围") except Exception as e: print(e) ``` ① `def image_callback(self, result_image)`:This is the image callback function, which processes the received image messages. ② `result_image = self.bridge.imgmsg_to_cv2(result_image, "mono8")`:Convert the ROS image message to an OpenCV grayscale image (mono8 format). ③ `contours = cv2.findContours(result_image, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_NONE)[-2]`:Find the external contours in the image. ④ `if contours`:If contours are found, perform the following steps. ⑤ `c = max(contours, key=cv2.contourArea)`:Find the largest contour based on the contour area. ⑥ `rect = cv2.minAreaRect(c)`:Calculate the minimum bounding rectangle for the largest contour. ⑦ `corners = np.intp(cv2.boxPoints(rect))`:Get the four corner points of the rectangle. ⑧ `x, y = rect[0][0], rect[0][1]`:Get the coordinates (x, y) of the center point of the rectangle. ⑨ `self.get_logger().info(f"Pixel coordinates: x: {x} y: {y}")`:Log and display the pixel coordinates of the center point. ⑩ `else`:If no contours are found, output a message. ⑪ `self.get_logger().info("Please place the object to be recognized within the camera's detection range.")`:Output a prompt to ensure the object is within the camera's detection range. (4) Main Function {lineno-start=51} ```python def main(): node = PixelCoordinateCalculation('pixel_coordinate_calculation') rclpy.spin(node) camera_node.destroy_node() rclpy.shutdown() ``` ① `node = PixelCoordinateCalculation('pixel_coordinate_calculation')`:Create a `PixelCoordinateCalculation` node with the name . ② `rclpy.spin(node)`:Enter the ROS2 event loop and keep the node running. ③ camera_node.destroy_node():Destroy the `camera_node` to release its resources. ④ `rclpy.shutdown()`:Shut down the ROS2 client and exit the program. ### 12.1.5 Object Pose Calculation * **Program Logic** When a color block is identified, the coordinates of the block's center point in the live camera feed can be obtained. The image's pose is then calculated and printed in the terminal. This information can be utilized for subsequent servo control on the robotic arm, adjusting the gripping angle of the gripper. Pose represents both position and orientation. Any rigid body in the spatial coordinate system (OXYZ) can be precisely and uniquely represented by its positional and orientational states. Position indicates the object's coordinates in three-dimensional space, typically represented by three coordinate values (x, y, z). These coordinates can be relative to a reference point or coordinate system, or they can be relative to the position of other objects. Orientation represents the object's direction or orientation in three-dimensional space, typically described using rotation matrices, Euler angles, quaternions, or other representation methods. Orientation describes the rotational relationship of the object relative to a reference direction or coordinate system and can be used to represent the object's orientation, angles, or rotation. * **Operation Steps** :::{Note} The input command should be case sensitive, and keywords can be complemented using Tab key. ::: (1) Start the robot arm, and access the robot system desktop using NoMachine according to the tutorials saved in [1. Getting Ready->1.6 Development Environment Setup and Configuration](1.Getting_Ready.md#development-environment-setup-and-configuration) (2) Double-click to open the command-line terminal, and execute the command below, then hit Enter to terminate the app auto-start service. ``` sudo systemctl stop start_app_node.service ``` (3) Execute the command below and hit Enter to run the game program. ``` ros2 launch example object_attitude_calculation.launch.py ``` (4) As shown in the image below, the terminal in the top-left corner (①) prints the object's yaw angle, which corresponds to the gripping angle of the robotic claw. The window in the bottom-left corner (②), labeled "**RGB**", displays the depth camera's returned image (converted from RGB to BGR). The window in the bottom-right corner (③), labeled "**result_image**", shows the binary black-and-white image generated by the color_detection function. (5) If you need to terminate the program, please use short-cut 'Ctrl+C'. If the program fails to stop, please retry. (6) If you wish to revisit the app-based gameplay in the future, enter the corresponding command and press Enter to start the APP service. Wait for the robotic arm to return to its initial position—when you hear a beep sound, the system is ready. ``` sudo systemctl start start_app_node.service ``` * **Launch File Analysis** Launch file locates in [/home/ubuntu/ros2_ws/src/example/example/opencv/object_attitude_calculation.launch.py](../_static/source_code/opencv.zip) (1) launch_setup Function {lineno-start=9} ```python def launch_setup(context): compiled = os.environ['need_compile'] enable_display = LaunchConfiguration('enable_display', default='false') enable_display_arg = DeclareLaunchArgument('enable_display', default_value=enable_display) if compiled == 'True': example_package_path = get_package_share_directory('example') else: example_package_path = '/home/ubuntu/ros2_ws/src/example' color_recognition_launch = IncludeLaunchDescription( PythonLaunchDescriptionSource( os.path.join(example_package_path, 'example/opencv/color_recognition.launch.py')), launch_arguments={ 'enable_display': enable_display }.items() ) object_attitude_calculation_node = Node( package='example', executable='object_attitude_calculation', output='screen', ) return [ color_recognition_launch, object_attitude_calculation_node, ] ``` ① `compiled = os.environ['need_compile']`: Retrieve the value of the need_compile environment variable to determine whether to use a compiled version or source path for loading resources. ② `enable_display = LaunchConfiguration('enable_display', default='false')`: Declare a launch configuration parameter named enable_display with a default value of false. This parameter can be adjusted at runtime via command-line input (e.g., to control whether a graphical interface is shown). ③ `enable_display_arg = DeclareLaunchArgument('enable_display', default_value=enable_display)`: Declare the enable_display argument so it can be passed from the command line at launch time. ④ Deciding the Package Path Based on need_compile: Based on the value of the need_compile environment variable, determine the package path to use. `If need_compile is 'True'`, use `get_package_share_directory('example')` to fetch the ROS2 package's shared directory path. If it's not 'True', use the fixed path /home/ubuntu/ros2_ws/src/example. ⑤ `color_recognition_launch`: This section includes another launch file, color_recognition.launch.py, and passes the enable_display parameter to it using IncludeLaunchDescription. This allows the configuration to be dynamically applied to the included launch file. ⑥ `object_attitude_calculation_node`: This node starts the executable object_attitude_calculation from the example package and prints the output to the terminal. (2) generate_launch_description Function {lineno-start=9} ```python def generate_launch_description(): return LaunchDescription([ OpaqueFunction(function = launch_setup) ]) ``` ① `generate_launch_description`: This function returns a LaunchDescription object, which describes the contents of a ROS2 launch file. It defines all the operations that should be executed during the launch process. ② `OpaqueFunction(function = launch_setup)`: OpaqueFunction allows dynamic generation of content during the launch process. In this case, it is used to call the launch_setup function and pass the nodes and launch files generated by that function to the LaunchDescription. (3)Main Function {lineno-start=42} ```python if __name__ == '__main__': # 创建一个LaunchDescription对象 ld = generate_launch_description() ls = LaunchService() ls.include_launch_description(ld) ls.run() ``` ① `ld = generate_launch_description()`: Call the generate_launch_description() function to create a launch description, which defines the actions to be executed during the ROS2 launch. ② `ls = LaunchService()`: Create a LaunchService object, which manages the execution of the launch file and its associated nodes. ③ `ls.include_launch_description(ld)`: Include the generated launch description (ld) into the LaunchService, so it can be executed. ④ `ls.run()`: Run the LaunchService to start the nodes and load the necessary files as described in the launch description. * **Source Code** In the process of determining the object's pose, it is essential to extract the key features of the target color. Therefore, we utilize the pre-written library, "**color_detection_base**" to obtain the specific binary image of the target color object. This binary image is then used for subsequent color object localization. The source code is saved in [/home/ubuntu/ros2_ws/src/example/example/opencv/include/object_attitude_calculation.py](../_static/source_code/opencv.zip) The program flowchart is as below: From the above diagram, the main logic flow of the program involves processing the acquired image and then recognizing the colors within the image. (1) Import Feature Pack {lineno-start=4} ```python import cv2 import rclpy from rclpy.node import Node import numpy as np from sensor_msgs.msg import Image from cv_bridge import CvBridge ``` ① `cv2`: The OpenCV library used for image processing. ② `rclpy`: The ROS2 Python client library, which enables interaction with the ROS2 system. ③`numpy`: Used for array and matrix manipulation. ④`sensor_msgs.msg.Image`: The ROS2 image message type, used for subscribing to image topics. ⑤ `cv_bridge`: A ROS2 package used to convert between ROS image messages and OpenCV image formats. (2) ObjectAttitudeCalculation Class Constructor {lineno-start=11} ```python class ObjectAttitudeCalculation(Node): def __init__(self, name): rclpy.init() super().__init__(name, allow_undeclared_parameters=True, automatically_declare_parameters_from_overrides=True) self.bridge = CvBridge() # 用于ROS Image消息与OpenCV图像之间的转换 # 订阅图像话题 self.image_sub = self.create_subscription(Image, '/color_detection/result_image', self.image_callback, 1) ``` ① `rclpy.init()`: Initializes the ROS2 system. This step must be called at the beginning of the program. ② `super().__init__(name)`: Initializes the parent class (\`rclpy.node.Node\`) with the node name set as \`name\`. ③ `self.bridge = CvBridge()`: Instantiates a \`CvBridge\` object for converting between ROS image messages and OpenCV image formats. ④ `self.image_sub = self.create_subscription(...)`: Subscribes to the \`/color_detection/result_image\` topic to receive image data. The callback function is \`image_callback\`, with a queue size of 1. (3) image_callback Callback Function {lineno-start=19} ```python # Process ROS node data. 处理ROS节点数据 def image_callback(self, result_image): try: # Convert ROS Image message to OpenCV image. 将 ROS Image 消息转换为 OpenCV 图像 result_image = self.bridge.imgmsg_to_cv2(result_image, "mono8") # Convert grayscale image to BGR image. 将灰度图像转换为 BGR 图像 # result_image = cv2.cvtColor(bgr_image, cv2.COLOR_BGR2GRAY) if result_image is not None : # Calculate the detected contours. 计算识别到的轮廓 contours = cv2.findContours(result_image, cv2.RETR_EXTERNAL,cv2.CHAIN_APPROX_NONE)[-2] # Find all contours. 找出所有轮廓 if contours : # Find the largest contour. 找出最大轮廓 c = max(contours, key = cv2.contourArea) # Decide whether to proceed based on contour size. 根据轮廓大小判断是否进行下一步处理 rect = cv2.minAreaRect(c) # Get the minimum bounding rectangle. 获取最小外接矩形 yaw = int(round(rect[2])) # Rectangular Angle 矩形角度 # Print the posture of the object. 打印物体姿态 self.get_logger().info(f" 物体姿态:,yaw: {yaw}") else: self.get_logger().info("请将需要识别颜色的物体放入摄像头的识别范围") except Exception as e: print(e) ``` ① `self.bridge.imgmsg_to_cv2(result_image, "mono8")`: Converts the received ROS image message into an OpenCV image. \`"**mono8**"\` indicates a grayscale image. ② `cv2.findContours(...)`: Uses OpenCV to find contours in the image. \`RETR_EXTERNAL\` ensures that only external contours are detected, while \`CHAIN_APPROX_NONE\` prevents contour point compression. ③ `max(contours, key=cv2.contourArea)`: Selects the largest contour from the found contours, assuming the largest contour corresponds to the target object. ④ `cv2.minAreaRect(c)`: Computes the minimum enclosing rectangle for the largest contour, returning the center, dimensions (width and height), and the angle (rotation) relative to the horizontal axis. ⑤ `self.get_logger().info(...)`: Logs the object's attitude (rotation angle) using the ROS logging system. (4)main Function {lineno-start=50} ```python def main(): node = ObjectAttitudeCalculation('object_attitude_calculation') rclpy.spin(node) camera_node.destroy_node() rclpy.shutdown() ``` ① `node = ObjectAttitudeCalculation('object_attitude_calculation')`: Creates an instance of the \`ObjectAttitudeCalculation\` node, with the node name set as \`object_attitude_calculation\`. ② `rclpy.spin(node)`: Enters the ROS2 event loop, waiting to process the incoming messages. ③ `camera_node.destroy_node()`: There is a small error here. It should be \`node.destroy_node()\`, not \`camera_node.destroy_node()\`, because the node object is \`node\`. ④ `rclpy.shutdown()`: Shuts down the ROS2 system when the program ends. ### 12.1.6 Perspective Transformation * **Perspective Transformation Description** Perspective transformation refers to the process of projecting an image onto a new viewing plane using the conditions of collinearity among the perspective center, image points, and target points. The goal of perspective transformation is to convert objects that appear as lines in reality but may be skewed in an image into straight lines through the transformation process. * **Perspective Transformation Function** Perspective transformation is commonly employed in research areas such as vision navigation for mobile robots. In scenarios where the camera is inclined relative to the ground rather than directly facing downward, resulting in an oblique angle, there is a need to rectify the image into a form with a straight, orthogonal projection. This correction is achieved through the application of perspective transformation. * **Perspective Transformation Principle** The process of converting a three-dimensional object or entity from a spatial coordinate system into a two-dimensional image representation is known as projection transformation. Perspective transformation is an operation that modifies the size and shape of objects, resulting in a three-dimensional effect when applied to a planar figure. For instance, in the case of a rectangle, shear transformation only displaces two vertices on the same edge, both moving in the same direction, while keeping the other two vertices stationary. Conversely, perspective transformation may require moving all vertices of the rectangle, with the two vertices on the same edge moving in opposite directions. Simply put, perspective transformation involves transitioning from a two-dimensional image to a three-dimensional representation and then back to a two-dimensional image. Based on the schematic principles of perspective transformation, we can formulate the following equations: The coordinates (X, Y, Z) represent a point in the original image plane, and the corresponding transformed image plane coordinates are (x, y). Since we are dealing with a two-dimensional image, we can set Z=1. Dividing the transformed image coordinates by Z is a way to reduce the image from three dimensions to two dimensions. Thus, we can obtain the following equations: Usually, we set a33=1 (to simplify obtaining X', Y', ensuring that the denominator on the left side of equation 3 is 1). Expanding the above formula, we obtain the situation for a single point: Equation 3 involves a total of 8 unknowns. To solve for these unknowns, it is necessary to formulate eight sets of equations. This entails manually selecting four points on both the source image and the target image. On the source image, four coordinates are selected: A (x0, y0), (x1, y1), (x2, y2), (x3, y3). Simultaneously, four coordinates are chosen on the target image: B (X'0, Y'0), (X'1, Y'1), (X'2, Y'2), (X'3, Y'3). By substituting into Equation 3, we can obtain Equation 4 as follows: ### 12.1.7 Coordinate System Conversion * **Program Goal** When we identify a color block from the live camera feed, we can obtain the pixel coordinates of the color block's center on the live camera feed. It is necessary to convert the obtained pixel coordinates into the actual coordinates of the color block. * **Operation Steps** :::{Note} The input command should be case sensitive, and keywords can be complemented using Tab key. ::: (1) Start the robot arm, and access the robot system desktop using NoMachine according to the tutorials saved in [1. Getting Ready->1.6 Development Environment Setup and Configuration](1.Getting_Ready.md#development-environment-setup-and-configuration) (2) Double-click to open the command-line terminal, and execute the command below, then hit Enter to terminate the app auto-start service. ``` sudo systemctl stop start_app_node.service ``` (3) Execute the following command to run the game program ``` ros2 launch example coordinate_system_transformation.launch.py ``` (4) In the screen, the left side (①) shows the BGR image (the image converted from RGB to BGR), while the right side (②) displays the color detection image (the processed image after color recognition). The terminal outputs the pixel coordinates of the color block's center and the corresponding real-world coordinates after conversion. (5) If you want to terminate the running program, please press '**Ctrl+C**'. If the game cannot be stopped, please retry. (6) If you wish to revisit the app-based gameplay in the future, enter the corresponding command and press Enter to start the APP service. Wait for the robotic arm to return to its initial position—when you hear a beep sound, the system is ready. ``` sudo systemctl start start_app_node.service ``` * **Launch File Analysis** The source code of the launch file is saved in: [/home/ubuntu/ros2_ws/src/example/example/opencv/coordinate_system_transformation.launch.py](../_static/source_code/opencv.zip) (1) launch_setup Function {lineno-start=9} ```python def launch_setup(context): compiled = os.environ['need_compile'] enable_display = LaunchConfiguration('enable_display', default='false') enable_display_arg = DeclareLaunchArgument('enable_display', default_value=enable_display) if compiled == 'True': example_package_path = get_package_share_directory('example') else: example_package_path = '/home/ubuntu/ros2_ws/src/example' color_recognition_launch = IncludeLaunchDescription( PythonLaunchDescriptionSource( os.path.join(example_package_path, 'example/opencv/color_recognition.launch.py')), launch_arguments={ 'enable_display': enable_display }.items() ) coordinate_system_transformation_node = Node( package='example', executable='coordinate_system_transformation', output='screen', ) return [color_recognition_launch, coordinate_system_transformation_node, ] ``` ① `compiled = os.environ['need_compile']`: Retrieves the value of the environment variable `need_compile` to determine whether compilation is required. This dynamically decides the subsequent paths and configurations based on the environment. ② `enable_display = LaunchConfiguration('enable_display', default='false')`: Defines a launch configuration parameter `enable_display` with a default value of `false`. This parameter controls whether the display feature is enabled, commonly used to toggle the visualization interface. ③ `enable_display_arg = DeclareLaunchArgument('enable_display', default_value=enable_display)`: Declares the `enable_display` parameter, allowing its value to be specified at runtime via the command line. The user can pass \`true\` or \`false\` to control the enabling of the display feature. ④ `example_package_path = get_package_share_directory('example')`: Uses ROS2's `ament_index` to retrieve the shared directory path of the `example` package. If the \`need_compile\` environment variable is `True`, the ROS2 package path is used; otherwise, the local path \`/home/ubuntu/ros2_ws/src/example\` is used. ⑤ `color_recognition_launch`: Includes another launch file, `color_recognition.launch.py`, using `IncludeLaunchDescription`. This file is located in the `example/opencv` directory. The `launch_arguments` pass the `enable_display` parameter to it. ⑥ `coordinate_system_transformation_node`: Defines a ROS2 node named `coordinate_system_transformation` that belongs to the `example` package. The executable for this node is `coordinate_system_transformation`, and its output is directed to the screen (`output='screen'`). (2) generate_launch_description Function {lineno-start=35} ```python def generate_launch_description(): return LaunchDescription([ OpaqueFunction(function = launch_setup) ]) ``` This function returns a LaunchDescription object, which is the core component for defining all launch configurations. It calls the launch_setup function through an OpaqueFunction. (3)Main Function {lineno-start=40} ```python if __name__ == '__main__': # Create a LaunchDescription object. 创建一个LaunchDescription对象 ld = generate_launch_description() ls = LaunchService() ls.include_launch_description(ld) ls.run() ``` ① `ld = generate_launch_description()` : Create a LaunchDescription object by calling the generate_launch_description() function to obtain the description of the launch file. ② `ls = LaunchService() `: Create a LaunchService object, which is responsible for launching the ROS2 nodes. ③ `ls.include_launch_description(ld)` : Include the previously created LaunchDescription object into the LaunchService. ④ `ls.run() `: Run the launch service. After calling the run() method, ROS2 will begin the node startup process, executing all nodes and configurations defined in the LaunchDescription. * **Source Code Analysis** The source code is saved in [/home/ubuntu/ros2_ws/src/example/example/opencv/include/coordinate_system_transformation.py](../_static/source_code/opencv.zip) The program flow chart is as below: From the above diagram, it can be inferred that the main logical flow of the program involves processing images through color recognition. This process entails extracting the central coordinates of object pixels, converting these coordinates, and obtaining the actual position of the object, which is then printed. (1) Import Function Package Import the required module using import statement. {lineno-start=4} ```python import os import cv2 import yaml import time import rclpy import numpy as np from sdk import common from rclpy.node import Node from cv_bridge import CvBridge from sensor_msgs.msg import Image as RosImage, CameraInfo ``` ① `os` is provides operating system interaction functions ② `cv2` is employed for OpenCV image processing. ③ `yaml` is used for parsing and generating YAML configuration files, commonly used for reading or writing structured data. ④ `rclpy`: The ROS2 Python client library, which enables interaction with the ROS2 system. ⑤ `numpy` is used for array operations. ⑥ `sdk` is custom module providing common utilities, such as plane transformations and coordinate system conversions (imported from /sdk/common). ⑦ `rclpy.node` is the base class for ROS2 nodes; all ROS2 nodes must inherit from this. ⑧ `cv_bridge` is a bridge between ROS2 and OpenCV, used to convert ROS image messages to OpenCV image format. ⑨ `sensor_msgs.msg` is imports ROS image message type (`Image` aliased as `RosImage`) and camera info type (`CameraInfo`) for image subscription and publishing.. (2) Initiate coordinate_transformation Class (Coordinate Conversion Class) {lineno-start=17} ```python def __init__(self, name): rclpy.init() super().__init__(name, allow_undeclared_parameters=True, automatically_declare_parameters_from_overrides=True) self.bridge = CvBridge() # Used for the conversion between ROS Image messages and OpenCV images. 用于ROS Image消息与OpenCV图像之间的转换 self.K = None self.white_area_center = None self.camera_type = os.environ['CAMERA_TYPE'] self.config_file = 'transform.yaml' self.calibration_file = 'calibration.yaml' self.config_path = "/home/ubuntu/ros2_ws/src/app/config/" # Subscribe to image topic. 订阅图像话题 self.image_sub = self.create_subscription(RosImage, '/color_detection/result_image', self.image_callback, 1) self.camera_info_sub = self.create_subscription(CameraInfo, '/depth_cam/rgb/camera_info', self.camera_info_callback, 1) ``` ① `rclpy.init()`: Initializes the ROS2 client library. This line of code is called once in every ROS2 node. ② `super().__init__(name, ...)`: Initialize parent Node class with parameter handling options ③ `self.bridge = CvBridge()`: Initializes the CvBridge object, which allows conversion between ROS messages and OpenCV images. ④ `self.K = None`: Initializes the camera intrinsic matrix K. This will later be obtained from the CameraInfo message. ⑤`self.white_area_center = None`:Stores detected white region coordinates ⑥ `self.camera_type = os.environ['CAMERA_TYPE']`:Get camera model from environment variables ⑦ `self.config_file = 'transform.yaml'`:Transformation configuration file ⑧ `self.calibration_file = 'calibration.yaml'`:Camera calibration parameters file ⑨ `self.config_path = "..."`:Path to configuration files ⑩ `self.image_sub`: Subscribes to the` /color_detection/result_image` topic to receive the processed color recognition image ⑪ `self.camera_info_sub`: Subscribes to the `/depth_cam/depth/camera_info` topic to obtain the camera's intrinsic parameters (such as focal length and principal point). (3) camera_info_callback Function {lineno-start=48} ```python def camera_info_callback(self, msg): self.K = np.matrix(msg.k).reshape(1, -1, 3) ``` self.K = np.matrix(msg.k).reshape(1, -1, 3):When the camera information is received, the camera's intrinsic matrix (msg.k) is converted into a NumPy matrix and stored in self.K. This matrix is used to convert pixel coordinates into 3D coordinates in the camera coordinate system. (4) image_callback Function {lineno-start=51} ```python # Process ROS node data. 处理ROS节点数据 def image_callback(self, result_image): try: # Convert ROS Image message to OpenCV image. 将 ROS Image 消息转换为 OpenCV 图像 result_image = self.bridge.imgmsg_to_cv2(result_image, "mono8") if result_image is not None : # Calculate the detected contours. 计算识别到的轮廓 contours = cv2.findContours(result_image, cv2.RETR_EXTERNAL,cv2.CHAIN_APPROX_NONE)[-2] # Find all contours. 找出所有轮廓 if contours : # Find the largest contour. 找出最大轮廓 c = max(contours, key = cv2.contourArea) # Decide whether to proceed based on contour size. 根据轮廓大小判断是否进行下一步处理 rect = cv2.minAreaRect(c) # Get the minimum bounding rectangle. 获取最小外接矩形 corners = np.int0(cv2.boxPoints(rect)) # Get the four corner points of the minimum bounding rectangle. 获取最小外接矩形的四个角点 x, y = rect[0][0],rect[0][1] if self.camera_type == 'USB_CAM': x, y = distortion_inverse_map.undistorted_to_distorted_pixel(target[2][0], target[2][1], self.intrinsic, self.distortion) self.get_yaml() projection_matrix = np.row_stack((np.column_stack((self.extristric[1],self.extristric[0])), np.array([[0, 0, 0, 1]]))) world_pose = common.pixels_to_world([[x,y]], self.K, projection_matrix)[0] world_pose[0] = -world_pose[0] world_pose[1] = -world_pose[1] position = self.white_area_center[:3, 3] + world_pose position[2] = 0.03 config_data = common.get_yaml_data(os.path.join(self.config_path, self.calibration_file)) offset = tuple(config_data['pixel']['offset']) scale = tuple(config_data['pixel']['scale']) for i in range(3): position[i] = position[i] * scale[i] position[i] = position[i] + offset[i] # Print pixel coordinates and actual coordinates. 打印像素坐标、实际坐标 self.get_logger().info(f"像素坐标为: x: {x}, y: {y}") self.get_logger().info(f"实际坐标为: {position}") time.sleep(1) else: self.get_logger().info("未检测到所需识别的颜色,请将色块放置到相机视野内。") except Exception as e: print(e) ``` ① `result_image = self.bridge.imgmsg_to_cv2(result_image, "mono8")`:Convert ROS Image message to OpenCV image. ② `contours = cv2.findContours(result_image, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_NONE)[-2]`:Calculate the detected contours,find all contours. ③ ` c = max(contours, key = cv2.contourArea)`:Find the largest contour. ④ `rect = cv2.minAreaRect(c)`:Get the minimum bounding rectangle. ⑤ `corners = np.int0(cv2.boxPoints(rect))`:Get the four corner points of the minimum bounding rectangle. ⑥ ` x, y = rect[0][0], rect[0][1]`:Get center coordinates of the rectangle ⑦ `x, y = distortion_inverse_map.undistorted_to_distorted_pixel(`:Handle distortion for USB camera ⑧ `projection_matrix = np.row_stack((`:Create projection matrix. ⑨ ` world_pose = common.pixels_to_world([[x,y]], self.K, projection_matrix)[0]`:Convert pixel coordinates to world coordinates ⑩ `Calculate final position`:Invert X axis ⑪ `world_pose[1] = -world_pose[1]`:Invert Y axis ⑫ `position = self.white_area_center[:3, 3] + world_pose`:Calculate final position ⑬ `config_data = common.get_yaml_data(os.path.join(self.config_path, self.calibration_file))`: Load offset and scale from config (5) Main Function {lineno-start=97} ```python def main(): node = CoordinateTransformation('coordinate_transformation') rclpy.spin(node) camera_node.destroy_node() rclpy.shutdown() ``` ① `node = CoordinateTransformation('coordinate_transformation')`: Creates a CoordinateTransformation node with the specified name. ② `rclpy.spin(node)`: Puts the node into the ROS2 event loop, where it waits for messages to be received and processed. ③ `camera_node.destroy_node()`: Destroys the node (this occurs when the node is closed). ④ `rclpy.shutdown()`: Shuts down the ROS2 client. ### 12.1.8 Trajectory Planning * **Program Logic** When we want the robotic arm to grasp an object according to our requirements, we need to use the coordinates of the object. Based on these coordinates, the servos on the robotic arm rotate to specific angles to facilitate the grasping of the object. (1) Rotate the robotic arm's pan-tilt to the target direction. (2) Move the robotic arm to a position directly above the target. (3) Control the end effector on the robotic arm to approach the target object. * **Operation Steps** :::{Note} The input command should be case sensitive, and keywords can be complemented using Tab key. ::: (1) Start the robot arm, and access the robot system desktop using NoMachine according to the tutorials saved in [1. Getting Ready->1.6 Development Environment Setup and Configuration](1.Getting_Ready.md#development-environment-setup-and-configuration) (2) Double-click to open the command-line terminal, and execute the command below, then hit Enter to terminate the app auto-start service. ``` sudo systemctl stop jetarm_bringup.service ``` (3) Execute the command below to start the trajectory planning Demo. ``` ros2 launch example path_planning.launch.py ``` The pan-tilt of the robot arm will rotate to align the gripper with the target destination. Subsequently, the robot arm will move to the target location and finally return to its initial pose. (4) If you wish to revisit the app-based gameplay in the future, enter the corresponding command and press Enter to start the APP service. Wait for the robotic arm to return to its initial position—when you hear a beep sound, the system is ready. ``` sudo systemctl start jetarm_bringup.service ``` * **Change Robot Arm's Gripping Position** The source code is located in [/home/ubuntu/ros2_ws/src/example/example/opencv/include/path_panning.py](../_static/source_code/opencv.zip) If you need to change the gripping position of the robotic arm, you can achieve this by modifying the parameters within `self.movement_0` and `self.movement_1` functions. The parameters in the parentheses, from left to right, represent the x, y, and z coordinates of the target position in meters, as well as the angle of the gripper when grasping the object (ranging from -90° to 90°). Once we provide the gripping position for the robotic arm, it will calculate whether it can reach that position. If reachable, it will move to the target location; otherwise, the robotic arm will remain stationary. {lineno-start=85} ```python # Run the path planning program. 运行路径规划程序 self.get_logger().info("云台运动") self.move_0(0.2, -0.1, 0.05, 90) time.sleep(2) self.get_logger().info("动作1") self.move_1(0.2, -0.1, 0.05, 90) time.sleep(2) self.get_logger().info("动作2") self.move_1(0.2, -0.1, 0.005, 90) time.sleep(3) self.start = False # Stop the loop. 停止循环 self.get_logger().info("停止运动") ``` * **Launch File Analysis** The launch file is saved in: [/home/ubuntu/ros2_ws/src/example/example/opencv/path_planning.launch.py](../_static/source_code/opencv.zip) (1) launch_setup Function {lineno-start=9} ```python def launch_setup(context): compiled = os.environ['need_compile'] enable_display = LaunchConfiguration('enable_display', default='false') enable_display_arg = DeclareLaunchArgument('enable_display', default_value=enable_display) if compiled == 'True': sdk_package_path = get_package_share_directory('sdk') example_package_path = get_package_share_directory('example') else: sdk_package_path = '/home/ubuntu/ros2_ws/src/driver/sdk' example_package_path = '/home/ubuntu/ros2_ws/src/example' sdk_launch = IncludeLaunchDescription( PythonLaunchDescriptionSource( os.path.join(sdk_package_path, 'launch/jetarm_sdk.launch.py')), ) path_planning_node = Node( package='example', executable='path_planning', output='screen', ) return [ sdk_launch, path_planning_node, ] ``` ① Environment Variable Check: Use `os.environ['need_compile']` to retrieve the value of the need_compile environment variable and check if the system has been compiled. If the value is True, the installed ROS2 SDK and example package paths are used. If the value is False, the specified source code path is used. ② Declare Parameters: enable_display: This launch parameter controls whether the display functionality is enabled. Its default value is false. Defined using `LaunchConfiguration`, it can be modified at launch time based on requirements. ③ `DeclareLaunchArgument`: Declares a launch argument, allowing the user to specify its value at runtime when launching the system. ④ Include Other Launch Files: Use `IncludeLaunchDescription` to include the `jetarm_sdk.launch.py` file from the SDK package. This file starts functionalities related to the SDK. PythonLaunchDescriptionSource provides the file path for the Python source file located in the SDK package. ⑤ Launch Path Planning Node: Define and launch a node named path_planning from the example package. This node will output to the terminal (output='screen'). ⑥ Return a List of SDK Launch Files and Path Planning Node: Return a list that includes both the SDK launch files and the path planning node for subsequent launch operations. (2) generate_launch_description Function {lineno-start=36} ```python def generate_launch_description(): return LaunchDescription([ OpaqueFunction(function = launch_setup) ]) ``` ① This function returns a `LaunchDescription ` object, which contains the launch_setup function defined using `OpaqueFunction`. ② `OpaqueFunction`: This is a special function that allows the dynamic generation of launch file content at runtime. (3) Main Function {lineno-start=41} ```python if __name__ == '__main__': # Create a LaunchDescription object. 创建一个LaunchDescription对象 ld = generate_launch_description() ls = LaunchService() ls.include_launch_description(ld) ls.run() ``` This is the entry point of the program. It creates and executes a `LaunchDescription `object, which starts the ROS2 LaunchService. The `LaunchService` is responsible for running the content defined in the launch file, including executing the specified nodes and launch configurations. * **Source Code Analysis** The source code is saved in: [/home/ubuntu/ros2_ws/src/example/example/opencv/include/path_planning.py](../_static/source_code/opencv.zip) ① Start the servo control node and kinematics node. ② Input the specified coordinates into the inverse kinematics to determine if the target point is reachable. **If not reachable**: End the program. **If reachable**: Use the servo control node with the servo parameters obtained from the inverse kinematics solution to control the robotic arm to move to the specified coordinates, and then end the program. (1) Import Function Package {lineno-start=1} ```python import time import os import math import rclpy from rclpy.node import Node from rclpy.executors import MultiThreadedExecutor import numpy as np from math import radians from std_srvs.srv import Trigger from ros_robot_controller_msgs.msg import BuzzerState from servo_controller import bus_servo_control from servo_controller_msgs.msg import ServosPosition, ServoPosition from kinematics_msgs.srv import SetRobotPose, SetJointValue from kinematics.kinematics_control import set_pose_target ``` ① `time`: Provides time-related functionality, such as delaying execution (time.sleep()). Commonly used in robot control to set wait times between operations. ② `os`: Allows interaction with the operating system. ③ `math`: Provides common mathematical functions, such as trigonometric functions, logarithms, etc. ④ `rclpy`: The ROS2 Python client library, responsible for creating ROS nodes, communication, and connecting to the ROS2 framework. ⑤ `rclpy.node.Node`: Used to create ROS nodes, which are the basic unit of communication in ROS. ⑥ `rclpy.executors.MultiThreadedExecutor`: A multi-threaded executor that allows multiple threads to simultaneously handle multiple nodes, suitable for concurrent tasks. ⑦ `std_srvs.srv.Trigger`: A standard ROS service used to trigger actions or events, typically without inputs, only a trigger request. ⑧ `ros_robot_controller_msgs.msg.BuzzerState`: A custom message type used to control or monitor the state of a buzzer. ⑨ `servo_controller_msgs.msg.ServosPosition, ServoPosition`: Message types used to control the positions of robotic arm servos. ServosPosition represents the positions of multiple servos, while ServoPosition represents the state of a single servo. ⑩ `kinematics_msgs.srv.SetRobotPose, SetJointValue`: Services related to robot kinematics. ⑪ `SetRobotPose`: Used to set the overall position and orientation (pose) of the robot. ⑫ `SetJointValue`: Used to set the positions of individual joints in a robotic arm or other robot. ⑬ `servo_controller.bus_servo_control`: A custom module for controlling robotic arm servos, providing functionality for setting servo positions. ⑭ `kinematics.kinematics_control`: A custom module responsible for calculating robot kinematics. The set_pose_target() function is used to set the target position of the robot. (2) Initialization Function of the PathPlanning Class {lineno-start=18} ```python def __init__(self, name): rclpy.init() super().__init__(name) self.start = True self.servos_list = [500, 610, 70, 140, 500, 200] self.servos_pub = self.create_publisher(ServosPosition, 'servo_controller', 1)# Servo control 舵机控制 # Wait for the service to start. 等待服务启动 self.client = self.create_client(Trigger, '/controller_manager/init_finish') self.client.wait_for_service() self.client = self.create_client(Trigger, '/kinematics/init_finish') self.client.wait_for_service() self.kinematics_client = self.create_client(SetRobotPose, '/kinematics/set_pose_target') self.kinematics_client.wait_for_service() bus_servo_control.set_servo_position(self.servos_pub, 1.0, ((1, self.servos_list[0]), (2, self.servos_list[1]), (3, self.servos_list[2]), (4, self.servos_list[3]), (5, self.servos_list[4]), (10, self.servos_list[5]))) # 设置机械臂初始位置 self.run() ``` ① `rclpy.init()`: Initializes the ROS2 client library. This is a required step in every ROS2 node. ② `self.servos_list`: Stores the initial positions of the robot arm's servos. ③ `self.servos_pub`: Creates a publisher for sending servo control messages (ServosPosition). ④ `Service Clients for Controller and Kinematics`: Creates service clients related to the controller and kinematics. The wait_for_service() function ensures that the services are running before proceeding with further operations. ⑤ `Set Initial Servo Positions`: Uses bus_servo_control.set_servo_position() to set the initial positions of the servos based on the values in self.servos_list. (3) send_request Method {lineno-start=38} ```python def send_request(self, client, msg): future = client.call_async(msg) while rclpy.ok(): if future.done(): try: response = future.result() return response except Exception as e: self.get_logger().error('Service call failed: {}'.format(e)) return None rclpy.spin_once(self) # Allow nodes to handle callback functions. 允许节点处理回调函数 ``` This is a general method used to asynchronously call ROS services and wait for the service response. If the service call is successful, the response is returned; if it fails, an error message is displayed. (4) move_0 and move_1 Methods {lineno-start=51} ```python def move_0(self, x, y, z, pitch, t=1000): # Whether the robotic arm can be planned to move to the specified coordinates. 是否可以规划机械臂运动到设置的坐标 msg = set_pose_target([x, y, z], pitch, [-90.0, 90.0], 1.0) res = self.send_request(self.kinematics_client, msg) if res.pulse:# Reachable 可以达到 servo_data = res.pulse self.get_logger().info(f"舵机角度: {list(res.pulse)}") # Drive the servos 驱动舵机 bus_servo_control.set_servo_position(self.servos_pub, 1.0, ((1, servo_data[0]),)) time.sleep(1.2) else: self.start = False self.get_logger().info("无法运行到此位置") def move_1(self, x, y, z, pitch, t=1000): # Whether the robotic arm can be planned to move to the specified coordinates. 是否可以规划机械臂运动到设置的坐标 msg = set_pose_target([x, y, z], pitch, [-90.0, 90.0], 1.0) res = self.send_request(self.kinematics_client, msg) if res.pulse: # Reachable 可以达到 servo_data = res.pulse self.get_logger().info(f"舵机角度: {list(res.pulse)}") # Drive the servos 驱动舵机 bus_servo_control.set_servo_position(self.servos_pub, 1.0, ((1, servo_data[0]), (2, servo_data[1]), (3, servo_data[2]), (4, servo_data[3]), (5, servo_data[4]))) else: self.start = False self.get_logger().info("无法运行到此位置") ``` ① The `move_0` and `move_1` methods use set_pose_target() to set the target position, including coordinates and orientation angles. Then, the send_request function is called to send a request to the kinematics service to check if the robotic arm can reach the target position. ② If the target position is reachable, the servo angles are returned, and the servo positions are updated. If the target position cannot be reached, an error log is output, and further operations are stopped. (5) Run Method {lineno-start=83} ```python def run(self): while self.start: # Run the path planning program. 运行路径规划程序 self.get_logger().info("云台运动") self.move_0(0.2, -0.1, 0.05, 90) time.sleep(2) self.get_logger().info("动作1") self.move_1(0.2, -0.1, 0.05, 90) time.sleep(2) self.get_logger().info("动作2") self.move_1(0.2, -0.1, 0.005, 90) time.sleep(3) self.start = False # Stop the loop. 停止循环 self.get_logger().info("停止运动") ``` In the run method, the program sequentially executes multiple actions (move_0 and move_1) to simulate the robotic arm's movement path.A delay (`time.sleep()`) is added between each action to allow the robotic arm sufficient time to complete the current `movement.Finally`, `self.start` = False stops the loop, ending the path planning process.After all movements are completed, the servos return to their initial positions. (6) Main Function {lineno-start=102} ```py def main(): node = PathPlanning('path_planning') rclpy.spin(node) node.destroy_node() rclpy.shutdown() ``` ① In the main function, create and start the `PathPlanning `node, then enter the ROS 2 execution loop (`rclpy.spin()`). ② Once completed, destroy the node and shut down the ROS 2 client. ### 12.1.9 Position & Picking * **Program Goal** The robotic arm can calculate the center coordinates of a recognized object and use inverse kinematics to grasp the object. Here, we take the example of a red block. * **Operation Steps** :::{Note} The input command should be case sensitive and keywords can be complemented using Tab key. ::: ① Start the robot arm, and access the robot system desktop using NoMachine according to the tutorials saved in [1. Getting Ready->1.6 Development Environment Setup and Configuration](1.Getting_Ready.md#development-environment-setup-and-configuration) ② Double-click to open the command-line terminal, and execute the command below, then hit Enter to terminate the app auto-start service. ``` sudo systemctl stop start_app_node.service ``` ③ Run the command below to initiate the positioning and gripping demo. ``` ros2 launch example positioning_clamp.launch.py ``` It can be observed that the robotic arm will recognize and calculate the coordinates of the colored block, proceed to grasp the block, and the terminal will print both the pixel coordinates and the real-world coordinates of the block. By default, the system recognizes red color. If you wish to grasp other colors, refer to the following section [13.1.11 Change Default Recognizable Colors](#anchor_13_1_11) If color recognition is inaccurate, adjustments can be made by referring to [1. Getting Ready->1.5 Color Threshold Setting](1.Getting_Ready.md#color-threshold-setting) ④ If you wish to revisit the app-based gameplay in the future, enter the corresponding command and press Enter to start the APP service. Wait for the robotic arm to return to its initial position—when you hear a beep sound, the system is ready. ``` sudo systemctl start start_app_node.service ``` * **Launch File Analysis** The source code of the launch file is saved in [/home/ubuntu/ros2_ws/src/example/example/opencv/positioning_clamp.launch.py](../_static/source_code/opencv.zip) (1) launch_setup Function {lineno-start=9} ```python def launch_setup(context): compiled = os.environ['need_compile'] enable_display = LaunchConfiguration('enable_display', default='false') enable_display_arg = DeclareLaunchArgument('enable_display', default_value=enable_display) if compiled == 'True': example_package_path = get_package_share_directory('example') else: example_package_path = '/home/ubuntu/ros2_ws/src/example' color_recognition_launch = IncludeLaunchDescription( PythonLaunchDescriptionSource( os.path.join(example_package_path, 'example/opencv/color_recognition.launch.py')), launch_arguments={ 'enable_display': enable_display }.items() ) positioning_clamp_node = Node( package='example', executable='positioning_clamp', output='screen', ) return [ color_recognition_launch, positioning_clamp_node, ] ``` ① `compiled` Parameter: Determines whether to use the installed package path or the source code path. ② `enable_display` Parameter: A dynamic configuration option to control whether the display function is enabled. ③ `DeclareLaunchArgument`: Declares the parameter so it can be passed from the command line. ④ If compiled, the `get_package_share_directory` function is used to retrieve the installation path. ⑤ If not compiled, the source code path is used instead. ⑥ Loads the `color_recognition.launch.py` sub-launch file and dynamically passes the enable_display parameter. ⑦ Loads the `jetarm_sdk.launch.py` sub-launch file without passing additional parameters. ⑧ Launches a node named `positioning_clamp` from the example package, with output displayed in the terminal. ⑨ Returns a list containing all elements to be launched. (2) generate_launch_description Function {lineno-start=38} ```python def generate_launch_description(): return LaunchDescription([ OpaqueFunction(function = launch_setup) ]) ``` ① Invoke `OpaqueFunction`: Delays the execution of the launch_setup function until runtime. ② `generate_launch_description`: The entry point for a ROS2 launch file, returning a `LaunchDescription` object. (3) Main Function {lineno-start=43} ```python if __name__ == '__main__': # Create a LaunchDescription object. 创建一个LaunchDescription对象 ld = generate_launch_description() ls = LaunchService() ls.include_launch_description(ld) ls.run() ``` ① Generate Launch Description: Calls generate_launch_description to create the launch description. ② Create `LaunchService` Object: Initializes a LaunchService instance and loads the launch description. ③ Execute the Launch File: Invokes the `run()` method to execute the entire launch file. * **Source Code Analysis** The source code locates in [/home/ubuntu/ros2_ws/src/example/example/opencv/include/positioning_clamp.py](../_static/source_code/opencv.zip) From the above diagram, the program's logical flow primarily involves processing images through color recognition, extracting the pixel center coordinates of objects, converting these coordinates to obtain the actual object coordinates, and then sending the actual coordinates to the servo control function for grasping. (1) Import Feature Pack Import the the required module using import statement. {lineno-start=4} ```python import os import cv2 import yaml import time import math import rclpy import threading import numpy as np from sdk import common from math import radians from rclpy.node import Node from cv_bridge import CvBridge from std_srvs.srv import Trigger from servo_controller import bus_servo_control from rclpy.executors import MultiThreadedExecutor from ros_robot_controller_msgs.msg import BuzzerState from sensor_msgs.msg import Image as RosImage, CameraInfo from kinematics.kinematics_control import set_pose_target from kinematics_msgs.srv import SetRobotPose, SetJointValue from servo_controller_msgs.msg import ServosPosition, ServoPosition ``` ① `os`: Provides functions for file path operations and system-level tasks. ② `cv2`: The OpenCV library for image processing and computer vision tasks. ③ `yaml`: Parses configuration files in YAML format. ④ `time`: Offers time management and delay functionalities. ⑤ `math`: Provides mathematical functions for calculations. ⑥ `rclpy`: The Python client library for ROS2, used to create ROS2 nodes. ⑦ `threading`: Supports multi-threaded operations. ⑧ `numpy`: A numerical computation library for arrays and matrix calculations. ⑨ `sdk.common`: A custom utility library. ⑩ `radians`: Converts angles from degrees to radians. ⑪ `Node`: The base class for ROS2 nodes, used to develop ROS2 applications. ⑫ `CvBridge`: Facilitates conversions between ROS Image messages and OpenCV images. ⑬ `Trigger`: A standard ROS2 service type used to trigger specific operations. ⑭ `bus_servo_control`: A utility library for controlling servo motors. ⑮ `MultiThreadedExecutor`: A ROS2 multithreaded executor for executing multiple tasks concurrently. ⑯ `BuzzerState`: A ROS message type representing the state of a buzzer. ⑰ `RosImage`: A ROS image message type that represents images captured by a camera. ⑱ `CameraInfo`: A ROS message type containing camera calibration and intrinsic parameters. ⑲ `set_pose_target`: A control function for setting the target pose of a robotic arm. ⑳ `SetRobotPose` and `SetJointValue`: ROS service types for setting the pose and joint angles of a robotic arm, respectively. ㉑ `ServosPosition` and `ServoPosition`: ROS message types for publishing the target positions and states of servos. (2) Initiate positioning_clamp Class {lineno-start=27} ```python def __init__(self, name): rclpy.init() super().__init__(name, allow_undeclared_parameters=True, automatically_declare_parameters_from_overrides=True) ``` ① Inheritance: The `PositioningClamp` class inherits from the ROS2 Node class. ② ROS2 Initialization: The `rclpy.init()` function initializes ROS2 communication. ③ Dynamic Parameter Declaration: The settings `allow_undeclared_parameters` and `automatically_declare_parameters_from_overrides` enable dynamic declaration and configuration of parameters. (3) Key Member Variables {lineno-start=29} ```python super().__init__(name, allow_undeclared_parameters=True, automatically_declare_parameters_from_overrides=True) self.bridge = CvBridge() # Used for the conversion between ROS Image messages and OpenCV images. 用于ROS Image消息与OpenCV图像之间的转换 self.K = None self.count = 0 self.pick_pitch = 80 self.result_image = None ``` ① `CvBridge`: Converts between ROS image messages and OpenCV image formats. ② `self.K`: The camera intrinsic matrix. ③ `self.count`: : A counter used to evaluate target stability. ④ `self.pick_pitch`: Sets the pitch angle of the robotic arm during object grasping. ⑤ `self.result_image`: Stores the subscribed visual processing results. (4) Publish and Subscribe {lineno-start=43} ```python self.servos_pub = self.create_publisher(ServosPosition, 'servo_controller', 1)# Servo control 舵机控制 # Subscribe to image topic. 订阅图像话题 self.image_sub = self.create_subscription(RosImage, '/color_detection/result_image', self.image_callback, 1) self.camera_info_sub = self.create_subscription(CameraInfo, '/depth_cam/rgb/camera_info', self.camera_info_callback, 1) ``` ① Publisher: Control the target position of the robot arm servo. ② Subscriber: `/color_detection/result_image`: vision processing result `/depth_cam/depth/camera_info`: Intrinsic parameter of the depth camera (5) Wait for Service {lineno-start=50} ```python # Wait for the service to start. 等待服务启动 self.client = self.create_client(Trigger, '/controller_manager/init_finish') self.client.wait_for_service() self.client = self.create_client(Trigger, '/kinematics/init_finish') self.client.wait_for_service() self.kinematics_client = self.create_client(SetRobotPose, '/kinematics/set_pose_target') self.kinematics_client.wait_for_service() ``` Trigger Services: Wait for the initialization of the controller and kinematics services to complete before proceeding. (6) Load Configuration File {lineno-start=61} ```python with open(self.config_path + self.config_file, 'r') as f: config = yaml.safe_load(f) # Convert to numpy array. 转换为 numpy 数组 extristric = np.array(config['extristric']) corners = np.array(config['corners']).reshape(-1, 3) self.white_area_center = np.array(config['white_area_pose_world']) tvec = extristric[:1] # Take the first row. 取第一行 rmat = extristric[1:] # Take the last three rows. 取后面三行 tvec, rmat = common.extristric_plane_shift(np.array(tvec).reshape((3, 1)), np.array(rmat), 0.03) self.extristric = tvec, rmat ``` ① Configuration File Path: Specifies the path for reading pose transformation and other configuration data. ② Load YAML File: Reads the extrinsic matrix (`extrinsic`) and the world coordinates of the white area center (`white_area_pose_world`). ③ `tvec`: Translation vector. ④ `rmat`: Rotation matrix. ⑤ Adjust Plane Height: Aligns the coordinate system with the robotic arm's workspace plane by calling `common.extristric_plane_shift`. (7) camera_info_callback {lineno-start=75} ```python def camera_info_callback(self, msg): self.K = np.matrix(msg.k).reshape(1, -1, 3) ``` Process Visual Data: Convert ROS image messages into OpenCV format as a single-channel grayscale image. (8) start_sortting Function {lineno-start=91} ```python def start_sorting(self, pose_t, pose_R): self.get_logger().info("开始搬运堆叠...") msg = set_pose_target(pose_t, self.pick_pitch, [-90.0, 90.0], 1.0) res = self.send_request(self.kinematics_client, msg) if res.pulse : # Reachable 可以达到 servo_data = res.pulse self.get_logger().info(f"舵机角度: {list(res.pulse)}") # Drive the servos 驱动舵机 bus_servo_control.set_servo_position(self.servos_pub, 1.0, ((1, servo_data[0]), (2, servo_data[1]), (3, servo_data[2]), (4, servo_data[3]), (5, servo_data[4]))) time.sleep(2) bus_servo_control.set_servo_position(self.servos_pub, 1.0, ((10, 550),)) time.sleep(2) bus_servo_control.set_servo_position(self.servos_pub, 1.0, ((1, 500), (2, 520), (3, 210), (4, 50), (5, 550) )) # Set the initial position of the robotic arm. 设置机械臂初始位置 time.sleep(2) else: self.start = False self.get_logger().info("无法运行到此位置") ``` ① Set Target Pose: Call the kinematics service to obtain the servo angles. ② Servo Control: Use `bus_servo_control.set_servo_position` to send commands that control the servos to move to the target positions. (9) Run Function {lineno-start=114} ```python def run(self): while True: try: if self.result_image is not None : # Calculate the detected contours. 计算识别到的轮廓 contours = cv2.findContours(self.result_image, cv2.RETR_EXTERNAL,cv2.CHAIN_APPROX_NONE)[-2] # Find all contours. 找出所有轮廓 if contours : # Find the largest contour. 找出最大轮廓 c = max(contours, key = cv2.contourArea) # Decide whether to proceed based on contour size. 根据轮廓大小判断是否进行下一步处理 rect = cv2.minAreaRect(c) # Get the minimum bounding rectangle. 获取最小外接矩形 corners = np.intp(cv2.boxPoints(rect)) # Get the four corner points of the minimum bounding rectangle. 获取最小外接矩形的四个角点 x, y, yaw = rect[0][0],rect[0][1],rect[2] if self.camera_type == 'USB_CAM': x, y = distortion_inverse_map.undistorted_to_distorted_pixel(target[2][0], target[2][1], self.intrinsic, self.distortion) self.get_yaml() projection_matrix = np.row_stack((np.column_stack((self.extristric[1],self.extristric[0])), np.array([[0, 0, 0, 1]]))) world_pose = common.pixels_to_world([[x,y]], self.K, projection_matrix)[0] world_pose[0] = -world_pose[0] world_pose[1] = -world_pose[1] position = self.white_area_center[:3, 3] + world_pose world_pose[2] = 0.03 config_data = common.get_yaml_data(os.path.join(self.config_path, self.calibration_file)) offset = tuple(config_data['pixel']['offset']) scale = tuple(config_data['pixel']['scale']) for i in range(3): position[i] = position[i] * scale[i] position[i] = position[i] + offset[i] position[2] = 0.015 ``` ① Use OpenCV methods to find contours in the image. ② Extract the contour with the largest area, assumed to be the target object. ③ Calculate the minimum bounding rectangle through the contour to obtain the object's pixel position and angle. ④ Determine the camera type as USB_CAM and perform correction on the detected target. ⑤ Using the camera's intrinsic and extrinsic parameters, convert pixel coordinates to world coordinates. ⑥ Apply pixel offset and scaling; `common.get_yaml_data`: Load another YAML configuration file to obtain pixel offset and scaling parameters. {lineno-start=145} ```python # If previous_pose is None, it means this is the first calculation. 如果previous_pose为None,说明是第一次计算 if self.previous_pose is None: self.previous_pose = position continue # Calculate the difference between the current position and the previous position. 计算当前位置与上次位置的差异 position_difference = np.linalg.norm(np.array(position) - np.array(self.previous_pose)) # Check whether the position change is small enough. 判断位置变化是否足够小 if position_difference < 0.01: # The threshold can be adjusted as needed. 可以根据需要调整阈值 self.count += 1 # If the position change is small, increment the counter. 如果位置变化小,计数器+1 else: self.count = 0 # If the position change is large, reset the counter. 如果位置变化较大,计数器重置 self.previous_pose = position if self.count >= 60: # If the counter reaches the threshold, consider the position stable. 如果计数器达到阈值,则认为位置已稳定 config_data = common.get_yaml_data(os.path.join(self.config_path, self.calibration_file)) offset = tuple(config_data['kinematics']['offset']) scale = tuple(config_data['kinematics']['scale']) for i in range(3): position[i] = position[i] * scale[i] position[i] = position[i] + offset[i] # Print pixel coordinates and real-world coordinates. 打印像素坐标和实际坐标 self.get_logger().info(f"像素坐标为: x: {x}, y: {y}") self.get_logger().info(f"实际坐标为: {position}") self.start_sorting(position,yaw) self.count = 0 break ``` ⑦ Calculate the positional difference between the current object location and the last detected position. ⑧ If the change is smaller than a specified threshold, increment the counter; otherwise, reset the counter. ⑨ When the detected position remains stable for a specified number of consecutive counts: - Apply pixel offset and scaling adjustments to the target position - Invoke the robotic arm control function `start_sorting()` to execute the grasping operation ⑩ If no valid contour is detected, prompt the user to place the object within the camera's field of view. (10) Main Function {lineno-start=180} ```python def main(): node = PositioningClamp('positioning_clamp') executor = MultiThreadedExecutor() executor.add_node(node) executor.spin() node.destroy_node() ``` Create a node and use a multi-threaded executor to handle tasks. ### 12.1.10 Color Sorting The robotic arm performs gripping based on the position of the color block, and situates the block in different locations according to its color. * **Program Logic** To begin, subscribe to the topic messages published by the color recognition node to acquire information about the identified colors. Next, upon matching the target color, retrieve the central position of the target image. Finally, employ inverse kinematics to calculate the angles necessary to align the center of the screen with the center of the target image. Publish the corresponding topic message, control servo rotation, and ensure the robotic arm follows the movement of the target. The source code for this program is located at: [/home/ubuntu/ros2_ws/src/example/example/opencv/include/color_sorting_node.py](../_static/source_code/opencv.zip) * **Operation Steps** :::{Note} The input command should be case sensitive and keywords can be complemented using Tab key. ::: (1) Place the robot on the designated position on the map and calibrate the position of the robot arm according to the tutorials saved in[1. Getting Ready->1.8 Position Adjustment for Object Gripping and Placing](1.Getting_Ready.md#position-adjustment-for-object-gripping-and-placing) (2) Start the robot arm, and access the robot system desktop using NoMachine according to the tutorials saved in[1. Getting Ready->1.6 Development Environment Setup and Configuration](1.Getting_Ready.md#development-environment-setup-and-configuration) (3) Double-click to open the command-line terminal. (4) Run the following command to terminate the auto-start service. ``` sudo systemctl stop start_app_node.service ``` (5) Enter the following command: ``` ros2 launch example color_sorting_node.launch.py ``` (6) After the game starts, you will see the following prompt in the terminal, and the camera feed will automatically pop up. (7) To close this mode, press 'Ctrl+C' in the terminal. If the closure fails, please try again. (8) If you wish to experience the mobile app features again, enter the command and press Enter to start the app service. Wait for the robotic arm to return to its initial position, and you will hear a 'beep' from the buzzer. ``` sudo systemctl start start_app_node.service ``` * **Program Outcome** Once the game is initiated, place red, green, and blue blocks within the recognition area of the robot arm. When the robot arm identifies the target color block, it will grip it and place it in the corresponding location in the sorting area. If color recognition is inaccurate, refer to [1. Getting Ready->1.5 Color Threshold Setting](1.Getting_Ready.md#color-threshold-setting) for adjustments. * **Launch File Analysis** The Launch file is stored in: [/home/ubuntu/ros2_ws/src/example/example/opencv/color_sorting_node.launch.py](../_static/source_code/opencv.zip) (1) launch_setup Function {lineno-start=9} ```python def launch_setup(context): compiled = os.environ['need_compile'] start = LaunchConfiguration('start', default='true') start_arg = DeclareLaunchArgument('start', default_value=start) display = LaunchConfiguration('display', default='true') display_arg = DeclareLaunchArgument('display', default_value=display) broadcast = LaunchConfiguration('broadcast', default='false') broadcast_arg = DeclareLaunchArgument('broadcast', default_value=broadcast) if compiled == 'True': sdk_package_path = get_package_share_directory('sdk') peripherals_package_path = get_package_share_directory('peripherals') else: sdk_package_path = '/home/ubuntu/ros2_ws/src/driver/sdk' peripherals_package_path = '/home/ubuntu/ros2_ws/src/peripherals' depth_camera_launch = IncludeLaunchDescription( PythonLaunchDescriptionSource( os.path.join(peripherals_package_path, 'launch/depth_camera.launch.py')), ) sdk_launch = IncludeLaunchDescription( PythonLaunchDescriptionSource( os.path.join(sdk_package_path, 'launch/jetarm_sdk.launch.py')), ) color_sorting_node = Node( package='app', executable='object_sorting', output='screen', parameters=[{ 'start': start, 'display': display, 'broadcast': broadcast}] ) return [start_arg, display_arg, broadcast_arg, sdk_launch, depth_camera_launch, color_sorting_node, ] ``` ① Environment Variable Check: Read `os.environ['need_compile']` to determine if the path should be loaded from the compiled package. ② Dynamic Parameter Declaration: Declare launch parameters start, debug, and broadcast, which control startup, debugging mode, and broadcasting functionality, respectively. ③ Dynamic Path Selection: If the environment variable `need_compile` is `True`, use the system installation path; otherwise, use the workspace source path. ④ Parameter Passing: Pass parameters to child launch files using `launch_arguments`. ⑤ Node Configuration: Set the package name to example, executable file to `color_sorting`, and output to the screen. ⑥ Parameter Files: Load two YAML configuration files and pass dynamic parameters. ⑦ Return Launch Items: Return all declared parameters, included launch files, and defined nodes. (2) generate_launch_description Function {lineno-start=50} ```python def generate_launch_description(): return LaunchDescription([ OpaqueFunction(function = launch_setup) ]) ``` Generate the launch description by encapsulating the launch items returned by launch_setup into a LaunchDescription. (3) Main Function {lineno-start=55} ```python if __name__ == '__main__': # Create a LaunchDescription object. 创建一个LaunchDescription对象 ld = generate_launch_description() ls = LaunchService() ls.include_launch_description(ld) ls.run() ``` ① Call `generate_launch_description` to generate the description object. ② Create an instance of `LaunchService`. ③ Pass the description object to the service. ④ Start the service to execute the entire configuration. * **Source Code Analysis** The source code is saved in [/home/ubuntu/ros2_ws/src/example/example/opencv/include/color_sorting_node.py](../_static/source_code/app.zip) The program flowchart is as pictured. From the diagram above, the logical flow of the program primarily involves image processing using color recognition. After identifying the target object, the program calls the servo control function based on the coordinates of the target object to initiate gripping. Once gripped, the object is then placed in the designated area. After completing the placement, the robotic arm returns to its initial pose, initiating a new round of detection. (1) Import Feature Pack Import the required the minodule using import statement. {lineno-start=4} ```python import os import cv2 import yaml import time import math import copy import queue import threading import numpy as np import rclpy from rclpy.node import Node from cv_bridge import CvBridge from std_srvs.srv import Trigger, SetBool from sensor_msgs.msg import Image, CameraInfo from rclpy.executors import MultiThreadedExecutor from rclpy.callback_groups import ReentrantCallbackGroup from sdk import common, fps from app.common import Heart from dt_apriltags import Detector from interfaces.srv import SetStringBool from kinematics_msgs.srv import SetRobotPose, SetJointValue from servo_controller_msgs.msg import ServosPosition from servo_controller.bus_servo_control import set_servo_position from kinematics.kinematics_control import set_pose_target, set_joint_value_target from app.utils import calculate_grasp_yaw, position_change_detect, pick_and_place, image_process, distortion_inverse_map ``` ① `os`: Handles system paths, environment variables, etc. ② `cv2`: Used for image processing and display with OpenCV. ③ `yaml`: Parses and loads YAML configuration files. ④ `math`: Provides support for mathematical operations, such as trigonometric functions. ⑤ `time`: Controls execution delays and time management. ⑥ `queue`: Manages image queues between threads. ⑦ `rclpy`: Used for creating ROS2 nodes and communication. ⑧ `signal`: Captures terminal signals (e.g., SIGINT) for safe program exit. ⑨ `threading`: Implements multithreading for parallel task processing. ⑩ `numpy (np)`: Handles matrix and vector operations. ⑪ `sdk.common`: Provides common utility functions for coordinate transformation and configuration. ⑫ `std_srvs.srv.Trigger`: Defines request and response types for triggering services. ⑬ `kinematics_msgs.srv.SetRobotPose`: Defines service types for setting robot arm target positions. ⑭ `sensor_msgs.msg.Image`: Receives image messages from the camera. ⑮ `sensor_msgs.msg.CameraInfo`: Receives camera intrinsic parameter messages. ⑯ `interfaces.msg.ColorsInfo`: Custom message type for receiving color information. ⑰ `interfaces.msg.ColorDetect`: Defines message type for color detection parameters. ⑱ `servo_controller_msgs.msg.ServosPosition`: Custom message type for controlling servo motors. ⑲ `servo_controller.bus_servo_control.set_servo_position` : Function for setting servo angles directly. ⑳ `rclpy.callback_groups.ReentrantCallbackGroup`: Supports concurrent callback execution in ROS 2. ㉑`kinematics.kinematics_control.set_pose_target`: Utility function for setting robot arm target positions. ㉒ `interfaces.srv.SetColorDetectParam`: Defines service type for setting color detection parameters. ㉓`dt_apriltags.Detector` – AprilTag detector module. ㉔ `interfaces.srv.SetCircleROI`: Defines service type for setting circular detection region. ㉕ `servo_controller.bus_servo_control.set_servo_position`: Utility function for directly controlling servo motor positions. (2) ObjectSortingNode Class ① place_position Dictionary The place_position dictionary maps different object colors and tag names to their corresponding target positions in 3D space. * Keys: Represent the names of colors (e.g., "**red**", "**green**", "**blue**") or tag identifiers (e.g., "**tag_1**"). * Values: Define the target position for placement as a tuple of 3D coordinates (x, y, z). This mapping allows the system to determine where each object should be placed based on its detected color or tag. ② Initialization Function This function initializes the ROS2 node: {lineno-start=40} ```python def __init__(self, name): rclpy.init() super().__init__(name, allow_undeclared_parameters=True, automatically_declare_parameters_from_overrides=True) ``` * `rclpy.init()`: Initializes the ROS2 client library, enabling the program to communicate with the ROS2 system. * `super().__init__(name, ...)`: Calls the constructor of the parent class to initialize the ROS2 node with the given name. It also enables advanced parameter settings:l Allows undeclared parameters.l Automatically declares parameters provided through overrides. (3) Initialization of Member Variables: {lineno-start=43} ```python proto_path = '/home/ubuntu/ros2_ws/src/app/app/hed_model/deploy.prototxt' model_path = '/home/ubuntu/ros2_ws/src/app/app/hed_model/hed_pretrained_bsds.caffemodel' self.image_process = image_process.GetObjectSurface(proto_path, model_path) self.at_detector = Detector(searchpath=['apriltags'], families='tag36h11', nthreads=4, quad_decimate=1.0, quad_sigma=0.0, refine_edges=1, decode_sharpening=0.25, debug=0) self.lock = threading.RLock() self.fps = fps.FPS() # frame rate counter 帧率统计器 self.bridge = CvBridge() # Used for the conversion between ROS Image messages and OpenCV images. 用于ROS Image消息与OpenCV图像之间的转换 self.image_queue = queue.Queue(maxsize=2) self.config_file = 'transform.yaml' self.calibration_file = 'calibration.yaml' self.config_path = "/home/ubuntu/ros2_ws/src/app/config/" self.data = common.get_yaml_data(os.path.join(self.config_path, "lab_config.yaml")) self.lab_data = self.data['/**']['ros__parameters'] self.camera_type = os.environ['CAMERA_TYPE'] self.tag_size = 0.025 self.min_area = 500 self.max_area = 7000 self.target_labels = { "red": False, "green": False, "blue": False, "tag1": False, "tag2": False, "tag3": False, } self.running = True ``` ①`proto_path`, `model_path`: Paths to the deep learning model files. ② `self.image_process`: Image processing object used to extract the target surface. ③ `self.at_detector`: Instance of the AprilTag detector. ④ `self.lock`: Thread lock used to protect shared resources. ⑤ `self.fps`: FPS (frames per second) tracker. ⑥ `self.bridge: CvBridge` instance for converting between ROS and OpenCV image formats. ⑦ `self.image_queue`: Image queue used to store images awaiting processing. ⑧ `self.config_file`, `self.calibration_file`: Paths to the configuration and calibration files. ⑨ `self.config_path`: Directory path where configuration files are stored. ⑩ `self.data`, `self.lab_data`: Configuration data loaded from YAML files. ⑪ `self.camera_type`: Type of camera used, retrieved from environment variables. ⑫ `self.tag_size`: Physical size of the AprilTag. ⑬ `self.min_area`, `self.max_area`: Minimum and maximum contour areas for target detection. ⑭ `self.target_labels`: Dictionary of target labels and their status. ⑮ `self.running`: Boolean flag indicating whether the thread is running. (4) ROS Communication Setup: {lineno-start=80} ```python # sub self.joints_pub = self.create_publisher(ServosPosition, 'servo_controller', 1) self.result_publisher = self.create_publisher(Image, '~/image_result', 1) self.timer_cb_group = ReentrantCallbackGroup() # services and topics self.enter_srv = self.create_service(Trigger, '~/enter', self.enter_srv_callback) self.exit_srv = self.create_service(Trigger, '~/exit', self.exit_srv_callback) self.enable_sorting_srv = self.create_service(SetBool, '~/enable_sorting', self.enable_sorting_srv_callback) self.set_target_srv = self.create_service(SetStringBool, '~/set_target', self.set_target_srv_callback) self.kinematics_client = self.create_client(SetRobotPose, 'kinematics/set_pose_target') self.kinematics_client.wait_for_service() self.set_joint_value_target_client = self.create_client(SetJointValue, 'kinematics/set_joint_value_target', callback_group=self.timer_cb_group) self.set_joint_value_target_client.wait_for_service() self.timer = self.create_timer(0.0, self.init_process, callback_group=self.timer_cb_group) ``` ① `self.joints_pub`: Publishes servo position messages. ② `self.result_publisher`: Publishes the processed image. ③ `self.timer_cb_group`: Callback group that allows concurrent execution of multiple callbacks. ④ `self.enter_srv`, `self.exit_srv`: Service callbacks for entering and exiting the task. ⑤ `self.enable_sorting_srv`: Service callback to enable the sorting functionality. ⑥ `self.set_target_srv`: Service callback to set the target object. ⑦ `self.kinematics_client`: Client for the kinematics control service. ⑧ `self.set_joint_value_target_client`: Client for the joint angle control service. ⑨ `self.timer`: Timer used during the initialization process. (5) get_node_state Method {lineno-start=96} ```python def get_node_state(self, request, response): response.success = True return response ``` ① `response.success = True`: Sets the response's success field to True, indicating the service was successful. ② `return response`: Returns the response object, completing the service request handling. (6) Parameter Initialization {lineno-start=100} ```python def _init_parameters(self): self.heart = None self.endpoint = None self.target_miss_count = 0 self.transport_info = None self.intrinsic = None self.distortion = None self.start_transport = False self.enable_sorting = False self.white_area_center = None self.enter = False self.roi = [] self.count_move = 0 self.count_still = 0 self.target = None self.start_get_roi = False self.last_position = None self.last_object_info_list = None self.image_sub = None self.camera_info_sub = None self.data = common.get_yaml_data(os.path.join(self.config_path, "lab_config.yaml")) self.lab_data = self.data['/**']['ros__parameters'] ``` Initialize the node's basic parameters, such as target states, camera intrinsic parameters, and so on. (7) init_process Method {lineno-start=123} ```python def init_process(self): self.timer.cancel() threading.Thread(target=self.main, daemon=True).start() threading.Thread(target=self.transport_thread, daemon=True).start() if self.get_parameter('start').value: self.enter_srv_callback(Trigger.Request(), Trigger.Response()) req = SetBool.Request() req.data = True res = SetBool.Response() self.enable_sorting_srv_callback(req, res) if not self.get_parameter('broadcast').value: target_list = ["red", "green", "blue"] req = SetStringBool.Request() req.data_bool = True for i in target_list: req.data_str = i res = SetBool.Response() self.set_target_srv_callback(req, res) self.create_service(Trigger, '~/init_finish', self.get_node_state) self.get_logger().info('\033[1;32m%s\033[0m' % 'init finish') ``` ① `self.timer.cancel()`: Cancels the previously set timer to prevent it from executing further. ② `threading.Thread(target=self.main, daemon=True).start()` and `threading.Thread(target=self.transport_thread, daemon=True).start()`: Start two background threads running the main and transport_thread methods respectively. ③ `if self.get_parameter('start').value:`: Checks whether the configuration parameter start is set to true. ④ `self.enter_srv_callback(Trigger.Request(), Trigger.Response())`: Calls the service callback function `enter_srv_callback` with new `Trigger.Request()` and `Trigger.Response() objects`. ⑤ `self.enable_sorting_srv_callback`: Creates a `SetBool.Request()` object with data set to True, then calls the `enable_sorting_srv_callback `service callback passing the request and response objects. ⑥ `self.create_service(Trigger, '~/init_finish', self.get_node_state)`: Creates a new service named ~/init_finish to indicate initialization completion. ⑦ `self.get_logger().info('\033[1;32m%s\033[0m' % 'init finish')`: Logs a message indicating that initialization has finished, with green-colored text for emphasis. (8) go_home Method {lineno-start=146} ```python def go_home(self, interrupt=True): if self.target is not None: if self.target[0] in ["bule", "tag1"]: t = 1.6 elif self.target is not None: if self.target[0] in ["green", "tag2"]: t = 1.3 elif self.target is not None: if self.target[0] in ["red", "tag3"]: t = 1.0 else : t = 1.0 if interrupt: set_servo_position(self.joints_pub, 0.5, ((10, 200), )) time.sleep(0.5) joint_angle = [500, 520, 210, 50, 500] set_servo_position(self.joints_pub, 1, ((2, joint_angle[1]), (3, joint_angle[2]), (4, joint_angle[3]), (5, 500))) time.sleep(1) set_servo_position(self.joints_pub, 1, ((1, joint_angle[0]), )) time.sleep(1) ``` Control the robotic arm movement based on the type of the target object to return it to the initial position. Adjust the joint angles according to the target color, and use set_servo_position to publish the joint control commands. (9) get_roi Method {lineno-start=169} ```python def get_roi(self): with open(self.config_path + self.config_file, 'r') as f: config = yaml.safe_load(f) # Convert to numpy array. 转换为 numpy 数组 extristric = np.array(config['extristric']) corners = np.array(config['corners']).reshape(-1, 3) self.white_area_center = np.array(config['white_area_pose_world']) while True: intrinsic = self.intrinsic distortion = self.distortion if intrinsic is not None and distortion is not None: break time.sleep(0.1) tvec = extristric[:1] # Take first row. 取第一行 rmat = extristric[1:] # Take next three rows. 取后面三行 tvec, rmat = common.extristric_plane_shift(np.array(tvec).reshape((3, 1)), np.array(rmat), 0.03) self.extristric = tvec, rmat tvec, rmat = common.extristric_plane_shift(np.array(tvec).reshape((3, 1)), np.array(rmat), 0.04) imgpts, jac = cv2.projectPoints(corners[:-1], np.array(rmat), np.array(tvec), intrinsic, distortion) imgpts = np.int32(imgpts).reshape(-1, 2) # Crop RIO region. 裁切出ROI区域 x_min = min(imgpts, key=lambda p: p[0])[0] # The minimum value of X-axis. x轴最小值 x_max = max(imgpts, key=lambda p: p[0])[0] # The maximum value of X-axis. x轴最大值 y_min = min(imgpts, key=lambda p: p[1])[1] # The minimum value of Y-axis. y轴最小值 y_max = max(imgpts, key=lambda p: p[1])[1] # The maximum value of Y-axis. y轴最大值 roi = np.maximum(np.array([y_min, y_max, x_min, x_max]), 0) self.roi = roi ``` ① Load the configuration file and extract extrinsic parameters, corner coordinates, and other relevant information. ② Wait until the camera intrinsic parameters and distortion coefficients are fully loaded. ③ Adjust the extrinsic matrix by applying translation and rotation transformations. ④ Project the 3D corner points onto the image plane. ⑤ Calculate the boundaries of the Region of Interest (ROI) based on the projected points' coordinates. ⑥ Save the computed ROI boundaries to `self.roi`. (10) enter_srv_callback Method {lineno-start=202} ```python def enter_srv_callback(self, request, response): self.get_logger().info('\033[1;32m%s\033[0m' % "enter object sorting") self._init_parameters() self.heart = Heart(self, '~/heartbeat', 5, lambda _: self.exit_srv_callback(request=Trigger.Request(), response=Trigger.Response())) # 心跳包(heartbeat package) for k, v in self.target_labels.items(): self.target_labels[k] = False self.image_sub = self.create_subscription(Image, '/depth_cam/rgb/image_raw', self.image_callback, 1) self.camera_info_sub = self.create_subscription(CameraInfo, '/depth_cam/rgb/camera_info', self.camera_info_callback, 1) self.start_get_roi = True joint_angle = [500, 520, 210, 50, 500] set_servo_position(self.joints_pub, 1, ((1, 500), (2, joint_angle[1]), (3, joint_angle[2]), (4, joint_angle[3]), (5, 500), (10, 200))) self.enter = True response.success = True response.message = "start" return response ``` Service callback for entering sorting mode: ① Initialize parameters. ② Create subscribers for image and camera information. ③ Set joint angles to prepare for sorting. (11) exit_srv_callback Method {lineno-start=221} ```python def exit_srv_callback(self, request, response): if self.enter: self.get_logger().info('\033[1;32m%s\033[0m' % "exit object sorting") if self.image_sub is not None: self.destroy_subscription(self.image_sub) self.image_sub = None if self.camera_info_sub is not None: self.destroy_subscription(self.camera_info_sub) self.camera_info_sub = None self.heart.destroy() self.heart = None self.enter = False self.start_transport = False pick_and_place.interrupt(True) response.success = True response.message = "start" return response ``` Service callback for exiting sorting mode: ① Destroy subscribers. ② Stop the callback mechanisms. ③ Return the robot arm to the home position. (12) enable_sorting_srv_callback Method {lineno-start=240} ```python def enable_sorting_srv_callback(self, request, response): if request.data: self.get_logger().info('\033[1;32m%s\033[0m' % 'start object sorting') pick_and_place.interrupt(False) self.enable_sorting = True else: self.get_logger().info('\033[1;32m%s\033[0m' % 'stop object sorting') pick_and_place.interrupt(True) self.enable_sorting = False response.success = True response.message = "start" return response ``` ① Determine whether to enable or disable the object sorting feature based on the value of `request.data`. ② Log the current status of the sorting feature. ③ Call the `pick_and_place.interrupt()` method to handle interrupt operations. ④ Set the `self.enable_sorting` attribute to reflect the updated feature status. ⑤ Return the response object to inform the client that the request has been successfully processed. (13) set_target_srv_callback Method {lineno-start=254} ```python def set_target_srv_callback(self, request, response): self.get_logger().info('\033[1;32mset target %s %s\033[0m' % (str(request.data_str), str(request.data_bool))) if request.data_str in self.target_labels: self.target_labels[request.data_str] = request.data_bool response.success = True response.message = "start" return response ``` ① Determine whether to enable or disable the object sorting feature based on the value of `request.data`. ② Log the current status of the sorting feature. ③ Call the `pick_and_place.interrupt()` method to handle interrupt operations. ④ Set the `self.enable_sorting `attribute to reflect the updated feature status. ⑤ Return the response object to inform the client that the request has been successfully processed. (14) get_object_pixel_position Method {lineno-start=263} ```python def get_object_pixel_position(self, bgr_image, roi): target_info = [] draw_image = bgr_image.copy() roi_img = bgr_image[roi[0]:roi[1], roi[2]:roi[3]] roi_img = self.image_process.get_top_surface(roi_img) # cv2.imshow('roi_img', roi_img) image_lab = cv2.cvtColor(roi_img, cv2.COLOR_BGR2LAB) # Convert to LAB space. 转换到 LAB 空间 for i in ['red', 'green', 'blue']: index = 0 mask = cv2.inRange(image_lab, tuple(self.lab_data['color_range_list'][i]['min']), tuple(self.lab_data['color_range_list'][i]['max'])) # Binaryzation 二值化 # Smooth edges, remove small blocks, and merge adjacent blocks. 平滑边缘,去除小块,合并靠近的块 eroded = cv2.erode(mask, cv2.getStructuringElement(cv2.MORPH_RECT, (3, 3))) dilated = cv2.dilate(eroded, cv2.getStructuringElement(cv2.MORPH_RECT, (3, 3))) contours = cv2.findContours(dilated, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_NONE)[-2] # Find all contours. 找出所有轮廓 contours_area = map(lambda c: (math.fabs(cv2.contourArea(c)), c), contours) # Calculate contour area. 计算轮廓面积 contours = map(lambda a_c: a_c[1], filter(lambda a: self.min_area <= a[0] <= self.max_area, contours_area)) for c in contours: rect = cv2.minAreaRect(c) # Obtain the minimum bounding rectangle. 获取最小外接矩形 (center_x, center_y), _ = cv2.minEnclosingCircle(c) center_x, center_y = roi[2] + center_x, roi[0] + center_y # cv2.circle(draw_image, (int(center_x), int(center_y)), 8, (0, 0, 0), -1) corners = list(map(lambda p: (roi[2] + p[0], roi[0] + p[1]), cv2.boxPoints(rect))) # Obtain the four corner points of the minimum rectangle and convert to the coordinates of the original image. 获取最小外接矩形的四个角点, 转换回原始图的坐标 cv2.drawContours(draw_image, [np.intp(corners)], -1, (0, 255, 255), 2, cv2.LINE_AA) # Draw rectangle contour. 绘制矩形轮廓 # cv2.line(draw_image, (int(center_x), 0), (int(center_x), 480), (255, 255, 0), 2, cv2.LINE_AA) index += 1 # Incremental numbering 序号递增 angle = int(round(rect[2])) # Rectangle angle 矩形角度 target_info.append([i, index, (int(center_x), int(center_y)), (int(rect[1][0]), int(rect[1][1])), angle]) return draw_image, target_info ``` ① `target_info`: Stores information about the detected target objects. ② `draw_image`: A copy of the input image used for drawing detection results. ③ `roi_img`: The Region of Interest (ROI) cropped from the input image. ④ `self.image_process.get_top_surface(roi_img)`: Processes the ROI image, likely to extract the top surface or enhance image quality. ⑤ `image_lab = cv2.cvtColor(roi_img, cv2.COLOR_BGR2LAB)`: Converts the ROI image from BGR color space to LAB color space. ⑥ `for i in ['red', 'green', 'blue']`: Iterates through a predefined list of target colors. ⑦ `mask = cv2.inRange()`: Creates a binary mask based on the color range. ⑧ `self.lab_data['color_range_list'][i]`: Retrieves the LAB color range (min and max) for color i from the configuration data. ⑨ `cv2.inRange`: Sets pixels within the specified color range to white (255), others to black (0). ⑩ `eroded = cv2.erode(mask, cv2.getStructuringElement(cv2.MORPH_RECT, (3, 3))):`Performs erosion on a binary image using a 3×3 rectangular kernel to shrink foreground regions and remove small noise. ⑪ `dilated = cv2.dilate(eroded, cv2.getStructuringElement(cv2.MORPH_RECT, (3, 3)))`: Applies erosion and dilation to remove noise and small artifacts, and to merge nearby regions. ⑫ `contours = cv2.findContours(dilated, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_NONE)[-2]`: Detects contours in the processed binary image. ⑬ `cv2.findContours`: Returns a list of contours found in the image. ⑭ `cv2.RETR_EXTERNAL`: Only detects external contours. ⑮ `cv2.CHAIN_APPROX_NONE`: No contour approximation; retains all contour points. ⑯ `contours_area = map(lambda c: (math.fabs(cv2.contourArea(c)), c), contours)`: Calculates the area for each contour. ⑰ `contours = map(lambda a_c: a_c[1], filter(lambda a: self.min_area <= a[0] <= self.max_area, contours_area))`: Filters contours by area, keeping only those within the specified [min_area, max_area] range. ⑱ `Minimum bounding rectangle`: Use cv2.minAreaRect to obtain the minimum bounding rectangle of the contour. ⑲ `Center point`: Use cv2.minEnclosingCircle to find the contour's center point and convert its coordinates back to the original image coordinates. ⑳ `Corner points`: Use cv2.boxPoints to get the four corner points of the minimum bounding rectangle and convert them back to the original image coordinates. ㉑ `Draw contour`: Draw the rectangular contour on draw_image. ㉒ `Store target information`: Save the target's color, index, center coordinates, rectangle size, and angle information into the target_info list. (15) get_object_world_position Method {lineno-start=294} ```python def get_object_world_position(self, position, intrinsic, extristric, white_area_center, height=0.03): projection_matrix = np.row_stack((np.column_stack((extristric[1], extristric[0])), np.array([[0, 0, 0, 1]]))) world_pose = common.pixels_to_world([position], intrinsic, projection_matrix)[0] world_pose[0] = -world_pose[0] world_pose[1] = -world_pose[1] position = white_area_center[:3, 3] + world_pose position[2] = height config_data = common.get_yaml_data(os.path.join(self.config_path, self.calibration_file)) offset = tuple(config_data['pixel']['offset']) scale = tuple(config_data['pixel']['scale']) for i in range(3): position[i] = position[i] * scale[i] position[i] = position[i] + offset[i] return position, projection_matrix ``` ① `projection_matrix = np.row_stack((np.column_stack((extristric[1], extristric[0])), np.array([[0, 0, 0, 1]])))`: Constructs a full projection matrix used for transforming pixel coordinates to world coordinates. ② `world_pose = common.pixels_to_world([position], intrinsic, projection_matrix)[0]`: Converts pixel coordinates to world coordinates. ③ `world_pose[0] = -world_pose[0]`: Negates the X-coordinate of the world position. ④ `world_pose[1] = -world_pose[1]`: Negates the Y-coordinate of the world position. ⑤ `white_area_center[:3, 3]`: Extracts the translation vector (3D position) representing the center of the white area. ⑥ `position = white_area_center[:3, 3] + world_pose`: Adds the relative world position of the target to the center position of the white area to obtain the absolute position of the target. ⑦ `position[2] = height`: Sets the Z-coordinate (height) of the target to a specified value. ⑧ `common.get_yaml_data`: Loads calibration data from a configuration file. ⑨ `offset and scale`: Represent the offset and scaling factors used to calibrate the target's position. ⑩ `position[i] = position[i] * scale[i] + offset[i]`: Applies scaling and offset to each coordinate component to obtain the calibrated position. ⑪ `position`: The final 3D position of the target object in the world coordinate system. ⑫ `projection_matrix`: The projection matrix, which may be used for further computation or verification. (16) calculate_pick_grasp_yaw Method {lineno-start=310} ```python def calculate_pick_grasp_yaw(self, position, target, target_info, intrinsic, projection_matrix): yaw = math.degrees(math.atan2(position[1], position[0])) if position[0] < 0 and position[1] < 0: yaw = yaw + 180 elif position[0] < 0 and position[1] > 0: yaw = yaw - 180 # 0.09x0.015 gripper_size = [common.calculate_pixel_length(0.09, intrinsic, projection_matrix), common.calculate_pixel_length(0.015, intrinsic, projection_matrix)] return calculate_grasp_yaw.calculate_gripper_yaw_angle(target, target_info, gripper_size, yaw) ``` ① `yaw = math.degrees(math.atan2(position[1], position[0]))`: Calculates the initial yaw angle of the target object relative to the origin of the world coordinate system. ② `if position[0] < 0 and position[1] < 0: yaw = yaw + 180`: If the target is in the third quadrant (x < 0 and y < 0), add 180° to the yaw angle. ③ `elif position[0] < 0 and position[1] > 0`: yaw = yaw - 180: If the target is in the second quadrant (x < 0 and y > 0), subtract 180° from the yaw angle. ④ `gripper_size = [common.calculate_pixel_length(0.09, intrinsic, projection_matrix), common.calculate_pixel_length(0.015, intrinsic, projection_matrix)]`: Calculates the size of the gripper in pixel units based on its physical dimensions (0.09m and 0.015m), using the camera's intrinsic parameters and the projection matrix. This information is used to determine an appropriate grasping orientation. ⑤ `return` :calculate_grasp_yaw.calculate_gripper_yaw_angle(target, target_info, gripper_size, yaw): Calls the calculate_gripper_yaw_angle function with the target object, detected target info, gripper size, and initial yaw angle to compute and return the final grasping yaw angle. (17) transport_thread Method {lineno-start=341} ```python def transport_thread(self): while self.running: if self.start_transport: position, yaw, target = self.transport_info if position[0] > 0.22: position[2] += 0.01 config_data = common.get_yaml_data(os.path.join(self.config_path, self.calibration_file)) offset = tuple(config_data['kinematics']['offset']) scale = tuple(config_data['kinematics']['scale']) for i in range(3): position[i] = position[i] * scale[i] position[i] = position[i] + offset[i] # self.get_logger().info(f'pick2:{position}') finish = pick_and_place.pick(position, 80, yaw, 540, 0.02, self.joints_pub, self.kinematics_client) if finish: position = copy.deepcopy(self.place_position[target[0]]) yaw = self.calculate_place_grasp_yaw(position, 0) config_data = common.get_yaml_data(os.path.join(self.config_path, self.calibration_file)) offset = tuple(config_data['kinematics']['offset']) scale = tuple(config_data['kinematics']['scale']) angle = math.degrees(math.atan2(position[1], position[0])) if angle > 45: # self.get_logger().info(f'1:{position}') position = [position[0] * scale[1], position[1] * scale[0], position[2] * scale[2]] position = [position[0] - offset[1], position[1] + offset[0], position[2] + offset[2]] elif angle < -45: # self.get_logger().info(f'2:{position}') position = [position[0] * scale[1], position[1] * scale[0], position[2] * scale[2]] position = [position[0] + offset[1], position[1] - offset[0], position[2] + offset[2]] else: # self.get_logger().info(f'3:{position}') position = [position[0] * scale[0], position[1] * scale[1], position[2] * scale[2]] position = [position[0] + offset[0], position[1] + offset[1], position[2] + offset[2]] # self.get_logger().info(f'{position}') finish = pick_and_place.place(position, 80, yaw, 200, self.joints_pub, self.kinematics_client) if finish: self.go_home(False) else: self.go_home(True) else: self.go_home(True) self.target = None self.start_transport = False else: time.sleep(0.1) ``` ① `self.running`: A boolean flag that controls the thread's execution state. If set to True, the thread continues running; if False, the thread exits. ② `self.start_transport`: A boolean flag indicating whether to initiate the pick-and-place operation. ③ `self.transport_info`: A tuple containing task-related information, including the target object's position (position), yaw angle (yaw), and identifier (target). ④ `config_data = common.get_yaml_data(os.path.join(self.config_path, self.calibration_file))`: Loads calibration data (such as offset and scaling factors) from a YAML configuration file. ⑤ `offset = tuple(config_data['kinematics']['offset']) and scale = tuple(config_data['kinematics']['scale'])`: Extracts the offset and scaling factor values used to calibrate the robot arm's position by adjusting each coordinate component. ⑥ `finish = pick_and_place.pick(position, 80, yaw, 540, 0.02, self.joints_pub, self.kinematics_client)`: Calls the pick_and_place.pick function to perform the pick operation using the specified position, yaw angle, and control parameters. ⑦ `position = copy.deepcopy(self.place_position[target[0]])`: Retrieves the designated placement position for the object based on its identifier (target[0]) from the self.place_position dictionary and creates a deep copy to avoid modifying the original. ⑧ `yaw = self.calculate_place_grasp_yaw(position, 0)`: Calls the calculate_place_grasp_yaw method to compute the yaw angle for the object during the placement operation. ⑨ Adjust Placement Position Based on Object Angle Depending on the detected object's rotation angle (angle), adjust the placement coordinates: * If the angle is greater than 45°, apply specific scaling and offset to the X and Y coordinates. * If the angle is less than -45°, apply another set of scaling and offset to the X and Y coordinates. * Otherwise, apply the default scaling and offset directly. ⑩ Execute Placement Operation finish = pick_and_place.place(position, 80, yaw, self.joints_pub, self.kinematics_client) Calls the pick_and_place.place function to perform the placement at the given position with a yaw angle of 80°, using the joint publisher and kinematics client. ⑪ Check Placement Result * If finish is True, the placement was successful. Call self.go_home(False) to move the robotic arm back to its home position. * If the placement failed, call self.go_home(True) to interrupt the operation and return to the home position. ⑫ Reset Task State self.target = None self.start_transport = False Resets the task state, preparing the system for the next pick-and-place cycle. (18) Main Method Within the Class {lineno-start=390} ```python def main(self): while self.running: if self.enter: try: bgr_image = self.image_queue.get(block=True, timeout=1) except queue.Empty: continue if self.start_get_roi: self.get_roi() self.start_get_roi = False roi = self.roi.copy() intrinsic = self.intrinsic if len(roi) > 0 and self.enable_sorting and not self.start_transport: display_image, target_info = self.get_object_pixel_position(bgr_image, roi) tags = self.at_detector.detect(cv2.cvtColor(bgr_image, cv2.COLOR_RGB2GRAY), True, (intrinsic[0,0], intrinsic[1,1], intrinsic[0,2], intrinsic[1,2]), self.tag_size) if len(tags) > 0: index = 0 for tag in tags: if 'tag%d'%tag.tag_id in self.target_labels: corners = tag.corners.astype(int) cv2.drawContours(display_image, [corners], -1, (0, 255, 255), 2, cv2.LINE_AA) rect = cv2.minAreaRect(np.array(tag.corners).astype(np.float32)) # rect includes (center point, (width, height), rotation Angle). rect 包含 (中心点, (宽度, 高度), 旋转角度) (center, (width, height), angle) = rect index += 1 target_info.append(['tag%d'%tag.tag_id, index, (int(center[0]), int(center[1])), (int(width), int(height)), angle]) if target_info: if self.last_object_info_list: # Reorder by comparing the positions of the objects from the last time. 对比上一次的物体的位置来重新排序 target_info = position_change_detect.position_reorder(target_info, self.last_object_info_list, 20) self.last_object_info_list = copy.deepcopy(target_info) for target in target_info: cv2.putText(display_image, '{}'.format(target[0]),(target[2][0] - 4 * len(target[0] + str(target[1])), target[2][1] + 5), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2) target_miss = True for target in target_info: # detect if self.target_labels[target[0]]: # app set if self.target is not None: # If there is already a target, skip the other objects directly. 如果已经有了目标,其他物体就直接跳过 if self.target[0] != target[0] or self.target[1] != target[1]: continue else: target_miss = False self.target = target if self.camera_type == 'USB_CAM': x, y = distortion_inverse_map.undistorted_to_distorted_pixel(target[2][0], target[2][1], self.intrinsic, self.distortion) target[2] = (x, y) position, projection_matrix = self.get_object_world_position(target[2], intrinsic, self.extristric, self.white_area_center) result = self.calculate_pick_grasp_yaw(position, target, target_info, intrinsic, projection_matrix) if result is not None and self.target is None: self.target = target break if self.last_position is not None and self.target is not None and result is not None: e_distance = round(math.sqrt(pow(self.last_position[0] - position[0], 2)) + math.sqrt( pow(self.last_position[1] - position[1], 2)), 5) # self.get_logger().info(f'e_distance: {e_distance}') if e_distance <= 0.005: # The Euclidean distance is less than 2mm to prevent the object from being picked up while it is still moving. 欧式距离小于2mm, 防止物体还在移动时就去夹取了 cv2.line(display_image, result[1][0], result[1][1], (255, 255, 0), 2, cv2.LINE_AA) self.count_move = 0 self.count_still += 1 else: self.count_move += 1 self.count_still = 0 if self.count_move > 10: self.target = None if self.count_still > 10: self.count_still = 0 self.count_move = 0 # self.get_logger().info(f'pick:{position}') self.target = target yaw = 500 + int(result[0] / 240 * 1000) self.transport_info = [position, yaw, target] self.start_transport = True self.last_position = position if target_miss: self.target_miss_count += 1 if self.target_miss_count > 10: self.target_miss_count = 0 self.target = None else: display_image = bgr_image.copy() # self.fps.update() # display_image = self.fps.show_fps(display_image) if bgr_image is not None and self.get_parameter('display').value: cv2.imshow('result_image', display_image) cv2.waitKey(1) self.result_publisher.publish(self.bridge.cv2_to_imgmsg(display_image, "bgr8")) else: time.sleep(0.1) def camera_info_callback(self, msg): self.intrinsic = np.matrix(msg.k).reshape(1, -1, 3) self.distortion = np.array(msg.d) def image_callback(self, ros_rgb_image): # Convert the image from ros format to opencv format. 将ros格式图像转换为opencv格式 cv_image = self.bridge.imgmsg_to_cv2(ros_rgb_image, "bgr8") bgr_image = np.array(cv_image, dtype=np.uint8) if self.image_queue.full(): # Discard the oldest image if the queue is full. 如果队列已满,丢弃最旧的图像 self.image_queue.get() # Put image into the queue. 将图像放入队列 self.image_queue.put((bgr_image)) ``` ① Retrieve Image from Queue `bgr_image = self.image_queue.get(block=True, timeout=1)`:Retrieves an image from the self.image_queue. If the queue is empty, it waits up to 1 second. If still empty, the loop iteration is skipped. ② ROI Extraction `if self.start_get_roi:`If self.start_get_roi is True, the method self.get_roi() is called to acquire the Region of Interest (ROI), which is then stored in self.roi. ③ Copy ROI `roi = self.roi.copy()`:Copies the ROI to a local variable roi for further processing. ④ Object Detection Trigger `if len(roi) > 0 and self.enable_sorting and not self.start_transport:`If the ROI is not empty, sorting is enabled, and object transport has not started, it calls self.get_object_pixel_position() with the current image and ROI. This returns the detected object's pixel coordinates (target_info) and the annotated display image (display_image). ⑤ AprilTag Detection `self.at_detector.detect()`:Converts the BGR image to grayscale and uses the AprilTag detector to identify tags, given the intrinsic camera parameters and tag size (self.tag_size). The results, stored in tags, include tag IDs and corner points. ⑥ Filter Tags by ID `for tag in tags:`Iterates over all detected tags and checks if the tag ID is within the list of target labels (self.target_labels). ⑦ Bounding Box Calculation `cv2.minAreaRect()`:For valid tags, computes the minimum bounding rectangle (rect) and extracts the center, width, height, and angle. ⑧ Store Tag Info `target_info.append()`:Adds the tag's positional and orientation information to target_info. ⑨ Reorder by Previous State `target_info = position_change_detect.position_reorder()`:If previous target information (self.last_object_info_list) exists, reorders the current target_info based on prior data for continuity. ⑩ Update Previous Info `self.last_object_info_list = copy.deepcopy(target_info)`:Updates the previous target info list with the latest data. ⑪ Annotate Tags `for target in target_info:`Iterates over detected targets and annotates each target's ID near its center point on the image. ⑫ Validate Target Label `if self.target_labels[target[0]]:`Checks if the tag ID is within the application's set of valid targets. ⑬ Skip If Target Locked `if self.target is not None:`Skips further processing if a target is already selected. ⑭ Update Target `if self.target[0] != target[0] or self.target[1] != target[1]:`Updates the current target if the ID or position differs from the existing one. ⑮ Camera Type Adjustment `if self.camera_type == 'USB_CAM':`If using a USB camera, applies distortion correction to the target's pixel coordinates. ⑯ World Coordinate Conversion `Calls self.get_object_world_position()`: to convert the corrected pixel coordinates to world coordinates. ⑰ Grasp Pose Calculation `Calls self.calculate_pick_grasp_yaw() `:to determine the grasp angle and target position for picking. ⑱ Distance Estimation `e_distance = round(...)`:Calculates the Euclidean distance between the current and previous target positions. ⑲ Target Loss Tracking `if target_miss:`Increments the loss counter if the target is temporarily lost. ⑳ Reset on Consecutive Loss `if self.target_miss_count > 10:`Resets target info if the loss count exceeds 10 consecutive instances. ㉑ Image Display `if bgr_image is not None and self.get_parameter('display').value:`Displays the image if it exists and the display parameter is enabled. ㉒ Publish Processed Image `self.result_publisher.publish(self.bridge.cv2_to_imgmsg(display_image, "bgr8"))`:Converts the processed image back to a ROS Image message and publishes it. (19) Main Function ```python def main(): node = ObjectSortingNode('object_sorting') executor = MultiThreadedExecutor() executor.add_node(node) try: executor.spin() except KeyboardInterrupt: node.running = False # Stop thread flag. 停止线程标志 executor.shutdown() ``` {lineno-start=501} ① `node = ColorSortingNode('color_sorting')`: Create an instance of ColorSortingNode with the name color_sorting. ② `executor = MultiThreadedExecutor()`: Create a multi-threaded executor to handle multiple nodes in parallel. ③ `executor.add_node(node)`: Add the ColorSortingNode instance to the executor. ④ `executor.spin()`: Start the executor to run and listen for node callbacks. ⑤ `node.destroy_node()`: Destroy the node and release resources when the program exits.

### 12.1.11 Change Default Recognition Color We analyze the modification of the default recognition color in the positioning and gripping routine. The modification of default colors for other games follows the same principle. The source code is saved in: [/home/ubuntu/ros2_ws/src/example/example/opencv/include/color_recognition.py](../_static/source_code/opencv.zip) To modify the recognition color of the robotic arm, you can change the parameters inside the red box to "**blue**" or "**green**." ## 12.2 Target Tracking ### 12.2.1 PID Principle * **PID Algorithm Description** PID algorithm is the most widely used autonomous controller which controls a process according to the ratio of errors, Proportional (P), Integral (I), and Derivative (D). With simple logic, it is easy to realize and can be widely applicable. And its control parameters are separate and it is easy to select the parameter. Simplify: And: u(t)——the output of the PID controller; Kp——ratio factor; Ki——integral time constant; Kd——derivative time constant; e(t)——the difference between the given value and the measured value (error) * **PID Algorithm Working Logic** In a closed-loop control system, the output result of the object controlled will be returned to affect the output of the controller so as to generate one or multiple loop. The closed-loop system has positive and negative feedback. If the feedback signal is opposite to the given signal, it is negative feedback. If they are the same, it is positive feedback. (1) Proportional Error is the difference between the given value (the set threshold **r(t)** ) and the measured output **(c(t))**. After the proportional is introduced, the output of the system is proportional to the error, which can reduce the error quickly and stabilize the system. When the system tends to stabilize, steady-state error will still exist and cannot be eliminated. The control effect of proportional depends on the value of Kp. The smaller the proportional coefficient Kp, the weaker the control and the slower the system response. On the contrary, the larger the proportional coefficient Kp, the stronger the control, and the faster the system response. However, too large Kp will cause the system to generate large overshoot and oscillation. The advantage of proportional control lies in its fast response, but the disadvantage is that there is a steady-state error. Take a bucket for example. There is a hole at the bottom of the bucket, but you need to maintain the water in the bucket at 1m. And the current water level is 0.2m, and the water will leak 0.1m whenever you add the water. Here, proportional control can be adopted to control the water volume. The current water volume is 0.2m and the target is 1m. Set Kp=0.4. Then u= 0.4\*e, and e = last water volume - current water volume The first error: e(1)=1-0.2=0.8m. Therefore, the volume of the added water is 0.32m (Kp\*0.8= 0.4\*0.8 = 0.32m), and the current water volume in the bucket is 0.42m (0.2+0.32 -0.1=0.42m) The second error: e(2) = 1-(0.2+0.32) =0.48m. Therefore, the volume of the added water is 0.192m (Kp\*0.48=0.192m), and the current water volume in the bucket is 0.552m (0.52+0.192=0.552) Lastly, the water level stabilizes at 0.75m, and the error is 0.25m. When you add 0.1m water, the volume of the water added is equivalent to the volume of the water leaking. And this kind of error is called steady-state error. Steady-state error is defined as the difference between the output under the system tends to stabilize and the target output. (2) Integral Integral aims at eliminating the steady-state error. The integral of the error over time is continuously accumulated to keep the output changing so as to reduce the error. If the integral time is sufficient, the steady-state error can be completely eliminated. The smaller Ti is, the stronger the function of integral is. But too strong integral function will make the system overshoot and even oscillate. The advantage of the integral is that the steady-state error can be completely eliminated, but the disadvantage is that the response speed is reduced and the overshoot will be too large. In the example , the steady-state error still exists after proportional control is adopted. To eliminate it completely, you can add integral control, and set the integral constant Ki as 0.3 (Ki=0.3). The first error: e(1)=1-0.2=0.8m. Therefore, the volume of the added water is: Kp\*0.8= 0.4\*0.8 = 0.32m, KI \* e(1) = 0.3 \* 0.8 = 0.24, and and the final added water is: 0.32m + 0.256 = 0.56. And the current water volume in the bucket is : 0.2+ 0.276 - 0.1=0.66 The second error: e(2)=1-0.66=0.34m. Therefore, the volume of the added water is: Kp\*0.34 = 0.4\*0.34 = 0.136m, KI \* (e(2)+e(1)) = 0.3 \* 0.342 = 0.342, and the final added water is: 0.136m + 0.342 = 0.478. And the current water volume in the bucket is : 0.66+ 0.478 - 0.1=1.038 After the Integral control is adopted, the steady-state error is completely eliminated, but overshoot occurs. (3) Derivative The slope error can be obtained by differentiating the error, and the slope reflects the change (rate of change) of the error signal, so the error can be adjusted in advance in derivative part. And it takes effect at the moment when the error appears or changes, so as to avoid excessive overshoot and reduce the dynamic error and response time. The larger the derivative coefficient Td, the stronger the ability to suppress the error variation. The advantage of derivative is that it suppress overshoot and speed up the response. The disadvantage is that the anti-interference ability is poor. When the interference signal is strong, it is not recommended to add derivative. In the above example, the integral effectively eliminates the steady-state error, but overshoot occurs. In order to reduce the overshoot, a derivative control is added. Set Kd as 0.3 (Kd=0.3). The first error: e(1)=1-0.2=0.8m. Therefore, the volume of the added water is: Kp\*0.8 = 0.4\*0.8 = 0.32m, KI \* e(1) = 0.3 \* 0.8 = 0.24. And KD = 0 (because the current water difference is 0.8). The final added water is: 0.32m + 0.256 = 0.56. And the current water volume in the bucket is : 0.2+ 0.276 - 0.1=0.66 The second error: e(2)=1-0.66=0.34m. Therefore, the volume of the added water is: Kp\*0.34 = 0.4\*0.34 = 0.136m, KI \* (e(2)+e(1)) = 0.3 \* 0.342 = 0.342. And KD\*(e(2) - e(1)) = 0.3\*(0.342 - 0.8) = -0.138. The final added water is: 0.136m + 0.342 -0.138 = 0.34. And the current water volume in the bucket is : 0.66+ 0.478 - 0.1=0.9 Compared with proportional control, the overshoot is weakened. ### 12.2.2 Color Tracking When the camera identifies the target color block (default is red), the robotic arm moves in sync with the color block's movement. * **Program Logic** Firstly, subscribe to the topic messages published by the color recognition node to acquire information about the recognized color. Next, upon matching the target color, retrieve the central position of the target image. Finally, employ inverse kinematics to calculate the angles necessary to align the center of the screen with the center of the target image. Publish the corresponding topic message, control servo rotation, and ensure the robotic arm follows the movement of the target. * **Operation Steps** (1) Start the robot arm, and access the robot system desktop using NoMachine according to the tutorials saved in [1. Getting Ready->1.6 Development Environment Setup and Configuration](1.Getting_Ready.md#development-environment-setup-and-configuration) (2) Double-click to open the command-line terminal. (3) Execute the command below and hit Enter to terminate the auto-start service. ``` sudo systemctl stop jetarm_bringup.service ``` (4) Open a new terminal by clicking ,and enter the command to launch the game program. ``` ros2 launch example color_track_node.launch.py ``` (5) Once the game is initiated, you will see the following message printed in the terminal, and at this point, the camera feed will automatically pop up on the screen. (6) If you need to terminate the program, please use short-cut '**Ctrl+C**'. If the program fails to stop, please retry. (7) After running the previous program, you need to restart the app service by executing the following command. ``` sudo systemctl start start_app_node.service ``` * **Program Outcome** After starting the game, place a red block in front of the robotic arm's camera. The transmitted image will highlight the recognized target color, and the robotic arm will continuously follow the movement of the target block. * **Launch File Analysis** Launch file is saved in [/home/ubuntu/ros2_ws/src/example/example/opencv/color_track_node.launch.py](../_static/source_code/opencv.zip) (1) launch_setup Function {lineno-start=9} ```python def launch_setup(context): compiled = os.environ['need_compile'] display = LaunchConfiguration('display', default='true') display_arg = DeclareLaunchArgument('display', default_value=display) start = LaunchConfiguration('start', default='true') start_arg = DeclareLaunchArgument('start', default_value=start) broadcast = LaunchConfiguration('broadcast', default='false') broadcast_arg = DeclareLaunchArgument('broadcast', default_value=broadcast) if compiled == 'True': sdk_package_path = get_package_share_directory('sdk') peripherals_package_path = get_package_share_directory('peripherals') else: sdk_package_path = '/home/ubuntu/ros2_ws/src/driver/sdk' peripherals_package_path = '/home/ubuntu/ros2_ws/src/peripherals' sdk_launch = IncludeLaunchDescription( PythonLaunchDescriptionSource( os.path.join(sdk_package_path, 'launch/jetarm_sdk.launch.py')), ) depth_camera_launch = IncludeLaunchDescription( PythonLaunchDescriptionSource( os.path.join(peripherals_package_path, 'launch/depth_camera.launch.py')), ) color_track_node = Node( package='app', executable='object_tracking', output='screen', parameters=[{ 'start': start, 'display': display, 'broadcast': broadcast}] ) return [start_arg, display_arg, broadcast_arg, sdk_launch, depth_camera_launch, color_track_node, ] ``` ① `compiled = os.environ['need_compile']`: Get the value of the need_compile environment variable to determine whether the compiled path is required. ② `enable_display = LaunchConfiguration('enable_display', default='true')`: Define the enable_display launch configuration parameter, with a default value of 'true'. ③ `enable_display_arg = DeclareLaunchArgument('enable_display', default_value=enable_display)`: Declare the enable_display launch argument. ④ `start = LaunchConfiguration('start', default='true')`: Define the start launch configuration parameter, with a default value of 'true'. ⑤ `start_arg = DeclareLaunchArgument('start', default_value=start)`: Declare the start launch argument. ⑥ `if compiled == 'True':`: Check if the compiled path configuration is needed. ⑦ `sdk_package_path = get_package_share_directory('sdk')`: Get the shared path of the sdk package (if compilation is required). ⑧ `peripherals_package_path = get_package_share_directory('peripherals')`: Get the shared path of the peripherals package. ⑨ `kinematics_package_path = get_package_share_directory('kinematics')`: Get the shared path of the kinematics package. ⑩ `example_package_path = get_package_share_directory('example')`: Get the shared path of the example package. ⑪ `else:`: If the compiled path configuration is not required. ⑫ Manually define the paths for sdk_package_path, peripherals_package_path, kinematics_package_path, and example_package_path. ⑬ `sdk_launch = IncludeLaunchDescription(...)`: Include the jetarm_sdk.launch.py launch file. ⑭ `kinematics_launch = IncludeLaunchDescription(...)`: Include the kinematics_node.launch.py launch file. ⑮ `color_detect_launch = IncludeLaunchDescription(...)`: Include the color_detect_node.launch.py launch file and pass the enable_display parameter. ⑯ `color_track_node = Node(...)`: Define a color_track node, specifying the package name as example, the executable as color_track, and set the start parameter. ⑰ `return [...]`: Return a list containing all declared parameters, launch files, and nodes. (2) generate_launch_description Function {lineno-start=50} ```python def generate_launch_description(): return LaunchDescription([ OpaqueFunction(function = launch_setup) ]) ``` Return a `LaunchDescription` object by invoking the `launch_setup` function to generate the configuration. (3) Main Program {lineno-start=55} ```python if __name__ == '__main__': # Create a LaunchDescription object. 创建一个LaunchDescription对象 ld = generate_launch_description() ls = LaunchService() ls.include_launch_description(ld) ls.run() ``` ① `ld = generate_launch_description()`: Call the generate_launch_description function to generate the launch description object. ② `ls = LaunchService()`: Create a LaunchService object. ③ `ls.include_launch_description(ld)`: Include the launch description into the launch service. ④ `ls.run()`: Run the launch service to load and start all configured nodes and included launch files. * **Python Source Code Analysis** The Python source code is saved in: [/home/ubuntu/ros2_ws/src/app/app/object_tracking.py](../_static/source_code/app.zip) The program flow chart is as below: From the above diagram, the program's logical flow primarily involves processing the acquired image and transmit the live camera feed. (1) Import Feature Pack {lineno-start=4} ```python import os import cv2 import time import queue import rclpy import threading import numpy as np import sdk.fps as fps from math import radians from rclpy.node import Node import sdk.common as common from app.common import Heart from cv_bridge import CvBridge import app.face_tracker as face_tracker import app.color_tracker as color_tracker from std_srvs.srv import Trigger, SetBool from kinematics_msgs.srv import SetRobotPose from sensor_msgs.msg import Image , CameraInfo from interfaces.srv import SetPoint, SetFloat64 from rclpy.executors import MultiThreadedExecutor from servo_controller_msgs.msg import ServosPosition from rclpy.callback_groups import ReentrantCallbackGroup from kinematics.kinematics_control import set_pose_target from servo_controller.bus_servo_control import set_servo_position ``` ① `os`: Handles file paths and environment variables. ② `cv2`: Uses OpenCV for image processing and display. ③ `time`: Provides time-related functions such as delays. ④ `queue`: Manages image queues for thread-safe communication between threads. ⑤ `rclpy`: Imports ROS 2 Python client library for creating and managing ROS nodes. ⑥ `threading`: Provides support for multithreading. ⑦ `numpy (np)`: Handles matrix and vector operations. ⑧ `import sdk.fps as fps`: Used for calculating and displaying frames per second (FPS). ⑨ `from rclpy.node import Node`: Imports the ROS 2 base class for creating nodes. ⑩ `from rclpy.executors import MultiThreadedExecutor`: Imports the multithreaded executor for concurrent callback execution. ⑪ `from rclpy.callback_groups import ReentrantCallbackGroup`: Enables handling callbacks in multithreaded environments. ⑫ `from std_srvs.srv import `Trigger`: Imports the standard ROS 2 service type Trigger, used for triggering events. ⑬ `from kinematics_msgs.srv import SetRobotPose`: Imports a service type for setting robot pose. ⑭ `from kinematics.kinematics_control import set_pose_target`: Imports a function to set the robot's pose target. ⑮ `from interfaces.srv import SetPoint, SetFloat64`: Imports custom service types for setting coordinate points and float values. ⑯ `from servo_controller.bus_servo_control import set_servo_position`: Imports the function for controlling servo positions. ⑰ `from servo_controller_msgs.msg import ServosPosition`: Imports the message type used for servo position control. ⑱ `from sensor_msgs.msg import Image, CameraInfo`: Standard message types used for transmitting image data and camera parameters. ⑲ `import sdk.common as common`: Imports general utility functions and constants from the SDK. ⑳ `import app.common as app_common`: Imports general utility functions and constants from the application layer. ㉑ `import app.face_tracker as face_tracker`: Used for implementing face tracking functionality. ㉒ `import app.color_tracker as color_tracker`: Used for implementing color tracking functionality. ㉓ `from cv_bridge import CvBridge`: Provides conversion between ROS image messages and OpenCV image formats. (2) Initialization Function of the ColorTrackNode Class {lineno-start=31} ```py class ObjectTrackingNode(Node): def __init__(self, name): rclpy.init() super().__init__(name, allow_undeclared_parameters=True, automatically_declare_parameters_from_overrides=True) self.last_time = 0 self.current_time = 0 self.name = name self.current_pose = None self.tracker = None self.enable_color_tracking = False self.enable_face_tracking = False self.fps = fps.FPS() self.bridge = CvBridge() self.lock = threading.RLock() self.image_queue = queue.Queue(maxsize=2) self.thread_started = True self.thread = None self.x_init = 0.18 self.image_sub = None self.joints_pub = self.create_publisher(ServosPosition, '/servo_controller', 1) # Servo control 舵机控制 self.result_publisher = self.create_publisher(Image, '~/image_result', 1) self.enter_srv = self.create_service(Trigger, '~/enter', self.enter_srv_callback) self.exit_srv = self.create_service(Trigger, '~/exit', self.exit_srv_callback) self.client = self.create_client(Trigger, '/controller_manager/init_finish') self.client.wait_for_service() self.client = self.create_client(Trigger, '/kinematics/init_finish') self.client.wait_for_service() ``` ① `rclpy.init()`: Initializes the ROS2 client library. ② `super().init(...)`: Calls the constructor of the parent class to initialize the ROS 2 node, allowing undeclared parameters and enabling automatic declaration from overrides. ③ `self.last_time and self.current_time`: Used for time-based calculations, such as frame rate measurement. ④ `self.name`: Stores the name of the node. ⑤ `self.current_pose and self.tracker`: Manage the robot's current pose and tracking functionality. ⑥ `self.enable_color_tracking and self.enable_face_tracking`: Boolean flags to control whether color and face tracking are enabled. ⑦ `self.fps`: An object or mechanism to calculate frames per second. ⑧ `self.bridge`: A CvBridge instance for converting between ROS image messages and OpenCV images. ⑨ `self.lock`: A thread lock to ensure thread-safe operations. ⑩ `self.image_queue`: A queue (max size: 2) for buffering incoming image data. ⑪ `self.thread`: A thread object for concurrent processing. ⑫ `self.x_init`: Stores initialization parameters. ⑬ `self.joints_pub`: A publisher for sending servo control messages to the /servo_controller topic. ⑭ `self.result_publisher`: A publisher for sending processed images to the ~/image_result topic. ⑮ `self.enter_srv and self.exit_srv`: Service servers for handling entry and exit operations, with enter_srv_callback and exit_srv_callback as their respective handlers. ⑯ `self.client = self.create_client(...)`: Creates multiple ROS 2 service clients for calling external node services. ⑰ Heart: Sets up a heartbeat mechanism that triggers exit_srv_callback every 5 seconds. ⑱ `self.kinematics_client`: A service client for calling the /kinematics/set_pose_target motion planning service. ⑲ `self.enable_detect_srv:` A service to enable or disable color tracking. ⑳ `self.enable_transport_srv`: A service to enable or disable face tracking. ㉑ `self.set_target_color_srv`: A service to set the target color for detection. ㉒ if `self.get_parameter('start').value`: If the start parameter is true, it triggers the default entry callback and enables color tracking. ㉓ if not `self.get_parameter('broadcast').value`: If the broadcast parameter is false, it sets the default target color to red. ㉔ `self.create_service(...)`: Creates a service that responds to an initialization complete request. ㉕ `self.get_logger().info('init_finish')`: Logs a message indicating that initialization has completed. (3) get_node_state Function {lineno-start=83} ```python def get_node_state(self, request, response): response.success = True return response ``` A service callback function used to check the status of the node. (4) goback Function {lineno-start=87} ```python def goback(self): set_servo_position(self.joints_pub, 1.5, ((10, 200), (5, 500), (4,300), (3, 85), (2,730), (1, 500))) time.sleep(1.5) ``` Control the robotic arm to return to the initial position. (5) send_request Function {lineno-start=91} ```python def send_request(self, client, msg): future = client.call_async(msg) while rclpy.ok(): if future.done() and future.result(): return future.result() ``` ① `future = client.call_async(msg)`: Initiates an asynchronous call to a ROS service using the provided request message. ② Continuously checks the status of the service call until a response is received. (6) to_face_tracking_base and to_color_tracking_base Function {lineno-start=96} ```python def to_face_tracking_base(self): with self.lock: thread = threading.Thread(target=self.goback).start() if self.enable_color_tracking: self.enable_color_tracking = False self.tracker = None self.tracker = face_tracker.FaceTracker() self.enable_face_tracking = True self.thread = None self.get_logger().info('\033[1;32m%s\033[0m' % "start color tracking") def to_color_tracking_base(self): with self.lock: thread = threading.Thread(target=self.goback).start() if self.enable_face_tracking: self.enable_face_tracking = False self.tracker = None self.enable_color_tracking = True self.thread = None self.get_logger().info('\033[1;32m%s\033[0m' % "start color tracking") ``` ① `to_face_tracking_base`: Switch to face tracking mode. ② `to_color_tracking_base`: Switch to color tracking mode. (7) enable_color_srv_callback and enable_face_srv_callback Function {lineno-start=117} ```python def enable_color_srv_callback(self, request, response): with self.lock: if request.data: if self.thread is None: self.get_logger().info('\033[1;32m%s\033[0m' % "start color tracking...") self.thread = threading.Thread(target=self.to_color_tracking_base) self.thread.start() response.success = True response.message = "enter" return response else: msg = "Enable Color Tracking, 有其他操作正在进行, 请稍后重试" self.get_logger().info(msg) response.success = True response.message = "enter" return response else: self.get_logger().info('\033[1;32m%s\033[0m' % "start color tracking...") if self.enable_color_tracking: self.enable_color_tracking = False self.tracker = None response.success = True response.message = "enter" return response def enable_face_srv_callback(self, request, response): with self.lock: if self.thread is None: if request.data: self.get_logger().info('\033[1;32m%s\033[0m' % "start face tracking...") self.thread = threading.Thread(target=self.to_face_tracking_base) self.thread.start() else: self.get_logger().info('\033[1;32m%s\033[0m' % "stop face tracking...") if self.enable_face_tracking: self.enable_face_tracking = False self.tracker = None response.success = True response.message = "enter" return response else: msg = "Enable Face Tracking, 有其他操作正在进行, 请稍后重试" self.get_logger().info(msg) response.success = True response.message = "enter" return response ``` ① `enable_face_srv_callback`:Enable or disable face tracking. ② `enable_color_srv_callback`: Enable or disable color tracking. (8) set_target_color_srv_callback Function {lineno-start=164} ```python def set_target_color_srv_callback(self, request, response): with self.lock: if request.data != 0: colors = ["None","red", "green", "blue"] self.get_logger().info("\033[1;32mset target color: " + colors[int(request.data)] + "\033[0m") self.tracker = color_tracker.ColorTracker(colors[int(request.data)]) if self.enable_face_tracking: self.enable_face_tracking = False self.tracker = None else: self.tracker = None response.success = True response.message = "enter" return response ``` ① Set the target color according to `request.data`. ② Update `self.tracker` to the corresponding `ColorTracker `object for the selected color. ③ `Return success=True` to indicate the operation was successful. (9) enter_srv_callback and exit_srv_callback Function {lineno-start=179} ```python def enter_srv_callback(self, request, response): # Get and publish the topic of image. 获取和发布图像的topic self.get_logger().info('\033[1;32m%s\033[0m' % "loading object tracking") self.image_sub = self.create_subscription(Image, '/depth_cam/rgb/image_raw', self.image_callback, 1) # Subscribe to the camera. 摄像头订阅 with self.lock: self.enable_face_tracking = False self.enable_color_tracking = False self.thread_started = False response.success = True response.message = "enter" return response def exit_srv_callback(self, request, response): self.get_logger().info('\033[1;32m%s\033[0m' % "exit object tracking") with self.lock: self.enable_face_tracking = False self.enable_color_tracking = False self.tracker = None try: if self.image_sub is not None: self.destroy_subscription(self.image_sub) self.image_sub = None except Exception as e: self.get_logger().error(str(e)) response.success = True response.message = "exit" return response ``` ① `enter_srv_callback`: Starts the target tracking. ② `exit_srv_callback`: Stops the target tracking. (10) image_callback Function {lineno-start=216} ```python def image_callback(self, ros_image): # Convert the ros format image to opencv format. 将ros格式图像转换为opencv格式 cv_image = self.bridge.imgmsg_to_cv2(ros_image, "rgb8") rgb_image = np.array(cv_image, dtype=np.uint8) with self.lock: if not self.thread_started: # Start the thread only on the first call. 只在第一次调用时启动线程 threading.Thread(target=self.image_processing, args=(ros_image,), daemon=True).start() self.thread_started = True if self.image_queue.full(): # # Discard the oldest image if the queue is full. 如果队列已满,丢弃最旧的图像 self.image_queue.get() # # Put image into the queue. 将图像放入队列 self.image_queue.put(rgb_image) ``` ① `self.bridge.imgmsg_to_cv2(ros_image, "rgb8")`: Use `CvBridge `to convert the ROS image message (`sensor_msgs/msg/Image`) into an OpenCV-compatible image format. ② `np.array(cv_image, dtype=np.uint8)`: Convert the resulting image data into a NumPy array with data type uint8, which is the standard format for OpenCV image processing. ③ Use `self.lock ` (a thread lock) to ensure thread safety during operations. ④ `self.thread_started` is used to mark whether the image processing thread has already been started. ⑤ `daemon=True` indicates that the thread is a daemon thread, meaning it will automatically exit when the main thread terminates. ⑥ `self.image_queue `is a `queue.Queue` object with a maximum capacity of 2 (set during initialization) used to store image data. ⑦ If the queue is full (`self.image_queue.full()`), the oldest image in the queue is removed (`self.image_queue.get()`) to make room. ⑧ The current OpenCV image (`rgb_image`) is then placed into the queue (`self.image_queue.put(rgb_image)`) for subsequent processing. (11) image_processing Function {lineno-start=233} ```python def image_processing(self, ros_image): while True: rgb_image = self.image_queue.get() result_image = np.copy(rgb_image) if self.thread is None and self.tracker is not None: if self.tracker.tracker_type == 'color' and self.enable_color_tracking: result_image, p_y = self.tracker.proc(rgb_image, result_image) if p_y is not None: msg = set_pose_target([0.15, 0, p_y[0]], 0.0, [-180.0, 180.0], 1.0) res = self.send_request(self.kinematics_client, msg) if res.pulse: servo_data = res.pulse set_servo_position(self.joints_pub, 0.02, ( (4, servo_data[3]), (3, servo_data[2]), (2, servo_data[1]), (1, int(p_y[1])) )) elif self.tracker.tracker_type == 'face' and self.enable_face_tracking: result_image, p_y = self.tracker.proc(rgb_image, result_image) if p_y is not None: set_servo_position(self.joints_pub, 0.02, ((1, p_y[1]), (4, p_y[0]))) self.result_publisher.publish(self.bridge.cv2_to_imgmsg(result_image, "rgb8")) if result_image is not None and self.get_parameter('display').value: result_image = cv2.cvtColor(result_image, cv2.COLOR_BGR2RGB) cv2.imshow('result_image', result_image) key = cv2.waitKey(1) ``` ① `self.image_queue.get()`: Retrieve an image from the image queue. ② `np.copy(rgb_image)`: Create a copy of the image to store the processed results. ③ if `self.thread` is None and `self.tracker` is not None: Check whether there is no other thread currently running (`self.thread` is None) and that the tracker has been initialized. ④ `if self.tracker.tracker_type == 'color'` and `self.enable_color_tracking`: If the tracker type is "**color**" and color tracking is enabled, process the image using `self.tracker.proc(rgb_image, result_image)` for color tracking. ⑤ `if res.pulse`: If the kinematics client returns pulse data (res.pulse), call set_servo_position to adjust servo positions according to the returned servo commands. ⑥ Call `self.tracker.proc(rgb_image, result_image)` to perform face tracking on the image. ⑦ Call `set_servo_position` to adjust servo positions based on the detected face position. ⑧ `self.result_publisher.publish`: Publish the processed image to the ~/image_result topic for other nodes to subscribe to. (12) main Function {lineno-start=256} ```python def main(): node = ObjectTrackingNode('object_tracking') executor = MultiThreadedExecutor() executor.add_node(node) executor.spin() node.destroy_node() ``` ① Create the `ObjectTrackingNode `node. ② Create a `MultiThreadedExecutor` for handling multiple threads. ③ Add the node to the executor. ④ Start the event loop by calling `executor.spin()`, waiting for services and subscription messages. ⑤ Upon program exit, destroy the node. ### 12.2.3 Tag Tracking When the camera detects the target tag (default: ID2), the robotic arm will follow the movement of the tag. * **Program Logic** Firstly, subscribe to the topic messages published by the tag tracking node to obtain information about the recognized tag. Then, upon identifying the target tag, retrieve the central position of the tag image. Finally, use inverse kinematics to calculate the angles required to align the center of the screen with the center of the target image. Publish the corresponding topic messages to control servo rotation, enabling the robotic arm to follow the target movement. * **Operation Steps** (1) Start the robot arm, and access the robot system desktop using NoMachine according to the tutorials saved in [1. Getting Ready->1.6 Development Environment Setup and Configuration](1.Getting_Ready.md#development-environment-setup-and-configuration) (2) Double-click to open the command-line terminal. (3) Execute the command below and hit Enter to terminate the auto-start service. ``` sudo systemctl stop start_app_node.service ``` (4) Open a new terminal ,and execute the command to run the game program. ``` ros2 launch example tag_track_node.launch.py ``` (5) After initiating the game, the terminal will display the following message, and the live camera feed will start automatically. (6) If you need to terminate the game, press '**Ctrl+C**'. If the program cannot be stopped, please retry. (7) After running the previous program, you need to restart the app service by executing the following command. ``` sudo systemctl start jetarm_bringup.service ``` * **Program Outcome** After starting the mode, place the ID1 tag in front of the robotic arm's camera. The detected tag ID will be displayed in the returned image, and the robotic arm will continuously follow the movement of the ID1 tag. * **Launch File Analysis** The Launch file is saved in [/home/ubuntu/ros2_ws/src/example/example/opencv/tag_track_node.launch.py](../_static/source_code/opencv.zip) (1) launch_setup Function {lineno-start=9} ```python def launch_setup(context): compiled = os.environ['need_compile'] start = LaunchConfiguration('start', default='true') start_arg = DeclareLaunchArgument('start', default_value=start) if compiled == 'True': sdk_package_path = get_package_share_directory('sdk') peripherals_package_path = get_package_share_directory('peripherals') example_package_path = get_package_share_directory('example') else: sdk_package_path = '/home/ubuntu/ros2_ws/src/driver/sdk' peripherals_package_path = '/home/ubuntu/ros2_ws/src/peripherals' example_package_path = '/home/ubuntu/ros2_ws/src/example' sdk_launch = IncludeLaunchDescription( PythonLaunchDescriptionSource( os.path.join(sdk_package_path, 'launch/jetarm_sdk.launch.py')), ) depth_camera_launch = IncludeLaunchDescription( PythonLaunchDescriptionSource( os.path.join(peripherals_package_path, 'launch/depth_camera.launch.py')), ) tag_track_node = Node( package='example', executable='tag_track', output='screen', parameters=[{'start': start}] ) return [start_arg, sdk_launch, depth_camera_launch, tag_track_node, ] ``` ① `compiled = os.environ['need_compile']`: Retrieve the environment variable `need_compile` to determine whether compilation of related packages is required. ② `start = LaunchConfiguration('start', default='true')`: Define the launch argument `start` with a default value of `'true'`. ③ `start_arg = DeclareLaunchArgument('start', default_value=start)`: Declare the launch argument `start` and set its default value to `'true'`. ④ `if compiled == 'True'`: Check if package compilation is required by evaluating theneed_compile environment variable. ⑤ `sdk_package_path = get_package_share_directory('sdk')`: Get the shared directory path for the `sdk` package. ⑥ `peripherals_package_path = get_package_share_directory('peripherals')`: Get the shared directory path for the `peripherals` package. ⑦ `example_package_path = get_package_share_directory('example')`: Get the shared directory path for the `example` package. ⑧`else`: If compilation is not required, use hardcoded paths to specify the package locations. ⑨ `sdk_package_path = '/home/ubuntu/ros2_ws/src/driver/sdk'`: Specify the path for the `sdk` package. ⑩ `peripherals_package_path = '/home/ubuntu/ros2_ws/src/peripherals'`: Specify the path for the `peripherals` package. ⑪ `example_package_path = '/home/ubuntu/ros2_ws/src/example'`: Specify the path for the `example` package. ⑫ `sdk_launch = IncludeLaunchDescription(PythonLaunchDescriptionSource(os.path.join(sdk_package_path, 'launch/jetarm_sdk.launch.py')))`: Include and launch the `jetarm_sdk.launch.py` file to start the SDK-related nodes. ⑬ `depth_camera_launch = IncludeLaunchDescription(PythonLaunchDescriptionSource(os.path.join(peripherals_package_path, 'launch/depth_camera.launch.py')))`: Include and launch the `depth_camera.launch.py` file to start the depth camera-related nodes. ⑭ `tag_track_node = Node(package='example', executable='tag_track', output='screen', parameters=[{'start': start}])`: Define and launch the `tag_track` node to perform color tag tracking. ⑮ `return [...]`: Return the list of launch items, including the launch arguments, SDK, depth camera, and tag tracking nodes. (2) generate_launch_description Function {lineno-start=46} ```python def generate_launch_description(): return LaunchDescription([ OpaqueFunction(function = launch_setup) ]) ``` ① `def generate_launch_description():` Define the generate_launch_description function to generate a launch description object. ② `return LaunchDescription([OpaqueFunction(function = launch_setup)])`: Return a LaunchDescription object, where the launch_setup function is executed through OpaqueFunction, and its result is used as the launch configuration. (3) Main Function {lineno-start=51} ```python if __name__ == '__main__': # Create a LaunchDescription object. 创建一个LaunchDescription对象 ld = generate_launch_description() ls = LaunchService() ls.include_launch_description(ld) ls.run() ``` ① `if __name__ == '__main__'`: Check if the script is being run as the main program. If true, proceed with the launch process. ② `ld = generate_launch_description()`: Call the `generate_launch_description` function to generate the launch description object `ld`. ③ `ls = LaunchService()`: Create an instance of `LaunchService` named `ls`, which is responsible for executing the launch service. ④ `ls.include_launch_description(ld)`: Include the generated launch description `ld` in the `LaunchService`, ensuring the configurations in the launch file are executed. ⑤ `ls.run()`: Start the ROS2 system and begin executing the nodes and configurations defined in the launch file. * **Python Source Code Analysis** Python source code file is saved in [/home/ubuntu/ros2_ws/src/example/example/opencv/include/tag_track_node.py](../_static/source_code/opencv.zip) The program's logical flowchart is illustrated in the diagram obtained by organizing information from the source files. The main logic flow of the program involves processing the captured images and displaying the live feed. (1) Import Feature Pack Import the required module using import statement. {lineno-start=4} ```python import cv2 import time import rclpy import queue import signal import threading import numpy as np import sdk.pid as pid import sdk.fps as fps import sdk.common as common from rclpy.node import Node from cv_bridge import CvBridge from kinematics import transform from dt_apriltags import Detector from std_srvs.srv import Trigger from sensor_msgs.msg import Image from geometry_msgs.msg import Twist from kinematics_msgs.srv import SetRobotPose from rclpy.executors import MultiThreadedExecutor from interfaces.msg import ColorsInfo, ColorDetect from servo_controller_msgs.msg import ServosPosition from rclpy.callback_groups import ReentrantCallbackGroup from interfaces.srv import SetColorDetectParam, SetString from kinematics.kinematics_control import set_pose_target from servo_controller.bus_servo_control import set_servo_position ``` ① `import cv2`: Import the OpenCV library for image processing and computer vision operations (e.g., image reading, conversion). ② `import time`: Import the time module to handle time-related functions such as delays (`sleep`) and time difference calculations. ③ `import rclpy`: Import the ROS2 Python client library to create and manage ROS nodes, services, and messages. ④ `import queue`: Import Python's queue module to create thread-safe queues (e.g., image queues) for multithreaded programming. ⑤ `import signal`: Import the signal module to handle system signals (e.g., termination, external interrupt signals). In this code, it is used for gracefully shutting down the node. ⑥ `import threading`: Import the threading module for multithreaded programming. It is used here to concurrently run the main function, ensuring that the node can track tags in the background. ⑦ `import numpy as np`: Import the NumPy library and alias it as `np`, used for array and matrix operations, especially in image data processing. ⑧ `import sdk.pid as pid`: Import the `pid` module from the SDK package, aliased as `pid`. This module implements the PID control algorithm for controlling the robot's Y and Z axis movement. ⑨ `import sdk.fps as fps`: Import the `fps` module from the SDK package, aliased as `fps`. This module is used for frame rate statistics and calculation, helping control the performance of tag tracking. ⑩ `import sdk.common as common`: Import the `common` module from the SDK package, aliased as `common`. This module contains general utility functions, such as tag drawing and data processing. ⑪ `from rclpy.node import Node`: Import the `Node` class from `rclpy.node` to create ROS2 nodes. The `TagTrackNode` class inherits from `Node` to manage the node's lifecycle, publishing, and subscribing. ⑫ `from cv_bridge import CvBridge`: Import the CvBridge class from the `cv_bridge` package to convert between ROS image messages and OpenCV images. This tool is crucial for image processing and transmission. ⑬ `from kinematics import transform`: Import the `transform` module from the `kinematics` package for robotic kinematic transformations (e.g., coordinate conversion, rotation matrix calculations). ⑭ `from dt_apriltags import Detector`: Import the `Detector` class from the `dt_apriltags` package to detect and decode AprilTag markers. An instance of this class is used to identify tags in the image and extract their position data. ⑮ `from std_srvs.srv import Trigger`: Import the `Trigger` service message type from the `std_srvs.srv` package for handling simple boolean trigger requests. In this code, it is used to start and stop tag tracking services. ⑯ `from sensor_msgs.msg import Image`: Import the `Image` message type from the `sensor_msgs.msg` package for transmitting image data. In this code, it subscribes to image messages from the depth camera. ⑰ `from geometry_msgs.msg import Twist`: Import the `Twist` message type from the `geometry_msgs.msg` package, typically used for controlling robot ⑱ `from kinematics_msgs.srv import SetRobotPose`: Import the service type for setting robot poses. ⑲ `from rclpy.executors import MultiThreadedExecutor`: Import the ROS 2 multithreaded executor. ⑳ `from interfaces.msg import ColorsInfo, ColorDetect`: Import message types related to color information and detection. ㉑ `from servo_controller_msgs.msg import ServosPosition`: Import the message type for controlling servo positions. ㉒ `from rclpy.callback_groups import ReentrantCallbackGroup`: Import the callback group to support handling callbacks in a multithreaded environment. ㉓ `from interfaces.srv import SetColorDetectParam, SetString`: Import service types for setting color detection parameters and strings. ㉔ `from kinematics.kinematics_control import set_pose_target`: Import the function for setting pose targets. ㉕ `from servo_controller.bus_servo_control import set_servo_position`: Import the function for controlling servo positions. (2) Initialization Function of the ColorTrackNode Class {lineno-start=30} ```python class TagTrackNode(Node): def __init__(self, name): rclpy.init() super().__init__(name, allow_undeclared_parameters=True, automatically_declare_parameters_from_overrides=True) self.z_dis = 0.28 self.y_dis = 500 self.x_init = 0.18 self.bridge = CvBridge() # Used for the conversion between ROS Image messages and OpenCV images. 用于ROS Image消息与OpenCV图像之间的转换 self.center = None self.running = True self.start = False self.name = name self.image_queue = queue.Queue(maxsize=2) self.pid_z = pid.PID(0.00008, 0.0, 0.0) self.pid_y = pid.PID(0.092, 0.00, 0.00000005) # self.pid_y = pid.PID(0.13, 0.0, 0.0001) self.tag_id = 1 # Set tracking tag 设置追踪标签 self.fps = fps.FPS() # frame rate counter 帧率统计器 self.at_detector = Detector(searchpath=['apriltags'], families='tag36h11', nthreads=8, quad_decimate=2.0, quad_sigma=0.0, refine_edges=1, decode_sharpening=0.25, debug=0) self.joints_pub = self.create_publisher(ServosPosition, '/servo_controller', 1) # Servo control 舵机控制 self.image_sub = self.create_subscription(Image, '/depth_cam/rgb/image_raw', self.image_callback, 1) # Subscribe to the camera. 摄像头订阅 timer_cb_group = ReentrantCallbackGroup() self.create_service(Trigger, '~/start', self.start_srv_callback) # Enter the feature 进入玩法 self.create_service(Trigger, '~/stop', self.stop_srv_callback, callback_group=timer_cb_group) # Exit the feature 退出玩法 self.client = self.create_client(Trigger, '/controller_manager/init_finish') self.client.wait_for_service() self.client = self.create_client(Trigger, '/kinematics/init_finish') self.client.wait_for_service() self.kinematics_client = self.create_client(SetRobotPose, '/kinematics/set_pose_target') self.kinematics_client.wait_for_service() self.timer = self.create_timer(0.0, self.init_process, callback_group=timer_cb_group) ``` ① `rclpy.init()`: Initializes the ROS 2 client library. ② `super().__init__(name, allow_undeclared_parameters=True, automatically_declare_parameters_from_overrides=True)`: Calls the parent class constructor to initialize the ROS 2 node with specified parameters, enabling undeclared parameter overrides. ③ `self.z_dis = 0.28`: Initialize the Z-axis distance to 0.28. ④ `self.y_dis = 500`: Initialize the Y-axis distance to 500. ⑤ `self.x_init = 0.18`: Initialize the X-axis initial position to 0.18. ⑥ `self.bridge = CvBridge()`: Initialize the CvBridge to convert between ROS image messages and OpenCV images. ⑦ `self.center = None`: Initialize the center of color detection as None. ⑧ `self.running = True`: Set the node's running state to True. ⑨ `self.start = False`: Set the start state to False. ⑩ `self.image_queue = queue.Queue(maxsize=2)`: Initialize the image queue with a maximum size of 2. ⑪` self.pid_z = pid.PID(0.00008, 0.0, 0.0)`: Initialize the PID controller for the Z-axis. ⑫ `self.pid_y = pid.PID(0.092, 0.00, 0.00000005)`: Initialize the PID controller for the Y-axis. ⑬ `self.tag_id = 1`: Set the ID of the tag to track as 1. ⑭ `self.fps = fps.FPS()`: Initialize the frame rate statistics tracker. ⑮ `self.at_detector = Detector(...)`: Initialize the AprilTag detector with the necessary configurations. ⑯ `self.joints_pub = self.create_publisher(ServosPosition, '/servo_controller', 1)`: Create a publisher for servo control messages. ⑰ `self.image_sub = self.create_subscription(Image, '/depth_cam/rgb/image_raw', self.image_callback, 1)`: Create an image subscriber to receive camera data. ⑱ `self.create_service(Trigger, '~/start', self.start_srv_callback)`: Create a service to handle the start request for tag tracking. ⑲ `self.create_service(Trigger, '~/stop', self.stop_srv_callback)`: Create a service to handle the stop request for tag tracking. ⑳ `self.client = self.create_client(Trigger, '/controller_manager/init_finish')`: Create and wait for the /controller_manager/init_finish service to finish initialization. ㉑ `self.client = self.create_client(Trigger, '/kinematics/init_finish')`: Create and wait for the /kinematics/init_finish service to finish initialization. ㉒ `self.kinematics_client = self.create_client(SetRobotPose, '/kinematics/set_pose_target')`: Create and wait for the /kinematics/set_pose_target\` service to set the robot's pose. ㉓ `self.timer = self.create_timer(0.0, self.init_process)`: Create a timer that triggers the init_process method immediately. (3) init_process Function {lineno-start=77} ```python def init_process(self): self.timer.cancel() self.init_action() if self.get_parameter('start').value: self.start_srv_callback(Trigger.Request(), Trigger.Response()) threading.Thread(target=self.main, daemon=True).start() self.create_service(Trigger, '~/init_finish', self.get_node_state) self.get_logger().info('\033[1;32m%s\033[0m' % 'start') ``` ① `self.timer.cancel()`: Cancels the initialization timer to stop scheduled tasks. ② `self.init_action()`: Executes the initialization actions required to prepare the node. ③ Conditional check for the start parameter: If the node's start parameter is True, calls services to initiate operations and configure color detection. ④ `threading.Thread(target=self.main, daemon=True).start()`: Starts a background thread to execute the main task, such as color tracking. ⑤ `self.create_service(...)`: Creates a service to handle requests for initialization completion. ⑥ `self.get_logger().info('start')`: Logs an informational message indicating that the node has started. (4) get_node_state Function {lineno-start=88} ```python def get_node_state(self, request, response): response.success = True return response ``` ① `response.success = True`: Set the response to success. ② `return response`: Return the response, indicating that the node is ready. (5) Shutdown Function {lineno-start=92} ```python def shutdown(self, signum, frame): self.running = False ``` self.running = False: Set the running flag to False to stop the node. (6) init_action Function {lineno-start=95} ```python def init_action(self): msg = set_pose_target([self.x_init, 0.0, self.z_dis], 0.0, [-90.0, 90.0], 1.0) res = self.send_request(self.kinematics_client, msg) if res.pulse: servo_data = res.pulse set_servo_position(self.joints_pub, 1.5, ((10, 500), (5, 500), (4, servo_data[3]), (3, servo_data[2]), (2, servo_data[1]), (1, servo_data[0]))) time.sleep(1.8) ``` ① `msg = set_pose_target(\[self.x_init, 0.0, self.z_dis], 0.0, [-90.0, 90.0], 1.0)`: Create the target pose message, setting the robot's target position and orientation. ② `res = self.send_request(self.kinematics_client, msg)`: Send the request to set the target pose. ③ if `res.pulse`: If the returned pulse data is valid, execute the servo control. ④ `set_servo_position(self.joints_pub, 1.5, ((10, 500), (5, 500), (4, servo_data[3]), (3, servo_data[2]), (2, servo_data[1]), (1, servo_data[0])))`: Set the servo positions. ⑤ `time.sleep(1.8)`: Pause for 1.8 seconds to ensure the servo stabilizes. (7) send_request Function {lineno-start=103} ```python def send_request(self, client, msg): future = client.call_async(msg) while rclpy.ok(): if future.done() and future.result(): return future.result() ``` ① `future = client.call_async(msg)`: Asynchronously call a ROS service. ② while ` rclpy.ok()`: Wait for the service response while the ROS2 system is running normally. ③ `if future.done() `and `future.result()`: Check if the request is completed and retrieve the result. ④ `return future.result()`: Return the result of the request. (8) start_srv_callback and stop_srv_callback Function {lineno-start=110} ```python def start_srv_callback(self, request, response): self.get_logger().info('\033[1;32m%s\033[0m' % "start tag track") self.start = True response.success = True response.message = "start" return response def stop_srv_callback(self, request, response): self.get_logger().info('\033[1;32m%s\033[0m' % "stop tag track") self.start = False res = self.send_request(ColorDetect.Request()) if res.success: self.get_logger().info('\033[1;32m%s\033[0m' % 'set tag success') else: self.get_logger().info('\033[1;32m%s\033[0m' % 'set tag fail') response.success = True response.message = "stop" return response ``` ① `self.get_logger().info("start tag track")`: Log output indicating the start of tag tracking. ② `self.start = True`: Set the tag tracking status to True to indicate it has started. ③ `response.success = True`: Set the response success flag to True. ④ `response.message = "start"`: Set the response message to "**start**". ⑤ `self.get_logger().info("stop tag track")`: Log output indicating the stop of tag tracking. ⑥ `self.start = False`: Set the tag tracking status to False to indicate it has stopped. ⑦ `res = self.send_request(ColorDetect.Request())`: Send a request to stop color detection. ⑧ `if res.success:`: If the stop request is successful, log the result. ⑨ ` response.success = True`: Set the response success flag to True. ⑩ `response.message = "stop"`: Set the response message to "**stop**". ### 12.2.4 KCF Target Tracking Utilizing the KCF algorithm, select the target object by drawing a bounding box with the mouse. When the camera detects the highlighted target object, the system will track its movement and follow it as it moves. * **KCF Introduction** KCF (Kernel Correlation Filter), also known as the Kernelized Correlation Filter algorithm, was proposed by Joao F. Henriques, Rui Caseiro, Pedro Martins, and Jorge Batista in 2014. KCF is a discriminative tracking method that operates by collecting positive and negative samples using a circulant matrix around the target region. It trains a target detector, and subsequently uses this detector to check whether the predicted position in the next frame is the target. Based on the updated detection results, the training set is then updated, iteratively refining the target detector. * **Program Logic** Firstly, subscribe to the topic messages published by the KCF tracking node to obtain information about the recognized KCF-tracked objects. Next, check if the 'S' key on the keyboard is pressed. When 'S' is pressed, use the mouse to draw a bounding box around the region of interest (ROI). Obtain the ROI and calculate the coordinates of its center. Finally, employ inverse kinematics to determine the angles necessary to align the center of the screen with the center of the target image. Publish the corresponding topic message to control servo rotation, enabling the robotic arm to follow the moving target. * **Operation Steps** (1) Start the robot arm, and access the robot system desktop using NoMachine according to the tutorials saved in [1. Getting Ready->1.6 Development Environment Setup and Configuration](1.Getting_Ready.md#development-environment-setup-and-configuration) (2) Double-click to open the command-line terminal. (3) Execute the command below and hit Enter to terminate the auto-start service. ``` sudo systemctl stop start_app_node.service ``` (4) Open a new command-line terminal and run the command below to initiate the game program. ``` ros2 launch example kcf_track_node.launch.py ``` (5) After initializing the game, you will observe the following prompt messages in the terminal, and at this point, the live camera feed will automatically appear. (6) If you need to terminate the game, press `'Ctrl+C'`. If the program cannot be stopped, please retry. (7) After running the previous program, you need to restart the app service by executing the command below. ``` sudo systemctl start jetarm_bringup.service ``` * **Program Outcome** After starting the tracking feature, place the object to be tracked in front of the camera. Press the 'S' key in the video feed window, then use the left mouse button to draw a selection box around the object. Finally, press Enter. The robotic arm will continuously follow the selected object. :::{Note} The object should not move too quickly. If the object is lost, simply press 'S' again and re-select the object. ::: * **Launch File Analysis** Launch file is saved in [/home/ubuntu/ros2_ws/src/example/example/opencv/kcf_tracking.launch.py](../_static/source_code/opencv.zip) (1) launch_setup Function {lineno-start=10} ```python def launch_setup(context): compiled = os.environ['need_compile'] start = LaunchConfiguration('start', default='true') start_arg = DeclareLaunchArgument('start', default_value=start) if compiled == 'True': sdk_package_path = get_package_share_directory('sdk') peripherals_package_path = get_package_share_directory('peripherals') example_package_path = get_package_share_directory('example') else: sdk_package_path = '/home/ubuntu/ros2_ws/src/driver/sdk' peripherals_package_path = '/home/ubuntu/ros2_ws/src/peripherals' example_package_path = '/home/ubuntu/ros2_ws/src/example' sdk_launch = IncludeLaunchDescription( PythonLaunchDescriptionSource( os.path.join(sdk_package_path, 'launch/jetarm_sdk.launch.py')), ) depth_camera_launch = IncludeLaunchDescription( PythonLaunchDescriptionSource( os.path.join(peripherals_package_path, 'launch/depth_camera.launch.py')), ) kcf_track_node = Node( package='example', executable='kcf_track', output='screen', parameters=[{'start': start}] ) return [start_arg, sdk_launch, depth_camera_launch, kcf_track_node, ] ``` ① `compiled = os.environ['need_compile']`: Retrieve the value of the `need_compile` environment variable to determine whether compilation is needed. ② `start = LaunchConfiguration('start', default='true')`: Define a launch argument `start`, with a default value of `'true'`. ③ `start_arg = DeclareLaunchArgument('start', default_value=start)`: Declare the `start` launch argument, allowing for customization during launch. ④ `if compiled == 'True'`: Check the value of `compiled` to decide whether to use the local path or ROS2 package path. ⑤ `sdk_package_path = get_package_share_directory('sdk')` Get the shared directory path of the `sdk` package. ⑥ `peripherals_package_path = get_package_share_directory('peripherals')`: Get the shared directory path of the `peripherals` package. ⑦ `example_package_path = get_package_share_directory('example')`: Get the shared directory path of the `example` package. ⑧ `sdk_launch = IncludeLaunchDescription(...)`: Include the `sdk` package's launch file, with the path `jetarm_sdk.launch.py`. ⑨ `depth_camera_launch = IncludeLaunchDescription(...)`: Include the `peripherals` package's launch file, with the path `depth_camera.launch.py`, to start the depth camera. ⑩ `kcf_track_node = Node(...)`: Create and configure the `kcf_track` node, passing the `start` argument. ⑪ `return [start_arg, sdk_launch, depth_camera_launch, kcf_track_node]`: Return a list of launch configurations that includes the required arguments and nodes to start. (2) generate_launch_description Function {lineno-start=46} ```python def generate_launch_description(): return LaunchDescription([ OpaqueFunction(function = launch_setup) ]) ``` ① `def generate_launch_description():`: Define the generate_launch_description function, which is used to generate the launch description object. ② `return LaunchDescription([OpaqueFunction(function = launch_setup)])`: Return a LaunchDescription object, where the launch_setup function is executed using OpaqueFunction, and its result is included as part of the launch configuration. (3) Main Function {lineno-start=51} ```python if __name__ == '__main__': # Create a LaunchDescription object. 创建一个LaunchDescription对象 ld = generate_launch_description() ls = LaunchService() ls.include_launch_description(ld) ls.run() ``` ① `if __name__ == '__main__'`: Check if the script is being executed as the main program. If so, proceed with the launch process. ② `ld = generate_launch_description()`: Call the `generate_launch_description` function to generate the launch description object `ld`. ③ `ls = LaunchService()`: Create an instance of `LaunchService` named `ls`, which is responsible for managing the launch process. ④ `ls.include_launch_description(ld)`: Include the generated launch description `ld` into the `LaunchService`, ensuring that the configurations in the launch file are executed. ⑤ `ls.run()`: Start the ROS2 system and execute the nodes and configurations defined in the launch file. * **Python Source Code Analysis** The Python source code file is saved in [/home/ubuntu/ros2_ws/src/example/example/opencv/include/kcf_tracking.py](../_static/source_code/opencv.zip) The program flowchart is as pictured below: The main logic flow of the program involves processing the captured images and displaying the live feed. (1) Import Function Package Import the required module using import statement. {lineno-start=4} ```python import cv2 import time import rclpy import queue import signal import threading import numpy as np import sdk.pid as pid import sdk.fps as fps import sdk.common as common from rclpy.node import Node from cv_bridge import CvBridge from kinematics import transform from dt_apriltags import Detector from std_srvs.srv import Trigger from sensor_msgs.msg import Image from geometry_msgs.msg import Twist from kinematics_msgs.srv import SetRobotPose from rclpy.executors import MultiThreadedExecutor from interfaces.msg import ColorsInfo, ColorDetect from servo_controller_msgs.msg import ServosPosition from rclpy.callback_groups import ReentrantCallbackGroup from interfaces.srv import SetColorDetectParam, SetString from kinematics.kinematics_control import set_pose_target from servo_controller.bus_servo_control import set_servo_position ``` ① `import cv2`: Import the OpenCV library for image processing and computer vision tasks. ② `import time`: Import the time module for time-related operations, such as delays. ③ `import rclpy`: Import the ROS2 Python client library to create nodes and handle communication. ④ `import queue`: Import the queue module to manage inter-thread communication via queues. ⑤ `import signal`: Import the signal module to handle system signals (e.g., termination signals). ⑥ `import threading`: Import the threading module for multithreaded operations. ⑦ `import numpy as np`: Import the NumPy library for numerical computations, particularly for processing image data. ⑧ `import sdk.pid as pid`: Import the custom `pid` module for implementing a PID controller. ⑨ `import sdk.fps as fps`: Import the custom `fps` module to calculate and display frame rates. ⑩ `import sdk.common as common`: Import the custom `common` module containing utility functions. ⑪ `from rclpy.node import Node`: Import the `Node` class from `rclpy.node`. All ROS2 nodes must inherit from this class. ⑫ `from cv_bridge import CvBridge`: Import `CvBridge` from `cv_bridge` to convert between ROS messages and OpenCV images. ⑬ `from kinematics import transform`: Import the `transform` function from the `kinematics` module for robotic kinematic transformations. ⑭ `from dt_apriltags import Detector`: Import the `Detector` class from the `dt_apriltags` library for AprilTag detection. ⑮ `from std_srvs.srv import Trigger`: Import the `Trigger` service type from `std_srvs.srv` for simple switch operations. ⑯ `from sensor_msgs.msg import Image`: Import the `Image` message type from `sensor_msgs.msg` to receive image data. ⑰ `from geometry_msgs.msg import Twist`: Import the `Twist` message type to send velocity control commands. ⑱ `from kinematics_msgs.srv import SetRobotPose`: Import the `SetRobotPose` service type for setting the robot's pose. ⑲ `from rclpy.executors import MultiThreadedExecutor`: Import `MultiThreadedExecutor` to allow multiple callbacks to run concurrently. ⑳ `from interfaces.msg import ColorsInfo, ColorDetect`: Import custom message types `ColorsInfo` and `ColorDetect` from `interfaces.msg` for color detection. ㉑ `from servo_controller_msgs.msg import ServosPosition`: Import the custom `ServosPosition` message type from `servo_controller_msgs.msg` to control servos. ㉒ `from rclpy.callback_groups import ReentrantCallbackGroup`: Import `ReentrantCallbackGroup` to allow reentrancy within the same callback group. ㉓ `from interfaces.srv import SetColorDetectParam, SetString`: Import custom service types `SetColorDetectParam` and `SetString` from `interfaces.srv`. ㉔ `from kinematics.kinematics_control import set_pose_target`: Import the `set_pose_target` function from `kinematics.kinematics_control` for setting the robot's target pose. ㉕ `from servo_controller.bus_servo_control import set_servo_position`: Import the `set_servo_position` function from `servo_controller.bus_servo_control` to control the servo position. (2) The Initialization Function of the KCFTrackingNode Class {lineno-start=31} ```python def __init__(self, name): rclpy.init() super().__init__(name, allow_undeclared_parameters=True, automatically_declare_parameters_from_overrides=True) self.z_dis = 0.28 self.y_dis = 500 self.x_init = 0.18 self.bridge = CvBridge() # Used for the conversion between ROS Image messages and OpenCV images. 用于ROS Image消息与OpenCV图像之间的转换 self.center = None self.tracker = None self.enable_select = False self.running = True self.start = False self.name = name self.image_queue = queue.Queue(maxsize=2) self.pid_z = pid.PID(0.00006, 0.0, 0.0) self.pid_y = pid.PID(0.05, 0.0, 0.0) self.fps = fps.FPS() # frame rate counter 帧率统计器 self.joints_pub = self.create_publisher(ServosPosition, '/servo_controller', 1) # Servo control 舵机控制 self.image_sub = self.create_subscription(Image, '/depth_cam/rgb/image_raw', self.image_callback, 1) # subscribe to the camera 摄像头订阅 timer_cb_group = ReentrantCallbackGroup() self.create_service(Trigger, '~/start', self.start_srv_callback) # Enter the feature 进入玩法 self.create_service(Trigger, '~/stop', self.stop_srv_callback, callback_group=timer_cb_group) # Exit the feature 退出玩法 self.client = self.create_client(Trigger, '/controller_manager/init_finish') self.client.wait_for_service() self.client = self.create_client(Trigger, '/kinematics/init_finish') self.client.wait_for_service() self.kinematics_client = self.create_client(SetRobotPose, '/kinematics/set_pose_target') self.kinematics_client.wait_for_service() self.timer = self.create_timer(0.0, self.init_process, callback_group=timer_cb_group) ``` ① `class KcfTrackNode(Node)`: Define a class named KcfTrackNode that inherits from ROS2's Node class to define the behavior of the node. ② `rclpy.init()`: Initialize the ROS2 client library to start the node. ③ `super().__init__(name, ...)`: Call the parent class constructor to initialize the node and declare configuration items. ④ `self.z_dis = 0.28`: Set z_dis (the initial distance between the robot and the object). ⑤ `self.y_dis = 500`: Set y_dis (the Y-coordinate position of the robot). ⑥ `self.x_init = 0.18`: Set x_init (the initial X-coordinate of the robot). ⑦ `self.bridge = CvBridge()`: Create an instance of CvBridge to convert between ROS image messages and OpenCV images. ⑧ `self.center = None`: Initialize center to store the center coordinates of the object. ⑨ `self.tracker = None`: Initialize tracker to track the target. ⑩ `self.enable_select = False`: Initialize enable_select to control whether the target selection feature is enabled. ⑪ `self.running = True`: Set running to True, indicating the node is running. ⑫ `self.start = False`: Set start to False, indicating whether the tracking is started. ⑬ `self.name = name`: Store the node's name. ⑭ `self.image_queue = queue.Queue(maxsize=2)`: Create a queue to store image data with a maximum size of 2. ⑮ `self.pid_z = pid.PID(0.00006, 0.0, 0.0)`: Create a PID controller pid_z to control the Z-axis. ⑯ `self.pid_y = pid.PID(0.05, 0.0, 0.0)`: Create a PID controller pid_y to control the Y-axis. ⑰ `self.fps = fps.FPS()`: Create a frame rate statistics object. ⑱ `self.joints_pub = self.create_publisher(...)`: Create a ROS2 publisher to control the servos. ⑲ `self.image_sub = self.create_subscription(...)`: Create a ROS2 subscriber to receive image data. ⑳ Create client for starting and stopping the functionality. (3) init_process Function {lineno-start=66} ```python def init_process(self): self.timer.cancel() self.init_action() if self.get_parameter('start').value: self.start_srv_callback(Trigger.Request(), Trigger.Response()) threading.Thread(target=self.main, daemon=True).start() self.create_service(Trigger, '~/init_finish', self.get_node_state) self.get_logger().info('\033[1;32m%s\033[0m' % 'start') ``` ① `self.timer.cancel()`: Cancel the timer to stop the scheduled tasks. ② `self.init_action()`: Call the initialization action to set up the robot's posture. ③ `if self.get_parameter('start').value`: Check the start parameter; if its value is True, call start_srv_callback to begin tracking. ④ `threading.Thread(target=self.main, daemon=True).start()`: Start a new thread to run the main method for target tracking. ⑤ `self.create_service(Trigger, '~/init_finish', self.get_node_state)`: Create a service that returns the node initialization completion status. ⑥ `self.get_logger().info(...)`: Log a message indicating the start of execution. (4) get_node_state Service {lineno-start=77} ```python def get_node_state(self, request, response): response.success = True return response ``` The service returns True, indicating that the node initialization is complete. (5) start_srv_callback & stop_srv_callback Service {lineno-start=99} ```python def start_srv_callback(self, request, response): self.get_logger().info('\033[1;32m%s\033[0m' % "start kcf track") self.start = True response.success = True response.message = "start" return response def stop_srv_callback(self, request, response): self.get_logger().info('\033[1;32m%s\033[0m' % "stop kcf track") self.start = False res = self.send_request(ColorDetect.Request()) if res.success: self.get_logger().info('\033[1;32m%s\033[0m' % 'set kcf success') else: self.get_logger().info('\033[1;32m%s\033[0m' % 'set kcf fail') response.success = True response.message = "stop" return response ``` ① `start_srv_callback`: Start the KCF tracker, set `start` to `True`, and return a success response. ② `stop_srv_callback`: Stop the tracker, perform any necessary follow-up actions, and return a response. (6) image_callback Method {lineno-start=118} ```python def image_callback(self, ros_image): # Convert the screen to opencv format. 将画面转为 opencv 格式 cv_image = self.bridge.imgmsg_to_cv2(ros_image, "bgr8") bgr_image = np.array(cv_image, dtype=np.uint8) if self.image_queue.full(): # If the queue is full, discard the oldest image. 如果队列已满,丢弃最旧的图像 self.image_queue.get() # Put the image into the queue. 将图像放入队列 self.image_queue.put(bgr_image) ``` The image_callback method receives image messages, converts them to OpenCV image format, and stores the images in a queue for subsequent processing. (7) Main Function {lineno-start=137} ```python def main(self): while self.running: rgb_image = self.image_queue.get() result_image = np.copy(rgb_image) factor = 4 rgb_image = cv2.resize(rgb_image, (int(result_image.shape[1]/ factor), int(result_image.shape[0]/ factor))) if self.tracker is None: if self.enable_select: roi = cv2.selectROI("result", result_image, False) roi = tuple(int(i / factor)for i in roi) if roi: param = cv2.TrackerKCF.Params() param.detect_thresh = 0.2 self.tracker = cv2.TrackerKCF_create(param) self.tracker.init(rgb_image, roi) else: status, box = self.tracker.update(rgb_image) if status: p1 = int(box[0] * factor), int(box[1] * factor) p2 = p1[0] + int(box[2] * factor), p1[1] + int(box[3] * factor) cv2.rectangle(result_image, p1, p2, (255, 255, 0), 2) center_x, center_y = (p1[0] + p2[0]) / 2, (p1[1] + p2[1]) / 2 t1 = time.time() self.pid_y.SetPoint = result_image.shape[1]/2 self.pid_y.update(center_x) self.y_dis += self.pid_y.output if self.y_dis < 200: self.y_dis = 200 if self.y_dis > 800: self.y_dis = 800 self.pid_z.SetPoint = result_image.shape[0]/2 self.pid_z.update(center_y) self.z_dis += self.pid_z.output if self.z_dis > 0.30: self.z_dis = 0.30 if self.z_dis < 0.23: self.z_dis = 0.23 msg = set_pose_target([self.x_init, 0.0, self.z_dis], 0.0, [-180.0, 180.0], 1.0) res = self.send_request(self.kinematics_client, msg) t2 = time.time() t = t2 - t1 if t < 0.02: time.sleep(0.02 - t) if res.pulse: servo_data = res.pulse set_servo_position(self.joints_pub, 0.02, ((10, 500), (5, 500), (4, servo_data[3]), (3, servo_data[2]), (2, servo_data[1]), (1, int(self.y_dis)))) else: set_servo_position(self.joints_pub, 0.02, ((1, int(self.y_dis)), )) else: time.sleep(0.01) self.fps.update() self.fps.show_fps(result_image) cv2.imshow("result", result_image) key = cv2.waitKey(1) if key == ord('s'): # Press 's' to start selecting the tracking target. 按下s开始选择追踪目标 self.get_logger().info("在画面窗口按下s开始选择追踪目标,然后按下空格开始追踪") self.tracker = None self.enable_select = True self.init_action() rclpy.shutdown() ``` ① `rgb_image = self.image_queue.get()`: Retrieve the latest image frame from the image queue, preparing it for target tracking. ② `result_image = np.copy(rgb_image)`: Create a copy of the image (result_image) for further processing and display. ③ `factor = 4`: Set the scaling factor to reduce the image resolution and speed up the computation process. ④ `rgb_image = cv2.resize(rgb_image, (int(result_image.shape[1] / factor), int(result_image.shape[0] / factor)))`: Scale the image down by the factor of 4. ⑤ `if self.tracker is None:`: Check if the target tracker has been initialized. If not, proceed with the target selection phase. ⑥ `if self.enable_select:`: If the target selection function is enabled, allow the user to choose a target. ⑦ `roi = cv2.selectROI("result", result_image, False)`: Use OpenCV's selectROI method to let the user select a region of interest (ROI) in the image. ⑧ `roi = tuple(int(i / factor) for i in roi)`: Scale the selected ROI to fit the reduced image size. ⑨ `if roi:`: If the user successfully selects an ROI, initialize the KCF tracker to start target tracking. ⑩ `param = cv2.TrackerKCF_Params()`: Create a parameter object for the KCF tracker. ⑪ `param.detect_thresh = 0.2`: Set the detection threshold for the KCF tracker to 0.2. ⑫ `self.tracker = cv2.TrackerKCF_create(param)`: Create and initialize the KCF tracker. ⑬ `self.tracker.init(rgb_image, roi)`: Initialize the tracker with the image and the user-selected ROI. ⑭ `else:`: If the tracker is already initialized, enter the target tracking phase. ⑮ `status, box = self.tracker.update(rgb_image)`: Update the tracker and get the status and bounding box of the target. ⑯ `if status:`: If the target is successfully tracked, proceed with processing the target's position. ⑰ `p1 = int(box[0] * factor), int(box[1] * factor)`: Calculate the top-left corner coordinates of the bounding box and scale them back to the original size. ⑱ `p2 = p1[0] + int(box[2] * factor), p1[1] + int(box[3] * factor)`: Calculate the bottom-right corner coordinates of the bounding box and scale them back. ⑲ `cv2.rectangle(result_image, p1, p2, (255, 255, 0), 2)`: Draw the bounding box on the result_image in yellow. ⑳ `center_x, center_y = (p1[0] + p2[0]) / 2, (p1[1] + p2[1]) / 2`: Calculate the center coordinates of the bounding box. ㉑ `self.pid_y.SetPoint = result_image.shape[1] / 2`: Set the target value for the PID controller on the x-axis to the center of the image width. ㉒ `self.pid_y.update(center_x)`: Update the PID controller to calculate the horizontal deviation between the current and target positions. ㉓ `self.y_dis += self.pid_y.output`: Calculate the new y-axis target position and adjust the robot's horizontal position. ㉔ `if self.y_dis < 200:`: Set a minimum value of 200 for the y_dis coordinate. ㉕ `if self.y_dis > 800:`: Set a maximum value of 800 for the y_dis coordinate. ㉖ `self.pid_z.SetPoint = result_image.shape[0] / 2`: Set the target value for the PID controller on the z-axis to the center of the image height. ㉗ `self.pid_z.update(center_y)`: Update the PID controller to calculate the vertical deviation between the current and target positions. ㉘ `self.z_dis += self.pid_z.output`: Calculate the new z-axis target position and adjust the robot's vertical position. ㉙ `if self.z_dis > 0.30:`: Set a maximum value of 0.30 for the z_dis coordinate. ㉚ `if self.z_dis < 0.23:`: Set a minimum value of 0.23 for the z_dis coordinate. ㉛ `msg = set_pose_target([self.x_init, 0.0, self.z_dis], 0.0, [-180.0, 180.0], 1.0)`: Set the robot's target pose, adjusting the z-axis position. ㉜ `res = self.send_request(self.kinematics_client, msg)`: Send the control command to the kinematics_client to adjust the robot's pose. ㉝ `t2 = time.time()`: Record the current time to calculate the frame delay. ㉞ `t = t2 - t1`: Calculate the time difference between the current frame and the previous one. ㉟ `if t < 0.02:`: If the frame processing time is less than 20ms, wait for the remaining time to control the update frequency. ㊱ `time.sleep(0.02 - t)`: Sleep for the remaining time to maintain a 20ms update rate. ㊲ `if res.pulse:`: If valid pulse data is returned, adjust the servo position. ㊳ `servo_data = res.pulse`: Get the pulse data. ㊴ `set_servo_position(self.joints_pub, 0.02, ((10, 500), (5, 500), (4, servo_data[3]), (3, servo_data[2]), (2, servo_data[1]), (1, int(self.y_dis))))`: Update the servo position based on the pulse data and the calculated y_dis value. ㊵ `else:`: If no pulse data is returned, only adjust the y_dis position. ㊶ `set_servo_position(self.joints_pub, 0.02, ((1, int(self.y_dis)),))`: Update the servo position by adjusting only the y-axis. ㊷ `else:`: If the target is lost, wait for a short period before retrying. ㊸ `time.sleep(0.01)`: Sleep for 10ms if tracking fails, to avoid overloading the CPU. ㊹ `self.fps.update()`: Update the frame rate statistics. ㊺ `self.fps.show_fps(result_image)`: Display the current frame rate on the image. ㊻ `cv2.imshow("result", result_image)`: Display the tracking result image. ㊼ `key = cv2.waitKey(1)`: Detect user input from the keyboard. ㊽ `if key == ord('s'):`: If the 's' key is pressed, start selecting the tracking target. ㊾ `self.get_logger().info("Press 's' to select a target in the window, then press space to start tracking.")`: Log a message informing the user to press 's' to select a target and space to start tracking. ㊿ `self.tracker = None`: Reset the tracker, allowing the user to select a new target.