13 ROS2 理论课-自定义接口设计规范与通信机制选型

ROS2 自定义接口设计规范与通信机制选型

关联:索引

  1. 场景提问:分拣系统里“检测结果”“分拣指令”“机械臂执行状态”分别属于哪类交互?为什么不能都用同一种机制?

  2. 项目场景(对照项目文档:西北勒苹果智能分拣)

3. 数据类型选择(务实规则)

4. 三类接口的“字段模板”(快速套用)

Topic(状态/数据流)推荐最小集合

# <Xxx>.msg
# 若与时间强相关,建议加 stamp;若与坐标相关,建议加 frame_id
# stamp:消息产生时刻(用于时间对齐/回放/排障),通常由发布端填当前 ROS 时间
builtin_interfaces/Time stamp
# task_id:链路关联 ID(检测→分配→规划→执行);建议由调度/上层生成并贯穿
string task_id
# frame_id:坐标系名称(如 map/base_link/camera_link);无空间含义可留空但要统一约定
string frame_id
# source_id:数据来源(传感器/节点/设备编号),便于追溯与多源融合
string source_id
# status:状态/质量码(必须配码表,例如 0=OK/1=STALE/2=INVALID),不要用自由字符串替代
uint8 status
# message:人类可读补充信息(可为空);不能替代结构化字段
string message

Service(短交互)推荐结构

# <Xxx>.srv
# Request
# request_id:一次调用的追溯 ID(建议 UUID/时间戳+序号),用于日志关联与幂等设计
string request_id
# task_id:业务任务 ID(可为空;若为空由服务端/上层生成后在 Response 回填)
string task_id
---
# Response
# ok:本次调用是否成功(仅表示“服务端已给出确定结论”,不等价于业务一定达成)
bool ok
# error_code:失败原因码(0 表示 OK;建议按模块分段 1xxx/2xxx/...),用于自动化处理与复盘
int32 error_code
# message:对 error_code 的文字解释 + 建议动作(可直接用于 UI/日志)
string message

Action(长任务)推荐结构

# <Xxx>.action
# Goal
# task_id:任务 ID(必须可追溯;多节点联动的主键)
string task_id
# timeout_sec:任务级超时(秒);到期后服务端应停止/取消并给出明确 final_state
float32 timeout_sec
---
# Result
# ok:任务最终是否成功(成功=达到业务验收条件;失败/取消/超时均为 false)
bool ok
# error_code:失败原因码(成功为 0;失败时必须可定位模块/原因)
int32 error_code
# message:对最终结果的解释(失败原因、取消原因、超时原因等)
string message
# final_state:最终状态码(建议对齐 action_msgs/msg/GoalStatus 的 STATUS_*,或在项目内固定码表)
uint8 final_state
---
# Feedback
# progress:进度(建议 0.0~1.0;若用百分比需说明单位),发布频率要控(避免刷屏)
float32 progress
# current_step:当前阶段(建议从有限集合取值,如 DETECTING/PLANNING/EXECUTING/RECOVERING)
string current_step
# safety_state:安全态码(配码表;用于上层策略:暂停/降速/取消)
uint8 safety_state

项目真实消息长什么样(摘自你的 ROS2 节点设计文档,方便对照理解)

# greensort_msgs/msg/DetectionResult.msg(项目中:检测结果 Topic /detection/result,30Hz)
std_msgs/Header header
string fruit_id
string grade
geometry_msgs/Point position
float32 confidence
int32 diameter_mm
float32 color_score
bool has_defect

6. 接口卡模板(当堂提交统一格式)

【接口卡】
接口包:ros2_comm_design_interfaces
通信名称:<topic/service/action 名称,例如 /sorting/arm/execute_path>
接口类型:msg / srv / action
接口名:<例如 ExecuteArmPath.action>

字段清单(逐项写业务含义与验收点):
- task_id:<用于追溯链路;如何生成/如何贯穿>
- stamp:<时间对齐/回放;是否必填>
- state/status:<码表定义(列出每个值含义)>
- ok/error_code/message:<成功/失败如何判断;失败如何定位模块>
- final_state(action):<最终状态码表与语义(注明是否对齐 action_msgs/msg/GoalStatus)>
- feedback(action):<progress/current_step/safety_state 的含义与发布频率>

