12 ROS2 实践课-动作通信节点开发与调试
ROS2 动作通信节点开发与调试
关联:索引
- 场景提问:机械臂执行一条分拣路径要 15 秒,你如何知道它执行到哪一步?出现风险时你如何让它停下?
1. action 文件的基本结构
# Goal(目标)
<字段类型> <字段名>
...
---
# Result(结果)
<字段类型> <字段名>
...
---
# Feedback(反馈)
<字段类型> <字段名>
...
- goal 要可执行:必须包含足够信息让服务端开始任务(目标点/路径/速度/超时/任务标识)。
- feedback 要可监控:能表达“进度/阶段/当前动作状态/异常征兆”,不要只回一串文本。
- 取消语义要一致:取消后 result 要能体现“已取消”而不是“成功但停了”。
动作名建议:/sorting/arm/execute_path
# Goal
string task_id
string arm_id
string path_id
float32 speed
float32 timeout_sec
---
# Result
bool ok
int32 error_code
string message
string final_state
---
# Feedback
float32 progress
string current_step
string safety_state
1. 创建工作空间与接口包(推荐接口独立成包)
source /opt/ros/humble/setup.bash
mkdir -p ~/ros2_ws/src
cd ~/ros2_ws/src
ros2 pkg create sorting_interfaces --build-type ament_cmake
# 放自定义 action 的目录
mkdir -p sorting_interfaces/action
说明:接口包选用 ament_cmake 是 ROS2 里生成 .srv/.msg/.action 的常见做法,不代表必须写 C++;动作服务器/客户端节点依然可以用 ament_python 开发。
建议目录:
~/ros2_ws/src/sorting_interfaces/action/ArmExecutePath.action
# Goal
string task_id
string arm_id
string path_id
float32 speed
float32 timeout_sec
---
# Result
bool ok
int32 error_code
string message
string final_state
---
# Feedback
float32 progress
string current_step
string safety_state
package.xml:声明接口生成相关依赖(rosidl_default_generators / rosidl_default_runtime)。CMakeLists.txt:调用rosidl_generate_interfaces(...)并安装导出依赖。
package.xml(接口包最小可用模板,可直接替换包名/描述/维护者信息):
<?xml version="1.0"?>
<package format="3">
<name>sorting_interfaces</name>
<version>0.0.0</version>
<description>Interface definitions for sorting robot</description>
<maintainer email="[email protected]">teacher</maintainer>
<license>Apache-2.0</license>
<buildtool_depend>ament_cmake</buildtool_depend>
<build_depend>rosidl_default_generators</build_depend>
<exec_depend>rosidl_default_runtime</exec_depend>
<member_of_group>rosidl_interface_packages</member_of_group>
<export>
<build_type>ament_cmake</build_type>
</export>
</package>
CMakeLists.txt(接口包最小可用模板,把 action 文件名填进去):
cmake_minimum_required(VERSION 3.8)
project(sorting_interfaces)
find_package(ament_cmake REQUIRED)
find_package(rosidl_default_generators REQUIRED)
find_package(action_msgs REQUIRED)
set(action_files
"action/ArmExecutePath.action"
)
rosidl_generate_interfaces(${PROJECT_NAME}
${action_files}
DEPENDENCIES action_msgs
)
ament_export_dependencies(rosidl_default_runtime)
ament_package()
扩展提示:如果 action 里使用了 builtin_interfaces/Time 等外部类型,需要在 package.xml 加 <depend>builtin_interfaces</depend>,并在 CMakeLists.txt 里 find_package(builtin_interfaces REQUIRED),同时在 rosidl_generate_interfaces(...) 里增加 DEPENDENCIES builtin_interfaces。
3. 编译与环境刷新
cd ~/ros2_ws
colcon build --symlink-install
source /opt/ros/humble/setup.bash
source install/setup.bash
4. 生成结果验证(必须能用命令看到接口)
查看接口是否已注册:
ros2 interface list | grep sorting_interfaces
查看 action 具体字段:
ros2 interface show sorting_interfaces/action/ArmExecutePath
5. 小组产出(当堂提交)
- 一张“action 接口设计卡”(动作名 + goal/result/feedback 字段 + 每个字段的业务含义)
- 快问快答:动作通信相比服务通信,多了哪两类关键信息?
1. 创建动作功能包(Python 示例)
source /opt/ros/humble/setup.bash
cd ~/ros2_ws/src
ros2 pkg create sorting_action_practice --build-type ament_python --dependencies rclpy sorting_interfaces
建议文件路径:
~/ros2_ws/src/sorting_action_practice/sorting_action_practice/execute_path_action_server.py~/ros2_ws/src/sorting_action_practice/sorting_action_practice/execute_path_action_client.py
入口点配置(setup.py)示例结构:
entry_points={
'console_scripts': [
'execute_path_action_server = sorting_action_practice.execute_path_action_server:main',
'execute_path_action_client = sorting_action_practice.execute_path_action_client:main',
],
},
实现代码(可直接复制):
动作服务器:execute_path_action_server.py
import time
import rclpy
from rclpy.action import ActionServer
from rclpy.action import CancelResponse
from rclpy.action import GoalResponse
from rclpy.node import Node
from sorting_interfaces.action import ArmExecutePath
class ExecutePathActionServer(Node):
def __init__(self) -> None:
super().__init__('execute_path_action_server')
self._action_server = ActionServer(
self,
ArmExecutePath,
'/sorting/arm/execute_path',
execute_callback=self.execute_callback,
goal_callback=self.goal_callback,
cancel_callback=self.cancel_callback,
)
def goal_callback(self, goal_request: ArmExecutePath.Goal) -> GoalResponse:
if not goal_request.task_id or not goal_request.arm_id:
return GoalResponse.REJECT
if goal_request.timeout_sec <= 0.0:
return GoalResponse.REJECT
return GoalResponse.ACCEPT
def cancel_callback(self, goal_handle) -> CancelResponse:
return CancelResponse.ACCEPT
def execute_callback(self, goal_handle):
feedback = ArmExecutePath.Feedback()
total_steps = 20
start_time = time.time()
for i in range(total_steps):
if goal_handle.is_cancel_requested:
goal_handle.canceled()
result = ArmExecutePath.Result()
result.ok = False
result.error_code = 2002
result.message = 'canceled by request'
result.final_state = 'CANCELED'
return result
elapsed = time.time() - start_time
if elapsed > float(goal_handle.request.timeout_sec):
goal_handle.abort()
result = ArmExecutePath.Result()
result.ok = False
result.error_code = 2003
result.message = 'timeout on server'
result.final_state = 'TIMEOUT'
return result
feedback.progress = float(i + 1) / float(total_steps)
feedback.current_step = f'step_{i + 1}'
feedback.safety_state = 'OK'
goal_handle.publish_feedback(feedback)
time.sleep(0.2)
goal_handle.succeed()
result = ArmExecutePath.Result()
result.ok = True
result.error_code = 0
result.message = 'completed'
result.final_state = 'SUCCEEDED'
return result
def main(args=None) -> None:
rclpy.init(args=args)
node = ExecutePathActionServer()
try:
rclpy.spin(node)
finally:
node.destroy_node()
rclpy.shutdown()
动作客户端:execute_path_action_client.py
import rclpy
from rclpy.action import ActionClient
from rclpy.node import Node
from sorting_interfaces.action import ArmExecutePath
class ExecutePathActionClient(Node):
def __init__(self) -> None:
super().__init__('execute_path_action_client')
self._client = ActionClient(self, ArmExecutePath, '/sorting/arm/execute_path')
def feedback_callback(self, feedback_msg) -> None:
feedback: ArmExecutePath.Feedback = feedback_msg.feedback
self.get_logger().info(
f'feedback: progress={feedback.progress:.2f}, step={feedback.current_step}, safety={feedback.safety_state}'
)
def run(self) -> None:
if not self._client.wait_for_server(timeout_sec=3.0):
self.get_logger().error('action server not available: /sorting/arm/execute_path')
return
goal = ArmExecutePath.Goal()
goal.task_id = 't001'
goal.arm_id = 'arm_1'
goal.path_id = 'p01'
goal.speed = 0.5
goal.timeout_sec = 10.0
send_goal_future = self._client.send_goal_async(goal, feedback_callback=self.feedback_callback)
rclpy.spin_until_future_complete(self, send_goal_future, timeout_sec=3.0)
if not send_goal_future.done() or send_goal_future.exception() is not None:
self.get_logger().error('failed to send goal')
return
goal_handle = send_goal_future.result()
if not goal_handle.accepted:
self.get_logger().error('goal rejected')
return
self.get_logger().info('goal accepted')
get_result_future = goal_handle.get_result_async()
rclpy.spin_until_future_complete(self, get_result_future, timeout_sec=15.0)
if not get_result_future.done() or get_result_future.exception() is not None:
self.get_logger().error('failed to get result')
return
result = get_result_future.result().result
self.get_logger().info(
f'result: ok={result.ok}, error_code={result.error_code}, final_state={result.final_state}, message={result.message}'
)
def main(args=None) -> None:
rclpy.init(args=args)
node = ExecutePathActionClient()
try:
node.run()
finally:
node.destroy_node()
rclpy.shutdown()
- Server:收到 goal 后要“明确接受/拒绝”;执行过程中要周期发布 feedback;结束时返回 result;需要处理 cancel 请求并给出一致的 result。
- 反馈与取消:一定要用业务语言定义“什么时候发反馈、何时允许取消、取消后结果应该是什么”。
目标:在“路径执行”动作跑通后,做一个更贴近生产的“抓取任务”动作,体会阶段反馈与取消的价值。
- 任务建议:新增 action(如
ArmPick.action),goal 包含target_id/pick_pose/place_pose/timeout_sec,feedback 包含progress/current_step/gripper_state,result 包含ok/error_code/message/final_state。 - 提交物:命令清单 + 关键输出截图/粘贴 + 字段解释卡 + 取消语义说明。
1. 编译与运行(先服务端后客户端)
cd ~/ros2_ws
colcon build --symlink-install
# 每个新终端都必须 source,否则看不到接口与可执行入口
source /opt/ros/humble/setup.bash
source install/setup.bash
终端 A(启动动作服务器):
ros2 run sorting_action_practice execute_path_action_server
终端 B(查看动作列表与类型):
ros2 action list
ros2 action info /sorting/arm/execute_path
ros2 interface show sorting_interfaces/action/ArmExecutePath
成功调用示例(期望:Goal accepted,持续看到 feedback,最终 result=SUCCEEDED):
ros2 action send_goal /sorting/arm/execute_path sorting_interfaces/action/ArmExecutePath "{task_id: 't001', arm_id: 'arm_1', path_id: 'p01', speed: 0.5, timeout_sec: 10.0}" --feedback
失败调用示例(Goal 被拒绝,期望:Goal rejected;理由:timeout_sec<=0):
ros2 action send_goal /sorting/arm/execute_path sorting_interfaces/action/ArmExecutePath "{task_id: 't002', arm_id: 'arm_1', path_id: 'p01', speed: 0.5, timeout_sec: 0.0}" --feedback
失败调用示例(服务端超时中止,期望:result=TIMEOUT):
ros2 action send_goal /sorting/arm/execute_path sorting_interfaces/action/ArmExecutePath "{task_id: 't003', arm_id: 'arm_1', path_id: 'p01', speed: 0.5, timeout_sec: 0.1}" --feedback
- 目标是否被接收(Goal accepted/开始执行的日志证据)
- 是否持续收到 feedback(频率合理、字段有意义)
- 是否返回 result(ok/error_code/message 能解释结果)
先发送一个会执行更久的目标(例如把 timeout_sec 设得更大),然后在另一个终端取消:
ros2 action list
ros2 action cancel /sorting/arm/execute_path
1. 接口定义/生成相关(编译时报错)
- 现象:
ros2 interface show找不到自定义 action - 排查顺序:
- 确认
action/*.action文件格式正确(必须有两条---分隔) - 确认
colcon build无报错 - 确认每个终端执行过
source install/setup.bash
2. Action Server 不可用(运行时最常见)
- 现象:客户端日志提示 action server not available;或
ros2 action send_goal直接失败 - 排查顺序:
ros2 node list确认服务端节点是否存在ros2 action list确认动作名是否出现(应包含/sorting/arm/execute_path)ros2 action info /sorting/arm/execute_path确认类型是否为sorting_interfaces/action/ArmExecutePath- 确认两端
ROS_DOMAIN_ID一致,且都 source 了同一个工作空间 install
2. 反馈无数据
- 现象:
ros2 action send_goal ... --feedback没有任何 feedback 输出 - 排查顺序:
- 确认服务端确实在执行循环里发布 feedback(不是只算完一次性返回)
- 确认 feedback 发布频率与执行节奏匹配(不要 sleep 太久或根本没 publish)
- 确认客户端/CLI 使用了
--feedback,且动作名/类型一致
3. 任务取消失败
- 现象:
ros2 action cancel显示请求已发出,但任务仍继续执行或 result 不体现取消 - 排查顺序:
- 服务端是否实现 cancel 回调与取消分支(检查是否判断 cancel 请求并退出执行循环)
- 取消后是否返回一致的 result(final_state/error_code/message)
- 执行逻辑是否存在不可中断的阻塞段(长时间计算/外部调用未设置超时)
- AI 可以用来生成 action 模板、节点骨架、分析报错,但必须提交:
- 你向 AI 提问的关键提示词
- AI 的关键输出(保留原文)
- 你的人工审计与修改点(至少 3 条)
- 你用 ros2 CLI 给出的验证证据(命令 + 关键输出)
提示词模板(可直接复制):
你是 ROS2 Humble 的动作通信开发与调试专家。请基于以下需求输出可运行的实现与自测方法。
需求:机械臂分拣任务动作(长时任务)
- 接口规范:请先用 5 条要点总结 action 接口设计规范(goal/feedback/result/取消语义/验收证据)
- 自定义 action:请给出 .action 内容(字段要可追溯、可验收)
- Python rclpy:请给出动作服务器与客户端完整代码(关键逻辑处添加简短注释:发布反馈、处理取消、结果字段含义)
- 约束:必须周期发布 feedback;必须支持 cancel;必须区分 success/failed/canceled 三类 result
- 自测:给出 ros2 interface show、ros2 action list/info/send_goal/cancel 的命令与期望关键输出(至少包含 1 次成功与 1 次取消)
- 排错:如果出现“反馈无数据 / 任务取消失败 / colcon 编译失败”,请按“现象→原因→定位命令→修复步骤→复验命令”输出
注意:不要假设任何文件已经存在;请给出推荐的包划分与文件路径(interfaces 包与节点包);涉及依赖必须在 package.xml / CMakeLists.txt 或 setup.py 中体现;不要编造不存在的依赖/包名(例如把 action API 当作独立的 Python 依赖包)。