23 实践课-协调节点开发
协调节点开发
关联:索引
- 推荐系统:Ubuntu 22.04;ROS2 Humble。
- 检测 Topic:
/sorting/perception/detections,类型:ros2_comm_design_interfaces/msg/DetectedItemArray - 机械臂 Action:
/sorting/arm/execute_path,类型:ros2_comm_design_interfaces/action/ExecuteArmPath - 夹爪 Service(本讲用标准接口便于自测):
/sorting/gripper/set,类型:std_srvs/srv/SetBool - 若你们项目接口包/通道名不同:以
ros2 topic list -t/ros2 action list -t/ros2 service list -t的输出为准逐项替换,严禁“脑补字段”。
接口可见性自检(你必须能给出证据):
source /opt/ros/humble/setup.bash
cd ~/ros2_ws
colcon build --symlink-install
source install/setup.bash
ros2 interface show ros2_comm_design_interfaces/msg/DetectedItemArray
ros2 interface show ros2_comm_design_interfaces/action/ExecuteArmPath
ros2 interface show std_srvs/srv/SetBool
逐行解释:
colcon build --symlink-install:构建并让 Python 节点软链接安装,便于边改边跑。source install/setup.bash:让当前终端“看见”你编译出来的接口与可执行入口。ros2 interface show ...:把接口字段当作“唯一契约”,后续写代码/写测试都以它为准。
场景提问(全班统一口径):
- 如果检测节点暂时没识别到目标,你的系统是“等到天荒地老”,还是“超时后切到兜底策略”?
- 如果机械臂抓取动作卡住了(无反馈/无结果),你的系统怎么证明“卡住了”,又怎么恢复?
1)四要素与分拣语义对齐
- 状态(State):系统当前处于哪个阶段(等待检测/分配任务/执行抓取/放置/恢复等)。
- 事件(Event):外部或内部信号(检测消息到达、动作完成、超时触发、失败码返回)。
- 转移(Transition):状态在事件触发下如何变化(带守卫条件)。
- 动作(Action):进入/退出状态时做什么(发送 action goal、调用 service、发布状态话题、记录证据日志)。
2)一个分拣流程:完整业务版 vs 本讲最小实现版(正常路径)
完整业务版(用于你们画状态机方案,体现“多节点协同调度”):
IDLE
↓(启动/收到开始指令)
WAIT_DETECTION --(detected_ok)--> PLAN_AND_DISPATCH --(task_ok)--> EXECUTE_PICK
↑(timeout/no_item) ↓(task_fail)
└------------------------------- RECOVERY <-----------┘
EXECUTE_PICK --(pick_ok)--> EXECUTE_PLACE --(place_ok)--> DONE --(next)--> WAIT_DETECTION
└--(timeout/fail)--> RECOVERY
WAIT_DETECTION --(detected_ok)--> OPEN_GRIPPER --(gripper_ok)--> EXECUTE_PICK
↑(timeout/no_item) ↓(arm_ok)
└--------------------------- RECOVERY <-------------------┘
EXECUTE_PICK --(arm_ok)--> CLOSE_GRIPPER --(gripper_ok)--> DONE --(next)--> WAIT_DETECTION
└--(timeout/fail)--> RECOVERY
你要能口头解释:
- 为什么 WAIT_DETECTION 必须有 timeout:避免“无检测=系统假死”。
- 为什么 RECOVERY 不能只写一句“失败了”:必须定义“重试次数/降级策略/是否急停”的边界。
1)异常分类(建议按“可恢复性”分层)
-
可恢复:短暂无检测、置信度不足、夹爪服务瞬时失败、动作失败但可重试。
-
半可恢复:连续多次无检测(可能相机遮挡/光照问题)、连续抓取失败(可能目标姿态不佳)。
-
不可恢复/安全相关:急停触发、动作服务器失联、关键节点退出、系统状态不一致(可能撞机风险)。
-
检测失败(未收到或无有效目标):
-
证据:协调节点日志出现
reason=detection_timeout(状态转移日志);命令ros2 topic echo /sorting/coordinator/state --once能看到状态变化。 -
抓取超时(动作执行超过
T_pick): -
证据:
ros2 action send_goal ... --feedback能看到 feedback 停止;协调节点日志包含cancel与timeout关键行。
3)把异常写成“事件”而不是“if 大杂烩”
建议事件命名统一口径(便于日志检索与复盘):
EV_DETECTION_OK/EV_DETECTION_TIMEOUTEV_TASK_ASSIGNED_OK/EV_TASK_ASSIGNED_FAILEV_ARM_RESULT_OK/EV_ARM_RESULT_FAIL/EV_ARM_TIMEOUTEV_GRIPPER_OK/EV_GRIPPER_FAIL
- 一张状态机图(手绘/工具均可),至少包含:正常路径 + “检测失败” + “抓取超时”两类异常路径。
- 一段文字说明(不超过 200 字):你们的异常兜底策略是什么?重试次数/超时阈值/降级动作分别是什么?
- 任意点名:请解释你们的一个转移条件(Guard)与一个动作(Action)分别是什么。
- 任意点名:请说明你们的“终止条件”(什么时候必须停机/报警)。
快问快答(两题):
- 为什么协调节点不应该在订阅回调里直接 sleep 等待动作结果?
- 为什么异常机制一定要有“超时 + 重试上限 + 降级策略”三件套?
- 订阅(输入):检测结果(事件来源)。
- 动作客户端(执行):机械臂执行(长时任务,用 Action)。
- 服务客户端(控制):夹爪开合(同步控制,用 Service)。
- 发布(输出):协调状态(便于联调与证据采集)。
2)事件驱动 + 单一状态入口(避免回调乱改状态)
推荐结构(本讲代码采用):
- 回调只做两件事:更新缓存 + 投递事件(例如
self._push_event("EV_DETECTION_OK"))。 - 状态切换只在一个地方发生:一个周期性定时器
tick()里处理事件并执行转移。
这样做的收益:
- 状态可追踪(每次转移都有日志)。
- 异常可复现(超时事件由定时器统一触发)。
- 不会出现“两个回调同时改状态”的竞态。
本工坊提供一套“全假节点”自测方案:即使没有真实相机/机械臂,也能跑通协调逻辑,并能注入异常。
1)创建功能包(Python)
source /opt/ros/humble/setup.bash
cd ~/ros2_ws/src
ros2 pkg create sorting_coordinator_fsm --build-type ament_python --dependencies rclpy std_msgs std_srvs action_msgs geometry_msgs builtin_interfaces ros2_comm_design_interfaces
逐行解释:
sorting_coordinator_fsm:本讲示例包名,你们可按项目规范命名。--dependencies ...:把运行期依赖写进package.xml,避免“能 import 但运行缺依赖”。ros2_comm_design_interfaces:用于导入检测消息与机械臂 Action(若你们接口包不同,替换此依赖与下方代码 import)。
2)入口点(setup.py)配置(示例)
entry_points={
"console_scripts": [
"coordinator_node = sorting_coordinator_fsm.coordinator_node:main",
"fake_perception_node = sorting_coordinator_fsm.fake_perception_node:main",
"fake_gripper_server = sorting_coordinator_fsm.fake_gripper_server:main",
"fake_arm_action_server = sorting_coordinator_fsm.fake_arm_action_server:main",
],
},
解释:
coordinator_node:你们要交付的协调节点。
3)协调节点代码:状态机 + 异常处理骨架(可直接复制)
sorting_coordinator_fsm/coordinator_node.py
from __future__ import annotations
"""
协调节点(Coordinator)示例:用有限状态机(FSM)编排流程,并通过“超时 + 重试上限”实现异常兜底。
配合本讲 3 个假节点一起运行,用于在没有真实硬件/仿真时也能联调流程:
- fake_perception_node:发布检测 Topic
- fake_gripper_server:提供夹爪 Service
- fake_arm_action_server:提供机械臂 Action Server
对外通道(默认命名,与示例一致):
- Sub: /sorting/perception/detections (DetectedItemArray)
- Svc: /sorting/gripper/set (std_srvs/SetBool)
- Act: /sorting/arm/execute_path (ExecuteArmPath)
- Pub: /sorting/coordinator/state (std_msgs/String) 证据输出:状态可见、可截图、可复现
关键约束(保证“可复现”和“不会乱跳状态”):
- 回调只投递事件(event),不直接切换状态
- 状态切换只在 tick() 一个地方发生(单入口)
"""
from collections import deque
from dataclasses import dataclass
from enum import Enum
from typing import Deque, Optional
import rclpy
from rclpy.action import ActionClient
from rclpy.callback_groups import ReentrantCallbackGroup
from rclpy.duration import Duration
from rclpy.node import Node
from rclpy.qos import qos_profile_sensor_data
from std_msgs.msg import String
from std_srvs.srv import SetBool
from ros2_comm_design_interfaces.msg import DetectedItemArray
from ros2_comm_design_interfaces.action import ExecuteArmPath
class State(str, Enum):
IDLE = "IDLE"
WAIT_DETECTION = "WAIT_DETECTION"
OPEN_GRIPPER = "OPEN_GRIPPER"
EXECUTE_PICK = "EXECUTE_PICK"
CLOSE_GRIPPER = "CLOSE_GRIPPER"
DONE = "DONE"
RECOVERY = "RECOVERY"
SAFE_STOP = "SAFE_STOP"
@dataclass
class Context:
task_id: str = ""
detect_retry: int = 0
pick_retry: int = 0
class SortingCoordinator(Node):
def __init__(self) -> None:
super().__init__("sorting_coordinator")
# 允许 timer/subscription/service/action 的回调并发执行,减少“回调互相阻塞导致假超时”的概率
self._cb_group = ReentrantCallbackGroup() # 创建一个可重入回调组,用来管理ROS2节点的回调函数执行顺序
self._state: State = State.IDLE # 空闲状态
self._ctx = Context()
# 事件队列:回调只负责把“发生了什么”丢进队列;tick 再按状态机规则消费事件并切状态
self._events: Deque[str] = deque(maxlen=50)
# 超时与重试上限:做异常注入实验时建议只改参数,不改代码
self._detect_timeout_sec = float(self.declare_parameter("detect_timeout_sec", 2.0).value)
self._pick_timeout_sec = float(self.declare_parameter("pick_timeout_sec", 4.0).value)
self._max_detect_retry = int(self.declare_parameter("max_detect_retry", 2).value)
self._max_pick_retry = int(self.declare_parameter("max_pick_retry", 1).value)
# “截止时间(deadline)”用于把时间条件变成事件:超时 -> 投递 EV_DETECTION_TIMEOUT/EV_ARM_TIMEOUT
self._detection_deadline = self.get_clock().now()
self._arm_goal_deadline = self.get_clock().now()
self._last_detection: Optional[DetectedItemArray] = None
# 状态证据输出:用 ros2 topic echo --once 抓一条即可当作验收证据
self._state_pub = self.create_publisher(String, "/sorting/coordinator/state", 10)
self._det_sub = self.create_subscription(
DetectedItemArray,
"/sorting/perception/detections",
self._on_detection,
qos_profile_sensor_data,
callback_group=self._cb_group,
)
# 夹爪服务客户端:本讲用 std_srvs/SetBool,方便用 ros2 service call 自测
self._gripper_client = self.create_client(
SetBool,
"/sorting/gripper/set",
callback_group=self._cb_group,
)
# 机械臂动作客户端:长时任务用 Action,可以 feedback、可以 cancel、也更容易做超时兜底
self._arm_client = ActionClient(
self,
ExecuteArmPath,
"/sorting/arm/execute_path",
callback_group=self._cb_group,
)
self._active_goal_handle = None
# tick 定时器:状态机唯一“执行入口”,统一处理超时、消费事件、切状态
self._tick_timer = self.create_timer(0.05, self._tick, callback_group=self._cb_group)
self._set_state(State.WAIT_DETECTION, reason="startup")
def _push_event(self, event: str) -> None:
self._events.append(event)
def _pop_event(self) -> Optional[str]:
if not self._events:
return None
return self._events.popleft()
def _set_state(self, new_state: State, reason: str) -> None:
if self._state == new_state:
return
# 状态转移日志:reason 必须可解释,便于排障时按关键字定位
self.get_logger().info(f"state: {self._state.value} -> {new_state.value} | reason={reason} | task_id={self._ctx.task_id}")
self._state = new_state
self._state_pub.publish(String(data=f"{new_state.value}|reason={reason}|task_id={self._ctx.task_id}"))
now = self.get_clock().now()
if new_state == State.WAIT_DETECTION:
# 进入 WAIT_DETECTION 就重置检测超时,避免无限等待
self._detection_deadline = now + Duration(seconds=self._detect_timeout_sec)
if new_state == State.EXECUTE_PICK:
# 进入 EXECUTE_PICK 就重置动作超时,超时后会 cancel action 并进入 RECOVERY
self._arm_goal_deadline = now + Duration(seconds=self._pick_timeout_sec)
def _on_detection(self, msg: DetectedItemArray) -> None:
self._last_detection = msg
# 回调不切状态,只投递事件:避免多个回调同时修改 _state 的竞态
if self._state == State.WAIT_DETECTION and len(getattr(msg, "items", [])) > 0:
self._ctx.task_id = getattr(msg, "task_id", "") or self._ctx.task_id
self._push_event("EV_DETECTION_OK")
def _call_gripper(self, close: bool) -> None:
# 服务未就绪时不要阻塞太久:直接走异常路径(RECOVERY)
if not self._gripper_client.wait_for_service(timeout_sec=0.2):
self.get_logger().error("gripper service not available")
self._push_event("EV_GRIPPER_FAIL")
return
req = SetBool.Request()
req.data = close
future = self._gripper_client.call_async(req)
future.add_done_callback(self._on_gripper_done)
def _on_gripper_done(self, future) -> None:
try:
resp = future.result()
except Exception as exc:
self.get_logger().error(f"gripper call exception: {exc}")
self._push_event("EV_GRIPPER_FAIL")
return
if bool(resp.success):
self._push_event("EV_GRIPPER_OK")
else:
self.get_logger().warn(f"gripper returned failure: message={resp.message}")
self._push_event("EV_GRIPPER_FAIL")
def _send_arm_goal(self) -> None:
# 动作服务器未就绪:直接投递失败事件(不要让流程卡住)
if not self._arm_client.wait_for_server(timeout_sec=0.5):
self.get_logger().error("arm action server not available")
self._push_event("EV_ARM_RESULT_FAIL")
return
goal = ExecuteArmPath.Goal()
goal.task_id = self._ctx.task_id or "task_demo"
goal.path_id = "pick_path"
goal.speed = 0.6
goal.timeout_sec = float(self._pick_timeout_sec)
future = self._arm_client.send_goal_async(goal, feedback_callback=self._on_arm_feedback)
future.add_done_callback(self._on_arm_goal_response)
def _on_arm_feedback(self, feedback_msg) -> None:
feedback = feedback_msg.feedback
progress = getattr(feedback, "progress", 0.0)
current_step = getattr(feedback, "current_step", 0)
self.get_logger().info(f"arm feedback: progress={progress:.2f} step={current_step} task_id={self._ctx.task_id}")
def _on_arm_goal_response(self, future) -> None:
goal_handle = future.result()
if not goal_handle.accepted:
self.get_logger().warn("arm goal rejected")
self._push_event("EV_ARM_RESULT_FAIL")
return
self._active_goal_handle = goal_handle
result_future = goal_handle.get_result_async()
result_future.add_done_callback(self._on_arm_result)
def _on_arm_result(self, future) -> None:
try:
result = future.result().result
except Exception as exc:
self.get_logger().error(f"arm result exception: {exc}")
self._push_event("EV_ARM_RESULT_FAIL")
return
ok = bool(getattr(result, "ok", False))
if ok:
self._push_event("EV_ARM_RESULT_OK")
else:
error_code = getattr(result, "error_code", 1)
message = getattr(result, "message", "")
self.get_logger().warn(f"arm result failed: error_code={error_code} message={message}")
self._push_event("EV_ARM_RESULT_FAIL")
def _cancel_arm_goal(self) -> None:
if self._active_goal_handle is None:
return
cancel_future = self._active_goal_handle.cancel_goal_async()
cancel_future.add_done_callback(lambda _: self.get_logger().warn("arm goal cancel requested"))
def _tick(self) -> None:
now = self.get_clock().now()
# 把“时间条件”转成“事件”,统一走状态机:方便验收与复盘
if self._state == State.WAIT_DETECTION and now > self._detection_deadline:
self._push_event("EV_DETECTION_TIMEOUT")
if self._state == State.EXECUTE_PICK and now > self._arm_goal_deadline:
self._push_event("EV_ARM_TIMEOUT")
# 每个 tick 消费 1 个事件:保证状态转移过程可追踪、可复现
event = self._pop_event()
if event is None:
return
if self._state == State.WAIT_DETECTION:
if event == "EV_DETECTION_OK":
self._set_state(State.OPEN_GRIPPER, reason="detected_ok")
self._call_gripper(close=False)
elif event == "EV_DETECTION_TIMEOUT":
self._set_state(State.RECOVERY, reason="detection_timeout")
elif self._state == State.OPEN_GRIPPER:
if event == "EV_GRIPPER_OK":
self._set_state(State.EXECUTE_PICK, reason="gripper_ready")
self._send_arm_goal()
elif event == "EV_GRIPPER_FAIL":
self._set_state(State.RECOVERY, reason="gripper_fail")
elif self._state == State.EXECUTE_PICK:
if event == "EV_ARM_RESULT_OK":
self._set_state(State.CLOSE_GRIPPER, reason="pick_ok")
self._call_gripper(close=True)
elif event == "EV_ARM_TIMEOUT":
self.get_logger().warn("arm timeout -> cancel")
self._cancel_arm_goal()
self._set_state(State.RECOVERY, reason="arm_timeout")
elif event == "EV_ARM_RESULT_FAIL":
self._set_state(State.RECOVERY, reason="arm_fail")
elif self._state == State.CLOSE_GRIPPER:
if event == "EV_GRIPPER_OK":
self._set_state(State.DONE, reason="picked_and_gripped")
elif event == "EV_GRIPPER_FAIL":
self._set_state(State.RECOVERY, reason="gripper_fail_after_pick")
elif self._state == State.DONE:
self._ctx = Context()
self._set_state(State.WAIT_DETECTION, reason="next_cycle")
elif self._state == State.RECOVERY:
if self._ctx.detect_retry < self._max_detect_retry:
self._ctx.detect_retry += 1
self.get_logger().warn(f"recovery: retry detection ({self._ctx.detect_retry}/{self._max_detect_retry})")
self._set_state(State.WAIT_DETECTION, reason="retry_detection")
return
if self._ctx.pick_retry < self._max_pick_retry:
self._ctx.pick_retry += 1
self.get_logger().warn(f"recovery: retry pick ({self._ctx.pick_retry}/{self._max_pick_retry})")
self._set_state(State.EXECUTE_PICK, reason="retry_pick")
self._send_arm_goal()
return
self.get_logger().error("recovery exhausted -> safe stop")
self._set_state(State.SAFE_STOP, reason="recovery_exhausted")
elif self._state == State.SAFE_STOP:
self.get_logger().error(f"SAFE_STOP: manual intervention required | task_id={self._ctx.task_id}")
def main() -> None:
rclpy.init()
node = SortingCoordinator()
try:
rclpy.spin(node)
finally:
node.destroy_node()
rclpy.shutdown()
你必须能解释的关键点(按块自检):
ReentrantCallbackGroup:允许 action 回调、service 回调、timer 回调并发执行;如果你们用默认回调组,有些场景会因为回调阻塞导致“看似超时”。- “回调只投递事件,tick 统一切状态”:避免多个回调同时修改
_state造成竞态。 *_deadline:把超时变成“可触发事件”,而不是散落的 if/sleep。RECOVERY的上限:重试必须有限,否则系统会进入“永远重试”的假稳定。
4)假节点 1:发布检测(可注入“无检测”异常)
sorting_coordinator_fsm/fake_perception_node.py
"""
假检测节点:周期性发布 DetectedItemArray,用于触发协调节点从 WAIT_DETECTION 进入下一状态。
参数(ros-args -p ...):
- publish: 是否发布检测(false 可注入“无检测/检测失败”异常)
- period_sec: 发布周期
- score: 置信度(示例字段)
"""
import math
import rclpy
from rclpy.node import Node
from rclpy.qos import qos_profile_sensor_data
from builtin_interfaces.msg import Time
from geometry_msgs.msg import Pose
from ros2_comm_design_interfaces.msg import DetectedItem
from ros2_comm_design_interfaces.msg import DetectedItemArray
class FakePerception(Node):
def __init__(self) -> None:
super().__init__("fake_perception_node")
self._pub = self.create_publisher(DetectedItemArray, "/sorting/perception/detections", qos_profile_sensor_data)
self._publish = bool(self.declare_parameter("publish", True).value)
self._period_sec = float(self.declare_parameter("period_sec", 0.2).value)
self._score = float(self.declare_parameter("score", 0.9).value)
self._t = 0.0
self.create_timer(self._period_sec, self._tick)
def _tick(self) -> None:
if not self._publish:
return
msg = DetectedItemArray()
msg.task_id = "task_demo"
msg.frame_id = "camera_frame"
msg.stamp = Time(sec=int(self.get_clock().now().nanoseconds // 1_000_000_000), nanosec=int(self.get_clock().now().nanoseconds % 1_000_000_000))
# 最小示例:只放 1 个目标即可(items 长度 > 0 会触发协调节点投递 EV_DETECTION_OK)
item = DetectedItem()
item.task_id = msg.task_id
item.class_id = 1
item.score = self._score
pose = Pose()
pose.position.x = 0.3 + 0.02 * math.sin(self._t)
pose.position.y = 0.1
pose.position.z = 0.0
pose.orientation.w = 1.0
item.pose = pose
msg.items.append(item)
self._pub.publish(msg)
self._t += 0.2
def main() -> None:
rclpy.init()
node = FakePerception()
try:
rclpy.spin(node)
finally:
node.destroy_node()
rclpy.shutdown()
解释与注意:
publish:=false可以制造“检测失败/无检测消息”异常。DetectedItemArray/DetectedItem的字段以你们的接口为准;如果编译提示字段不存在,立刻回到ros2 interface show对齐字段。
5)假节点 2:夹爪服务(可注入“服务返回失败”异常)
sorting_coordinator_fsm/fake_gripper_server.py
"""
假夹爪服务端:提供 std_srvs/SetBool,便于验证协调节点的 Service 调用与异常兜底。
服务名:/sorting/gripper/set
参数:
- always_fail: true 时始终返回失败,用于注入“夹爪失败”异常
"""
import rclpy
from rclpy.node import Node
from std_srvs.srv import SetBool
class FakeGripper(Node):
def __init__(self) -> None:
super().__init__("fake_gripper_server")
self._always_fail = bool(self.declare_parameter("always_fail", False).value)
self.create_service(SetBool, "/sorting/gripper/set", self._handle)
def _handle(self, request: SetBool.Request, response: SetBool.Response) -> SetBool.Response:
if self._always_fail:
response.success = False
response.message = "injected failure"
return response
response.success = True
# 约定:request.data=True 表示 close,False 表示 open(仅用于示例/自测)
response.message = "closed" if request.data else "opened"
return response
def main() -> None:
rclpy.init()
node = FakeGripper()
try:
rclpy.spin(node)
finally:
node.destroy_node()
rclpy.shutdown()
解释:
- 本讲用
std_srvs/SetBool的目的是:测试时可以用ros2 service call直接触发。 always_fail:=true用来验证协调节点的 RECOVERY 路径是否会被触发。
6)假节点 3:机械臂 Action Server(可注入“超时/失败”异常)
sorting_coordinator_fsm/fake_arm_action_server.py
"""
假机械臂 Action Server:提供 ExecuteArmPath,用于注入“成功/失败/超时/取消”等场景。
参数:
- execute_time_sec: 模拟动作执行耗时;大于协调节点 pick_timeout_sec 会触发超时与 cancel
- always_fail: true 时动作结束返回失败(abort)
"""
import time
import rclpy
from rclpy.action import ActionServer
from rclpy.node import Node
from ros2_comm_design_interfaces.action import ExecuteArmPath
class FakeArmActionServer(Node):
def __init__(self) -> None:
super().__init__("fake_arm_action_server")
self._execute_time_sec = float(self.declare_parameter("execute_time_sec", 1.0).value)
self._always_fail = bool(self.declare_parameter("always_fail", False).value)
self._server = ActionServer(self, ExecuteArmPath, "/sorting/arm/execute_path", self._execute)
def _execute(self, goal_handle):
goal = goal_handle.request
self.get_logger().info(f"goal received: task_id={goal.task_id} timeout_sec={goal.timeout_sec}")
start = time.time()
step = 0
while time.time() - start < self._execute_time_sec:
# 如果协调节点因为超时触发 cancel,这里会收到取消请求并返回 canceled 结果
if goal_handle.is_cancel_requested:
goal_handle.canceled()
result = ExecuteArmPath.Result()
result.ok = False
result.error_code = 2
result.message = "canceled"
result.final_state = 2
return result
# feedback 用于“进度可见”:终端里用 --feedback 可观察是否持续反馈
feedback = ExecuteArmPath.Feedback()
feedback.progress = min(0.99, (time.time() - start) / max(self._execute_time_sec, 1e-3))
feedback.current_step = step
feedback.safety_state = "OK"
goal_handle.publish_feedback(feedback)
step += 1
time.sleep(0.1)
result = ExecuteArmPath.Result()
if self._always_fail:
goal_handle.abort()
result.ok = False
result.error_code = 1
result.message = "injected failure"
result.final_state = 4
return result
goal_handle.succeed()
result.ok = True
result.error_code = 0
result.message = "ok"
result.final_state = 3
return result
def main() -> None:
rclpy.init()
node = FakeArmActionServer()
try:
rclpy.spin(node)
finally:
node.destroy_node()
rclpy.shutdown()
解释与注意:
execute_time_sec控制“动作耗时”。当它大于协调节点参数pick_timeout_sec时,协调节点应触发EV_ARM_TIMEOUT并请求 cancel。always_fail:=true模拟动作失败,观察协调节点是否进入 RECOVERY/SAFE_STOP。final_state的数值语义应与接口定义对齐;这里给的是演示值,实际以你们接口字段约束为准。
7)编译与运行(四终端联调)
cd ~/ros2_ws
colcon build --symlink-install --packages-select sorting_coordinator_fsm
source install/setup.bash
逐行解释:
--packages-select ...:只编译本讲包,减少等待时间与联调噪音。source install/setup.bash:每个终端都要 source(这是最常见“找不到可执行入口”的根因)。
终端 A(夹爪服务):
ros2 run sorting_coordinator_fsm fake_gripper_server
终端 B(机械臂 Action Server):
ros2 run sorting_coordinator_fsm fake_arm_action_server --ros-args -p execute_time_sec:=1.0
终端 C(检测发布):
ros2 run sorting_coordinator_fsm fake_perception_node --ros-args -p publish:=true -p period_sec:=0.2
终端 D(协调节点):
ros2 run sorting_coordinator_fsm coordinator_node --ros-args -p detect_timeout_sec:=2.0 -p pick_timeout_sec:=4.0
ros2 topic echo /sorting/coordinator/state --once
解释:
- 能看到状态字符串说明:协调节点在发布状态;也意味着你后续做异常注入时可以用它作为“状态切换证据”。
做法:把假机械臂执行时间调大,让它超过 pick_timeout_sec。
ros2 run sorting_coordinator_fsm fake_arm_action_server --ros-args -p execute_time_sec:=10.0
你应观察到的证据(至少二选一):
- 协调节点日志出现
arm timeout -> cancel,并进入RECOVERY或SAFE_STOP。 ros2 topic echo /sorting/coordinator/state --once显示RECOVERY|reason=arm_timeout(或你们实现的等价语义)。
做法:停止终端 C 或设置 publish:=false。
ros2 run sorting_coordinator_fsm fake_perception_node --ros-args -p publish:=false
你应观察到的证据:
detect_timeout_sec秒后,协调节点进入RECOVERY,并按max_detect_retry重试,最终进入SAFE_STOP或回到 WAIT_DETECTION(取决于你们策略)。
1)让 AI 先出“状态机方案”(含流程图与说明)
可直接复制给 AI 的指令模板(教师发放):
你是 ROS2 Humble 工程与工业流程控制专家。请为“分拣流程协调节点”设计有限状态机(FSM)方案。
背景:分拣系统包含 perception(发布检测 Topic)、arm(提供执行 Action)、gripper(提供 SetBool Service)。
目标:协调节点编排一件物品的流程:等待检测→抓取→放置→下一件。
异常:1)检测失败(超时/无目标/置信度低);2)抓取超时(Action 无结果/无反馈/执行时间超限)。
要求:
1)输出状态机:列出所有状态、事件、转移(含守卫条件)、进入/退出动作。
2)输出 Mermaid 流程图(stateDiagram-v2 或 flowchart 均可)。
3)给出异常处理策略:重试次数上限、超时阈值、降级策略、进入 SAFE_STOP 的条件。
4)给出验收证据清单:每条异常至少 2 条可复现证据(CLI/日志/状态话题)。
校验点(人工审计,不通过就退回重写):
- AI 是否把“异常”写成了具体事件与可复现证据,而不是一句“失败则重试”。
- 是否给出了重试上限与终止条件(没有上限=不可交付)。
2)让 AI 生成“协调节点基础代码”时的约束(避免生成垃圾代码)
请生成 ROS2 Python 协调节点代码,必须满足:
1)状态切换只能在一个函数里发生(例如 tick),回调只投递事件;
2)必须实现 detection_timeout 与 arm_timeout 两种异常;
3)必须提供命令级验收方法(ros2 topic/service/action),并说明期望输出;
4)必须包含人工审计清单:至少列出 8 条(线程/回调、超时、重试上限、日志证据、接口字段对齐等)。
案例引入(教师讲解要点):
-
场景:某地合作社引入“果蔬分级与装箱”智能装备(相机检测 + 机械臂抓取 + 输送线分拣),核心难点往往不是单个算法,而是“整条流程的稳定运行”。
-
状态机的价值:把“看到了→抓起来→放到对应箱→记录产量→异常报警”固化为可复现流程;当光照变化导致检测不稳定、当夹爪磨损导致抓取失败时,系统仍能通过超时/重试/降级保证连续生产。
-
使命感落点:技术不仅是“跑通 demo”,更是“让产业设备稳定可靠地跑在田间地头与加工车间”,用工程化流程管控降低损耗、提升效率、支撑乡村产业升级。
-
学习状态机核心原理与分拣流程编排逻辑,并完成本组状态机设计图。
-
开发分拣流程协调节点,实现多节点协同调度(订阅检测、调用夹爪、调用机械臂动作)。
-
设计并实现检测失败、抓取超时等异常场景处理机制(含超时、重试上限、降级/停机策略)。
-
使用 AI 生成协调节点基础代码、辅助设计状态机方案,并对生成内容进行审计与优化。
-
记录开发过程中的问题与解决思路,梳理协调节点对系统稳定性的作用(证据链)。
-
生成分拣流程状态机设计方案(含流程图与说明)。
-
生成协调节点基础代码(带状态切换、异常处理框架)。
-
辅助分析异常场景处理逻辑,优化异常机制设计(重试/降级/停机边界)。
-
结合农业智能装备案例,讲解协调节点的产业应用要点(流程管控价值与可靠性指标)。
作业(布置)
1)提交分拣流程状态机设计图(手绘 / 工具生成均可),附核心状态与切换逻辑说明。
2)提交协调节点完整代码(含 AI 生成代码审计优化痕迹),重点标注异常处理逻辑。
3)提交协调节点运行测试截图(含正常调度、异常触发场景),撰写 200 字左右分析,说明异常处理机制的设计思路与效果。
4)提交 AI 交互记录(至少 1 次,用于设计状态机 / 生成协调节点代码),阐述对 AI 生成内容的验证与优化过程。
Markdown 与代码自检清单(提交前必过)
- 标题层级连续(不跳级);代码块均闭合且带语言标注。
- 所有命令可复制执行;每段命令都写了“为什么要这么做”的解释。
- 代码不依赖“未声明的接口字段”;一旦报字段错误,先用
ros2 interface show对齐再改代码。 - 异常机制具备:超时阈值、重试上限、终止条件、证据输出(日志/状态话题/CLI)。