验收证据(至少 2 条命令 + 关键输出期待):
1) ros2 interface show ...
2) ros2 topic echo / ros2 service call / ros2 action send_goal ... --feedback / ros2 action cancel ...

7. 学生最终产物(提交清单 + 评分点)

提交清单(建议四件套):

0. 创建接口包(命令)

# 以工作空间为例:~/ros2_ws
mkdir -p ~/ros2_ws/src
cd ~/ros2_ws/src

# 创建接口包(仅放 msg/srv/action,不放可执行节点)
ros2 pkg create ros2_comm_design_interfaces --build-type ament_cmake --license Apache-2.0

# 创建接口目录
cd ros2_comm_design_interfaces
mkdir -p msg srv action

1. 最小闭环:定义 → 生成 → 验证

  1. 在接口包中创建目录:msg/srv/action/
  2. 填写 package.xmlCMakeLists.txt(关键是 rosidl_default_generatorsrosidl_generate_interfaces(...))。
  3. colcon build 编译生成。
  4. 每个终端 source install/setup.bash 后,用命令验证接口可见:
# 列出所有已生成的接口,便于快速检索
ros2 interface list | grep <your_interfaces_pkg>
# 展示消息结构(字段名与类型)
ros2 interface show <your_interfaces_pkg>/msg/<MsgName>
# 展示服务结构(Request/Response 字段)
ros2 interface show <your_interfaces_pkg>/srv/<SrvName>
# 展示动作结构(Goal/Feedback/Result 字段)
ros2 interface show <your_interfaces_pkg>/action/<ActionName>

2. 依赖配置“你必须知道自己少了哪一行”

3. 最小可用模板(完整内容)

【package.xml】

<?xml version="1.0"?>
<package format="3">
  <!-- 接口包:仅包含 msg/srv/action 定义与生成配置,不含可执行节点 -->
  <name>ros2_comm_design_interfaces</name>
  <version>0.0.1</version>
  <description>Interfaces for the sorting system (msg/srv/action)</description>
  <maintainer email="[email protected]">Dev Team</maintainer>
  <license>Apache-2.0</license>

  <buildtool_depend>ament_cmake</buildtool_depend>

  <!-- 生成代码所需:rosidl 默认生成器 + 被引用的外部类型 -->
  <build_depend>rosidl_default_generators</build_depend>
  <depend>builtin_interfaces</depend>
  <depend>std_msgs</depend>
  <depend>geometry_msgs</depend>

  <!-- 运行期依赖:让生成的类型在运行时可用 -->
  <exec_depend>rosidl_default_runtime</exec_depend>

  <!-- 标记为 rosidl 接口包(规范建议) -->
  <member_of_group>rosidl_interface_packages</member_of_group>

  <export>
    <!-- 使用 ament_cmake 构建 -->
    <build_type>ament_cmake</build_type>
  </export>
  </package>

【CMakeLists.txt】

cmake_minimum_required(VERSION 3.8)
project(ros2_comm_design_interfaces)

# 构建工具与接口生成器
find_package(ament_cmake REQUIRED)
find_package(rosidl_default_generators REQUIRED)

# 被接口字段引用到的外部类型(必须声明)
find_package(builtin_interfaces REQUIRED)
find_package(std_msgs REQUIRED)
find_package(geometry_msgs REQUIRED)

# 声明需要生成的接口文件;DEPENDENCIES 必须覆盖到所有外部类型
rosidl_generate_interfaces(${PROJECT_NAME}
  "msg/DetectedItem.msg"
  "msg/DetectedItemArray.msg"
  "msg/TaskStatus.msg"
  "msg/SafetyState.msg"
  "srv/AssignTask.srv"
  "srv/PlanPath.srv"
  "srv/CancelTask.srv"
  "action/ExecuteArmPath.action"
  DEPENDENCIES builtin_interfaces std_msgs geometry_msgs
)

# 导出运行期依赖,供其他包使用这些类型
ament_export_dependencies(rosidl_default_runtime)
ament_package()

4. 接口文件最小示例(与模板一致,可直接复制)

