10 ROS2 服务与动作通信原理与选型

ROS2 服务与动作通信原理与选型

关联:索引

  1. 场景提问:你对分拣设备下发“启动”指令,设备真的启动了吗?如果没启动,你怎么第一时间知道?

1. 服务通信解决什么问题

2. 请求/响应交互逻辑与关键概念

3. 工业控制指令中的安全性要求(课程思政融入点)

1. 业务拆解:把“设备控制”拆成服务接口

分拣设备控制类需求示例:

2. 请求/响应时序(统一画法)

Client                 Server
  | -- Request(cmd) -->  |
  | <--- Response(ok) -- |

讨论要点(用业务语言回答):

3. 典型代码结构(用于识别框架,不要求当堂跑通)

业务接口草案(自定义 .srv 的字段示例):

Service: /sorting/device_control
Request: command, value, device_id, request_id
Response: ok, message, device_state, error_code

工作空间与功能包创建(服务示例):

source /opt/ros/humble/setup.bash
mkdir -p ~/ros2_ws/src
cd ~/ros2_ws/src
ros2 pkg create sorting_service_demo --build-type ament_python --dependencies rclpy std_srvs

建议文件路径:

入口点配置(~/ros2_ws/src/sorting_service_demo/setup.py):

entry_points={
    'console_scripts': [
        'device_start_stop_service = sorting_service_demo.device_start_stop_service:main',
        'device_start_stop_client = sorting_service_demo.device_start_stop_client:main',
    ],
},

可运行示例:std_srvs/srv/SetBool(适合“启停”类指令,ROS2 Humble + Python3.10 可直接跑):

文件:device_start_stop_service.py

import rclpy
from rclpy.node import Node
from std_srvs.srv import SetBool

class DeviceStartStopService(Node):
    def __init__(self):
        super().__init__('device_start_stop_srv')
        self._srv = self.create_service(SetBool, '/sorting/device/start_stop', self.handle_request)

    def handle_request(self, request, response):
        if request.data:
            response.success = True
            response.message = 'started'
        else:
            response.success = True
            response.message = 'stopped'
        return response

def main(args=None):
    rclpy.init(args=args)
    node = DeviceStartStopService()
    rclpy.spin(node)
    node.destroy_node()
    rclpy.shutdown()

说明:SetBool 的 success 表示“请求是否被服务端成功处理”,不等同于“设备处于启动状态”。因此示例中 stop 分支也可返回 success=True,表示“已成功执行停止动作/或已处于停止状态”。

可运行示例:服务客户端(带超时与证据输出):

文件:device_start_stop_client.py

import rclpy
from rclpy.node import Node
from std_srvs.srv import SetBool

class DeviceStartStopClient(Node):
    def __init__(self):
        super().__init__('device_start_stop_cli')
        self._cli = self.create_client(SetBool, '/sorting/device/start_stop')

    def call_once(self, start: bool, timeout_sec: float = 2.0):
        if not self._cli.wait_for_service(timeout_sec=timeout_sec):
            self.get_logger().error('service not available')
            return

        req = SetBool.Request()
        req.data = start
        future = self._cli.call_async(req)
        rclpy.spin_until_future_complete(self, future, timeout_sec=timeout_sec)

        if future.done() and future.result() is not None:
            resp = future.result()
            self.get_logger().info(f'ok={resp.success}, msg={resp.message}')
        else:
            self.get_logger().error('service call timeout or failed')

def main(args=None):
    rclpy.init(args=args)
    node = DeviceStartStopClient()
    node.call_once(start=True)
    node.destroy_node()
    rclpy.shutdown()

编译与运行(服务示例):

cd ~/ros2_ws
colcon build --symlink-install
source /opt/ros/humble/setup.bash
source install/setup.bash

终端 A(先启动服务端):

ros2 run sorting_service_demo device_start_stop_service

终端 B(再运行客户端):

ros2 run sorting_service_demo device_start_stop_client

4. 小组产出(当堂提交)

  1. 快问快答:服务通信最关键的特点是什么?为什么“设备启动”更像服务而不是话题?

