Skip to content

ROS2入门:服务与参数通信

在 ROS2 中,服务(Service) 用于 请求-响应 通信,适合一次性的交互;
参数(Parameter) 用于节点的配置和状态共享,节点可以在运行时读取或更新参数值。

  • 服务端(Service Server):提供服务接口,等待客户端请求。
  • 客户端(Service Client):发送请求,并获取响应。
  • 参数(Parameter):节点内存储的可查询/可更新变量,可以被其他节点读取或修改。

这种组合方式适合:查询节点状态、动态调整配置、执行动作等场景。


一、服务与参数通信尝试

以小海龟为例,我们用 turtlesim 演示如何调用服务并查看参数。启动小海龟:

bash
ros2 run turtlesim turtlesim_node

查看可用服务:

bash
ros2 service list

如下图:

查看某个服务类型,例如 /spawn 服务:ros2 service type /spawn 输出:turtlesim/srv/Spawn

可以进一步查看详细请求/响应结构:

bash
ros2 interface show turtlesim/srv/Spawn

输出如下:

861

这个输出示例是 ROS2 服务接口(srv) 的定义格式,用于描述请求和响应消息的结构。具体解释如下:

float32 x
float32 y
float32 theta
string name
---
string name

在 --- 上方的部分是客户端向服务端发送的 请求数据:

  • x :新海龟的 X 坐标
  • y : 新海龟的 Y 坐标
  • theta : 新海龟的初始朝向(弧度)
  • name : 新海龟的名字,如果为空则由系统自动生成 在 --- 下方的部分是服务端返回的数据:
  • name : 服务端生成的海龟名称

当调用 /spawn 服务时,需要提供这些参数,告诉服务端新海龟生成的位置和名字。在一个新终端中执行:

bash
ros2 service call /spawn turtlesim/srv/Spawn "{x: 5.0, y: 5.0, theta: 0.0, name: 'turtle2'}"

这会在画布上生成一只新的海龟 turtle2

Turtlesim 节点还提供了一些可配置参数,例如背景颜色:

ros2 param list /turtlesim

输出如下: 827

我们也可以查看某个参数值:

ros2 param get /turtlesim background_r

也可以修改某个参数值(这样对应的背景颜色也会发生变化):

ros2 param set /turtlesim background_r 200

这里可以通过文件存储和配置参数,也可以使用rqt工具进行可视化调整,在此不再赘述。

二、服务与参数通信实践(python)

下面我们实现一个基于python的服务通信实践,实现实时人脸检测,创建一个人脸检测服务,提供图像,返回人脸数量和位置信息。

这个服务用于识别人脸,请求数据就是图片信息,返回信息就是识别结结果和识别用时。

这个服务流程如下:

  • 客户端发送图像数据。
  • 服务端处理图像,检测人脸。
  • 服务端返回检测结果,包括人脸数量、人脸位置和处理用时。

1. 定义服务接口

假设工作空间为 ros2_ws

bash
cd ~/ros2_ws/src
ros2 pkg create  part4_interfaces --dependencies sensor_msgs rosidl_default_generators --license MIT

这里使用 CMake 类型包,因为自定义 .srv 文件只能在 ament_cmake 类型包里生成。

创建服务文件:

part4_interfaces/srv/ 下新建一个服务文件,例如 FaceDetect.srv

# 请求部分:客户端发送图片信息
sensor_msgs/Image image
---
# 响应部分:服务端返回检测结果
int16 number
float32 use_time
int32[] top
int32[] right
int32[] bottom
int32[] left

注意:--- 分隔请求和响应部分。

然后我们修改 CMakeLists.txt,编辑 part4_interfaces/CMakeLists.txt,添加这些内容:

cmake
# 声明 srv 文件
set(srv_files
  "srv/FaceDetect.srv"
)

# 生成接口代码
rosidl_generate_interfaces(${PROJECT_NAME}
  ${srv_files}
  DEPENDENCIES sensor_msgs
)

这些接口依赖rosidl_default_generators生成特定语言代码,需要在package.xml文件里面声明这些依赖,以及声接口包:

xml
<depend>builtin_interfaces</depend>
<depend>rosidl_default_generators</depend>
<depend>rosidl_default_runtime</depend>

 <!-- 必须声明为接口包 -->
<member_of_group>rosidl_interface_packages</member_of_group>

编译并生成接口、在工作空间根目录执行:

bash
cd ~/ros2_ws
colcon build 
source install/setup.bash

2. 人脸检测服务端的实现

这里使用face_recognition这个库来实现人脸检测,我们先写一个简单的脚本测试这个库:

安装依赖:

bash
pip3 install face_recognition -i https://pypi.tuna.tsinghua.edu.cn/simple

下载测试图片:

bash
wget https://github.com/ultralytics/yolov5/raw/master/data/images/zidane.jpg -O zidane.jpg

写一个简单的测试脚本:

python
#!/usr/bin/env python3
"""
最简人脸检测脚本
- 仅使用 face_recognition
- 输出人脸数量和位置
"""

import face_recognition
import time


def detect_faces(image_path: str):
    start = time.time()

    # 加载图片
    image = face_recognition.load_image_file(image_path)

    # 检测人脸位置
    face_locations = face_recognition.face_locations(image)

    elapsed = time.time() - start

    print(f"检测到 {len(face_locations)} 张人脸,用时 {elapsed:.3f} 秒")
    for i, (top, right, bottom, left) in enumerate(face_locations):
        print(f"Face {i+1}: top={top}, right={right}, bottom={bottom}, left={left}")


if __name__ == "__main__":
    detect_faces("zidane.jpg")

运行:python3 face_detect_basic.py,效果如下:

接下来我们把这个人脸检测脚本封装成一个服务,先创建一个功能包:

bash
cd ~/ros2_ws/src
ros2 pkg create demo_python_service --build-type ament_python --dependencies rclpy sensor_msgs part4_interfaces --license MIT

demo_python_service/demo_python_service/face_service.py 中写入:

python
import rclpy
from rclpy.node import Node
from part4_interfaces.srv import FaceDetect
from sensor_msgs.msg import Image
from cv_bridge import CvBridge, CvBridgeError
import face_recognition
import time


class FaceDetectService(Node):
    def __init__(self):
        super().__init__('face_detect_service')
        self.bridge = CvBridge()
        # 创建服务
        self.srv = self.create_service(FaceDetect, 'face_detect', self.detect_callback)
        self.get_logger().info("人脸检测服务已启动,等待请求...")

    def detect_callback(self, request, response):
        """服务回调函数:接收图片并返回检测结果"""
        start = time.time()

        try:
            # 将 ROS Image 转换为 OpenCV 图像
            cv_image = self.bridge.imgmsg_to_cv2(request.image, desired_encoding='bgr8')

            if cv_image is None or cv_image.size == 0:
                self.get_logger().error("收到的图片为空!")
                response.number = 0
                response.use_time = 0.0
                response.top = []
                response.right = []
                response.bottom = []
                response.left = []
                return response

            # 转换为 face_recognition 可处理的格式(RGB)
            rgb_image = cv_image[:, :, ::-1]

            # 检测人脸
            face_locations = face_recognition.face_locations(rgb_image)
            elapsed = time.time() - start

            # 填充响应
            response.number = len(face_locations)
            response.use_time = float(elapsed)
            response.top = [loc[0] for loc in face_locations]
            response.right = [loc[1] for loc in face_locations]
            response.bottom = [loc[2] for loc in face_locations]
            response.left = [loc[3] for loc in face_locations]

            self.get_logger().info(f"检测到 {response.number} 张人脸, 用时 {elapsed:.3f}s")

        except CvBridgeError as e:
            self.get_logger().error(f"CvBridge 转换错误: {e}")
            response.number = 0
            response.use_time = 0.0
            response.top = []
            response.right = []
            response.bottom = []
            response.left = []

        except Exception as e:
            self.get_logger().error(f"人脸检测异常: {e}")
            response.number = 0
            response.use_time = 0.0
            response.top = []
            response.right = []
            response.bottom = []
            response.left = []

        return response


def main(args=None):
    rclpy.init(args=args)
    node = FaceDetectService()
    rclpy.spin(node)
    rclpy.shutdown()


if __name__ == '__main__':
    main()

setup.py注册节点:

python
    entry_points={
        'console_scripts': [
            'face_detect_service  = demo_python_service.face_service:main'
        ],
    },

编译运行:

bash
cd ~/ros2_ws
colcon build
source install/setup.bash

启动服务端:

bash
ros2 run demo_python_service face_detect_service

查询服务:ros2 service lsit -t

请求服务:ros2 service call /face_detect part4_interfaces/srv/FaceDetect

可以看到返回了空的响应,说明服务没有问题。下面我们着手实现客户端:

3. 人脸检测客户端的实现

首先我们把待识别的图片放到包下面的demo_python_service/resource 目录,这是用于存放资源文件的目录,依然使用这个来链接下的图片: https://github.com/ultralytics/yolov5/raw/master/data/images/zidane.jpg

需要修改 setup.py,把 img 目录编译安装到 share,在setup()里加 data_files 项:

python
    data_files=[
        ('share/ament_index/resource_index/packages',
            ['resource/' + package_name]),
        ('share/' + package_name, ['package.xml']),
        ('share/' + package_name+"/resource", ['resource/zidane.jpg']),
    ],

添加了这一行:

python
 ('share/' + package_name+"/resource", ['resource/zidane.jpg']),

这样编译完成后,资源文件会被拷贝到share目录,可供我们调用:

946

编写客户端代码:demo_python_service/demo_python_service/face_client.py如下:

python
import os
import rclpy
from rclpy.node import Node
from part4_interfaces.srv import FaceDetect
from sensor_msgs.msg import Image
from cv_bridge import CvBridge
from ament_index_python.packages import get_package_share_directory
import cv2


