10 ROS2 服务与动作通信原理与选型
ROS2 服务与动作通信原理与选型
关联:索引
- 场景提问:你对分拣设备下发“启动”指令,设备真的启动了吗?如果没启动,你怎么第一时间知道?
1. 服务通信解决什么问题
- 目标:完成一次性、可确认的“问答式”交互(Request → Response)
- 核心价值:请求方得到“明确的返回结果”(成功/失败/原因/状态/数据)
- 典型适用:设备控制指令、参数设置、状态查询(需要确定性回应)
2. 请求/响应交互逻辑与关键概念
- 服务端(Server):提供服务能力,接收请求并返回响应
- 客户端(Client):发起请求并等待响应(通常以 future 形式等待;可以阻塞等,也可以用回调方式处理)
- 服务类型(.srv):定义请求与响应字段(接口约定,双方必须一致)
- 超时与重试:控制“等多久、失败怎么办”,避免一直卡住
3. 工业控制指令中的安全性要求(课程思政融入点)
- 指令必须“可确认”:不能只看“我发了”,必须确认“对方收了/做了/没做并给原因”
- 指令必须“可追溯”:请求内容、时间、响应结果要能复盘(责任闭环)
- 风险意识:控制指令丢失、重复、误执行会带来安全事故,工程上要有严格性
1. 业务拆解:把“设备控制”拆成服务接口
分拣设备控制类需求示例:
-
启动/停止:
start、stop -
参数调整:速度
speed、皮带阈值threshold -
故障复位:
reset_fault -
服务名:
/sorting/device_control -
请求字段:
command、value、device_id、request_id -
响应字段:
ok、message、device_state、error_code
2. 请求/响应时序(统一画法)
Client Server
| -- Request(cmd) --> |
| <--- Response(ok) -- |
讨论要点(用业务语言回答):
- 什么时候算“成功”:收到响应就算?还是设备状态改变才算?
- 失败怎么表达:用 ok=false + error_code?还是 message 文本?
- 如何避免重复执行:request_id 是否需要?服务端如何做“幂等”(重复请求不造成重复动作)?
3. 典型代码结构(用于识别框架,不要求当堂跑通)
业务接口草案(自定义 .srv 的字段示例):
Service: /sorting/device_control
Request: command, value, device_id, request_id
Response: ok, message, device_state, error_code
工作空间与功能包创建(服务示例):
source /opt/ros/humble/setup.bash
mkdir -p ~/ros2_ws/src
cd ~/ros2_ws/src
ros2 pkg create sorting_service_demo --build-type ament_python --dependencies rclpy std_srvs
建议文件路径:
~/ros2_ws/src/sorting_service_demo/sorting_service_demo/device_start_stop_service.py~/ros2_ws/src/sorting_service_demo/sorting_service_demo/device_start_stop_client.py
入口点配置(~/ros2_ws/src/sorting_service_demo/setup.py):
entry_points={
'console_scripts': [
'device_start_stop_service = sorting_service_demo.device_start_stop_service:main',
'device_start_stop_client = sorting_service_demo.device_start_stop_client:main',
],
},
可运行示例:std_srvs/srv/SetBool(适合“启停”类指令,ROS2 Humble + Python3.10 可直接跑):
文件:device_start_stop_service.py
import rclpy
from rclpy.node import Node
from std_srvs.srv import SetBool
class DeviceStartStopService(Node):
def __init__(self):
super().__init__('device_start_stop_srv')
self._srv = self.create_service(SetBool, '/sorting/device/start_stop', self.handle_request)
def handle_request(self, request, response):
if request.data:
response.success = True
response.message = 'started'
else:
response.success = True
response.message = 'stopped'
return response
def main(args=None):
rclpy.init(args=args)
node = DeviceStartStopService()
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
说明:SetBool 的 success 表示“请求是否被服务端成功处理”,不等同于“设备处于启动状态”。因此示例中 stop 分支也可返回 success=True,表示“已成功执行停止动作/或已处于停止状态”。
可运行示例:服务客户端(带超时与证据输出):
文件:device_start_stop_client.py
import rclpy
from rclpy.node import Node
from std_srvs.srv import SetBool
class DeviceStartStopClient(Node):
def __init__(self):
super().__init__('device_start_stop_cli')
self._cli = self.create_client(SetBool, '/sorting/device/start_stop')
def call_once(self, start: bool, timeout_sec: float = 2.0):
if not self._cli.wait_for_service(timeout_sec=timeout_sec):
self.get_logger().error('service not available')
return
req = SetBool.Request()
req.data = start
future = self._cli.call_async(req)
rclpy.spin_until_future_complete(self, future, timeout_sec=timeout_sec)
if future.done() and future.result() is not None:
resp = future.result()
self.get_logger().info(f'ok={resp.success}, msg={resp.message}')
else:
self.get_logger().error('service call timeout or failed')
def main(args=None):
rclpy.init(args=args)
node = DeviceStartStopClient()
node.call_once(start=True)
node.destroy_node()
rclpy.shutdown()
编译与运行(服务示例):
cd ~/ros2_ws
colcon build --symlink-install
source /opt/ros/humble/setup.bash
source install/setup.bash
终端 A(先启动服务端):
ros2 run sorting_service_demo device_start_stop_service
终端 B(再运行客户端):
ros2 run sorting_service_demo device_start_stop_client
4. 小组产出(当堂提交)
- 一张“服务接口设计卡”(服务名 + 请求字段 + 响应字段 + 成功/失败判据)
- 一段“安全性说明”(不少于 3 条,必须包含:超时/失败处理/可追溯)
- 快问快答:服务通信最关键的特点是什么?为什么“设备启动”更像服务而不是话题?
1. 动作通信解决什么问题
-
目标:管理长时间运行的任务(Goal → Feedback(可多次) → Result),并支持取消(Cancel)
-
核心价值:任务过程“可观察、可控制、可终止”,避免“发了就不管”的黑盒执行
-
典型适用:机械臂抓取、路径执行、视觉-规划-执行流水线等长时任务
-
动作服务端(Action Server):接收 goal,周期发布 feedback,完成后返回 result
-
动作客户端(Action Client):发送 goal,接收 feedback,必要时发 cancel
-
动作类型(.action):定义 goal/feedback/result 字段(接口约定,双方必须一致)
-
取消与状态:任务要有状态机思维(执行中/成功/失败/取消)
3. 分拣机械臂任务的建模示例
机械臂“抓取并放置”任务(动作)可拆字段:
- goal:
target_id、pick_pose、place_pose、speed、timeout_sec - feedback:
progress、current_step、gripper_state - result:
ok、final_pose、error_code、message
1. 动作时序(统一画法)
Client Server
| -- Send Goal ------------------> |
| <--- Goal Accepted ------------- |
| <--- Feedback (0..100%) ------- |
| <--- Feedback (step updates) -- |
| -- Cancel Goal (optional) -----> |
| <--- Result (ok/canceled) ----- |
讨论要点(用业务语言回答):
- 哪些情况下必须支持取消:安全停止、超时、上游任务终止、人工介入
- 反馈频率怎么选:过高浪费资源,过低无法监控风险
- 结果如何表示失败:区分“可重试失败”与“不可恢复失败”
2. 典型代码结构(用于识别框架,不要求当堂跑通)
业务接口草案(自定义 .action 的字段示例):
Action: /sorting/arm_pick_place
Goal: target_id, pick_pose, place_pose, speed, timeout_sec
Feedback: progress, current_step, gripper_state
Result: ok, final_pose, error_code, message
可运行示例:example_interfaces/action/Fibonacci(用于理解反馈与取消,ROS2 Humble + Python3.10 可直接跑):
工作空间与功能包创建(动作示例,可复用上面的工作空间):
source /opt/ros/humble/setup.bash
mkdir -p ~/ros2_ws/src
cd ~/ros2_ws/src
ros2 pkg create sorting_action_demo --build-type ament_python --dependencies rclpy example_interfaces
建议文件路径:
~/ros2_ws/src/sorting_action_demo/sorting_action_demo/fibonacci_action_server.py~/ros2_ws/src/sorting_action_demo/sorting_action_demo/fibonacci_action_client.py
入口点配置(~/ros2_ws/src/sorting_action_demo/setup.py):
entry_points={
'console_scripts': [
'fibonacci_action_server = sorting_action_demo.fibonacci_action_server:main',
'fibonacci_action_client = sorting_action_demo.fibonacci_action_client:main',
],
},
文件:fibonacci_action_server.py
import rclpy
from rclpy.node import Node
from rclpy.action import ActionServer
from example_interfaces.action import Fibonacci
import time
class FibonacciActionServer(Node):
def __init__(self):
super().__init__('fibonacci_action_server')
self._action_server = ActionServer(self, Fibonacci, '/demo/fibonacci', self.execute_callback)
def execute_callback(self, goal_handle):
feedback = Fibonacci.Feedback()
feedback.sequence = [0, 1]
for i in range(1, goal_handle.request.order):
if goal_handle.is_cancel_requested:
goal_handle.canceled()
result = Fibonacci.Result()
result.sequence = feedback.sequence
return result
feedback.sequence.append(feedback.sequence[i] + feedback.sequence[i - 1])
goal_handle.publish_feedback(feedback)
time.sleep(0.2)
goal_handle.succeed()
result = Fibonacci.Result()
result.sequence = feedback.sequence
return result
def main(args=None):
rclpy.init(args=args)
node = FibonacciActionServer()
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
可运行示例:动作客户端(接收反馈,可扩展取消):
文件:fibonacci_action_client.py
import rclpy
from rclpy.node import Node
from rclpy.action import ActionClient
from example_interfaces.action import Fibonacci
class FibonacciActionClient(Node):
def __init__(self):
super().__init__('fibonacci_action_client')
self._client = ActionClient(self, Fibonacci, '/demo/fibonacci')
self._goal_handle = None
def send_goal(self, order: int = 10):
if not self._client.wait_for_server(timeout_sec=2.0):
self.get_logger().error('action server not available')
rclpy.shutdown()
return
goal = Fibonacci.Goal()
goal.order = order
send_future = self._client.send_goal_async(goal, feedback_callback=self.feedback_callback)
send_future.add_done_callback(self.goal_response_callback)
def feedback_callback(self, feedback_msg):
self.get_logger().info(f'feedback: {feedback_msg.feedback.sequence}')
def goal_response_callback(self, future):
self._goal_handle = future.result()
if not self._goal_handle.accepted:
self.get_logger().error('goal rejected')
rclpy.shutdown()
return
self.get_logger().info('goal accepted')
result_future = self._goal_handle.get_result_async()
result_future.add_done_callback(self.result_callback)
def result_callback(self, future):
result = future.result().result
self.get_logger().info(f'result: {result.sequence}')
rclpy.shutdown()
def main(args=None):
rclpy.init(args=args)
node = FibonacciActionClient()
node.send_goal(order=10)
rclpy.spin(node)
node.destroy_node()
编译与运行(动作示例):
cd ~/ros2_ws
colcon build --symlink-install
source /opt/ros/humble/setup.bash
source install/setup.bash
终端 A(先启动动作服务端):
ros2 run sorting_action_demo fibonacci_action_server
终端 B(再运行动作客户端):
ros2 run sorting_action_demo fibonacci_action_client
3. 小组产出(当堂提交)
- 一张“动作接口设计卡”(动作名 + goal/feedback/result 字段 + 取消触发条件)
- 一段“可控性说明”(不少于 3 条,必须包含:反馈意义/取消策略/异常结果表达)
| 需求特征 | 话题 Topic | 服务 Service | 动作 Action |
|---|---|---|---|
| 交互模式 | 持续数据流(发布/订阅) | 一次性交互(请求/响应) | 长时任务(目标/反馈/结果) |
| 是否需要确认 | 不强制(可自行设计确认话题) | 强(响应即确认载体) | 强(goal 接受 + result) |
| 是否需要过程反馈 | 不擅长(需另开话题) | 一般不需要 | 原生支持(feedback) |
| 是否支持取消 | 需自定义协议 | 可用“停止服务”模拟,但不自然 | 原生支持(cancel) |
| 典型场景 | 传感器、状态、检测结果 | 启停、参数设置、一次性查询 | 机械臂执行、导航、规划执行 |
| 风险提示 | 丢消息/时序难控 | 阻塞/超时/并发处理 | 状态机复杂/资源占用/取消一致性 |
- “要持续流动的数据”优先话题;“要明确确认的一次性指令”优先服务;“要可监控可取消的长任务”优先动作。
初学者快速决策(三问):
1)这是“持续数据流”吗?是 → Topic
2)不是持续流,是“一次性指令/查询”且必须确认吗?是 → Service
3)不是一次性,是“要跑一段时间”且要进度/可取消吗?是 → Action
补充提醒:
- 自定义 .srv/.action 需要创建接口包并生成代码(本先以可运行的内置示例帮助理解机制)。
1. 业务拆解:AGV 导航到底在“交互”什么
- 下发目标:把车从 A 点开到 B 点(一次目标,但执行时间较长)
- 执行过程:需要随时知道“现在进度/是否卡住/是否绕路/是否需要人工介入”
- 任务中止:遇到急停、避障失败、路线封锁,需要立刻取消
结论:AGV 导航是典型的动作通信场景。
2. 在 ROS2 中怎么落到三种通信(以 Nav2 为例)
-
Action(长任务、可反馈、可取消):导航到目标点(常见动作名为
/navigate_to_pose,类型常见为nav2_msgs/action/NavigateToPose) -
Topic(持续数据流):定位与传感器状态(如
/tf、/odom、/scan、/amcl_pose、电量/急停状态等) -
Service(一次性确认指令):恢复/清图等“必须确认”的运维类操作(如清理代价地图、重置局部规划等,具体服务名随栈实现而变)
-
动作名:
/sorting/agv/navigate_to_station -
goal:
station_id、target_pose、timeout_sec、max_speed -
feedback:
distance_remaining、current_state、recovery_count -
result:
ok、final_pose、error_code、message -
取消触发:急停、超时、上游任务取消、避障失败超过阈值
4. 与分拣业务协同:一个典型流程
1)分拣系统下发“去取货点”导航动作(Action)
2)导航过程中持续读取反馈(Feedback)判断是否需要重规划/人工介入
3)到站后再下发“对接/开门/开始上料”等一次性确认指令(Service)
4)全程用话题持续监控 AGV 位姿、速度、电量、急停等状态(Topic)
- 代码抄袭与 AI 直接照搬的风险:责任不可追溯、漏洞不可控、接口不匹配、维护成本高
AI 提示词模板(用于生成可审计的回答):
你是 ROS2 Humble 的通信设计评审专家。请针对“分拣系统”完成通信模式选型并给出接口草案。
1)我的需求:<实时数据/控制指令/长时任务,任选或组合>
2)场景约束:<安全性要求、超时、并发、取消、证据输出>
3)请输出:
- 选型结论(Topic/Service/Action)
- 风险点(至少 3 条)与规避策略
- 接口字段草案(request/response 或 goal/feedback/result)
- 自测清单(如何证明它正确,给出命令 + 期望关键输出)
- 明确假设与不确定点(不允许臆测;不确定就写“我假设…”)
回答必须按“需求→候选→风险→证据→结论”结构。
提示词模板使用示例(可直接复制给大模型):
你是 ROS2 Humble 的通信设计评审专家。请针对“分拣系统”完成通信模式选型并给出接口草案。
1)我的需求:
- AGV 从“待命区”导航到“上料站”(长时任务)
- 到站后控制上料机构“启动/停止”(控制指令)
- 全程需要实时监控 AGV 位姿、电量、急停状态(实时数据)
2)场景约束:
- 安全性:急停必须立即生效;任何控制指令必须有明确响应
- 可靠性:网络可能偶发丢包;不能靠“看终端打印”判断成功
- 可控性:导航过程中要能看到进度,并支持人工取消
- 证据输出:要求给出可复测的命令/日志作为验收证据
3)请输出:
- 选型结论(Topic/Service/Action)并说明每一类需求用哪一种
- 风险点(至少 3 条)与规避策略
- 接口字段草案:
- 导航:goal/feedback/result 字段
- 上料控制:request/response 字段
- 自测清单(如何证明它正确):给出具体 ros2 命令或日志检查项
回答必须按“需求→候选→风险→证据→结论”结构。