ROS2入门:服务与参数通信
在 ROS2 中,服务(Service) 用于 请求-响应 通信,适合一次性的交互;
参数(Parameter) 用于节点的配置和状态共享,节点可以在运行时读取或更新参数值。
- 服务端(Service Server):提供服务接口,等待客户端请求。
- 客户端(Service Client):发送请求,并获取响应。
- 参数(Parameter):节点内存储的可查询/可更新变量,可以被其他节点读取或修改。
这种组合方式适合:查询节点状态、动态调整配置、执行动作等场景。
一、服务与参数通信尝试
以小海龟为例,我们用 turtlesim 演示如何调用服务并查看参数。启动小海龟:
ros2 run turtlesim turtlesim_node
查看可用服务:
ros2 service list
如下图:
查看某个服务类型,例如 /spawn 服务:ros2 service type /spawn
输出:turtlesim/srv/Spawn
可以进一步查看详细请求/响应结构:
ros2 interface show turtlesim/srv/Spawn
输出如下:
这个输出示例是 ROS2 服务接口(srv) 的定义格式,用于描述请求和响应消息的结构。具体解释如下:
float32 x
float32 y
float32 theta
string name
---
string name
在 --- 上方的部分是客户端向服务端发送的 请求数据:
- x :新海龟的 X 坐标
- y : 新海龟的 Y 坐标
- theta : 新海龟的初始朝向(弧度)
- name : 新海龟的名字,如果为空则由系统自动生成 在 --- 下方的部分是服务端返回的数据:
- name : 服务端生成的海龟名称
当调用 /spawn 服务时,需要提供这些参数,告诉服务端新海龟生成的位置和名字。在一个新终端中执行:
ros2 service call /spawn turtlesim/srv/Spawn "{x: 5.0, y: 5.0, theta: 0.0, name: 'turtle2'}"
这会在画布上生成一只新的海龟 turtle2
。
Turtlesim 节点还提供了一些可配置参数,例如背景颜色:
ros2 param list /turtlesim
输出如下:
我们也可以查看某个参数值:
ros2 param get /turtlesim background_r
也可以修改某个参数值(这样对应的背景颜色也会发生变化):
ros2 param set /turtlesim background_r 200
这里可以通过文件存储和配置参数,也可以使用rqt工具进行可视化调整,在此不再赘述。
二、服务与参数通信实践(python)
下面我们实现一个基于python的服务通信实践,实现实时人脸检测,创建一个人脸检测服务,提供图像,返回人脸数量和位置信息。
这个服务用于识别人脸,请求数据就是图片信息,返回信息就是识别结结果和识别用时。
这个服务流程如下:
- 客户端发送图像数据。
- 服务端处理图像,检测人脸。
- 服务端返回检测结果,包括人脸数量、人脸位置和处理用时。
1. 定义服务接口
假设工作空间为 ros2_ws
:
cd ~/ros2_ws/src
ros2 pkg create part4_interfaces --dependencies sensor_msgs rosidl_default_generators --license MIT
这里使用 CMake 类型包,因为自定义 .srv 文件只能在 ament_cmake 类型包里生成。
创建服务文件:
在 part4_interfaces/srv/
下新建一个服务文件,例如 FaceDetect.srv
:
# 请求部分:客户端发送图片信息
sensor_msgs/Image image
---
# 响应部分:服务端返回检测结果
int16 number
float32 use_time
int32[] top
int32[] right
int32[] bottom
int32[] left
注意:--- 分隔请求和响应部分。
然后我们修改 CMakeLists.txt
,编辑 part4_interfaces/CMakeLists.txt
,添加这些内容:
# 声明 srv 文件
set(srv_files
"srv/FaceDetect.srv"
)
# 生成接口代码
rosidl_generate_interfaces(${PROJECT_NAME}
${srv_files}
DEPENDENCIES sensor_msgs
)
这些接口依赖rosidl_default_generators生成特定语言代码,需要在package.xml文件里面声明这些依赖,以及声接口包:
<depend>builtin_interfaces</depend>
<depend>rosidl_default_generators</depend>
<depend>rosidl_default_runtime</depend>
<!-- 必须声明为接口包 -->
<member_of_group>rosidl_interface_packages</member_of_group>
编译并生成接口、在工作空间根目录执行:
cd ~/ros2_ws
colcon build
source install/setup.bash
2. 人脸检测服务端的实现
这里使用face_recognition
这个库来实现人脸检测,我们先写一个简单的脚本测试这个库:
安装依赖:
pip3 install face_recognition -i https://pypi.tuna.tsinghua.edu.cn/simple
下载测试图片:
wget https://github.com/ultralytics/yolov5/raw/master/data/images/zidane.jpg -O zidane.jpg
写一个简单的测试脚本:
#!/usr/bin/env python3
"""
最简人脸检测脚本
- 仅使用 face_recognition
- 输出人脸数量和位置
"""
import face_recognition
import time
def detect_faces(image_path: str):
start = time.time()
# 加载图片
image = face_recognition.load_image_file(image_path)
# 检测人脸位置
face_locations = face_recognition.face_locations(image)
elapsed = time.time() - start
print(f"检测到 {len(face_locations)} 张人脸,用时 {elapsed:.3f} 秒")
for i, (top, right, bottom, left) in enumerate(face_locations):
print(f"Face {i+1}: top={top}, right={right}, bottom={bottom}, left={left}")
if __name__ == "__main__":
detect_faces("zidane.jpg")
运行:python3 face_detect_basic.py
,效果如下:
接下来我们把这个人脸检测脚本封装成一个服务,先创建一个功能包:
cd ~/ros2_ws/src
ros2 pkg create demo_python_service --build-type ament_python --dependencies rclpy sensor_msgs part4_interfaces --license MIT
在 demo_python_service/demo_python_service/face_service.py
中写入:
import rclpy
from rclpy.node import Node
from part4_interfaces.srv import FaceDetect
from sensor_msgs.msg import Image
from cv_bridge import CvBridge, CvBridgeError
import face_recognition
import time
class FaceDetectService(Node):
def __init__(self):
super().__init__('face_detect_service')
self.bridge = CvBridge()
# 创建服务
self.srv = self.create_service(FaceDetect, 'face_detect', self.detect_callback)
self.get_logger().info("人脸检测服务已启动,等待请求...")
def detect_callback(self, request, response):
"""服务回调函数:接收图片并返回检测结果"""
start = time.time()
try:
# 将 ROS Image 转换为 OpenCV 图像
cv_image = self.bridge.imgmsg_to_cv2(request.image, desired_encoding='bgr8')
if cv_image is None or cv_image.size == 0:
self.get_logger().error("收到的图片为空!")
response.number = 0
response.use_time = 0.0
response.top = []
response.right = []
response.bottom = []
response.left = []
return response
# 转换为 face_recognition 可处理的格式(RGB)
rgb_image = cv_image[:, :, ::-1]
# 检测人脸
face_locations = face_recognition.face_locations(rgb_image)
elapsed = time.time() - start
# 填充响应
response.number = len(face_locations)
response.use_time = float(elapsed)
response.top = [loc[0] for loc in face_locations]
response.right = [loc[1] for loc in face_locations]
response.bottom = [loc[2] for loc in face_locations]
response.left = [loc[3] for loc in face_locations]
self.get_logger().info(f"检测到 {response.number} 张人脸, 用时 {elapsed:.3f}s")
except CvBridgeError as e:
self.get_logger().error(f"CvBridge 转换错误: {e}")
response.number = 0
response.use_time = 0.0
response.top = []
response.right = []
response.bottom = []
response.left = []
except Exception as e:
self.get_logger().error(f"人脸检测异常: {e}")
response.number = 0
response.use_time = 0.0
response.top = []
response.right = []
response.bottom = []
response.left = []
return response
def main(args=None):
rclpy.init(args=args)
node = FaceDetectService()
rclpy.spin(node)
rclpy.shutdown()
if __name__ == '__main__':
main()
在setup.py
注册节点:
entry_points={
'console_scripts': [
'face_detect_service = demo_python_service.face_service:main'
],
},
编译运行:
cd ~/ros2_ws
colcon build
source install/setup.bash
启动服务端:
ros2 run demo_python_service face_detect_service
查询服务:ros2 service lsit -t
:
请求服务:ros2 service call /face_detect part4_interfaces/srv/FaceDetect
可以看到返回了空的响应,说明服务没有问题。下面我们着手实现客户端:
3. 人脸检测客户端的实现
首先我们把待识别的图片放到包下面的demo_python_service/resource
目录,这是用于存放资源文件的目录,依然使用这个来链接下的图片: https://github.com/ultralytics/yolov5/raw/master/data/images/zidane.jpg
。
需要修改 setup.py
,把 img
目录编译安装到 share
,在setup()
里加 data_files
项:
data_files=[
('share/ament_index/resource_index/packages',
['resource/' + package_name]),
('share/' + package_name, ['package.xml']),
('share/' + package_name+"/resource", ['resource/zidane.jpg']),
],
添加了这一行:
('share/' + package_name+"/resource", ['resource/zidane.jpg']),
这样编译完成后,资源文件会被拷贝到share
目录,可供我们调用:
编写客户端代码:demo_python_service/demo_python_service/face_client.py
如下:
import os
import rclpy
from rclpy.node import Node
from part4_interfaces.srv import FaceDetect
from sensor_msgs.msg import Image
from cv_bridge import CvBridge
from ament_index_python.packages import get_package_share_directory
import cv2
class FaceDetectClient(Node):
def __init__(self):
super().__init__('face_detect_client')
self.cli = self.create_client(FaceDetect, 'face_detect')
self.bridge = CvBridge()
while not self.cli.wait_for_service(timeout_sec=1.0):
self.get_logger().info('服务不可用,等待中...')
def send_request(self, image_path):
# 读取 share 目录下的图片
if not os.path.exists(image_path):
self.get_logger().error(f"图片路径不存在: {image_path}")
return
cv_image = cv2.imread(image_path)
if cv_image is None:
self.get_logger().error(f"无法读取图片: {image_path}")
return
# 转换为 ROS Image 消息
ros_image = self.bridge.cv2_to_imgmsg(cv_image, encoding='bgr8')
# 构造请求
req = FaceDetect.Request()
req.image = ros_image
# 异步调用服务
self.future = self.cli.call_async(req)
def main(args=None):
rclpy.init(args=args)
node = FaceDetectClient()
# 获取包的 share 目录下的资源路径
package_share_dir = get_package_share_directory('demo_python_service')
image_path = os.path.join(package_share_dir, 'resource', 'zidane.jpg')
node.send_request(image_path)
# 等待服务响应
while rclpy.ok():
rclpy.spin_once(node)
if node.future.done():
try:
response = node.future.result()
except Exception as e:
node.get_logger().error(f'服务调用失败: {e}')
else:
node.get_logger().info(f'检测到 {response.number} 张人脸,用时 {response.use_time:.3f}s')
# 显示结果
cv_image = cv2.imread(image_path)
for top, right, bottom, left in zip(
response.top, response.right, response.bottom, response.left
):
cv2.rectangle(cv_image, (left, top), (right, bottom), (0, 255, 0), 2)
cv2.imshow("Face Detection Result", cv_image)
cv2.waitKey(0)
cv2.destroyAllWindows()
break
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
继续在setup.py
注册节点:
entry_points={
'console_scripts': [
'face_detect_service = demo_python_service.face_service:main',
'face_detect_client = demo_python_service.face_client:main'
],
},
编译运行:
cd ~/ros2_ws
colcon build
source install/setup.bash
启动客户端:
ros2 run demo_python_service face_detect_client
效果如下:
可以看到可视化结果也给出来了:
三、服务与参数通信实践(C++)
下面我们实现一个基于python的服务通信实践,实现小海龟在模拟器里进行巡逻。
大概的逻辑是这样的:
- 服务端接收一个目标坐标 (x, y),控制海龟移动到该点,返回是否成功。
- 服务端逻辑:订阅
/turtle1/pose
获取当前位置,通过/turtle1/cmd_vel
发布速度指令控制海龟运动,当距离目标点小于阈值时,返回success = true
- 客户端逻辑:随机生成 (x, y) 坐标,循环调用服务端,实现巡逻效果。
1.自定义服务接口
在 part4_interfaces/srv/
下新建一个服务文件, TurtlePatrol.srv
:
float64 x
float64 y
---
bool success
修改CMakeLists.txt
:
# 声明 srv 文件
set(srv_files
"srv/FaceDetect.srv"
"srv/TurtlePatrol.srv"
)
# 生成接口代码
rosidl_generate_interfaces(${PROJECT_NAME}
${srv_files}
DEPENDENCIES sensor_msgs
)
编译生成接口:
cd ~/ros2_ws
colcon build
source install/setup.bash
2. 服务端实现
先新建一个包:
cd ~/ros2_ws/src
ros2 pkg create turtle_patrol_cpp --build-type ament_cmake --dependencies rclcpp geometry_msgs turtlesim part4_interfaces
服务端代码src/turtle_patrol_service.cpp
:
#include "rclcpp/rclcpp.hpp"
#include "geometry_msgs/msg/twist.hpp"
#include "turtlesim/msg/pose.hpp"
#include "chapt4_interfaces/srv/patrol.hpp"
#include "rcl_interfaces/msg/set_parameters_result.hpp"
#include <cmath>
using Patrol = chapt4_interfaces::srv::Patrol;
using SetParametersResult = rcl_interfaces::msg::SetParametersResult;
class TurtlePatrolService : public rclcpp::Node
{
public:
TurtlePatrolService() : Node("turtle_patrol_service")
{
// 发布速度控制
velocity_pub_ = this->create_publisher<geometry_msgs::msg::Twist>("/turtle1/cmd_vel", 10);
// 订阅小海龟位置
pose_sub_ = this->create_subscription<turtlesim::msg::Pose>(
"/turtle1/pose", 10,
std::bind(&TurtlePatrolService::on_pose_received, this, std::placeholders::_1));
// 创建服务
patrol_server_ = this->create_service<Patrol>(
"patrol",
std::bind(&TurtlePatrolService::handle_patrol, this, std::placeholders::_1, std::placeholders::_2));
// 声明参数并读取初值
this->declare_parameter("k", 1.0);
this->declare_parameter("max_speed", 1.0);
this->get_parameter("k", k_);
this->get_parameter("max_speed", max_speed_);
// 添加参数回调
parameters_callback_handle_ = this->add_on_set_parameters_callback(
[this](const std::vector<rclcpp::Parameter> ¶ms) -> SetParametersResult
{
for (auto param : params) {
if (param.get_name() == "k") {
k_ = param.as_double();
RCLCPP_INFO(this->get_logger(), "Updated k: %f", k_);
} else if (param.get_name() == "max_speed") {
max_speed_ = param.as_double();
RCLCPP_INFO(this->get_logger(), "Updated max_speed: %f", max_speed_);
}
}
SetParametersResult result;
result.successful = true;
return result;
});
RCLCPP_INFO(this->get_logger(), "Turtle Patrol Service is ready!");
}
private:
// 订阅/发布
rclcpp::Publisher<geometry_msgs::msg::Twist>::SharedPtr velocity_pub_;
rclcpp::Subscription<turtlesim::msg::Pose>::SharedPtr pose_sub_;
rclcpp::Service<Patrol>::SharedPtr patrol_server_;
// 当前目标
double target_x_{1.0};
double target_y_{1.0};
// 控制参数
double k_{1.0};
double max_speed_{1.0};
// 回调句柄
rclcpp::node_interfaces::OnSetParametersCallbackHandle::SharedPtr parameters_callback_handle_;
// 当前海龟位置
turtlesim::msg::Pose current_pose_;
bool has_new_target_{false};
void on_pose_received(const turtlesim::msg::Pose::SharedPtr pose)
{
current_pose_ = *pose;
if (!has_new_target_) return;
double dx = target_x_ - current_pose_.x;
double dy = target_y_ - current_pose_.y;
double distance = std::sqrt(dx * dx + dy * dy);
double angle_to_goal = std::atan2(dy, dx);
double angle_diff = angle_to_goal - current_pose_.theta;
// 归一化到 [-pi, pi]
while (angle_diff > M_PI) angle_diff -= 2 * M_PI;
while (angle_diff < -M_PI) angle_diff += 2 * M_PI;
geometry_msgs::msg::Twist cmd;
double linear_speed = 0.0;
double angular_speed = 4.0 * angle_diff;
if (distance > 0.1) {
if (std::fabs(angle_diff) < 0.2) {
linear_speed = k_ * distance;
if (linear_speed > max_speed_) linear_speed = max_speed_;
}
} else {
// 到达目标
linear_speed = 0.0;
angular_speed = 0.0;
has_new_target_ = false;
}
cmd.linear.x = linear_speed;
cmd.angular.z = angular_speed;
velocity_pub_->publish(cmd);
}
void handle_patrol(
const std::shared_ptr<Patrol::Request> request,
std::shared_ptr<Patrol::Response> response)
{
// 检查边界
if (request->target_x <= 0.0 || request->target_x >= 12.0 ||
request->target_y <= 0.0 || request->target_y >= 12.0) {
response->result = Patrol::Response::FAIL;
return;
}
target_x_ = request->target_x;
target_y_ = request->target_y;
has_new_target_ = true;
RCLCPP_INFO(this->get_logger(), "New patrol target: (%.2f, %.2f)", target_x_, target_y_);
response->result = Patrol::Response::SUCCESS;
}
};
int main(int argc, char **argv)
{
rclcpp::init(argc, argv);
auto node = std::make_shared<TurtlePatrolService>();
rclcpp::spin(node);
rclcpp::shutdown();
return 0;
}
修改 CMakeLists.txt
,添加:
# 服务端
add_executable(turtle_patrol_service src/turtle_patrol_service.cpp)
ament_target_dependencies(turtle_patrol_service rclcpp geometry_msgs turtlesim part4_interfaces)
install(TARGETS
turtle_patrol_service
DESTINATION lib/${PROJECT_NAME})
编译:
colcon build
source install/setup.bash
运行:
ros2 run turtle_patrol_cpp turtle_patrol_service
3. 客户端实现
编辑src/turtle_patrol_client.cpp
:
#include <chrono>
#include <cstdlib>
#include <ctime>
#include "rclcpp/rclcpp.hpp"
#include "part4_interfaces/srv/turtle_patrol.hpp"
using namespace std::chrono_literals;
using TurtlePatrol = part4_interfaces::srv::TurtlePatrol;
class TurtlePatrolClient : public rclcpp::Node
{
public:
TurtlePatrolClient() : Node("turtle_patrol_client")
{
client_ = this->create_client<TurtlePatrol>("turtle_patrol");
srand(time(nullptr));
// 定时器,每隔 5 秒发送一次随机目标
timer_ = this->create_wall_timer(5s, std::bind(&TurtlePatrolClient::timer_callback, this));
RCLCPP_INFO(this->get_logger(), "Turtle Patrol Client started.");
}
private:
rclcpp::Client<TurtlePatrol>::SharedPtr client_;
rclcpp::TimerBase::SharedPtr timer_;
void timer_callback()
{
// 等待服务上线
while (!client_->wait_for_service(1s)) {
if (!rclcpp::ok()) {
RCLCPP_ERROR(this->get_logger(), "等待服务过程中被打断");
return;
}
RCLCPP_INFO(this->get_logger(), "等待 turtle_patrol 服务上线..." );
}
// 构造请求
auto request = std::make_shared<TurtlePatrol::Request>();
request->x = 1.0 + static_cast<double>(rand()) / RAND_MAX * 9.0;
request->y = 1.0 + static_cast<double>(rand()) / RAND_MAX * 9.0;
RCLCPP_INFO(this->get_logger(), "请求巡逻到: (%.2f, %.2f)", request->x, request->y);
// 异步发送请求
client_->async_send_request(request,
[this](rclcpp::Client<TurtlePatrol>::SharedFuture future) {
auto response = future.get();
if (response->success) {
RCLCPP_INFO(this->get_logger(), "目标点到达成功");
} else {
RCLCPP_WARN(this->get_logger(), "目标点到达失败");
}
});
}
};
int main(int argc, char **argv)
{
rclcpp::init(argc, argv);
auto node = std::make_shared<TurtlePatrolClient>();
rclcpp::spin(node);
rclcpp::shutdown();
return 0;
}
修改 CMakeLists.txt
,添加:
# 客户端
add_executable(turtle_patrol_client src/turtle_patrol_client.cpp)
ament_target_dependencies(turtle_patrol_client rclcpp part4_interfaces)
install(TARGETS
turtle_patrol_service
turtle_patrol_client
DESTINATION lib/${PROJECT_NAME})
编译:
colcon build
source install/setup.bash
运行:
ros2 run turtle_patrol_cpp turtle_patrol_client
最后启动小乌龟:
ros2 run turtlesim turtlesim_node
效果:小海龟在窗口内随机巡逻
四、在python节点中使用参数
下面在第二部分的基础上,给服务加上参数,人脸识别函数 face_recognition.face_locations(image)
是可以添加一些额外的参数的,比如选择检测模型(hog 或 cnn),以及调整检测精度等。我们可以通过 ROS2 参数 的方式,让节点在运行时动态切换这些参数,而不用改代码。
face_recognition.face_locations
函数参数:
face_recognition.face_locations(img, number_of_times_to_upsample=1, model="hog")
number_of_times_to_upsample
:放大图像次数,值越高越容易检测到小人脸,但速度变慢。model
:检测模型,可选 "hog"(CPU快,精度中)或 "cnn"(GPU快,精度高)。
1. 修改服务端,添加参数
在服务节点中声明并读取参数,动态传给 face_recognition.face_locations
。
import rclpy
from rclpy.node import Node
from part4_interfaces.srv import FaceDetect
from sensor_msgs.msg import Image
from cv_bridge import CvBridge, CvBridgeError
import face_recognition
import time
class FaceDetectService(Node):
def __init__(self):
super().__init__('face_detect_service')
self.bridge = CvBridge()
# 声明参数
self.declare_parameter('model', 'hog') # 模型: hog 或 cnn
self.declare_parameter('upsample_times', 1) # 放大次数
# 创建服务
self.srv = self.create_service(FaceDetect, 'face_detect', self.detect_callback)
self.get_logger().info("人脸检测服务已启动,等待请求...")
def detect_callback(self, request, response):
"""服务回调函数:接收图片并返回检测结果"""
start = time.time()
# 检查 Image 消息是否有效
if (request.image is None or
request.image.data is None or len(request.image.data) == 0 or
request.image.width == 0 or request.image.height == 0 or
request.image.encoding == ""):
self.get_logger().error("收到无效的图片请求!")
response.number = 0
response.use_time = 0.0
response.top = []
response.right = []
response.bottom = []
response.left = []
return response
try:
# 将 ROS Image 转换为 OpenCV 图像
cv_image = self.bridge.imgmsg_to_cv2(request.image)
if cv_image is None or cv_image.size == 0:
self.get_logger().error("CvBridge 转换后的图像为空!")
response.number = 0
response.use_time = 0.0
response.top = []
response.right = []
response.bottom = []
response.left = []
return response
# 获取参数
model = self.get_parameter('model').get_parameter_value().string_value
upsample_times = self.get_parameter('upsample_times').get_parameter_value().integer_value
# 人脸检测
face_locations = face_recognition.face_locations(
cv_image,
number_of_times_to_upsample=upsample_times,
model=model
)
elapsed = time.time() - start
# 填充响应
response.number = len(face_locations)
response.use_time = float(elapsed)
response.top = [loc[0] for loc in face_locations]
response.right = [loc[1] for loc in face_locations]
response.bottom = [loc[2] for loc in face_locations]
response.left = [loc[3] for loc in face_locations]
self.get_logger().info(
f"检测到 {response.number} 张人脸, 模型: {model}, upsample: {upsample_times}, 用时 {elapsed:.3f}s"
)
except CvBridgeError as e:
self.get_logger().error(f"CvBridge 转换错误: {e}")
response.number = 0
response.use_time = 0.0
response.top = []
response.right = []
response.bottom = []
response.left = []
except Exception as e:
self.get_logger().error(f"人脸检测异常: {e}")
response.number = 0
response.use_time = 0.0
response.top = []
response.right = []
response.bottom = []
response.left = []
return response
def main(args=None):
rclpy.init(args=args)
node = FaceDetectService()
rclpy.spin(node)
rclpy.shutdown()
if __name__ == '__main__':
main()
由上述代码可以看到:
在节点初始化时,使用 self.declare_parameter
声明参数名和默认值:
self.declare_parameter('model', 'hog') # 默认使用 hog 模型
self.declare_parameter('upsample_times', 1) # 默认放大次数为 1
在服务回调或其他函数中,通过self.get_parameter()
获取参数值:
model = self.get_parameter('model').get_parameter_value().string_value
upsample_times = self.get_parameter('upsample_times').get_parameter_value().integer_value
这里用 .string_value 和 .integer_value 分别取出字符串和整数。
获取参数值还可以使用服务自带的参数设置回调,可以使用 add_on_set_parameters_callback
来注册一个回调函数,如下:
from rcl_interfaces.msg import SetParametersResult
class FaceDetectService(Node):
def __init__(self):
super().__init__('face_detect_service')
self.bridge = CvBridge()
# 声明参数
self.declare_parameter('model', 'hog')
self.declare_parameter('upsample_times', 1)
# 添加参数回调
self.add_on_set_parameters_callback(self.param_callback)
self.srv = self.create_service(FaceDetect, 'face_detect', self.detect_callback)
self.get_logger().info("人脸检测服务已启动,等待请求...")
def param_callback(self, params):
"""参数更新回调"""
for param in params:
if param.name == "model":
if param.value not in ["hog", "cnn"]:
self.get_logger().warn(f"模型参数非法: {param.value},使用默认值 hog")
return SetParametersResult(successful=False)
if param.name == "upsample_times":
if param.value < 0:
self.get_logger().warn("放大次数不能小于0")
return SetParametersResult(successful=False)
self.get_logger().info(f"参数更新成功: {[f'{p.name}={p.value}' for p in params]}")
return SetParametersResult(successful=True)
注意修改完代码重新编译项目
编译运行:
cd ~/ros2_ws
colcon build
source install/setup.bash
启动服务端:
ros2 run demo_python_service face_detect_service
2. 通过命令使用参数
查看参数:
ros2 param list
可以看到我们声明的两个参数就出来了(注:use_sim_time
是 ROS2 自动加的内置参数,每个节点都会有):
查看参数值:
ros2 param get /face_detect_service model
ros2 param get /face_detect_service upsample_times
动态修改参数:
ros2 param set /face_detect_service model cnn
ros2 param set /face_detect_service upsample_times 2
启动服务指定参数:
我们可以在启动节点时,通过 --ros-args -p
选项直接设置参数,而不用运行后再 ros2 param set
:
ros2 run face_detect_pkg face_detect_service \
--ros-args \
-p model:=cnn \
-p upsample_times:=2
或者使用参数文件:
# face_detect_params.yaml
face_detect_service:
ros__parameters:
model: "cnn"
upsample_times: 2
加载:
ros2 run face_detect_pkg face_detect_service --ros-args --params-file face_detect_params.yaml
3. 在客户端请求服务进行参数设置
通过调用远程节点的参数服务 /face_detect_service/set_parameters
,可以动态设置人脸检测的 model
和upsample_times
参数,而无需重启服务端或修改代码。
import rclpy
from rclpy.node import Node
from rcl_interfaces.srv import SetParameters
from rcl_interfaces.msg import Parameter, ParameterValue
from part4_interfaces.srv import FaceDetect
from sensor_msgs.msg import Image
from cv_bridge import CvBridge
import os
import cv2
from ament_index_python.packages import get_package_share_directory
class FaceDetectClient(Node):
def __init__(self):
super().__init__('face_detect_client')
self.cli = self.create_client(FaceDetect, 'face_detect')
self.bridge = CvBridge()
# 等待人脸检测服务
while not self.cli.wait_for_service(timeout_sec=1.0):
self.get_logger().info('人脸检测服务不可用,等待中...')
# 创建远程参数服务客户端
self.param_cli = self.create_client(SetParameters, '/face_detect_service/set_parameters')
while not self.param_cli.wait_for_service(timeout_sec=1.0):
self.get_logger().info('等待远程参数服务...')
def set_remote_parameters(self, model="hog", upsample_times=1):
"""通过服务调用修改远程节点参数"""
param_list = []
# model 参数
p_model = Parameter()
p_model.name = 'model'
p_model.value = ParameterValue()
p_model.value.type = 4 # STRING
p_model.value.string_value = model
param_list.append(p_model)
# upsample_times 参数
p_up = Parameter()
p_up.name = 'upsample_times'
p_up.value = ParameterValue()
p_up.value.type = 2 # INTEGER
p_up.value.integer_value = upsample_times
param_list.append(p_up)
# 调用远程参数服务
req = SetParameters.Request()
req.parameters = param_list
future = self.param_cli.call_async(req)
rclpy.spin_until_future_complete(self, future)
result = future.result()
# result.results 是一个列表,对应每个参数设置的结果
if all(r.successful for r in result.results):
self.get_logger().info("参数设置成功")
else:
self.get_logger().error("参数设置失败")
def send_request(self, image_path):
"""发送人脸检测请求"""
if not os.path.exists(image_path):
self.get_logger().error(f"图片路径不存在: {image_path}")
return None
cv_image = cv2.imread(image_path)
if cv_image is None:
self.get_logger().error(f"无法读取图片: {image_path}")
return None
ros_image = self.bridge.cv2_to_imgmsg(cv_image, encoding='bgr8')
req = FaceDetect.Request()
req.image = ros_image
future = self.cli.call_async(req)
rclpy.spin_until_future_complete(self, future)
return future.result()
def main(args=None):
rclpy.init(args=args)
node = FaceDetectClient()
node.set_remote_parameters(model="hog", upsample_times=2)
package_share_dir = get_package_share_directory('demo_python_service')
image_path = os.path.join(package_share_dir, 'resource', 'zidane.jpg')
response = node.send_request(image_path)
if response:
node.get_logger().info(f'检测到 {response.number} 张人脸,用时 {response.use_time:.3f}s')
cv_image = cv2.imread(image_path)
for top, right, bottom, left in zip(response.top, response.right, response.bottom, response.left):
cv2.rectangle(cv_image, (left, top), (right, bottom), (0, 255, 0), 2)
cv2.imshow("Face Detection Result", cv_image)
cv2.waitKey(0)
cv2.destroyAllWindows()
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
编译运行:
cd ~/ros2_ws
colcon build
source install/setup.bash
启动客户端:
ros2 run demo_python_service face_detect_client
效果如下:
五、在C++节点中使用参数
略,可参看: https://www.bilibili.com/video/BV19k4UeXEvZ
六、补充:使用Launch启动脚本
在 ROS2 中,推荐使用 Launch 文件 来启动节点并传递参数,这样更方便管理复杂系统,也支持同时启动多个节点和配置参数文件。以第三部分为例,我们可以用 Launch 文件 同时启动 turtlesim 节点、服务端节点和客户端节点,并且在 Launch 文件里传递参数或者设置输出。下面给出完整示例。
1.在包 turtle_patrol_cpp 下创建 Launch 文件
在 launch/
目录下新建 turtle_patrol_launch.py
:
from launch import LaunchDescription
from launch_ros.actions import Node
def generate_launch_description():
return LaunchDescription([
# 启动 turtlesim 节点
Node(
package='turtlesim',
executable='turtlesim_node',
name='turtlesim',
output='screen'
),
# 启动服务端节点
Node(
package='turtle_patrol_cpp',
executable='turtle_patrol_service',
name='turtle_patrol_service',
output='screen'
),
# 启动客户端节点
Node(
package='turtle_patrol_cpp',
executable='turtle_patrol_client',
name='turtle_patrol_client',
output='screen'
)
])
这里我们的服务没有参数,如果服务有参数,还可以这样写:
# 启动服务端节点
Node(
package='turtle_patrol_cpp',
executable='turtle_patrol_service',
name='turtle_patrol_service',
output='screen',
parameters=[{
'k': 1.0,
'max_speed': 1.0
}]
),
修改CMakeLists.txt
添加安装 Launch
文件:
# 安装 Launch 文件
install(DIRECTORY launch
DESTINATION share/${PROJECT_NAME}
)
2.通过launch启动
在终端中执行:
ros2 launch turtle_patrol_cpp turtle_patrol_launch.py
启动后:
turtlesim_node
打开窗口显示小乌龟。turtle_patrol_service
启动服务端,监听客户端请求。turtle_patrol_client
每隔 5 秒生成随机目标点并调用服务,控制小乌龟巡逻。
3. launch进阶
在 ROS2 中,Launch 文件功能非常强大,除了同时启动多个节点,还能做以下操作:
3.1 传递参数文件
可以将参数配置写入 YAML 文件,并在 Launch 文件中加载到节点。
config/params.yaml
:
turtle_patrol_service:
ros__parameters:
k: 1.5
max_speed: 2.0
Launch 文件加载参数:
from launch import LaunchDescription
from launch_ros.actions import Node
from ament_index_python.packages import get_package_share_directory
import os
def generate_launch_description():
pkg_share = get_package_share_directory('turtle_patrol_cpp')
param_file = os.path.join(pkg_share, 'config', 'params.yaml')
return LaunchDescription([
Node(
package='turtlesim',
executable='turtlesim_node',
name='turtlesim',
output='screen'
),
Node(
package='turtle_patrol_cpp',
executable='turtle_patrol_service',
name='turtle_patrol_service',
output='screen',
parameters=[param_file]
),
Node(
package='turtle_patrol_cpp',
executable='turtle_patrol_client',
name='turtle_patrol_client',
output='screen'
)
])
3.2 使用 Launch Arguments
通过 Launch 参数动态控制节点启动行为或配置参数。
from launch import LaunchDescription
from launch_ros.actions import Node
from launch.actions import DeclareLaunchArgument
from launch.substitutions import LaunchConfiguration
def generate_launch_description():
k_arg = DeclareLaunchArgument('k', default_value='1.0', description='增益参数')
max_speed_arg = DeclareLaunchArgument('max_speed', default_value='1.0', description='最大速度')
return LaunchDescription([
k_arg,
max_speed_arg,
Node(
package='turtle_patrol_cpp',
executable='turtle_patrol_service',
name='turtle_patrol_service',
output='screen',
parameters=[{
'k': LaunchConfiguration('k'),
'max_speed': LaunchConfiguration('max_speed')
}]
)
])
3.3 条件启动节点
根据布尔值或条件判断决定是否启动某个节点。
from launch.conditions import IfCondition
from launch.substitutions import LaunchConfiguration
Node(
package='turtle_patrol_cpp',
executable='turtle_patrol_client',
name='turtle_patrol_client',
output='screen',
condition=IfCondition(LaunchConfiguration('enable_client'))
)
运行时通过参数控制是否启动:
ros2 launch turtle_patrol_cpp turtle_patrol_launch.py enable_client:=true
3.4 定时器(TimerAction)
延迟启动节点或动作,控制启动顺序或等待依赖节点初始化。TimerAction
可以让你延迟启动某个节点或动作。比如想等 turtlesim
完全启动后再启动客户端:
from launch.actions import TimerAction
from launch_ros.actions import Node
client_node = Node(
package='turtle_patrol_cpp',
executable='turtle_patrol_client',
name='turtle_patrol_client',
output='screen'
)
delayed_client_node = TimerAction(
period=3.0, # 延迟 3 秒启动
actions=[client_node]
)
3.5 包含其他 Launch 文件(IncludeLaunchDescription)
在一个 Launch 文件中调用其他 Launch 文件,实现模块化管理和统一调度。如果系统很大,建议把节点拆到多个 Launch 文件里,再在主 Launch 文件中统一调用。示例:
from launch.actions import IncludeLaunchDescription
from launch.launch_description_sources import PythonLaunchDescriptionSource
from ament_index_python.packages import get_package_share_directory
import os
include_turtle_launch = IncludeLaunchDescription(
PythonLaunchDescriptionSource(
os.path.join(get_package_share_directory('turtlesim'), 'launch', 'turtlesim.launch.py')
)
)
include_service_launch = IncludeLaunchDescription(
PythonLaunchDescriptionSource(
os.path.join(get_package_share_directory('turtle_patrol_cpp'), 'launch', 'service_launch.py')
)
)
ld = LaunchDescription([
include_turtle_launch,
include_service_launch
])