class FaceDetectClient(Node):
    def __init__(self):
        super().__init__('face_detect_client')
        self.cli = self.create_client(FaceDetect, 'face_detect')
        self.bridge = CvBridge()

        while not self.cli.wait_for_service(timeout_sec=1.0):
            self.get_logger().info('服务不可用,等待中...')

    def send_request(self, image_path):
        # 读取 share 目录下的图片
        if not os.path.exists(image_path):
            self.get_logger().error(f"图片路径不存在: {image_path}")
            return

        cv_image = cv2.imread(image_path)
        if cv_image is None:
            self.get_logger().error(f"无法读取图片: {image_path}")
            return

        # 转换为 ROS Image 消息
        ros_image = self.bridge.cv2_to_imgmsg(cv_image, encoding='bgr8')

        # 构造请求
        req = FaceDetect.Request()
        req.image = ros_image

        # 异步调用服务
        self.future = self.cli.call_async(req)


def main(args=None):
    rclpy.init(args=args)
    node = FaceDetectClient()

    # 获取包的 share 目录下的资源路径
    package_share_dir = get_package_share_directory('demo_python_service')
    image_path = os.path.join(package_share_dir, 'resource', 'zidane.jpg')

    node.send_request(image_path)

    # 等待服务响应
    while rclpy.ok():
        rclpy.spin_once(node)
        if node.future.done():
            try:
                response = node.future.result()
            except Exception as e:
                node.get_logger().error(f'服务调用失败: {e}')
            else:
                node.get_logger().info(f'检测到 {response.number} 张人脸,用时 {response.use_time:.3f}s')

                # 显示结果
                cv_image = cv2.imread(image_path)
                for top, right, bottom, left in zip(
                    response.top, response.right, response.bottom, response.left
                ):
                    cv2.rectangle(cv_image, (left, top), (right, bottom), (0, 255, 0), 2)
                cv2.imshow("Face Detection Result", cv_image)
                cv2.waitKey(0)
                cv2.destroyAllWindows()
            break

    node.destroy_node()
    rclpy.shutdown()


if __name__ == '__main__':
    main()

继续在setup.py注册节点:

python
    entry_points={
        'console_scripts': [
            'face_detect_service  = demo_python_service.face_service:main',
            'face_detect_client   = demo_python_service.face_client:main'
        ],
    },

编译运行:

bash
cd ~/ros2_ws
colcon build
source install/setup.bash

启动客户端:

ros2 run demo_python_service face_detect_client

效果如下:

可以看到可视化结果也给出来了:

818

三、服务与参数通信实践(C++)

下面我们实现一个基于python的服务通信实践,实现小海龟在模拟器里进行巡逻。

大概的逻辑是这样的:

  1. 服务端接收一个目标坐标 (x, y),控制海龟移动到该点,返回是否成功。
  2. 服务端逻辑:订阅/turtle1/pose获取当前位置,通过 /turtle1/cmd_vel 发布速度指令控制海龟运动,当距离目标点小于阈值时,返回 success = true
  3. 客户端逻辑:随机生成 (x, y) 坐标,循环调用服务端,实现巡逻效果。

1.自定义服务接口

part4_interfaces/srv/ 下新建一个服务文件, TurtlePatrol.srv

float64 x
float64 y
---
bool success

修改CMakeLists.txt:

# 声明 srv 文件
set(srv_files
  "srv/FaceDetect.srv"
  "srv/TurtlePatrol.srv"
)

# 生成接口代码
rosidl_generate_interfaces(${PROJECT_NAME}
  ${srv_files}
  DEPENDENCIES sensor_msgs
)

编译生成接口:

cd ~/ros2_ws
colcon build
source install/setup.bash

2. 服务端实现

先新建一个包:

cd ~/ros2_ws/src
ros2 pkg create turtle_patrol_cpp --build-type ament_cmake --dependencies rclcpp geometry_msgs turtlesim part4_interfaces

服务端代码src/turtle_patrol_service.cpp

cpp
#include "rclcpp/rclcpp.hpp"
#include "geometry_msgs/msg/twist.hpp"
#include "turtlesim/msg/pose.hpp"
#include "chapt4_interfaces/srv/patrol.hpp"
#include "rcl_interfaces/msg/set_parameters_result.hpp"
#include <cmath>

using Patrol = chapt4_interfaces::srv::Patrol;
using SetParametersResult = rcl_interfaces::msg::SetParametersResult;

