12. ArmPi Ultra with Conveyor Belt

12.1 Conveyor Belt Installation

  1. Remove the plastic part near the motor end.

  1. Use M4*10 pan head screws and M4 nuts to secure the acrylic plate onto the conveyor belt, then reattach the plastic part.

  1. Place the conveyor belt on the left side of the robot arm, connect the motor cable to Motor Port 1 on the STM32 controller, and connect the I2C communication cable to Port 2.

12.2 electing the Robot Type

ArmPi Ultra’s expansion accessories come in three types: Mecanum chassis, sliding rail, and conveyor belt. After installation, you must switch the device version according to the installed accessory for proper operation.

Note

If you don’t switch or select the wrong version, the motor may run unpredictably, causing malfunctions or even damaging the device.

Step-by-step instructions:

  1. Start the robotic arm, and access the robot system desktop using VNC. To get detailed instructions on remote control software connection, please refer to the tutorials saved in the section 1.ArmPi Ultra User Manual-> 1.6 Development Environment Setup and Configuration in the ArmPi Ultra User Manual.

  2. Double-click the model configuration tool on the desktop.

  3. Select the appropriate options based on the camera version and accessory type:

Mecanum refers to the Mecanum chassis. Slide_Rails refers to the sliding rail. Conveyor_Belt refers to the conveyor belt.

  1. After making your selection, click Save to save the configuration.

  1. Click Apply to reload the configuration.

Wait for the buzzer to beep once, which indicates the restart is complete and the new configuration is now active.

12.3 Wireless Controller

12.3.1 Preparation

  1. Before powering on the device, make sure the wireless controller receiver is properly inserted. This can be ignored if the receiver was pre-inserted at the factory.

  2. Pay attention to battery polarity when placing the batteries.

  1. Each time the robot is powered on, the APP auto-start service will launch which includes the wireless handle control service. If this service has not been closed, no additional actions are needed—simply connect and control.

  2. Since signals from wireless controller can interfere with each other, it is recommended not to use this function when multiple robots are in the same area, to avoid misconnection or unintended control.

  3. After turning on the wireless controller, if it does not connect to the robot within 30 seconds, or remains unused for 5 minutes after connection, it will enter sleep mode automatically. To wake up the wireless handle and exit sleep mode, press the “START” button.

12.3.2 Device Connection

  1. After the robot powers on, slide the wireless controller switch to the “ON” position. At this point, the red and green LED indicators on the wireless controller will start flashing simultaneously.

  2. Wait a few seconds for the robot and wireless controller to pair automatically. Once pairing is successful, the green LED will remain solid while the red LED turns off.

12.3.3 Control Modes

The wireless handle supports two control modes: Coordinate Mode and Single Servo Mode. After a successful connection, the default mode is Coordinate Mode.

  • Single Servo Mode: In this mode, the wireless controller buttons can be used to control the forward and reverse rotation of individual servos on the robotic arm.

Button Functions in Single Servo Mode:

Button Function (from the robotic arm's first-person perspective)
START Reset the robotic arm
SELECT+START Switch control mode (Single Servo / Coordinate)
UP / ↑ Raise Servo 5
DOWN / ↓ Lower Servo 5
LEFT / ← Rotate Servo 6 to the left
RIGHT / → Rotate Servo 6 to the right
Y Lower Servo 4
A Raise Servo 4
B Rotate Servo 2 to the right (Gripper turns right)
X Rotate Servo 2 to the left (Gripper turns left)
L1 Open the gripper (Servo 1)
L2 Close the gripper (Servo 1)
R1 Raise Servo 3
R2 Lower Servo 3
Left joystick left Conveyor belt moves left
Left joystick right Conveyor belt moves right
  • Coordinate Mode: In Coordinate Mode, the robotic arm moves as a whole along the X, Y, and Z axes and can also adjust its tilt angle based on button inputs.

Button Functions in Coordinate Mode:

Button Function (from the robotic arm's first-person perspective)
START Reset the robotic arm
SELECT+START Switch control mode (Single Servo / Coordinate)
UP / ↑ Move arm in the positive X direction (forward)
DOWN / ↓ Move arm in the negative X direction (backward)
LEFT / ← Move arm in the positive Y direction (left)
RIGHT / → Move arm in the negative Y direction (right)
Y Close the gripper (Servo 1)
A Open the gripper (Servo 1)
B Rotate Servo 2 to the right (Gripper turns right)
X Rotate Servo 2 to the left (Gripper turns left)
L1 Move arm upward along Z axis
L2 Move arm downward along Z axis
R1 Increase gripper pitch angle
R2 Decrease gripper pitch angle
Left joystick left Conveyor belt moves left
Left joystick right Conveyor belt moves right

Switching Between Modes: To switch between modes, press both SELECT and START buttons. A sound prompt indicates the switch was successful.

  1. Two beeps: Switched from Single Servo Mode to Coordinate Mode.

  2. One beep: Switched from Coordinate Mode to Single Servo Mode.

12.4 Color Sorting

12.4.1 Program Introduction

First, subscribe to the topic published by the color recognition node to obtain detected color information.

Then choose the target color. Once the target color is detected, obtain the center position of the target object in the image.

Use the object’s center position to guide the robotic arm for gripping and placing the color block into the corresponding area.

12.4.2 Operation Steps

  1. Start the robotic arm, and access the robot system desktop using VNC. To get detailed instructions on remote control software connection, please refer to the tutorials saved in the section 1.ArmPi Ultra User Manual-> 1.6 Development Environment Setup and Configuration in the ArmPi Ultra User Manual.

  2. Click the icon to open the command-line terminal.

  3. Enter the following command and press Enter to stop the auto-start service:

~/.stop_ros.sh
  1. Entering the following command to start the feature.

ros2 launch example color_sorting_motor.launch.py
  1. After the program starts successfully, click on the camera display window and press A to start sorting, then press S to stop sorting.

  2. To exit the feature, press Ctrl+C in the terminal. If the program does not close successfully, try pressing Ctrl + C again.

  3. If you want to experience the mobile app features again later, enter the command and press Enter to start the app service. Wait for the robotic arm to return to its initial position — a beep from the buzzer will indicate it’s ready.

ros2 launch bringup bringup.launch.py

12.4.3 Project Outcome

After starting the program, place the color blocks within the robotic arm’s camera view. The returned image will highlight the recognized target color, and the robotic arm will sort the corresponding color blocks into their designated areas.

12.4.4 Program Brief Analysis

  • Launch File Analysis

The launch file is located at:

/home/ubuntu/ros2_ws/src/example/example/motor/color_sorting_motor.launch.py

1. launch_setup Function

def launch_setup(context):
    compiled = os.environ['need_compile']
    if compiled == 'True':
        sdk_package_path = get_package_share_directory('sdk')
        peripherals_package_path = get_package_share_directory('peripherals')
    else:
        sdk_package_path = '/home/ubuntu/ros2_ws/src/driver/sdk'
        peripherals_package_path = '/home/ubuntu/ros2_ws/src/peripherals'

    depth_camera_launch = IncludeLaunchDescription(
        PythonLaunchDescriptionSource(
            os.path.join(peripherals_package_path, 'launch/depth_camera.launch.py')),
    )

    sdk_launch = IncludeLaunchDescription(
        PythonLaunchDescriptionSource(
            os.path.join(sdk_package_path, 'launch/armpi_ultra.launch.py')),
    )
    
    color_sorting_node = Node(
        package='example',
        executable='color_sorting_motor',
        output='screen',
    )

    return [
            sdk_launch,
            depth_camera_launch,
            color_sorting_node,
            ]

Loads the launch/depth_camera.launch.py file from the peripherals package to start the depth camera node, which provides RGB image and depth data, which is the visual input for color recognition. Loads the launch/armpi_ultra.launch.py file from the SDK package to start the underlying control services of the robotic arm, such as such as servo control and motion control, providing hardware support for the sorting operation. Launches the color_sorting_motor executable from the example package as the core logic node for color sorting. It uses visual data to control the robotic arm to perform color recognition and sorting actions, with logs output to the screen.

2. generate_launch_description Function

def generate_launch_description():
    return LaunchDescription([
        OpaqueFunction(function = launch_setup)
    ])

Creates and returns a LaunchDescription object, calling launch_setup via OpaqueFunction as the standard entry point for the ROS 2 launch file.

3. Main Function

if __name__ == '__main__':
    ld = generate_launch_description()

    ls = LaunchService()
    ls.include_launch_description(ld)
    ls.run()

Creates a LaunchDescription object and a LaunchService service. The launch description is added to the service and executed, enabling the startup of the entire system manually.

  • Python File Analysis

The Python file is located at:

/home/ubuntu/ros2_ws/src/example/example/motor/color_sorting_motor.py

1. Import the Necessary Libraries

import os
import cv2
import yaml
import time
import math
import queue
import rclpy
import threading
import numpy as np
from rclpy.node import Node
from sdk import common, fps
from cv_bridge import CvBridge
from std_srvs.srv import SetBool
from sensor_msgs.msg import Image
from rclpy.executors import MultiThreadedExecutor
from servo_controller_msgs.msg import ServosPosition
from rclpy.callback_groups import ReentrantCallbackGroup
from kinematics.kinematics_control import set_pose_target
from kinematics_msgs.srv import GetRobotPose, SetRobotPose
from ros_robot_controller_msgs.msg import ConveyorSpeed
from servo_controller.bus_servo_control import set_servo_position
  1. sdk: A custom SDK that provides interfaces to specific hardware components or functional modules.

  2. kinematics.kinematics_control.set_pose_target: A custom function module used to set the robot’s target pose.

  3. kinematics_msgs.srv.GetRobotPose: A service interface for retrieving the current pose of the robot.

  4. kinematics_msgs.srv.SetRobotPose: A service interface for setting the desired target pose of the robot.

  5. servo_controller.bus_servo_control.set_servo_position: A custom function used to control the position of servo motors.

2. Initialization

class ColorSortingNode(Node):

    def __init__(self, name):
        rclpy.init()
        super().__init__(name, allow_undeclared_parameters=True, automatically_declare_parameters_from_overrides=True)

        self.running = True
        self.start_count = False
        self.lock = threading.RLock()
        self.image_queue = queue.Queue(maxsize=2)
        self.fps = fps.FPS()
        self.bridge = CvBridge()
        self.center_imgpts = None
        self.offset = 0.005
        self.pick_pitch = 60
        self.count = 0
        self.data = common.get_yaml_data("/home/ubuntu/ros2_ws/src/app/config/lab_config.yaml")
        self.lab_data = self.data['/**']['ros__parameters']
        self.camera_type = os.environ['CAMERA_TYPE']
        self.min_area = 500
        self.max_area = 15000
        self.target = None
        self.image_sub = None

        self.conveyor_pub = self.create_publisher(ConveyorSpeed, '/ros_robot_controller/set_conveyor_speed', 1)
        self.joints_pub = self.create_publisher(ServosPosition, '/servo_controller', 1)
        self.image_sub = self.create_subscription(Image, '/depth_cam/rgb/image_raw', self.image_callback, 1)

        timer_cb_group = ReentrantCallbackGroup()
        self.get_current_pose_client = self.create_client(GetRobotPose, '/kinematics/get_current_pose', callback_group=timer_cb_group)
        self.get_current_pose_client.wait_for_service()
        self.kinematics_client = self.create_client(SetRobotPose, '/kinematics/set_pose_target')
        self.kinematics_client.wait_for_service()

        set_servo_position(self.joints_pub, 1.5, ((1, 210), (2, 500), (3, 110), (4, 825), (5, 600), (6, 880)))
        threading.Thread(target=self.main, daemon=True).start()

Initializes the color sorting node, configures runtime parameters such as queues and status flags, sets up ROS communication components including subscribers, publishers, and service clients, loads the color configuration file, sets the initial servo positions, and starts the main loop thread.

3. send_request Method

    def send_request(self, client, msg):
        future = client.call_async(msg)
        while rclpy.ok():
            if future.done() and future.result():
                return future.result()

A general service call utility that sends service requests and blocks until the result is returned, ensuring the completion of services such as the robotic arm’s kinematics and other communications.

4. adaptive_threshold Method

    def adaptive_threshold(self, gray_image):
        binary = cv2.adaptiveThreshold(gray_image, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, cv2.THRESH_BINARY, 41, 7)
        return binary

5. canny_proc Method

    def canny_proc(self, bgr_image):
        mask = cv2.Canny(bgr_image, 9, 41, 9, L2gradient=True)
        mask = 255 - cv2.dilate(mask, cv2.getStructuringElement(cv2.MORPH_RECT, (11, 11)))
        return mask

Uses the Canny edge detection algorithm to process the image, generating an edge mask. The mask is optimized through dilation to enhance the edges and extract the object contours.

6. get_top_surface Method

    def get_top_surface(self, rgb_image):
        image_scale = cv2.convertScaleAbs(rgb_image, alpha=2.5, beta=0)
        image_gray = cv2.cvtColor(image_scale, cv2.COLOR_RGB2GRAY)
        image_mb = cv2.medianBlur(image_gray, 3)
        image_gs = cv2.GaussianBlur(image_mb, (5, 5), 5)
        binary = self.adaptive_threshold(image_gs)
        mask = self.canny_proc(image_gs)
        mask1 = cv2.bitwise_and(binary, mask)
        roi_image_mask = cv2.bitwise_and(rgb_image, rgb_image, mask=mask1)
        return roi_image_mask

Preprocesses the RGB image through scaling, grayscale conversion, and blurring, and combines adaptive thresholding with edge detection to extract the top surface area of the object, enhancing target recognition.

7. image_callback Method

    def image_callback(self, ros_image):
        cv_image = self.bridge.imgmsg_to_cv2(ros_image, "rgb8")
        rgb_image = np.array(cv_image, dtype=np.uint8)
        if self.image_queue.full():
            self.image_queue.get()
        self.image_queue.put(rgb_image)

Image subscription callback function that converts ROS image message into an OpenCV formatted RGB image and stores it in the image queue, discarding the oldest image if the queue is full.

8. image_processing Method

    def image_processing(self):
        rgb_image = self.image_queue.get()
        result_image = np.copy(rgb_image)
        target_list = []
        index = 0
        if self.start_count:
            img_h, img_w = rgb_image.shape[:2]
            rgb_image = self.get_top_surface(rgb_image)
            image_lab = cv2.cvtColor(rgb_image, cv2.COLOR_RGB2LAB)
            for i in ['red', 'green', 'blue']:
                mask = cv2.inRange(image_lab, tuple(self.lab_data['color_range_list'][i]['min']), tuple(self.lab_data['color_range_list'][i]['max']))
                eroded = cv2.erode(mask, cv2.getStructuringElement(cv2.MORPH_RECT, (3, 3)))
                dilated = cv2.dilate(eroded, cv2.getStructuringElement(cv2.MORPH_RECT, (3, 3)))
                contours = cv2.findContours(dilated, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_NONE)[-2]
                contours_area = map(lambda c: (math.fabs(cv2.contourArea(c)), c), contours)
                contours = map(lambda a_c: a_c[1], filter(lambda a: self.min_area <= a[0] <= self.max_area, contours_area))
                for c in contours:
                    area = math.fabs(cv2.contourArea(c))
                    rect = cv2.minAreaRect(c)
                    (center_x, center_y), _ = cv2.minEnclosingCircle(c)
                    cv2.circle(result_image, (int(center_x), int(center_y)), 8, (0, 0, 0), -1)
                    corners = list(map(lambda p: (p[0], p[1]), cv2.boxPoints(rect)))
                    cv2.drawContours(result_image, [np.intp(corners)], -1, (0, 255, 255), 2, cv2.LINE_AA)
                    index += 1
                    angle = int(round(rect[2]))
                    target_list.append([i, index, (center_x, center_y), angle])
        return target_list, result_image

Fetches an image from the image queue and detects red, green, and blue objects in the LAB color space. It extracts contours, calculates the center coordinates and angles, and returns a list of target objects along with the processed image.

9. move Method

    def move(self, center_y):
        if self.running:
            y = ((center_y - 235) / (335 - 235)) * (0.11 - 0.16) + 0.16
            y = y + self.offset
            position = [0, y, 0.08]

            if self.running:
                position[2] += 0.01
                self.get_logger().info('0') 
                msg = set_pose_target(position, self.pick_pitch, [-180.0, 180.0], 1.0)
                res = self.send_request(self.kinematics_client, msg)
                servo_data = res.pulse
                set_servo_position(self.joints_pub, 1.0, ((6, servo_data[0]),))
                time.sleep(1)
                set_servo_position(self.joints_pub, 1.0, ((5, servo_data[1]), (3, servo_data[2]), (3, servo_data[3])))
                time.sleep(1)
                self.get_logger().info('0.0') 

            if self.running:
                self.get_logger().info('1') 
                # position[2] -= 0.03
                msg = set_pose_target(position, self.pick_pitch, [-90.0, 90.0], 1.0)
                res = self.send_request(self.kinematics_client, msg)
                servo_data = res.pulse
                set_servo_position(self.joints_pub, 1.0, ((6, servo_data[0]), (5, servo_data[1]), (4, servo_data[2]), (3, servo_data[3])))
                time.sleep(1)
                set_servo_position(self.joints_pub, 1.0, ((1, 600),))
                time.sleep(1)

            if self.running:
                self.get_logger().info('2') 
                # position[2] += 0.04
                msg = set_pose_target(position, self.pick_pitch, [-180.0, 180.0], 1.0)
                res = self.send_request(self.kinematics_client, msg)
                servo_data = res.pulse
                self.get_logger().info('3') 
                set_servo_position(self.joints_pub, 1.0, ((5, servo_data[1]), (4, servo_data[2]), (3, servo_data[3])))
                time.sleep(1)
                self.get_logger().info('4') 
                set_servo_position(self.joints_pub, 1.0, ((6, 850), (5, 630), (4, 810), (3, 85), (2, 480)))
                time.sleep(1)
                position = PLACE_POSITION[self.target[0]]

            if self.running:
                self.get_logger().info('5') 
                position[2] += 0.03
                msg = set_pose_target(position, self.pick_pitch, [-90.0, 90.0], 1.0)
                res = self.send_request(self.kinematics_client, msg)
                servo_data = res.pulse
                set_servo_position(self.joints_pub, 2.0, ((6, servo_data[0]),))
                time.sleep(2)
                set_servo_position(self.joints_pub, 1.0, ((5, servo_data[1]), (4, servo_data[2]), (3, servo_data[3])))
                time.sleep(1)
                self.get_logger().info('6') 
            if self.running:
                position[2] -= 0.03
                msg = set_pose_target(position, self.pick_pitch, [-90.0, 90.0], 1.0)
                res = self.send_request(self.kinematics_client, msg)
                servo_data = res.pulse
                set_servo_position(self.joints_pub, 1.0, ((6, servo_data[0]), (5, servo_data[1]), (4, servo_data[2]), (3, servo_data[3])))
                time.sleep(1)
                set_servo_position(self.joints_pub, 1.0, ((1, 200),))
                time.sleep(1)

            if self.running:
                set_servo_position(self.joints_pub, 1.0, ((5, 630), (4, 810), (3, 90), (2, 480), (1, 200)))
                time.sleep(1)
                set_servo_position(self.joints_pub, 2.0, ((6, 850),))
                time.sleep(2)
                self.target = None
                self.start_count = True

            if not self.running:
                set_servo_position(self.joints_pub, 1.0, ((5, 630), (4, 810), (3, 90), (2, 480), (1, 200)))
                time.sleep(1)
                set_servo_position(self.joints_pub, 2.0, ((6, 850),))
                time.sleep(2)
                self.target = None

The robotic arm’s grab and place logic. Based on the target object’s center coordinates, it calculates the grab position and controls the servos to execute the grab, move, and place actions. After completion, the arm is reset.

10. main Method

    def main(self):
        while rclpy.ok():
            try:
                target_list, result_image = self.image_processing()
                if target_list and self.running:
                    if self.target is not None:
                        if self.target[0] == target_list[0][0]:
                            self.count += 1
                        else:
                            self.target = target_list[0]
                            self.count = 0
                    else:
                        self.target = target_list[0]
                if self.count > 50:
                    self.count = 0
                    self.start_count = False
                    threading.Thread(target=self.move, args=(target_list[0][2][0],), daemon=True).start()

                if result_image is not None:
                    self.fps.update()
                    result_image = cv2.cvtColor(result_image, cv2.COLOR_BGR2RGB)
                    cv2.imshow('result_image', result_image)
                    key = cv2.waitKey(1)
                    if key != -1:
                        if key == 97:  # a
                            self.running = True
                            self.start_count = True
                            conveyor_msg = ConveyorSpeed()
                            conveyor_msg.speed = 100
                            self.conveyor_pub.publish(conveyor_msg)
                        if key == 115:  # s
                            self.running = False
                            self.count = 0
                            conveyor_msg = ConveyorSpeed()
                            conveyor_msg.speed = 0
                            self.conveyor_pub.publish(conveyor_msg)
            except queue.Empty:
                continue

The main loop function continuously processes image recognition results. When the same object is detected enough times, it starts the grabbing thread. It also supports keyboard input to control the operational state, pressing A to start the conveyor belt, while pressing S to stop.

11. Main Function

def main():
    try:
        node = ColorSortingNode('color_sorting')
        executor = MultiThreadedExecutor()
        executor.add_node(node)
        executor.spin()
    except (KeyboardInterrupt, rclpy.executors.ExternalShutdownException):
        pass
    finally:
        if 'node' in locals() and rclpy.ok():
             node.destroy_node()
        rclpy.shutdown()

Node entry point that creates an instance of the color sorting node, starts a multi-threaded executor to handle callbacks, and captures interrupt signals to clean up resources by destroying the node and shutting down ROS.

12.4.5 Adjusting the Gripping Offset

Due to slight variations between different ArmPi Ultra robotic arms, the gripping position may be inaccurate. If there is a deviation in the gripping position, you can correct it by modifying the offset value in the source code file. The location of the Python file can be found in the program analysis section of the corresponding tutorial.

Increasing the offset value allows the arm to grip farther. Decreasing the value brings the grip position closer.