Skip to content

ROS2 入门:话题通信(带工作空间与包)

在 ROS2 中,话题(Topic) 是节点之间最常见的通信方式之一。它遵循 发布-订阅(Pub-Sub)模型

  • 发布者(Publisher) 将消息发送到某个话题。
  • 订阅者(Subscriber) 监听该话题,并接收消息。

发布者和订阅者之间不需要直接知道对方的存在,只要它们使用同一个话题名和消息类型,就能实现通信。


一. 话题通信基本操作

1.1 创建工作空间与包

  1. 新建工作空间:

    bash
    mkdir -p ~/ros2_ws/src
    cd ~/ros2_ws
    colcon build
  2. 进入 src 目录,新建 Python 包:

    bash
    cd ~/ros2_ws/src
    ros2 pkg create --build-type ament_python py_topic_demo
  3. 包结构如下:

   py_topic_demo/
   ├── package.xml
   ├── py_topic_demo
   │   └── __init__.py
   ├── resource
   ├── setup.cfg
   ├── setup.py
   └── test

1.2 编写发布者节点

py_topic_demo/py_topic_demo/minimal_publisher.py 中添加:

python
import rclpy
from rclpy.node import Node
from std_msgs.msg import String


class MinimalPublisher(Node):
    def __init__(self):
        super().__init__('minimal_publisher')
        self.publisher_ = self.create_publisher(String, 'chatter', 10)
        timer_period = 1.0
        self.timer = self.create_timer(timer_period, self.timer_callback)
        self.i = 0

    def timer_callback(self):
        msg = String()
        msg.data = f'Hello ROS2: {self.i}'
        self.publisher_.publish(msg)
        self.get_logger().info(f'Publishing: "{msg.data}"')
        self.i += 1


def main(args=None):
    rclpy.init(args=args)
    node = MinimalPublisher()
    rclpy.spin(node)
    node.destroy_node()
    rclpy.shutdown()

1.3 编写订阅者节点

py_topic_demo/py_topic_demo/minimal_subscriber.py 中添加:

python
import rclpy
from rclpy.node import Node
from std_msgs.msg import String


class MinimalSubscriber(Node):
    def __init__(self): 
        super().__init__('minimal_subscriber')
        self.subscription = self.create_subscription(
            String,
            'chatter',
            self.listener_callback,
            10)
        self.subscription

    def listener_callback(self, msg):
        self.get_logger().info(f'I heard: "{msg.data}"')


def main(args=None):
    rclpy.init(args=args)
    node = MinimalSubscriber()
    rclpy.spin(node)
    node.destroy_node()
    rclpy.shutdown()

1.4 修改 setup.py

setup.pyentry_points 中加入:

python
entry_points={
    'console_scripts': [
        'minimal_publisher = py_topic_demo.minimal_publisher:main',
        'minimal_subscriber = py_topic_demo.minimal_subscriber:main',
    ],
},

1.5 构建与运行

  1. 在工作空间根目录构建:

    bash
    cd ~/ros2_ws
    colcon build
    source install/setup.bash
  2. 运行发布者:

    bash
    ros2 run py_topic_demo minimal_publisher

如下图:

1078

  1. 运行订阅者:

    bash
    ros2 run py_topic_demo minimal_subscriber

如下图:


1.6 使用命令行调试话题

  • 查看所有话题:

    bash
    ros2 topic list
  • 查看话题类型:

    bash
    ros2 topic type /chatter
  • 查看话题实时消息:

    bash
    ros2 topic echo /chatter
  • 发布测试消息:

    bash
    ros2 topic pub /chatter std_msgs/String "data: 'Hello from CLI'"

二、话题通信实践

turtlesim 是 ROS2 自带的一个简单模拟器,可以用来学习 话题通信。 我们要做的是:写一个发布者节点,发布速度指令,控制小海龟按照指定半径画圆。

2.1 启动模拟器

ros2 run turtlesim turtlesim_node

会弹出一个小窗口,显示一只小海龟 🐢。

2.2 控制说明

小海龟的运动由话题 /turtle1/cmd_vel 控制,它使用 geometry_msgs/msg/Twist 消息类型。

该消息用于表示 速度指令,由两个三维向量组成:

geometry_msgs/msg/Twist
-----------------------
geometry_msgs/Vector3 linear
geometry_msgs/Vector3 angular

其中 Vector3 的定义为:

geometry_msgs/msg/Vector3
-------------------------
float64 x
float64 y
float64 z

linear 是线速度向量,在 turtlesim 里,只有 linear.x 有效。

  • linear.x :表示 前后方向的速度(m/s)。正值:向前运动负值:向后运动
  • linear.y :表示 左右平移速度(对小海龟无效,始终为 0)
  • linear.z :表示 上下方向速度(对小海龟无效,始终为 0)

angular 是角速度向量,在 turtlesim 里,只有 angular.z 有效。

  • angular.x :绕 X 轴旋转速度(对小海龟无效)
  • angular.y :绕 Y 轴旋转速度(对小海龟无效)
  • angular.z :绕 Z 轴旋转速度(rad/s)。正值:逆时针旋转 ,负值:顺时针旋转。

在控制小海龟时,通常只需要设置两个参数:

  • linear.x —— 决定小海龟的前进速度
  • angular.z —— 决定小海龟的转弯速度

例如:

  • linear.x = 2.0, angular.z = 0.0 → 小海龟直线前进
  • linear.x = 0.0, angular.z = 1.0 → 小海龟原地旋转
  • linear.x = 2.0, angular.z = 1.0 → 小海龟沿圆弧运动

同时,我们可以使用 话题 /turtle1/pose 获得小海龟的状态,消息类型为:turtlesim/msg/Pose,格式如下:

turtlesim/msg/Pose
------------------
float32 x          # 小海龟的 X 坐标(单位:米)
float32 y          # 小海龟的 Y 坐标(单位:米)
float32 theta      # 朝向角度(单位:弧度,0 表示朝右,逆时针为正)
float32 linear_velocity   # 当前线速度
float32 angular_velocity  # 当前角速度

2.3 编写海龟控制节点

创建功能包:

ros2 pkg create py_turtle_control --build-type ament_python --dependencies rclpy geometry_msgs turtlesim

编写控制节点,在py_turtle_control/py_turtle_control/turtle_circle_controller.py添加:

python
import rclpy
from rclpy.node import Node
from geometry_msgs.msg import Twist
from turtlesim.msg import Pose
import math


class TurtleCircleController(Node):
    def __init__(self):
        super().__init__('turtle_circle_controller')

        # 发布速度指令
        self.publisher_ = self.create_publisher(Twist, '/turtle1/cmd_vel', 10)

        # 订阅小海龟的位姿
        self.subscription = self.create_subscription(
            Pose,
            '/turtle1/pose',
            self.pose_callback,
            10
        )

        # 控制参数
        self.radius = 2.0          # 目标圆的半径
        self.center_x = 5.5        # 圆心坐标(窗口中心)
        self.center_y = 5.5
        self.linear_speed = 2.0    # 期望的线速度
        self.pose = None

        # 定时器:50ms 调用一次控制逻辑
        self.timer = self.create_timer(0.05, self.control_loop)

    def pose_callback(self, msg: Pose):
        """接收小海龟的当前位置"""
        self.pose = msg

    def control_loop(self):
        if self.pose is None:
            return

        # 当前坐标
        x, y, theta = self.pose.x, self.pose.y, self.pose.theta

        # 小海龟相对圆心的偏移
        dx = x - self.center_x
        dy = y - self.center_y
        dist_to_center = math.sqrt(dx**2 + dy**2)

        # 误差:当前半径 - 目标半径
        error = dist_to_center - self.radius

        # 控制律:
        cmd = Twist()
        cmd.linear.x = self.linear_speed
        cmd.angular.z = -1.0 * error   # 简单 P 控制器

        # 发布速度
        self.publisher_.publish(cmd)

        # 打印调试信息
        self.get_logger().info(
            f'Pose (x={x:.2f}, y={y:.2f}, θ={theta:.2f}), error={error:.2f}, angular.z={cmd.angular.z:.2f}'
        )


def main(args=None):
    rclpy.init(args=args)
    node = TurtleCircleController()
    rclpy.spin(node)
    node.destroy_node()
    rclpy.shutdown()


if __name__ == '__main__':
    main()

修改 setup.py,在 setup.pyentry_points 中加入:

python
entry_points={
    'console_scripts': [
        'turtle_circle_controller = py_turtle_control.turtle_circle_controller:main',
    ],
},

在工作空间根目录构建:

bash
colcon build
source install/setup.bash

启动控制节点:

bash
ros2 run py_turtle_control turtle_circle_controller