msg/DetectedItem.msg

# 检测到的单个目标(含位姿)
# 说明:
# - 坐标系:由上层消息(DetectedItemArray.msg)的 frame_id 统一约定,避免在每个 item 内重复写 frame_id
# - 约定:所有数值字段都必须可验收(单位/取值范围/含义),写在“接口卡”里
# stamp:检测产生时刻(ROS 时间),用于时间对齐/回放/延迟统计
builtin_interfaces/Time stamp
# task_id:任务关联 ID(可为空;若为空表示“尚未绑定到具体任务”,由调度/上层补齐)
string task_id
# class_id:类别标识(建议用稳定 ID/枚举字符串,如 box/bag;不要用随意中文描述当主键)
string class_id
# score:置信度(建议 0.0~1.0;若模型输出不是该范围需说明归一化规则)
float32 score
# pose:目标位姿(position 单位 m;orientation 为四元数 x/y/z/w,需为单位四元数)
geometry_msgs/Pose pose

msg/DetectedItemArray.msg

# 一帧检测结果集合
# 说明:
# - frame_id:坐标系,用于空间对齐
# - items:按需裁剪(例如 topK)
# stamp:这一帧检测结果的时间戳(建议与 items 内的 stamp 一致;或约定以此为准)
builtin_interfaces/Time stamp
# task_id:同一帧/同一批结果关联的任务 ID(可为空;用于把感知结果归属到某个任务)
string task_id
# frame_id:坐标系(如 camera_link/map);items[].pose 的坐标都在该 frame_id 下
string frame_id
# items:检测目标列表;建议对数量做上限约束(例如 topK),避免无边界大数组
DetectedItem[] items

msg/TaskStatus.msg

# 通用任务状态(执行侧或调度汇总均可复用)
# 说明:
# - state:码表字段,见“状态码示例”
# - error_code/message:失败定位与解释
# stamp:状态产生时刻(用于状态回放与链路对齐)
builtin_interfaces/Time stamp
# task_id:任务 ID(状态主键)
string task_id
# state:任务状态码(必须配码表,例如 1=RUNNING/2=SUCCEEDED/...,见“状态码示例”)
uint8 state
# error_code:错误码(成功为 0;失败时用于定位模块与原因)
int32 error_code
# message:对当前状态/错误码的解释(面向日志/UI)
string message

msg/SafetyState.msg

# 系统安全状态广播
# 说明:
# - e_stop:急停信号
# - zone_state:区域安全等级(码表)
# stamp:安全状态产生时刻(用于追溯“何时进入危险区/何时急停”)
builtin_interfaces/Time stamp
# task_id:关联任务(可为空;表示系统级安全态,不绑定具体任务)
string task_id
# e_stop:急停是否触发(true 表示必须立刻进入安全策略)
bool e_stop
# zone_state:安全区域等级码(见“状态码示例”;用于降速/暂停/禁止执行)
uint8 zone_state
# message:安全态说明(例如触发来源:门禁/光栅/区域传感器)
string message

srv/AssignTask.srv

# 调度为目标类别分配任务,返回统一的 task_id
# Request
# request_id:一次请求的追溯 ID(建议 UUID;用于幂等与日志关联)
string request_id
# task_id:可为空;如上层已预分配任务 ID 则传入,否则由调度生成并在 Response 回填
string task_id
# target_class:目标类别(与感知 class_id 口径对齐;避免出现 box vs carton 两套词)
string target_class
# target_count:目标数量(建议 >=1;若允许 0/负数有特殊含义必须在接口卡写清)
int32 target_count
---
# Response
# ok:是否成功分配任务
bool ok
# task_id:最终确认的任务 ID(无论 Request 是否传入,都以此为准)
string task_id
# error_code:错误码(成功为 0;失败时定位原因:资源不足/参数错误/系统繁忙等)
int32 error_code
# message:对分配结果的解释(面向日志/UI)
string message

srv/PlanPath.srv