1. 动作通信解决什么问题

3. 分拣机械臂任务的建模示例

机械臂“抓取并放置”任务(动作)可拆字段:

1. 动作时序(统一画法)

Client                             Server
  | -- Send Goal ------------------> |
  | <--- Goal Accepted ------------- |
  | <--- Feedback (0..100%) -------  |
  | <--- Feedback (step updates) --  |
  | -- Cancel Goal (optional) -----> |
  | <--- Result (ok/canceled) -----  |

讨论要点(用业务语言回答):

2. 典型代码结构(用于识别框架,不要求当堂跑通)

业务接口草案(自定义 .action 的字段示例):

Action: /sorting/arm_pick_place
Goal: target_id, pick_pose, place_pose, speed, timeout_sec
Feedback: progress, current_step, gripper_state
Result: ok, final_pose, error_code, message

可运行示例:example_interfaces/action/Fibonacci(用于理解反馈与取消,ROS2 Humble + Python3.10 可直接跑):

工作空间与功能包创建(动作示例,可复用上面的工作空间):

source /opt/ros/humble/setup.bash
mkdir -p ~/ros2_ws/src
cd ~/ros2_ws/src
ros2 pkg create sorting_action_demo --build-type ament_python --dependencies rclpy example_interfaces

建议文件路径:

入口点配置(~/ros2_ws/src/sorting_action_demo/setup.py):

entry_points={
    'console_scripts': [
        'fibonacci_action_server = sorting_action_demo.fibonacci_action_server:main',
        'fibonacci_action_client = sorting_action_demo.fibonacci_action_client:main',
    ],
},

文件:fibonacci_action_server.py

import rclpy
from rclpy.node import Node
from rclpy.action import ActionServer
from example_interfaces.action import Fibonacci
import time

class FibonacciActionServer(Node):
    def __init__(self):
        super().__init__('fibonacci_action_server')
        self._action_server = ActionServer(self, Fibonacci, '/demo/fibonacci', self.execute_callback)

    def execute_callback(self, goal_handle):
        feedback = Fibonacci.Feedback()
        feedback.sequence = [0, 1]

        for i in range(1, goal_handle.request.order):
            if goal_handle.is_cancel_requested:
                goal_handle.canceled()
                result = Fibonacci.Result()
                result.sequence = feedback.sequence
                return result

            feedback.sequence.append(feedback.sequence[i] + feedback.sequence[i - 1])
            goal_handle.publish_feedback(feedback)
            time.sleep(0.2)

        goal_handle.succeed()
        result = Fibonacci.Result()
        result.sequence = feedback.sequence
        return result

def main(args=None):
    rclpy.init(args=args)
    node = FibonacciActionServer()
    rclpy.spin(node)
    node.destroy_node()
    rclpy.shutdown()

可运行示例:动作客户端(接收反馈,可扩展取消):

文件:fibonacci_action_client.py

import rclpy
from rclpy.node import Node
from rclpy.action import ActionClient
from example_interfaces.action import Fibonacci

class FibonacciActionClient(Node):
    def __init__(self):
        super().__init__('fibonacci_action_client')
        self._client = ActionClient(self, Fibonacci, '/demo/fibonacci')
        self._goal_handle = None

    def send_goal(self, order: int = 10):
        if not self._client.wait_for_server(timeout_sec=2.0):
            self.get_logger().error('action server not available')
            rclpy.shutdown()
            return

        goal = Fibonacci.Goal()
        goal.order = order
        send_future = self._client.send_goal_async(goal, feedback_callback=self.feedback_callback)
        send_future.add_done_callback(self.goal_response_callback)

    def feedback_callback(self, feedback_msg):
        self.get_logger().info(f'feedback: {feedback_msg.feedback.sequence}')

    def goal_response_callback(self, future):
        self._goal_handle = future.result()
        if not self._goal_handle.accepted:
            self.get_logger().error('goal rejected')
            rclpy.shutdown()
            return
        self.get_logger().info('goal accepted')
        result_future = self._goal_handle.get_result_async()
        result_future.add_done_callback(self.result_callback)

    def result_callback(self, future):
        result = future.result().result
        self.get_logger().info(f'result: {result.sequence}')
        rclpy.shutdown()