终端会打印小海龟位置、误差和控制参数,小海龟会围绕屏幕中心画圆。

三、自定义通信接口(自定义msg)

在实际项目中,内置的消息类型(如 std_msgs、geometry_msgs)可能无法满足业务需求。 ROS2 允许我们 自定义消息(msg) 或 服务(srv) 接口,以便在节点间传递更加复杂或特定的数据结构。

下面我们基于自定义 msg 实现一个 读取电脑状态(CPU/内存占用等)并通过可视化界面显示 的小 demo。

3.1 创建工作空间和接口包

创建一个新的工作空间

mkdir -p ~/dev_ws/src
cd ~/dev_ws/src

创建接口包:

在 src 目录下,创建一个接口包(注意必须是 ament_cmake 类型):

bash
ros2 pkg create --build-type ament_cmake system_interfaces

生成后的目录结构如下:

css
dev_ws/
└── src/
    └── system_interfaces/
        ├── CMakeLists.txt
        ├── package.xml
        └── ...

3.2 定义 msg 文件

进入接口包,创建 msg 文件夹,并新建 SystemStatus.msg (注意这个自定义消息文件需要使用大驼峰命名法):

cd system_interfaces
mkdir msg
nano msg/SystemStatus.msg

内容如下:

builtin_interfaces/Time stamp    # 记录时间戳
string host_name                 # 系统名称
float32 cpu_percent              # CPU 使用率
float32 memory_percent           # 内存使用率
float32 memory_total             # 内存总量
float32 memory_available         # 剩余有效内存
float64 net_sent                 # 网络发送数据总量
float64 net_recv                 # 网络接收数据总量

3.3 修改 package.xml

package.xml中添加依赖和声明接口包:

xml
<depend>builtin_interfaces</depend>
<depend>rosidl_default_generators</depend>
<depend>rosidl_default_runtime</depend>

 <!-- 必须声明为接口包 -->
<member_of_group>rosidl_interface_packages</member_of_group>

3.4 修改 CMakeLists.txt

打开 CMakeLists.txt,找到并修改如下内容:

find_package(ament_cmake REQUIRED)
find_package(rosidl_default_generators REQUIRED)
find_package(builtin_interfaces REQUIRED) 

rosidl_generate_interfaces(${PROJECT_NAME}
  "msg/SystemStatus.msg"
  DEPENDENCIES builtin_interfaces
)

这样就会在构建时生成 SystemStatus 的 Python/C++ 接口。

3.5 构建接口包

回到工作空间根目录,执行:

bash
cd ~/dev_ws
colcon build

构建完成后,记得 source:

source install/setup.bash

验证接口是否生成成功:

ros2 interface show system_interfaces/msg/SystemStatus

正常情况下会输出刚才定义的消息内容,如下图:

四、自定义msg实战

下面我们基于上面的自定义 msg 实现一个 读取电脑状态(CPU/内存占用等)并通过可视化界面显示 的小 demo。

这个小demo主要包括三部分:一个自定义的msg包、一个采集电脑状态的话题发布者、一个显示状态的话题订阅者。

4.1 创建 Python 功能包

~/dev_ws/src 下创建一个新的 Python 包:

bash
cd ~/dev_ws/src
ros2 pkg create --build-type ament_python system_monitor --dependencies rclpy psutil system_interfaces

这里我们额外依赖了 psutil 库用于采集系统信息,可以通过 apt 安装:

bash
sudo apt update
sudo apt install python3-psutil

4.2 编写系统状态发布节点

system_monitor/system_monitor/system_publisher.py 中添加以下代码:

python
import rclpy
from rclpy.node import Node
from system_interfaces.msg import SystemStatus
from builtin_interfaces.msg import Time
import psutil
import socket
import time