class TurtlePatrolService : public rclcpp::Node
{
public:
    TurtlePatrolService() : Node("turtle_patrol_service")
    {
        // 发布速度控制
        velocity_pub_ = this->create_publisher<geometry_msgs::msg::Twist>("/turtle1/cmd_vel", 10);

        // 订阅小海龟位置
        pose_sub_ = this->create_subscription<turtlesim::msg::Pose>(
            "/turtle1/pose", 10,
            std::bind(&TurtlePatrolService::on_pose_received, this, std::placeholders::_1));

        // 创建服务
        patrol_server_ = this->create_service<Patrol>(
            "patrol",
            std::bind(&TurtlePatrolService::handle_patrol, this, std::placeholders::_1, std::placeholders::_2));

        // 声明参数并读取初值
        this->declare_parameter("k", 1.0);
        this->declare_parameter("max_speed", 1.0);
        this->get_parameter("k", k_);
        this->get_parameter("max_speed", max_speed_);

        // 添加参数回调
        parameters_callback_handle_ = this->add_on_set_parameters_callback(
            [this](const std::vector<rclcpp::Parameter> &params) -> SetParametersResult
            {
                for (auto param : params) {
                    if (param.get_name() == "k") {
                        k_ = param.as_double();
                        RCLCPP_INFO(this->get_logger(), "Updated k: %f", k_);
                    } else if (param.get_name() == "max_speed") {
                        max_speed_ = param.as_double();
                        RCLCPP_INFO(this->get_logger(), "Updated max_speed: %f", max_speed_);
                    }
                }
                SetParametersResult result;
                result.successful = true;
                return result;
            });

        RCLCPP_INFO(this->get_logger(), "Turtle Patrol Service is ready!");
    }

private:
    // 订阅/发布
    rclcpp::Publisher<geometry_msgs::msg::Twist>::SharedPtr velocity_pub_;
    rclcpp::Subscription<turtlesim::msg::Pose>::SharedPtr pose_sub_;
    rclcpp::Service<Patrol>::SharedPtr patrol_server_;

    // 当前目标
    double target_x_{1.0};
    double target_y_{1.0};

    // 控制参数
    double k_{1.0};
    double max_speed_{1.0};

    // 回调句柄
    rclcpp::node_interfaces::OnSetParametersCallbackHandle::SharedPtr parameters_callback_handle_;

    // 当前海龟位置
    turtlesim::msg::Pose current_pose_;
    bool has_new_target_{false};

    void on_pose_received(const turtlesim::msg::Pose::SharedPtr pose)
    {
        current_pose_ = *pose;

        if (!has_new_target_) return;

        double dx = target_x_ - current_pose_.x;
        double dy = target_y_ - current_pose_.y;
        double distance = std::sqrt(dx * dx + dy * dy);
        double angle_to_goal = std::atan2(dy, dx);
        double angle_diff = angle_to_goal - current_pose_.theta;

        // 归一化到 [-pi, pi]
        while (angle_diff > M_PI) angle_diff -= 2 * M_PI;
        while (angle_diff < -M_PI) angle_diff += 2 * M_PI;

        geometry_msgs::msg::Twist cmd;
        double linear_speed = 0.0;
        double angular_speed = 4.0 * angle_diff;

        if (distance > 0.1) {
            if (std::fabs(angle_diff) < 0.2) {
                linear_speed = k_ * distance;
                if (linear_speed > max_speed_) linear_speed = max_speed_;
            }
        } else {
            // 到达目标
            linear_speed = 0.0;
            angular_speed = 0.0;
            has_new_target_ = false;
        }

        cmd.linear.x = linear_speed;
        cmd.angular.z = angular_speed;
        velocity_pub_->publish(cmd);
    }

    void handle_patrol(
        const std::shared_ptr<Patrol::Request> request,
        std::shared_ptr<Patrol::Response> response)
    {
        // 检查边界
        if (request->target_x <= 0.0 || request->target_x >= 12.0 ||
            request->target_y <= 0.0 || request->target_y >= 12.0) {
            response->result = Patrol::Response::FAIL;
            return;
        }

        target_x_ = request->target_x;
        target_y_ = request->target_y;
        has_new_target_ = true;

        RCLCPP_INFO(this->get_logger(), "New patrol target: (%.2f, %.2f)", target_x_, target_y_);
        response->result = Patrol::Response::SUCCESS;
    }
};

int main(int argc, char **argv)
{
    rclcpp::init(argc, argv);
    auto node = std::make_shared<TurtlePatrolService>();
    rclcpp::spin(node);
    rclcpp::shutdown();
    return 0;
}

修改 CMakeLists.txt,添加:

# 服务端
add_executable(turtle_patrol_service src/turtle_patrol_service.cpp)
ament_target_dependencies(turtle_patrol_service rclcpp geometry_msgs turtlesim part4_interfaces)


install(TARGETS
  turtle_patrol_service
  DESTINATION lib/${PROJECT_NAME})

编译:

colcon build 
source install/setup.bash

运行:

ros2 run turtle_patrol_cpp turtle_patrol_service

3. 客户端实现

编辑src/turtle_patrol_client.cpp

cpp
#include <chrono>
#include <cstdlib>
#include <ctime>
#include "rclcpp/rclcpp.hpp"
#include "part4_interfaces/srv/turtle_patrol.hpp"