def main(args=None):
    rclpy.init(args=args)
    node = FibonacciActionClient()
    node.send_goal(order=10)
    rclpy.spin(node)
    node.destroy_node()

编译与运行(动作示例):

cd ~/ros2_ws
colcon build --symlink-install
source /opt/ros/humble/setup.bash
source install/setup.bash

终端 A(先启动动作服务端):

ros2 run sorting_action_demo fibonacci_action_server

终端 B(再运行动作客户端):

ros2 run sorting_action_demo fibonacci_action_client

3. 小组产出(当堂提交)

需求特征 话题 Topic 服务 Service 动作 Action
交互模式 持续数据流(发布/订阅) 一次性交互(请求/响应) 长时任务(目标/反馈/结果)
是否需要确认 不强制(可自行设计确认话题) 强(响应即确认载体) 强(goal 接受 + result)
是否需要过程反馈 不擅长(需另开话题) 一般不需要 原生支持(feedback)
是否支持取消 需自定义协议 可用“停止服务”模拟,但不自然 原生支持(cancel)
典型场景 传感器、状态、检测结果 启停、参数设置、一次性查询 机械臂执行、导航、规划执行
风险提示 丢消息/时序难控 阻塞/超时/并发处理 状态机复杂/资源占用/取消一致性

初学者快速决策(三问):

1)这是“持续数据流”吗?是 → Topic
2)不是持续流,是“一次性指令/查询”且必须确认吗?是 → Service
3)不是一次性,是“要跑一段时间”且要进度/可取消吗?是 → Action

补充提醒:

1. 业务拆解:AGV 导航到底在“交互”什么

结论:AGV 导航是典型的动作通信场景。

2. 在 ROS2 中怎么落到三种通信(以 Nav2 为例)

4. 与分拣业务协同:一个典型流程

1)分拣系统下发“去取货点”导航动作(Action)
2)导航过程中持续读取反馈(Feedback)判断是否需要重规划/人工介入
3)到站后再下发“对接/开门/开始上料”等一次性确认指令(Service)
4)全程用话题持续监控 AGV 位姿、速度、电量、急停等状态(Topic)

AI 提示词模板(用于生成可审计的回答):

你是 ROS2 Humble 的通信设计评审专家。请针对“分拣系统”完成通信模式选型并给出接口草案。
1)我的需求:<实时数据/控制指令/长时任务,任选或组合>
2)场景约束:<安全性要求、超时、并发、取消、证据输出>
3)请输出:
 - 选型结论(Topic/Service/Action)
 - 风险点(至少 3 条)与规避策略
 - 接口字段草案(request/response 或 goal/feedback/result)
 - 自测清单(如何证明它正确,给出命令 + 期望关键输出)
 - 明确假设与不确定点(不允许臆测;不确定就写“我假设…”)
回答必须按“需求→候选→风险→证据→结论”结构。

提示词模板使用示例(可直接复制给大模型):

你是 ROS2 Humble 的通信设计评审专家。请针对“分拣系统”完成通信模式选型并给出接口草案。
1)我的需求:
- AGV 从“待命区”导航到“上料站”(长时任务)
- 到站后控制上料机构“启动/停止”(控制指令)
- 全程需要实时监控 AGV 位姿、电量、急停状态(实时数据)
2)场景约束:
- 安全性:急停必须立即生效;任何控制指令必须有明确响应
- 可靠性:网络可能偶发丢包;不能靠“看终端打印”判断成功
- 可控性:导航过程中要能看到进度,并支持人工取消
- 证据输出:要求给出可复测的命令/日志作为验收证据
3)请输出:
- 选型结论(Topic/Service/Action)并说明每一类需求用哪一种
- 风险点(至少 3 条)与规避策略
- 接口字段草案:
  - 导航:goal/feedback/result 字段
  - 上料控制:request/response 字段
- 自测清单(如何证明它正确):给出具体 ros2 命令或日志检查项
回答必须按“需求→候选→风险→证据→结论”结构。