# 请求路径规划(起点/终点位姿 + 超时)
# Response 返回 path_id 与估计耗时,失败时给出 error_code/message
# request_id:一次规划请求的追溯 ID
string request_id
# task_id:关联任务 ID(用于把规划结果归档到任务)
string task_id
# start_pose:起点位姿(header.frame_id 必填;header.stamp 可填 0 表示“当前”或按项目约定)
geometry_msgs/PoseStamped start_pose
# goal_pose:终点位姿(header.frame_id 必填;必须与 start_pose 在同一坐标系或明确转换策略)
geometry_msgs/PoseStamped goal_pose
# timeout_sec:规划超时(秒);超时后应返回 ok=false + error_code=TIMEOUT 类错误
float32 timeout_sec
---
# ok:规划是否成功
bool ok
# path_id:规划结果标识(用于后续 ExecuteArmPath.goal.path_id 引用;或用于拉取具体路径点)
string path_id
# estimated_time_sec:预估执行耗时(秒),用于调度做超时/排程决策
float32 estimated_time_sec
# error_code:错误码(成功为 0;失败定位:不可达/碰撞/约束不满足/超时)
int32 error_code
# message:对规划结果的解释(面向日志/UI)
string message

srv/CancelTask.srv

# 统一取消入口(上层发起)
# 说明:
# - reason:取消原因(用于审计)
# - ok/error_code/message:取消结果与解释
# request_id:一次取消请求的追溯 ID
string request_id
# task_id:要取消的任务 ID(必须存在且可追溯)
string task_id
# reason:取消原因(用于审计与复盘,例如 user_request/safety_trigger/timeout)
string reason
---
# ok:是否取消成功(成功表示“系统已进入取消/终止流程并给出确定状态”,不等价于立即停止到位)
bool ok
# error_code:错误码(成功为 0;失败定位:任务不存在/不可取消/权限不足等)
int32 error_code
# message:对取消结果的解释(面向日志/UI)
string message

action/ExecuteArmPath.action

# 执行路径(长任务,可观测可取消)
# Goal:
# - task_id/path_id/speed/timeout_sec:任务目标与约束
# Result:
# - ok/error_code/message/final_state:最终结论与码表状态
# Feedback:
# - progress/current_step/safety_state:过程可观测 + 安全态提示
# task_id:关联任务 ID(用于把执行结果回写到任务)
string task_id
# path_id:要执行的路径标识(通常来自 PlanPath.srv 的 path_id)
string path_id
# speed:执行速度(建议用比例 0.0~1.0;若用 m/s 等单位必须在接口卡写清取值范围与含义)
float32 speed
# timeout_sec:执行超时(秒);到期后应停止/取消并给出明确 final_state
float32 timeout_sec
---
# ok:执行是否成功(成功=达到业务验收点)
bool ok
# error_code:错误码(成功为 0;失败定位:轨迹不可达/碰撞/安全阻止/超时等)
int32 error_code
# message:对最终结果的解释(面向日志/UI)
string message
# final_state:最终状态码(建议对齐 action_msgs/msg/GoalStatus 的 STATUS_*,或在项目内固定码表)
uint8 final_state
---
# progress:进度(建议 0.0~1.0);发布频率建议 2~10 Hz,避免过高导致链路拥塞
float32 progress
# current_step:当前阶段(建议从有限集合取值,便于上层状态机判断)
string current_step
# safety_state:安全态码(配码表;用于上层决定是否降速/暂停/取消)
uint8 safety_state
  1. 感知检测结果 → 调度分配(Topic)
  2. 调度请求路径规划(Service)
  3. 机械臂执行分拣路径(Action)
  1. 快问快答:msg/srv/action 的关键差异是什么?分别适合什么交互?

  2. 交互类型:是数据流(持续)还是一次问答(一次)还是长任务(可反馈/可取消)?

  3. 实时性:对延迟/抖动敏感吗?丢一两帧能否接受?

  4. 数据量:消息大小与频率是多少?是否需要压缩、分片、降低频率?

  5. 可观测/可控制:是否必须有过程反馈?是否必须允许取消/超时?

2. Topic / Service / Action 的“决策表”(可直接套用)