using namespace std::chrono_literals;
using TurtlePatrol = part4_interfaces::srv::TurtlePatrol;

class TurtlePatrolClient : public rclcpp::Node
{
public:
    TurtlePatrolClient() : Node("turtle_patrol_client")
    {
        client_ = this->create_client<TurtlePatrol>("turtle_patrol");
        srand(time(nullptr));

        // 定时器,每隔 5 秒发送一次随机目标
        timer_ = this->create_wall_timer(5s, std::bind(&TurtlePatrolClient::timer_callback, this));

        RCLCPP_INFO(this->get_logger(), "Turtle Patrol Client started.");
    }

private:
    rclcpp::Client<TurtlePatrol>::SharedPtr client_;
    rclcpp::TimerBase::SharedPtr timer_;

    void timer_callback()
    {
        // 等待服务上线
        while (!client_->wait_for_service(1s)) {
            if (!rclcpp::ok()) {
                RCLCPP_ERROR(this->get_logger(), "等待服务过程中被打断");
                return;
            }
            RCLCPP_INFO(this->get_logger(), "等待 turtle_patrol 服务上线..." );
        }

        // 构造请求
        auto request = std::make_shared<TurtlePatrol::Request>();
        request->x = 1.0 + static_cast<double>(rand()) / RAND_MAX * 9.0;
        request->y = 1.0 + static_cast<double>(rand()) / RAND_MAX * 9.0;

        RCLCPP_INFO(this->get_logger(), "请求巡逻到: (%.2f, %.2f)", request->x, request->y);

        // 异步发送请求
        client_->async_send_request(request,
            [this](rclcpp::Client<TurtlePatrol>::SharedFuture future) {
                auto response = future.get();
                if (response->success) {
                    RCLCPP_INFO(this->get_logger(), "目标点到达成功");
                } else {
                    RCLCPP_WARN(this->get_logger(), "目标点到达失败");
                }
            });
    }
};

int main(int argc, char **argv)
{
    rclcpp::init(argc, argv);
    auto node = std::make_shared<TurtlePatrolClient>();
    rclcpp::spin(node);
    rclcpp::shutdown();
    return 0;
}

修改 CMakeLists.txt,添加:

# 客户端
add_executable(turtle_patrol_client src/turtle_patrol_client.cpp)
ament_target_dependencies(turtle_patrol_client rclcpp part4_interfaces)

install(TARGETS
  turtle_patrol_service
  turtle_patrol_client
  DESTINATION lib/${PROJECT_NAME})

编译:

bash
colcon build 
source install/setup.bash

运行:

bash
ros2 run turtle_patrol_cpp turtle_patrol_client

最后启动小乌龟:

bash
ros2 run turtlesim turtlesim_node

效果:小海龟在窗口内随机巡逻

四、在python节点中使用参数

下面在第二部分的基础上,给服务加上参数,人脸识别函数 face_recognition.face_locations(image)是可以添加一些额外的参数的,比如选择检测模型(hog 或 cnn),以及调整检测精度等。我们可以通过 ROS2 参数 的方式,让节点在运行时动态切换这些参数,而不用改代码。

face_recognition.face_locations 函数参数:

python
face_recognition.face_locations(img, number_of_times_to_upsample=1, model="hog")
  • number_of_times_to_upsample:放大图像次数,值越高越容易检测到小人脸,但速度变慢。
  • model:检测模型,可选 "hog"(CPU快,精度中)或 "cnn"(GPU快,精度高)。

1. 修改服务端,添加参数

在服务节点中声明并读取参数,动态传给 face_recognition.face_locations

python
import rclpy
from rclpy.node import Node
from part4_interfaces.srv import FaceDetect
from sensor_msgs.msg import Image
from cv_bridge import CvBridge, CvBridgeError
import face_recognition
import time