class SystemStatusPublisher(Node):
    def __init__(self):
        super().__init__('system_status_publisher')

        # 创建 Publisher
        self.publisher_ = self.create_publisher(SystemStatus, 'system_status', 10)
        
        # 定时器,1 秒发布一次
        self.timer = self.create_timer(1.0, self.timer_callback)

        # 获取主机名
        self.host_name = socket.gethostname()
        self.get_logger().info(f"SystemStatus Publisher started on {self.host_name}")

    def timer_callback(self):
        msg = SystemStatus()

        # 时间戳
        now = self.get_clock().now().to_msg()
        msg.stamp = now

        # 主机名
        msg.host_name = self.host_name

        # CPU使用率
        msg.cpu_percent = psutil.cpu_percent(interval=None)

        # 内存信息
        mem = psutil.virtual_memory()
        msg.memory_percent = mem.percent
        msg.memory_total = mem.total / (1024 * 1024 * 1024)  # GB
        msg.memory_available = mem.available / (1024 * 1024 * 1024)

        # 网络数据
        net = psutil.net_io_counters()
        msg.net_sent = net.bytes_sent / (1024 * 1024)  # MB
        msg.net_recv = net.bytes_recv / (1024 * 1024)  # MB

        self.publisher_.publish(msg)
        self.get_logger().info(
            f"CPU: {msg.cpu_percent:.1f}%, Mem: {msg.memory_percent:.1f}%, Sent: {msg.net_sent:.1f}MB, Recv: {msg.net_recv:.1f}MB"
        )


def main(args=None):
    rclpy.init(args=args)
    node = SystemStatusPublisher()
    rclpy.spin(node)
    node.destroy_node()
    rclpy.shutdown()


if __name__ == '__main__':
    main()

4.3 修改状态发布节点的setup.py

system_monitor/setup.py 中添加可执行入口:

python
entry_points={
    'console_scripts': [
        'system_publisher = system_monitor.system_publisher:main',
    ],
},

4.4 构建并运行状态发布节点

bash
cd ~/dev_ws
colcon build
source install/setup.bash

运行发布节点:

bash
ros2 run system_monitor system_publisher

4.5 验证状态发布消息

在另一个终端查看话题:

bash
ros2 topic list

会看到新增话题 /system_status,查看消息内容:

bash
ros2 topic echo /system_status

输出如下:

4.6 编写系统状态可视化订阅节点

为了更直观展示系统运行状态,我们实现一个简单的 GUI 界面来订阅 /system_status 话题并实时显示数据。

system_monitor/system_monitor/system_subscriber_gui.py 中添加以下代码:

python
import sys
import rclpy
from rclpy.node import Node
from system_interfaces.msg import SystemStatus
import tkinter as tk
from tkinter import ttk


class SystemStatusGUI(Node):
    def __init__(self):
        super().__init__('system_status_gui')

        # 创建订阅者
        self.subscription = self.create_subscription(
            SystemStatus,
            'system_status',
            self.listener_callback,
            10
        )

        # 初始化GUI
        self.root = tk.Tk()
        self.root.title("System Monitor")
        self.root.geometry("400x300")

        self.labels = {}
        fields = [
            'Host Name', 'CPU Usage (%)', 'Memory Usage (%)',
            'Total Memory (GB)', 'Available Memory (GB)',
            'Net Sent (MB)', 'Net Received (MB)'
        ]
        for i, field in enumerate(fields):
            ttk.Label(self.root, text=field + ':', font=("Arial", 12, "bold")).grid(row=i, column=0, sticky="w", padx=10, pady=5)
            self.labels[field] = ttk.Label(self.root, text="N/A", font=("Arial", 12))
            self.labels[field].grid(row=i, column=1, sticky="w", padx=10, pady=5)

        self.root.after(100, self.spin_ros)

    def listener_callback(self, msg: SystemStatus):
        """接收到系统状态消息后更新GUI"""
        self.labels['Host Name'].config(text=msg.host_name)
        self.labels['CPU Usage (%)'].config(text=f"{msg.cpu_percent:.1f}")
        self.labels['Memory Usage (%)'].config(text=f"{msg.memory_percent:.1f}")
        self.labels['Total Memory (GB)'].config(text=f"{msg.memory_total:.2f}")
        self.labels['Available Memory (GB)'].config(text=f"{msg.memory_available:.2f}")
        self.labels['Net Sent (MB)'].config(text=f"{msg.net_sent:.1f}")
        self.labels['Net Received (MB)'].config(text=f"{msg.net_recv:.1f}")

    def spin_ros(self):
        """让ROS和Tkinter循环一起运行"""
        rclpy.spin_once(self, timeout_sec=0)
        self.root.after(100, self.spin_ros)

    def run(self):
        self.root.mainloop()


def main(args=None):
    rclpy.init(args=args)
    gui_node = SystemStatusGUI()
    gui_node.run()
    gui_node.destroy_node()
    rclpy.shutdown()

if __name__ == '__main__':
    main()

确保系统安装了 Tkinter(Python 的 GUI 库),如果没有安装,使用下面的命令安:

bash
sudo apt update
sudo apt install python3-tk

4.7 修改 setup.py 注册订阅者入口

system_monitor/setup.py 中添加 GUI 订阅者的入口:

python
entry_points={
    'console_scripts': [
        'system_publisher = system_monitor.system_publisher:main',
        'system_subscriber_gui = system_monitor.system_subscriber_gui:main',
    ],
},

4.8 构建并运行

重新构建:

bash
cd ~/dev_ws
colcon build
source install/setup.bash

运行订阅者:

ros2 run system_monitor system_subscriber_gui

运行发布者(另一个终端):

ros2 run system_monitor system_publisher

GUI 界面会实时更新 CPU、内存、网络等状态数据:

至此,我们实现了:

  • system_interfaces:自定义 SystemStatus.msg

  • system_monitor

    • system_publisher:周期发布电脑状态
    • system_subscriber_gui:订阅状态并在 Tkinter GUI 中实时显示

这个 Demo 展示了如何在 ROS2 中:自定义消息接口/采集系统信息/构建发布者和订阅者/使用 Python GUI 可视化话题数据

虽然看起来“多此一举”,一个 Python 脚本就能直接实现相同功能,但 ROS2 的架构设计理念是模块化、分布式、解耦:

  • 接口与逻辑分离:自定义 msg 让系统数据有标准化定义,方便其他节点使用
  • 发布/订阅解耦:数据采集与显示逻辑分开,后续可以很容易接入新的数据消费者,比如日志系统、远程监控等
  • 可扩展性:未来只需在不同设备上运行订阅者节点,就能实现多机状态实时监控
  • 工业化工程思路:这也是 ROS2 在机器人开发中的优势,将功能拆分为小而独立的节点,有助于团队协作和后期维护

换句话说,这个 Demo 体现了 ROS2 从“脚本”到“系统工程”的转变。

补充:

在实际项目中,机器人系统往往由多台设备组成,例如:

  • 主控电脑(运行ROS2核心节点、算法、可视化界面)
  • 嵌入式单板机(如树莓派、NVIDIA Jetson,运行驱动和中间层节点)
  • 底层微控制器(MCU)(STM32/ESP32等,直接控制电机、传感器)

ROS2天然支持分布式多机通信,通过统一的话题机制连接不同层级的硬件与软件。

典型架构图:

+------------------------------------------------------+
|                   主控PC (ROS2)                      |
|  - 系统监控节点 system_publisher                      |
|  - GUI 显示节点 system_subscriber_gui                 |
|  - SLAM、导航、控制等高级算法                           |
+-------------------↑----------------------------------+
                    | DDS over LAN
+-------------------↓----------------------------------+
|       单板机/边缘设备 (ROS2或Micro-ROS Agent)          |
|  - 驱动节点(相机、激光雷达等)                          |
|  - 与底层MCU通信的桥接节点 (串口/Canbus/Modbus等)        |
+-------------------↑----------------------------------+
                    | 串口/CAN/I2C/SPI等总线
+-------------------↓----------------------------------+
|                   MCU (裸机或RTOS)                   |
|  - 传感器采集                                         |
|  - 电机/执行器控制                                    |
|  - 数据封装后通过串口/总线发送                          |
+------------------------------------------------------+

ROS2主机间通信(LAN/WiFi)

  • ROS2基于DDS(Data Distribution Service),内置发布/订阅分布式通信。
  • 不同电脑上运行的节点,只要网络可达并配置相同的ROS_DOMAIN_ID,就能直接通过话题通信,无需额外中间件。
  • 场景:多台PC协同计算、远程监控、分布式机器人。

嵌入式Linux设备通信

  • 像树莓派、Jetson等Linux设备可以直接跑完整ROS2。
  • 它们既可以充当底层设备的驱动桥,也能执行边缘计算,最终通过ROS2话题将数据上传到主控PC。

微控制器(MCU)通信

  • MCU资源有限,无法直接运行完整ROS2,可选:
  • Micro-ROS:在MCU上运行轻量级ROS客户端,通过串口或UDP与ROS2主机通信。
  • 串口桥接节点:MCU仅负责采集和发送原始数据;PC或单板机上运行节点,解析数据并封装为ROS2话题。

参考:

  1. https://www.bilibili.com/video/BV1GnH5eGEC3?spm_id_from=333.788.player.switch&vd_source=ef5a0ab0106372751602034cdd9ab98e