维度 Topic(msg) Service(srv) Action(action)
交互类型 持续数据流/状态广播 一次请求-一次响应 长任务:目标-反馈-结果(可取消)
实时性 可做低延迟(配 QoS);允许丢帧更合适 常用于短交互 任务级交互,重“过程可见+可控”
数据量 中小消息高频;大消息需限频/裁剪 一般较小 中等;feedback 需控频
可观测 可通过 echo/录包回放观测 只有一次响应 原生支持反馈与最终结果
可取消 无(需自建协议) 无(可加“取消服务”但语义弱) 支持 cancel(服务端需显式接受/实现取消回调,否则会拒绝)
典型例子 检测结果、机器人状态、安全状态 分配任务、请求规划、参数查询 执行路径、抓取任务、回零/复位

3. QoS 选择的最小原则(不展开细节也能用)

链路(Topic) 业务属性 reliability history depth durability 选型理由
/sorting/perception/detections 高频传感器流 best_effort keep_last 1~5 volatile 丢一两帧可接受;优先低延迟与“最新值”
/sorting/safety/state 安全状态广播 reliable keep_last 1 transient_local 晚启动也要拿到最后一次安全态;不追求历史长队列
/sorting/arm/task_status 执行状态/看板 reliable keep_last 10 transient_local(可选) UI/日志晚启动可见最近状态;队列适中便于观测
/sorting/dispatcher/task_status 汇总状态/看板 reliable keep_last 10 transient_local(可选) 多订阅者,强调一致性与可观测
# 查看 Topic 的发布端提供了哪些 QoS(offered QoS)
ros2 topic info -v /sorting/safety/state

# echo 时显式指定 QoS(避免“订阅端 QoS 不匹配”导致收不到)
ros2 topic echo /sorting/safety/state --qos-reliability reliable --qos-durability transient_local --qos-history keep_last --qos-depth 1 --once
ros2 topic echo /sorting/perception/detections --qos-reliability best_effort --qos-history keep_last --qos-depth 1 --once

项目对照(greensort_ros2_ws 实际通信拓扑与 QoS 口径)

  1. 感知 → 调度:检测结果发布
  1. 调度:任务分配(给出统一 task_id)
  1. 调度 → 规划:请求路径规划
  1. 调度/规划 → 机械臂:执行路径(长任务可观测、可取消)
  1. 机械臂 → 调度:执行侧任务状态上报(便于监控与审计)
  1. 安全 → 全系统:安全状态广播(影响是否允许继续执行)
  1. 调度:统一取消入口(上层“一键取消”)
  1. 调度 → 全系统:任务状态汇总广播(统一看板口径)
  1. 反例:Topic 用 std_msgs/String 传“逗号拼接字段”
  1. 反例:Service 只返回 bool success
  1. 反例:Action 没有有效 feedback 或取消语义不一致

任务:以小组自选题项目为对象,完成“选型表 + 方案草图 + 接口清单”。

| - | --------- | --------- | ------ | ------ | ------------------------ | ------- | ------------------------ | ----------------- |
| 1 |
|
|
|
|
|
|
|
|
| 2 |
|
|
|
|
|
|
|
|
| 3 |
|
|
|
|
|
|
|
|
| 4 |
|
|
|
|
|
|
|
|
| 5 |
|
|
|
|
|
|
|
|
| 6 |
|
|
|
|
|
|
|
|

  1. 工业接口标准化:用“统一命名、字段含义明确、码表一致、可追溯”对齐行业要求,减少集成成本,体现工程职业素养。

提示词模板(可直接复制):

你是 ROS2 Humble 的接口设计与通信机制选型专家。请基于以下业务给出可落地的方案。
业务:你们小组项目(无项目则用分拣系统:感知-调度-规划-机械臂执行-安全)
输出要求:
1)给出 msg/srv/action 的通用设计规范(命名、字段、类型、错误码、可验收)
2)给出通信机制选型决策表(按实时性、交互类型、数据量、可观测/可取消分类)
3)针对以下模块交互清单,逐条推荐 Topic/Service/Action,并给出接口草案(字段+理由+验收方式)
4)给出 3 个常见反例(坏接口)并说明如何改
约束:不要编造不存在的包;字段必须能解释业务含义;必须包含 task_id/状态/结果/错误码等可追溯信息;给出可用的 ros2 interface 验证命令。