class FaceDetectService(Node):
    def __init__(self):
        super().__init__('face_detect_service')
        self.bridge = CvBridge()

        # 声明参数
        self.declare_parameter('model', 'hog')  # 模型: hog 或 cnn
        self.declare_parameter('upsample_times', 1)  # 放大次数

        # 创建服务
        self.srv = self.create_service(FaceDetect, 'face_detect', self.detect_callback)
        self.get_logger().info("人脸检测服务已启动,等待请求...")

    def detect_callback(self, request, response):
        """服务回调函数:接收图片并返回检测结果"""
        start = time.time()

        # 检查 Image 消息是否有效
        if (request.image is None or
            request.image.data is None or len(request.image.data) == 0 or
            request.image.width == 0 or request.image.height == 0 or
            request.image.encoding == ""):
            self.get_logger().error("收到无效的图片请求!")
            response.number = 0
            response.use_time = 0.0
            response.top = []
            response.right = []
            response.bottom = []
            response.left = []
            return response

        try:
            # 将 ROS Image 转换为 OpenCV 图像
            cv_image = self.bridge.imgmsg_to_cv2(request.image)

            if cv_image is None or cv_image.size == 0:
                self.get_logger().error("CvBridge 转换后的图像为空!")
                response.number = 0
                response.use_time = 0.0
                response.top = []
                response.right = []
                response.bottom = []
                response.left = []
                return response

            # 获取参数
            model = self.get_parameter('model').get_parameter_value().string_value
            upsample_times = self.get_parameter('upsample_times').get_parameter_value().integer_value

            # 人脸检测
            face_locations = face_recognition.face_locations(
                cv_image,
                number_of_times_to_upsample=upsample_times,
                model=model
            )
            elapsed = time.time() - start

            # 填充响应
            response.number = len(face_locations)
            response.use_time = float(elapsed)
            response.top = [loc[0] for loc in face_locations]
            response.right = [loc[1] for loc in face_locations]
            response.bottom = [loc[2] for loc in face_locations]
            response.left = [loc[3] for loc in face_locations]

            self.get_logger().info(
                f"检测到 {response.number} 张人脸, 模型: {model}, upsample: {upsample_times}, 用时 {elapsed:.3f}s"
            )

        except CvBridgeError as e:
            self.get_logger().error(f"CvBridge 转换错误: {e}")
            response.number = 0
            response.use_time = 0.0
            response.top = []
            response.right = []
            response.bottom = []
            response.left = []

        except Exception as e:
            self.get_logger().error(f"人脸检测异常: {e}")
            response.number = 0
            response.use_time = 0.0
            response.top = []
            response.right = []
            response.bottom = []
            response.left = []

        return response


def main(args=None):
    rclpy.init(args=args)
    node = FaceDetectService()
    rclpy.spin(node)
    rclpy.shutdown()


if __name__ == '__main__':
    main()

由上述代码可以看到:

在节点初始化时,使用 self.declare_parameter 声明参数名和默认值:

python
self.declare_parameter('model', 'hog')        # 默认使用 hog 模型
self.declare_parameter('upsample_times', 1)   # 默认放大次数为 1

在服务回调或其他函数中,通过self.get_parameter()获取参数值:

python
model = self.get_parameter('model').get_parameter_value().string_value
upsample_times = self.get_parameter('upsample_times').get_parameter_value().integer_value

这里用 .string_value 和 .integer_value 分别取出字符串和整数。

获取参数值还可以使用服务自带的参数设置回调,可以使用 add_on_set_parameters_callback 来注册一个回调函数,如下:

python
from rcl_interfaces.msg import SetParametersResult

class FaceDetectService(Node):
    def __init__(self):
        super().__init__('face_detect_service')
        self.bridge = CvBridge()

        # 声明参数
        self.declare_parameter('model', 'hog')
        self.declare_parameter('upsample_times', 1)

        # 添加参数回调
        self.add_on_set_parameters_callback(self.param_callback)

        self.srv = self.create_service(FaceDetect, 'face_detect', self.detect_callback)
        self.get_logger().info("人脸检测服务已启动,等待请求...")

    def param_callback(self, params):
        """参数更新回调"""
        for param in params:
            if param.name == "model":
                if param.value not in ["hog", "cnn"]:
                    self.get_logger().warn(f"模型参数非法: {param.value},使用默认值 hog")
                    return SetParametersResult(successful=False)
            if param.name == "upsample_times":
                if param.value < 0:
                    self.get_logger().warn("放大次数不能小于0")
                    return SetParametersResult(successful=False)

        self.get_logger().info(f"参数更新成功: {[f'{p.name}={p.value}' for p in params]}")
        return SetParametersResult(successful=True)

注意修改完代码重新编译项目

编译运行:

bash
cd ~/ros2_ws
colcon build
source install/setup.bash

启动服务端:

bash
ros2 run demo_python_service face_detect_service

2. 通过命令使用参数

查看参数:

ros2 param list

可以看到我们声明的两个参数就出来了(注:use_sim_time 是 ROS2 自动加的内置参数,每个节点都会有):

查看参数值:

ros2 param get /face_detect_service model
ros2 param get /face_detect_service upsample_times

动态修改参数:

ros2 param set /face_detect_service model cnn
ros2 param set /face_detect_service upsample_times 2

启动服务指定参数:

我们可以在启动节点时,通过 --ros-args -p 选项直接设置参数,而不用运行后再 ros2 param set

bash
ros2 run face_detect_pkg face_detect_service \
  --ros-args \
  -p model:=cnn \
  -p upsample_times:=2

或者使用参数文件:

# face_detect_params.yaml
face_detect_service:
  ros__parameters:
    model: "cnn"
    upsample_times: 2

加载:

ros2 run face_detect_pkg face_detect_service --ros-args --params-file face_detect_params.yaml

3. 在客户端请求服务进行参数设置

通过调用远程节点的参数服务 /face_detect_service/set_parameters,可以动态设置人脸检测的 model upsample_times参数,而无需重启服务端或修改代码。

