ROS2 入门:话题通信(带工作空间与包)
在 ROS2 中,话题(Topic) 是节点之间最常见的通信方式之一。它遵循 发布-订阅(Pub-Sub)模型:
- 发布者(Publisher) 将消息发送到某个话题。
- 订阅者(Subscriber) 监听该话题,并接收消息。
发布者和订阅者之间不需要直接知道对方的存在,只要它们使用同一个话题名和消息类型,就能实现通信。
一. 话题通信基本操作
1.1 创建工作空间与包
新建工作空间:
bashmkdir -p ~/ros2_ws/src cd ~/ros2_ws colcon build
进入
src
目录,新建 Python 包:bashcd ~/ros2_ws/src ros2 pkg create --build-type ament_python py_topic_demo
包结构如下:
py_topic_demo/
├── package.xml
├── py_topic_demo
│ └── __init__.py
├── resource
├── setup.cfg
├── setup.py
└── test
1.2 编写发布者节点
在 py_topic_demo/py_topic_demo/minimal_publisher.py
中添加:
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
class MinimalPublisher(Node):
def __init__(self):
super().__init__('minimal_publisher')
self.publisher_ = self.create_publisher(String, 'chatter', 10)
timer_period = 1.0
self.timer = self.create_timer(timer_period, self.timer_callback)
self.i = 0
def timer_callback(self):
msg = String()
msg.data = f'Hello ROS2: {self.i}'
self.publisher_.publish(msg)
self.get_logger().info(f'Publishing: "{msg.data}"')
self.i += 1
def main(args=None):
rclpy.init(args=args)
node = MinimalPublisher()
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
1.3 编写订阅者节点
在 py_topic_demo/py_topic_demo/minimal_subscriber.py
中添加:
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
class MinimalSubscriber(Node):
def __init__(self):
super().__init__('minimal_subscriber')
self.subscription = self.create_subscription(
String,
'chatter',
self.listener_callback,
10)
self.subscription
def listener_callback(self, msg):
self.get_logger().info(f'I heard: "{msg.data}"')
def main(args=None):
rclpy.init(args=args)
node = MinimalSubscriber()
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
1.4 修改 setup.py
在 setup.py
的 entry_points
中加入:
entry_points={
'console_scripts': [
'minimal_publisher = py_topic_demo.minimal_publisher:main',
'minimal_subscriber = py_topic_demo.minimal_subscriber:main',
],
},
1.5 构建与运行
在工作空间根目录构建:
bashcd ~/ros2_ws colcon build source install/setup.bash
运行发布者:
bashros2 run py_topic_demo minimal_publisher
如下图:
运行订阅者:
bashros2 run py_topic_demo minimal_subscriber
如下图:
1.6 使用命令行调试话题
查看所有话题:
bashros2 topic list
查看话题类型:
bashros2 topic type /chatter
查看话题实时消息:
bashros2 topic echo /chatter
发布测试消息:
bashros2 topic pub /chatter std_msgs/String "data: 'Hello from CLI'"
二、话题通信实践
turtlesim
是 ROS2 自带的一个简单模拟器,可以用来学习 话题通信。 我们要做的是:写一个发布者节点,发布速度指令,控制小海龟按照指定半径画圆。
2.1 启动模拟器
ros2 run turtlesim turtlesim_node
会弹出一个小窗口,显示一只小海龟 🐢。
2.2 控制说明
小海龟的运动由话题 /turtle1/cmd_vel
控制,它使用 geometry_msgs/msg/Twist
消息类型。
该消息用于表示 速度指令,由两个三维向量组成:
geometry_msgs/msg/Twist
-----------------------
geometry_msgs/Vector3 linear
geometry_msgs/Vector3 angular
其中 Vector3 的定义为:
geometry_msgs/msg/Vector3
-------------------------
float64 x
float64 y
float64 z
linear
是线速度向量,在 turtlesim
里,只有 linear.x
有效。
linear.x
:表示 前后方向的速度(m/s)。正值:向前运动负值:向后运动linear.y
:表示 左右平移速度(对小海龟无效,始终为 0)linear.z
:表示 上下方向速度(对小海龟无效,始终为 0)
angular
是角速度向量,在 turtlesim 里,只有 angular.z 有效。
angular.x
:绕 X 轴旋转速度(对小海龟无效)angular.y
:绕 Y 轴旋转速度(对小海龟无效)angular.z
:绕 Z 轴旋转速度(rad/s)。正值:逆时针旋转 ,负值:顺时针旋转。
在控制小海龟时,通常只需要设置两个参数:
- linear.x —— 决定小海龟的前进速度
- angular.z —— 决定小海龟的转弯速度
例如:
linear.x = 2.0, angular.z = 0.0
→ 小海龟直线前进linear.x = 0.0, angular.z = 1.0
→ 小海龟原地旋转linear.x = 2.0, angular.z = 1.0
→ 小海龟沿圆弧运动
同时,我们可以使用 话题 /turtle1/pose
获得小海龟的状态,消息类型为:turtlesim/msg/Pose
,格式如下:
turtlesim/msg/Pose
------------------
float32 x # 小海龟的 X 坐标(单位:米)
float32 y # 小海龟的 Y 坐标(单位:米)
float32 theta # 朝向角度(单位:弧度,0 表示朝右,逆时针为正)
float32 linear_velocity # 当前线速度
float32 angular_velocity # 当前角速度
2.3 编写海龟控制节点
创建功能包:
ros2 pkg create py_turtle_control --build-type ament_python --dependencies rclpy geometry_msgs turtlesim
编写控制节点,在py_turtle_control/py_turtle_control/turtle_circle_controller.py
添加:
import rclpy
from rclpy.node import Node
from geometry_msgs.msg import Twist
from turtlesim.msg import Pose
import math
class TurtleCircleController(Node):
def __init__(self):
super().__init__('turtle_circle_controller')
# 发布速度指令
self.publisher_ = self.create_publisher(Twist, '/turtle1/cmd_vel', 10)
# 订阅小海龟的位姿
self.subscription = self.create_subscription(
Pose,
'/turtle1/pose',
self.pose_callback,
10
)
# 控制参数
self.radius = 2.0 # 目标圆的半径
self.center_x = 5.5 # 圆心坐标(窗口中心)
self.center_y = 5.5
self.linear_speed = 2.0 # 期望的线速度
self.pose = None
# 定时器:50ms 调用一次控制逻辑
self.timer = self.create_timer(0.05, self.control_loop)
def pose_callback(self, msg: Pose):
"""接收小海龟的当前位置"""
self.pose = msg
def control_loop(self):
if self.pose is None:
return
# 当前坐标
x, y, theta = self.pose.x, self.pose.y, self.pose.theta
# 小海龟相对圆心的偏移
dx = x - self.center_x
dy = y - self.center_y
dist_to_center = math.sqrt(dx**2 + dy**2)
# 误差:当前半径 - 目标半径
error = dist_to_center - self.radius
# 控制律:
cmd = Twist()
cmd.linear.x = self.linear_speed
cmd.angular.z = -1.0 * error # 简单 P 控制器
# 发布速度
self.publisher_.publish(cmd)
# 打印调试信息
self.get_logger().info(
f'Pose (x={x:.2f}, y={y:.2f}, θ={theta:.2f}), error={error:.2f}, angular.z={cmd.angular.z:.2f}'
)
def main(args=None):
rclpy.init(args=args)
node = TurtleCircleController()
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
修改 setup.py
,在 setup.py
的 entry_points
中加入:
entry_points={
'console_scripts': [
'turtle_circle_controller = py_turtle_control.turtle_circle_controller:main',
],
},
在工作空间根目录构建:
colcon build
source install/setup.bash
启动控制节点:
ros2 run py_turtle_control turtle_circle_controller
终端会打印小海龟位置、误差和控制参数,小海龟会围绕屏幕中心画圆。
三、自定义通信接口(自定义msg)
在实际项目中,内置的消息类型(如 std_msgs、geometry_msgs)可能无法满足业务需求。 ROS2 允许我们 自定义消息(msg) 或 服务(srv) 接口,以便在节点间传递更加复杂或特定的数据结构。
下面我们基于自定义 msg 实现一个 读取电脑状态(CPU/内存占用等)并通过可视化界面显示 的小 demo。
3.1 创建工作空间和接口包
创建一个新的工作空间
mkdir -p ~/dev_ws/src
cd ~/dev_ws/src
创建接口包:
在 src 目录下,创建一个接口包(注意必须是 ament_cmake 类型):
ros2 pkg create --build-type ament_cmake system_interfaces
生成后的目录结构如下:
dev_ws/
└── src/
└── system_interfaces/
├── CMakeLists.txt
├── package.xml
└── ...
3.2 定义 msg 文件
进入接口包,创建 msg 文件夹,并新建 SystemStatus.msg
(注意这个自定义消息文件需要使用大驼峰命名法):
cd system_interfaces
mkdir msg
nano msg/SystemStatus.msg
内容如下:
builtin_interfaces/Time stamp # 记录时间戳
string host_name # 系统名称
float32 cpu_percent # CPU 使用率
float32 memory_percent # 内存使用率
float32 memory_total # 内存总量
float32 memory_available # 剩余有效内存
float64 net_sent # 网络发送数据总量
float64 net_recv # 网络接收数据总量
3.3 修改 package.xml
在package.xml
中添加依赖和声明接口包:
<depend>builtin_interfaces</depend>
<depend>rosidl_default_generators</depend>
<depend>rosidl_default_runtime</depend>
<!-- 必须声明为接口包 -->
<member_of_group>rosidl_interface_packages</member_of_group>
3.4 修改 CMakeLists.txt
打开 CMakeLists.txt,找到并修改如下内容:
find_package(ament_cmake REQUIRED)
find_package(rosidl_default_generators REQUIRED)
find_package(builtin_interfaces REQUIRED)
rosidl_generate_interfaces(${PROJECT_NAME}
"msg/SystemStatus.msg"
DEPENDENCIES builtin_interfaces
)
这样就会在构建时生成 SystemStatus 的 Python/C++ 接口。
3.5 构建接口包
回到工作空间根目录,执行:
cd ~/dev_ws
colcon build
构建完成后,记得 source:
source install/setup.bash
验证接口是否生成成功:
ros2 interface show system_interfaces/msg/SystemStatus
正常情况下会输出刚才定义的消息内容,如下图:
四、自定义msg实战
下面我们基于上面的自定义 msg 实现一个 读取电脑状态(CPU/内存占用等)并通过可视化界面显示 的小 demo。
这个小demo主要包括三部分:一个自定义的msg包、一个采集电脑状态的话题发布者、一个显示状态的话题订阅者。
4.1 创建 Python 功能包
在 ~/dev_ws/src
下创建一个新的 Python 包:
cd ~/dev_ws/src
ros2 pkg create --build-type ament_python system_monitor --dependencies rclpy psutil system_interfaces
这里我们额外依赖了 psutil 库用于采集系统信息,可以通过 apt 安装:
sudo apt update
sudo apt install python3-psutil
4.2 编写系统状态发布节点
在 system_monitor/system_monitor/system_publisher.py
中添加以下代码:
import rclpy
from rclpy.node import Node
from system_interfaces.msg import SystemStatus
from builtin_interfaces.msg import Time
import psutil
import socket
import time
class SystemStatusPublisher(Node):
def __init__(self):
super().__init__('system_status_publisher')
# 创建 Publisher
self.publisher_ = self.create_publisher(SystemStatus, 'system_status', 10)
# 定时器,1 秒发布一次
self.timer = self.create_timer(1.0, self.timer_callback)
# 获取主机名
self.host_name = socket.gethostname()
self.get_logger().info(f"SystemStatus Publisher started on {self.host_name}")
def timer_callback(self):
msg = SystemStatus()
# 时间戳
now = self.get_clock().now().to_msg()
msg.stamp = now
# 主机名
msg.host_name = self.host_name
# CPU使用率
msg.cpu_percent = psutil.cpu_percent(interval=None)
# 内存信息
mem = psutil.virtual_memory()
msg.memory_percent = mem.percent
msg.memory_total = mem.total / (1024 * 1024 * 1024) # GB
msg.memory_available = mem.available / (1024 * 1024 * 1024)
# 网络数据
net = psutil.net_io_counters()
msg.net_sent = net.bytes_sent / (1024 * 1024) # MB
msg.net_recv = net.bytes_recv / (1024 * 1024) # MB
self.publisher_.publish(msg)
self.get_logger().info(
f"CPU: {msg.cpu_percent:.1f}%, Mem: {msg.memory_percent:.1f}%, Sent: {msg.net_sent:.1f}MB, Recv: {msg.net_recv:.1f}MB"
)
def main(args=None):
rclpy.init(args=args)
node = SystemStatusPublisher()
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
4.3 修改状态发布节点的setup.py
在 system_monitor/setup.py
中添加可执行入口:
entry_points={
'console_scripts': [
'system_publisher = system_monitor.system_publisher:main',
],
},
4.4 构建并运行状态发布节点
cd ~/dev_ws
colcon build
source install/setup.bash
运行发布节点:
ros2 run system_monitor system_publisher
4.5 验证状态发布消息
在另一个终端查看话题:
ros2 topic list
会看到新增话题 /system_status,查看消息内容:
ros2 topic echo /system_status
输出如下:
4.6 编写系统状态可视化订阅节点
为了更直观展示系统运行状态,我们实现一个简单的 GUI 界面来订阅 /system_status
话题并实时显示数据。
在 system_monitor/system_monitor/system_subscriber_gui.py
中添加以下代码:
import sys
import rclpy
from rclpy.node import Node
from system_interfaces.msg import SystemStatus
import tkinter as tk
from tkinter import ttk
class SystemStatusGUI(Node):
def __init__(self):
super().__init__('system_status_gui')
# 创建订阅者
self.subscription = self.create_subscription(
SystemStatus,
'system_status',
self.listener_callback,
10
)
# 初始化GUI
self.root = tk.Tk()
self.root.title("System Monitor")
self.root.geometry("400x300")
self.labels = {}
fields = [
'Host Name', 'CPU Usage (%)', 'Memory Usage (%)',
'Total Memory (GB)', 'Available Memory (GB)',
'Net Sent (MB)', 'Net Received (MB)'
]
for i, field in enumerate(fields):
ttk.Label(self.root, text=field + ':', font=("Arial", 12, "bold")).grid(row=i, column=0, sticky="w", padx=10, pady=5)
self.labels[field] = ttk.Label(self.root, text="N/A", font=("Arial", 12))
self.labels[field].grid(row=i, column=1, sticky="w", padx=10, pady=5)
self.root.after(100, self.spin_ros)
def listener_callback(self, msg: SystemStatus):
"""接收到系统状态消息后更新GUI"""
self.labels['Host Name'].config(text=msg.host_name)
self.labels['CPU Usage (%)'].config(text=f"{msg.cpu_percent:.1f}")
self.labels['Memory Usage (%)'].config(text=f"{msg.memory_percent:.1f}")
self.labels['Total Memory (GB)'].config(text=f"{msg.memory_total:.2f}")
self.labels['Available Memory (GB)'].config(text=f"{msg.memory_available:.2f}")
self.labels['Net Sent (MB)'].config(text=f"{msg.net_sent:.1f}")
self.labels['Net Received (MB)'].config(text=f"{msg.net_recv:.1f}")
def spin_ros(self):
"""让ROS和Tkinter循环一起运行"""
rclpy.spin_once(self, timeout_sec=0)
self.root.after(100, self.spin_ros)
def run(self):
self.root.mainloop()
def main(args=None):
rclpy.init(args=args)
gui_node = SystemStatusGUI()
gui_node.run()
gui_node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
确保系统安装了 Tkinter(Python 的 GUI 库),如果没有安装,使用下面的命令安:
sudo apt update
sudo apt install python3-tk
4.7 修改 setup.py 注册订阅者入口
在 system_monitor/setup.py
中添加 GUI 订阅者的入口:
entry_points={
'console_scripts': [
'system_publisher = system_monitor.system_publisher:main',
'system_subscriber_gui = system_monitor.system_subscriber_gui:main',
],
},
4.8 构建并运行
重新构建:
cd ~/dev_ws
colcon build
source install/setup.bash
运行订阅者:
ros2 run system_monitor system_subscriber_gui
运行发布者(另一个终端):
ros2 run system_monitor system_publisher
GUI 界面会实时更新 CPU、内存、网络等状态数据:
至此,我们实现了:
system_interfaces
:自定义SystemStatus.msg
system_monitor
:system_publisher
:周期发布电脑状态system_subscriber_gui
:订阅状态并在 Tkinter GUI 中实时显示
这个 Demo 展示了如何在 ROS2 中:自定义消息接口/采集系统信息/构建发布者和订阅者/使用 Python GUI 可视化话题数据
虽然看起来“多此一举”,一个 Python 脚本就能直接实现相同功能,但 ROS2 的架构设计理念是模块化、分布式、解耦:
- 接口与逻辑分离:自定义 msg 让系统数据有标准化定义,方便其他节点使用
- 发布/订阅解耦:数据采集与显示逻辑分开,后续可以很容易接入新的数据消费者,比如日志系统、远程监控等
- 可扩展性:未来只需在不同设备上运行订阅者节点,就能实现多机状态实时监控
- 工业化工程思路:这也是 ROS2 在机器人开发中的优势,将功能拆分为小而独立的节点,有助于团队协作和后期维护
换句话说,这个 Demo 体现了 ROS2 从“脚本”到“系统工程”的转变。
补充:
在实际项目中,机器人系统往往由多台设备组成,例如:
- 主控电脑(运行ROS2核心节点、算法、可视化界面)
- 嵌入式单板机(如树莓派、NVIDIA Jetson,运行驱动和中间层节点)
- 底层微控制器(MCU)(STM32/ESP32等,直接控制电机、传感器)
ROS2天然支持分布式多机通信,通过统一的话题机制连接不同层级的硬件与软件。
典型架构图:
+------------------------------------------------------+
| 主控PC (ROS2) |
| - 系统监控节点 system_publisher |
| - GUI 显示节点 system_subscriber_gui |
| - SLAM、导航、控制等高级算法 |
+-------------------↑----------------------------------+
| DDS over LAN
+-------------------↓----------------------------------+
| 单板机/边缘设备 (ROS2或Micro-ROS Agent) |
| - 驱动节点(相机、激光雷达等) |
| - 与底层MCU通信的桥接节点 (串口/Canbus/Modbus等) |
+-------------------↑----------------------------------+
| 串口/CAN/I2C/SPI等总线
+-------------------↓----------------------------------+
| MCU (裸机或RTOS) |
| - 传感器采集 |
| - 电机/执行器控制 |
| - 数据封装后通过串口/总线发送 |
+------------------------------------------------------+
ROS2主机间通信(LAN/WiFi)
- ROS2基于DDS(Data Distribution Service),内置发布/订阅分布式通信。
- 不同电脑上运行的节点,只要网络可达并配置相同的ROS_DOMAIN_ID,就能直接通过话题通信,无需额外中间件。
- 场景:多台PC协同计算、远程监控、分布式机器人。
嵌入式Linux设备通信
- 像树莓派、Jetson等Linux设备可以直接跑完整ROS2。
- 它们既可以充当底层设备的驱动桥,也能执行边缘计算,最终通过ROS2话题将数据上传到主控PC。
微控制器(MCU)通信
- MCU资源有限,无法直接运行完整ROS2,可选:
- Micro-ROS:在MCU上运行轻量级ROS客户端,通过串口或UDP与ROS2主机通信。
- 串口桥接节点:MCU仅负责采集和发送原始数据;PC或单板机上运行节点,解析数据并封装为ROS2话题。