python
import rclpy
from rclpy.node import Node
from rcl_interfaces.srv import SetParameters
from rcl_interfaces.msg import Parameter, ParameterValue
from part4_interfaces.srv import FaceDetect
from sensor_msgs.msg import Image
from cv_bridge import CvBridge
import os
import cv2
from ament_index_python.packages import get_package_share_directory

class FaceDetectClient(Node):
    def __init__(self):
        super().__init__('face_detect_client')
        self.cli = self.create_client(FaceDetect, 'face_detect')
        self.bridge = CvBridge()

        # 等待人脸检测服务
        while not self.cli.wait_for_service(timeout_sec=1.0):
            self.get_logger().info('人脸检测服务不可用,等待中...')

        # 创建远程参数服务客户端
        self.param_cli = self.create_client(SetParameters, '/face_detect_service/set_parameters')
        while not self.param_cli.wait_for_service(timeout_sec=1.0):
            self.get_logger().info('等待远程参数服务...')

    def set_remote_parameters(self, model="hog", upsample_times=1):
        """通过服务调用修改远程节点参数"""
        param_list = []

        # model 参数
        p_model = Parameter()
        p_model.name = 'model'
        p_model.value = ParameterValue()
        p_model.value.type = 4   # STRING
        p_model.value.string_value = model
        param_list.append(p_model)

        # upsample_times 参数
        p_up = Parameter()
        p_up.name = 'upsample_times'
        p_up.value = ParameterValue()
        p_up.value.type = 2      # INTEGER
        p_up.value.integer_value = upsample_times
        param_list.append(p_up)

        # 调用远程参数服务
        req = SetParameters.Request()
        req.parameters = param_list
        future = self.param_cli.call_async(req)
        rclpy.spin_until_future_complete(self, future)
        result = future.result()
        # result.results 是一个列表,对应每个参数设置的结果
        if all(r.successful for r in result.results):
            self.get_logger().info("参数设置成功")
        else:
            self.get_logger().error("参数设置失败")

    def send_request(self, image_path):
        """发送人脸检测请求"""
        if not os.path.exists(image_path):
            self.get_logger().error(f"图片路径不存在: {image_path}")
            return None

        cv_image = cv2.imread(image_path)
        if cv_image is None:
            self.get_logger().error(f"无法读取图片: {image_path}")
            return None

        ros_image = self.bridge.cv2_to_imgmsg(cv_image, encoding='bgr8')
        req = FaceDetect.Request()
        req.image = ros_image
        future = self.cli.call_async(req)
        rclpy.spin_until_future_complete(self, future)
        return future.result()

def main(args=None):
    rclpy.init(args=args)
    node = FaceDetectClient()

    node.set_remote_parameters(model="hog", upsample_times=2)

    package_share_dir = get_package_share_directory('demo_python_service')
    image_path = os.path.join(package_share_dir, 'resource', 'zidane.jpg')
    response = node.send_request(image_path)

    if response:
        node.get_logger().info(f'检测到 {response.number} 张人脸,用时 {response.use_time:.3f}s')
        cv_image = cv2.imread(image_path)
        for top, right, bottom, left in zip(response.top, response.right, response.bottom, response.left):
            cv2.rectangle(cv_image, (left, top), (right, bottom), (0, 255, 0), 2)
        cv2.imshow("Face Detection Result", cv_image)
        cv2.waitKey(0)
        cv2.destroyAllWindows()

    node.destroy_node()
    rclpy.shutdown()

if __name__ == '__main__':
    main()

编译运行:

bash
cd ~/ros2_ws
colcon build
source install/setup.bash

启动客户端:

ros2 run demo_python_service face_detect_client

效果如下:

五、在C++节点中使用参数

略,可参看: https://www.bilibili.com/video/BV19k4UeXEvZ

六、补充:使用Launch启动脚本

在 ROS2 中,推荐使用 Launch 文件 来启动节点并传递参数,这样更方便管理复杂系统,也支持同时启动多个节点和配置参数文件。以第三部分为例,我们可以用 Launch 文件 同时启动 turtlesim 节点、服务端节点和客户端节点,并且在 Launch 文件里传递参数或者设置输出。下面给出完整示例。

1.在包 turtle_patrol_cpp 下创建 Launch 文件

launch/ 目录下新建 turtle_patrol_launch.py

python
from launch import LaunchDescription
from launch_ros.actions import Node

def generate_launch_description():
    return LaunchDescription([
        # 启动 turtlesim 节点
        Node(
            package='turtlesim',
            executable='turtlesim_node',
            name='turtlesim',
            output='screen'
        ),

        # 启动服务端节点
        Node(
            package='turtle_patrol_cpp',
            executable='turtle_patrol_service',
            name='turtle_patrol_service',
            output='screen'
        ),

        # 启动客户端节点
        Node(
            package='turtle_patrol_cpp',
            executable='turtle_patrol_client',
            name='turtle_patrol_client',
            output='screen'
        )
    ])

这里我们的服务没有参数,如果服务有参数,还可以这样写:

python
	# 启动服务端节点
	Node(
		package='turtle_patrol_cpp',
		executable='turtle_patrol_service',
		name='turtle_patrol_service',
		output='screen',
		parameters=[{
			'k': 1.0,
			'max_speed': 1.0
		}]
	),

修改CMakeLists.txt添加安装 Launch 文件:

# 安装 Launch 文件
install(DIRECTORY launch
  DESTINATION share/${PROJECT_NAME}
)

2.通过launch启动

在终端中执行:

bash
ros2 launch turtle_patrol_cpp turtle_patrol_launch.py

启动后:

  1. turtlesim_node 打开窗口显示小乌龟。
  2. turtle_patrol_service 启动服务端,监听客户端请求。
  3. turtle_patrol_client 每隔 5 秒生成随机目标点并调用服务,控制小乌龟巡逻。

3. launch进阶

在 ROS2 中,Launch 文件功能非常强大,除了同时启动多个节点,还能做以下操作:

3.1 传递参数文件

可以将参数配置写入 YAML 文件,并在 Launch 文件中加载到节点。

config/params.yaml

turtle_patrol_service:
  ros__parameters:
    k: 1.5
    max_speed: 2.0

Launch 文件加载参数:

python
from launch import LaunchDescription
from launch_ros.actions import Node
from ament_index_python.packages import get_package_share_directory
import os

def generate_launch_description():
    pkg_share = get_package_share_directory('turtle_patrol_cpp')
    param_file = os.path.join(pkg_share, 'config', 'params.yaml')

    return LaunchDescription([
        Node(
            package='turtlesim',
            executable='turtlesim_node',
            name='turtlesim',
            output='screen'
        ),
        Node(
            package='turtle_patrol_cpp',
            executable='turtle_patrol_service',
            name='turtle_patrol_service',
            output='screen',
            parameters=[param_file]
        ),
        Node(
            package='turtle_patrol_cpp',
            executable='turtle_patrol_client',
            name='turtle_patrol_client',
            output='screen'
        )
    ])

3.2 使用 Launch Arguments

通过 Launch 参数动态控制节点启动行为或配置参数。

python
from launch import LaunchDescription
from launch_ros.actions import Node
from launch.actions import DeclareLaunchArgument
from launch.substitutions import LaunchConfiguration

def generate_launch_description():
    k_arg = DeclareLaunchArgument('k', default_value='1.0', description='增益参数')
    max_speed_arg = DeclareLaunchArgument('max_speed', default_value='1.0', description='最大速度')

    return LaunchDescription([
        k_arg,
        max_speed_arg,
        Node(
            package='turtle_patrol_cpp',
            executable='turtle_patrol_service',
            name='turtle_patrol_service',
            output='screen',
            parameters=[{
                'k': LaunchConfiguration('k'),
                'max_speed': LaunchConfiguration('max_speed')
            }]
        )
    ])

3.3 条件启动节点

根据布尔值或条件判断决定是否启动某个节点。

python
from launch.conditions import IfCondition
from launch.substitutions import LaunchConfiguration

Node(
    package='turtle_patrol_cpp',
    executable='turtle_patrol_client',
    name='turtle_patrol_client',
    output='screen',
    condition=IfCondition(LaunchConfiguration('enable_client'))
)

运行时通过参数控制是否启动:

ros2 launch turtle_patrol_cpp turtle_patrol_launch.py enable_client:=true

3.4 定时器(TimerAction)

延迟启动节点或动作,控制启动顺序或等待依赖节点初始化。TimerAction 可以让你延迟启动某个节点或动作。比如想等 turtlesim 完全启动后再启动客户端:

python
from launch.actions import TimerAction
from launch_ros.actions import Node

client_node = Node(
    package='turtle_patrol_cpp',
    executable='turtle_patrol_client',
    name='turtle_patrol_client',
    output='screen'
)

delayed_client_node = TimerAction(
    period=3.0,  # 延迟 3 秒启动
    actions=[client_node]
)

3.5 包含其他 Launch 文件(IncludeLaunchDescription)

在一个 Launch 文件中调用其他 Launch 文件,实现模块化管理和统一调度。如果系统很大,建议把节点拆到多个 Launch 文件里,再在主 Launch 文件中统一调用。示例:

python
from launch.actions import IncludeLaunchDescription
from launch.launch_description_sources import PythonLaunchDescriptionSource
from ament_index_python.packages import get_package_share_directory
import os

include_turtle_launch = IncludeLaunchDescription(
    PythonLaunchDescriptionSource(
        os.path.join(get_package_share_directory('turtlesim'), 'launch', 'turtlesim.launch.py')
    )
)

include_service_launch = IncludeLaunchDescription(
    PythonLaunchDescriptionSource(
        os.path.join(get_package_share_directory('turtle_patrol_cpp'), 'launch', 'service_launch.py')
    )
)

ld = LaunchDescription([
    include_turtle_launch,
    include_service_launch
])