23 实践课-协调节点开发

协调节点开发

关联:索引

接口可见性自检(你必须能给出证据):

source /opt/ros/humble/setup.bash
cd ~/ros2_ws
colcon build --symlink-install
source install/setup.bash

ros2 interface show ros2_comm_design_interfaces/msg/DetectedItemArray
ros2 interface show ros2_comm_design_interfaces/action/ExecuteArmPath
ros2 interface show std_srvs/srv/SetBool

逐行解释:


场景提问(全班统一口径):

  1. 如果检测节点暂时没识别到目标,你的系统是“等到天荒地老”,还是“超时后切到兜底策略”?
  2. 如果机械臂抓取动作卡住了(无反馈/无结果),你的系统怎么证明“卡住了”,又怎么恢复?

1)四要素与分拣语义对齐

2)一个分拣流程:完整业务版 vs 本讲最小实现版(正常路径)

完整业务版(用于你们画状态机方案,体现“多节点协同调度”):

IDLE
  ↓(启动/收到开始指令)
WAIT_DETECTION  --(detected_ok)-->  PLAN_AND_DISPATCH  --(task_ok)-->  EXECUTE_PICK
  ↑(timeout/no_item)                                     ↓(task_fail)
  └-------------------------------  RECOVERY  <-----------┘
EXECUTE_PICK  --(pick_ok)-->  EXECUTE_PLACE  --(place_ok)-->  DONE  --(next)--> WAIT_DETECTION
          └--(timeout/fail)--> RECOVERY
WAIT_DETECTION  --(detected_ok)-->  OPEN_GRIPPER  --(gripper_ok)-->  EXECUTE_PICK
  ↑(timeout/no_item)                                         ↓(arm_ok)
  └---------------------------  RECOVERY  <-------------------┘
EXECUTE_PICK  --(arm_ok)-->  CLOSE_GRIPPER  --(gripper_ok)-->  DONE  --(next)--> WAIT_DETECTION
          └--(timeout/fail)--> RECOVERY

你要能口头解释:

1)异常分类(建议按“可恢复性”分层)

3)把异常写成“事件”而不是“if 大杂烩”

建议事件命名统一口径(便于日志检索与复盘):

  1. 一张状态机图(手绘/工具均可),至少包含:正常路径 + “检测失败” + “抓取超时”两类异常路径。
  2. 一段文字说明(不超过 200 字):你们的异常兜底策略是什么?重试次数/超时阈值/降级动作分别是什么?

快问快答(两题):

  1. 为什么协调节点不应该在订阅回调里直接 sleep 等待动作结果?
  2. 为什么异常机制一定要有“超时 + 重试上限 + 降级策略”三件套?

2)事件驱动 + 单一状态入口(避免回调乱改状态)

推荐结构(本讲代码采用):

这样做的收益:

本工坊提供一套“全假节点”自测方案:即使没有真实相机/机械臂,也能跑通协调逻辑,并能注入异常。

1)创建功能包(Python)

source /opt/ros/humble/setup.bash
cd ~/ros2_ws/src
ros2 pkg create sorting_coordinator_fsm --build-type ament_python --dependencies rclpy std_msgs std_srvs action_msgs geometry_msgs builtin_interfaces ros2_comm_design_interfaces

逐行解释:

entry_points={
    "console_scripts": [
        "coordinator_node = sorting_coordinator_fsm.coordinator_node:main",
        "fake_perception_node = sorting_coordinator_fsm.fake_perception_node:main",
        "fake_gripper_server = sorting_coordinator_fsm.fake_gripper_server:main",
        "fake_arm_action_server = sorting_coordinator_fsm.fake_arm_action_server:main",
    ],
},

解释:

3)协调节点代码:状态机 + 异常处理骨架(可直接复制)

sorting_coordinator_fsm/coordinator_node.py

from __future__ import annotations

"""
协调节点(Coordinator)示例:用有限状态机(FSM)编排流程,并通过“超时 + 重试上限”实现异常兜底。

配合本讲 3 个假节点一起运行,用于在没有真实硬件/仿真时也能联调流程:
- fake_perception_node:发布检测 Topic
- fake_gripper_server:提供夹爪 Service
- fake_arm_action_server:提供机械臂 Action Server

对外通道(默认命名,与示例一致):
- Sub:  /sorting/perception/detections  (DetectedItemArray)
- Svc:  /sorting/gripper/set            (std_srvs/SetBool)
- Act:  /sorting/arm/execute_path       (ExecuteArmPath)
- Pub:  /sorting/coordinator/state      (std_msgs/String)  证据输出:状态可见、可截图、可复现

关键约束(保证“可复现”和“不会乱跳状态”):
- 回调只投递事件(event),不直接切换状态
- 状态切换只在 tick() 一个地方发生(单入口)
"""

from collections import deque
from dataclasses import dataclass
from enum import Enum
from typing import Deque, Optional

import rclpy
from rclpy.action import ActionClient
from rclpy.callback_groups import ReentrantCallbackGroup
from rclpy.duration import Duration
from rclpy.node import Node
from rclpy.qos import qos_profile_sensor_data

from std_msgs.msg import String
from std_srvs.srv import SetBool

from ros2_comm_design_interfaces.msg import DetectedItemArray
from ros2_comm_design_interfaces.action import ExecuteArmPath

class State(str, Enum):
    IDLE = "IDLE"
    WAIT_DETECTION = "WAIT_DETECTION"
    OPEN_GRIPPER = "OPEN_GRIPPER"
    EXECUTE_PICK = "EXECUTE_PICK"
    CLOSE_GRIPPER = "CLOSE_GRIPPER"
    DONE = "DONE"
    RECOVERY = "RECOVERY"
    SAFE_STOP = "SAFE_STOP"

@dataclass
class Context:
    task_id: str = ""
    detect_retry: int = 0
    pick_retry: int = 0

class SortingCoordinator(Node):
    def __init__(self) -> None:
        super().__init__("sorting_coordinator")

        # 允许 timer/subscription/service/action 的回调并发执行,减少“回调互相阻塞导致假超时”的概率
        self._cb_group = ReentrantCallbackGroup() # 创建一个可重入回调组,用来管理ROS2节点的回调函数执行顺序

        self._state: State = State.IDLE # 空闲状态
        self._ctx = Context()
        # 事件队列:回调只负责把“发生了什么”丢进队列;tick 再按状态机规则消费事件并切状态
        self._events: Deque[str] = deque(maxlen=50)

        # 超时与重试上限:做异常注入实验时建议只改参数,不改代码
        self._detect_timeout_sec = float(self.declare_parameter("detect_timeout_sec", 2.0).value)
        self._pick_timeout_sec = float(self.declare_parameter("pick_timeout_sec", 4.0).value)
        self._max_detect_retry = int(self.declare_parameter("max_detect_retry", 2).value)
        self._max_pick_retry = int(self.declare_parameter("max_pick_retry", 1).value)

        # “截止时间(deadline)”用于把时间条件变成事件:超时 -> 投递 EV_DETECTION_TIMEOUT/EV_ARM_TIMEOUT
        self._detection_deadline = self.get_clock().now()
        self._arm_goal_deadline = self.get_clock().now()

        self._last_detection: Optional[DetectedItemArray] = None

        # 状态证据输出:用 ros2 topic echo --once 抓一条即可当作验收证据
        self._state_pub = self.create_publisher(String, "/sorting/coordinator/state", 10)
        self._det_sub = self.create_subscription(
            DetectedItemArray,
            "/sorting/perception/detections",
            self._on_detection,
            qos_profile_sensor_data,
            callback_group=self._cb_group,
        )

        # 夹爪服务客户端:本讲用 std_srvs/SetBool,方便用 ros2 service call 自测
        self._gripper_client = self.create_client(
            SetBool,
            "/sorting/gripper/set",
            callback_group=self._cb_group,
        )
        # 机械臂动作客户端:长时任务用 Action,可以 feedback、可以 cancel、也更容易做超时兜底
        self._arm_client = ActionClient(
            self,
            ExecuteArmPath,
            "/sorting/arm/execute_path",
            callback_group=self._cb_group,
        )

        self._active_goal_handle = None

        # tick 定时器:状态机唯一“执行入口”,统一处理超时、消费事件、切状态
        self._tick_timer = self.create_timer(0.05, self._tick, callback_group=self._cb_group)

        self._set_state(State.WAIT_DETECTION, reason="startup")

    def _push_event(self, event: str) -> None:
        self._events.append(event)

    def _pop_event(self) -> Optional[str]:
        if not self._events:
            return None
        return self._events.popleft()

    def _set_state(self, new_state: State, reason: str) -> None:
        if self._state == new_state:
            return
        # 状态转移日志:reason 必须可解释,便于排障时按关键字定位
        self.get_logger().info(f"state: {self._state.value} -> {new_state.value} | reason={reason} | task_id={self._ctx.task_id}")
        self._state = new_state
        self._state_pub.publish(String(data=f"{new_state.value}|reason={reason}|task_id={self._ctx.task_id}"))

        now = self.get_clock().now()
        if new_state == State.WAIT_DETECTION:
            # 进入 WAIT_DETECTION 就重置检测超时,避免无限等待
            self._detection_deadline = now + Duration(seconds=self._detect_timeout_sec)
        if new_state == State.EXECUTE_PICK:
            # 进入 EXECUTE_PICK 就重置动作超时,超时后会 cancel action 并进入 RECOVERY
            self._arm_goal_deadline = now + Duration(seconds=self._pick_timeout_sec)

    def _on_detection(self, msg: DetectedItemArray) -> None:
        self._last_detection = msg
        # 回调不切状态,只投递事件:避免多个回调同时修改 _state 的竞态
        if self._state == State.WAIT_DETECTION and len(getattr(msg, "items", [])) > 0:
            self._ctx.task_id = getattr(msg, "task_id", "") or self._ctx.task_id
            self._push_event("EV_DETECTION_OK")

    def _call_gripper(self, close: bool) -> None:
        # 服务未就绪时不要阻塞太久:直接走异常路径(RECOVERY)
        if not self._gripper_client.wait_for_service(timeout_sec=0.2):
            self.get_logger().error("gripper service not available")
            self._push_event("EV_GRIPPER_FAIL")
            return

        req = SetBool.Request()
        req.data = close
        future = self._gripper_client.call_async(req)
        future.add_done_callback(self._on_gripper_done)

    def _on_gripper_done(self, future) -> None:
        try:
            resp = future.result()
        except Exception as exc:
            self.get_logger().error(f"gripper call exception: {exc}")
            self._push_event("EV_GRIPPER_FAIL")
            return

        if bool(resp.success):
            self._push_event("EV_GRIPPER_OK")
        else:
            self.get_logger().warn(f"gripper returned failure: message={resp.message}")
            self._push_event("EV_GRIPPER_FAIL")

    def _send_arm_goal(self) -> None:
        # 动作服务器未就绪:直接投递失败事件(不要让流程卡住)
        if not self._arm_client.wait_for_server(timeout_sec=0.5):
            self.get_logger().error("arm action server not available")
            self._push_event("EV_ARM_RESULT_FAIL")
            return

        goal = ExecuteArmPath.Goal()
        goal.task_id = self._ctx.task_id or "task_demo"
        goal.path_id = "pick_path"
        goal.speed = 0.6
        goal.timeout_sec = float(self._pick_timeout_sec)

        future = self._arm_client.send_goal_async(goal, feedback_callback=self._on_arm_feedback)
        future.add_done_callback(self._on_arm_goal_response)

    def _on_arm_feedback(self, feedback_msg) -> None:
        feedback = feedback_msg.feedback
        progress = getattr(feedback, "progress", 0.0)
        current_step = getattr(feedback, "current_step", 0)
        self.get_logger().info(f"arm feedback: progress={progress:.2f} step={current_step} task_id={self._ctx.task_id}")

    def _on_arm_goal_response(self, future) -> None:
        goal_handle = future.result()
        if not goal_handle.accepted:
            self.get_logger().warn("arm goal rejected")
            self._push_event("EV_ARM_RESULT_FAIL")
            return

        self._active_goal_handle = goal_handle
        result_future = goal_handle.get_result_async()
        result_future.add_done_callback(self._on_arm_result)

    def _on_arm_result(self, future) -> None:
        try:
            result = future.result().result
        except Exception as exc:
            self.get_logger().error(f"arm result exception: {exc}")
            self._push_event("EV_ARM_RESULT_FAIL")
            return

        ok = bool(getattr(result, "ok", False))
        if ok:
            self._push_event("EV_ARM_RESULT_OK")
        else:
            error_code = getattr(result, "error_code", 1)
            message = getattr(result, "message", "")
            self.get_logger().warn(f"arm result failed: error_code={error_code} message={message}")
            self._push_event("EV_ARM_RESULT_FAIL")

    def _cancel_arm_goal(self) -> None:
        if self._active_goal_handle is None:
            return
        cancel_future = self._active_goal_handle.cancel_goal_async()
        cancel_future.add_done_callback(lambda _: self.get_logger().warn("arm goal cancel requested"))

    def _tick(self) -> None:
        now = self.get_clock().now()

        # 把“时间条件”转成“事件”,统一走状态机:方便验收与复盘
        if self._state == State.WAIT_DETECTION and now > self._detection_deadline:
            self._push_event("EV_DETECTION_TIMEOUT")

        if self._state == State.EXECUTE_PICK and now > self._arm_goal_deadline:
            self._push_event("EV_ARM_TIMEOUT")

        # 每个 tick 消费 1 个事件:保证状态转移过程可追踪、可复现
        event = self._pop_event()
        if event is None:
            return

        if self._state == State.WAIT_DETECTION:
            if event == "EV_DETECTION_OK":
                self._set_state(State.OPEN_GRIPPER, reason="detected_ok")
                self._call_gripper(close=False)
            elif event == "EV_DETECTION_TIMEOUT":
                self._set_state(State.RECOVERY, reason="detection_timeout")

        elif self._state == State.OPEN_GRIPPER:
            if event == "EV_GRIPPER_OK":
                self._set_state(State.EXECUTE_PICK, reason="gripper_ready")
                self._send_arm_goal()
            elif event == "EV_GRIPPER_FAIL":
                self._set_state(State.RECOVERY, reason="gripper_fail")

        elif self._state == State.EXECUTE_PICK:
            if event == "EV_ARM_RESULT_OK":
                self._set_state(State.CLOSE_GRIPPER, reason="pick_ok")
                self._call_gripper(close=True)
            elif event == "EV_ARM_TIMEOUT":
                self.get_logger().warn("arm timeout -> cancel")
                self._cancel_arm_goal()
                self._set_state(State.RECOVERY, reason="arm_timeout")
            elif event == "EV_ARM_RESULT_FAIL":
                self._set_state(State.RECOVERY, reason="arm_fail")

        elif self._state == State.CLOSE_GRIPPER:
            if event == "EV_GRIPPER_OK":
                self._set_state(State.DONE, reason="picked_and_gripped")
            elif event == "EV_GRIPPER_FAIL":
                self._set_state(State.RECOVERY, reason="gripper_fail_after_pick")

        elif self._state == State.DONE:
            self._ctx = Context()
            self._set_state(State.WAIT_DETECTION, reason="next_cycle")

        elif self._state == State.RECOVERY:
            if self._ctx.detect_retry < self._max_detect_retry:
                self._ctx.detect_retry += 1
                self.get_logger().warn(f"recovery: retry detection ({self._ctx.detect_retry}/{self._max_detect_retry})")
                self._set_state(State.WAIT_DETECTION, reason="retry_detection")
                return

            if self._ctx.pick_retry < self._max_pick_retry:
                self._ctx.pick_retry += 1
                self.get_logger().warn(f"recovery: retry pick ({self._ctx.pick_retry}/{self._max_pick_retry})")
                self._set_state(State.EXECUTE_PICK, reason="retry_pick")
                self._send_arm_goal()
                return

            self.get_logger().error("recovery exhausted -> safe stop")
            self._set_state(State.SAFE_STOP, reason="recovery_exhausted")

        elif self._state == State.SAFE_STOP:
            self.get_logger().error(f"SAFE_STOP: manual intervention required | task_id={self._ctx.task_id}")

def main() -> None:
    rclpy.init()
    node = SortingCoordinator()
    try:
        rclpy.spin(node)
    finally:
        node.destroy_node()
        rclpy.shutdown()

你必须能解释的关键点(按块自检):

4)假节点 1:发布检测(可注入“无检测”异常)

sorting_coordinator_fsm/fake_perception_node.py

"""
假检测节点:周期性发布 DetectedItemArray,用于触发协调节点从 WAIT_DETECTION 进入下一状态。

参数(ros-args -p ...):
- publish: 是否发布检测(false 可注入“无检测/检测失败”异常)
- period_sec: 发布周期
- score: 置信度(示例字段)
"""

import math
import rclpy
from rclpy.node import Node
from rclpy.qos import qos_profile_sensor_data

from builtin_interfaces.msg import Time
from geometry_msgs.msg import Pose

from ros2_comm_design_interfaces.msg import DetectedItem
from ros2_comm_design_interfaces.msg import DetectedItemArray

class FakePerception(Node):
    def __init__(self) -> None:
        super().__init__("fake_perception_node")
        self._pub = self.create_publisher(DetectedItemArray, "/sorting/perception/detections", qos_profile_sensor_data)

        self._publish = bool(self.declare_parameter("publish", True).value)
        self._period_sec = float(self.declare_parameter("period_sec", 0.2).value)
        self._score = float(self.declare_parameter("score", 0.9).value)

        self._t = 0.0
        self.create_timer(self._period_sec, self._tick)

    def _tick(self) -> None:
        if not self._publish:
            return

        msg = DetectedItemArray()
        msg.task_id = "task_demo"
        msg.frame_id = "camera_frame"
        msg.stamp = Time(sec=int(self.get_clock().now().nanoseconds // 1_000_000_000), nanosec=int(self.get_clock().now().nanoseconds % 1_000_000_000))

        # 最小示例:只放 1 个目标即可(items 长度 > 0 会触发协调节点投递 EV_DETECTION_OK)
        item = DetectedItem()
        item.task_id = msg.task_id
        item.class_id = 1
        item.score = self._score
        pose = Pose()
        pose.position.x = 0.3 + 0.02 * math.sin(self._t)
        pose.position.y = 0.1
        pose.position.z = 0.0
        pose.orientation.w = 1.0
        item.pose = pose

        msg.items.append(item)
        self._pub.publish(msg)
        self._t += 0.2

def main() -> None:
    rclpy.init()
    node = FakePerception()
    try:
        rclpy.spin(node)
    finally:
        node.destroy_node()
        rclpy.shutdown()

解释与注意:

5)假节点 2:夹爪服务(可注入“服务返回失败”异常)

sorting_coordinator_fsm/fake_gripper_server.py

"""
假夹爪服务端:提供 std_srvs/SetBool,便于验证协调节点的 Service 调用与异常兜底。

服务名:/sorting/gripper/set
参数:
- always_fail: true 时始终返回失败,用于注入“夹爪失败”异常
"""

import rclpy
from rclpy.node import Node
from std_srvs.srv import SetBool

class FakeGripper(Node):
    def __init__(self) -> None:
        super().__init__("fake_gripper_server")
        self._always_fail = bool(self.declare_parameter("always_fail", False).value)
        self.create_service(SetBool, "/sorting/gripper/set", self._handle)

    def _handle(self, request: SetBool.Request, response: SetBool.Response) -> SetBool.Response:
        if self._always_fail:
            response.success = False
            response.message = "injected failure"
            return response

        response.success = True
        # 约定:request.data=True 表示 close,False 表示 open(仅用于示例/自测)
        response.message = "closed" if request.data else "opened"
        return response

def main() -> None:
    rclpy.init()
    node = FakeGripper()
    try:
        rclpy.spin(node)
    finally:
        node.destroy_node()
        rclpy.shutdown()

解释:

6)假节点 3:机械臂 Action Server(可注入“超时/失败”异常)

sorting_coordinator_fsm/fake_arm_action_server.py

"""
假机械臂 Action Server:提供 ExecuteArmPath,用于注入“成功/失败/超时/取消”等场景。

参数:
- execute_time_sec: 模拟动作执行耗时;大于协调节点 pick_timeout_sec 会触发超时与 cancel
- always_fail: true 时动作结束返回失败(abort)
"""

import time
import rclpy
from rclpy.action import ActionServer
from rclpy.node import Node

from ros2_comm_design_interfaces.action import ExecuteArmPath

class FakeArmActionServer(Node):
    def __init__(self) -> None:
        super().__init__("fake_arm_action_server")
        self._execute_time_sec = float(self.declare_parameter("execute_time_sec", 1.0).value)
        self._always_fail = bool(self.declare_parameter("always_fail", False).value)
        self._server = ActionServer(self, ExecuteArmPath, "/sorting/arm/execute_path", self._execute)

    def _execute(self, goal_handle):
        goal = goal_handle.request
        self.get_logger().info(f"goal received: task_id={goal.task_id} timeout_sec={goal.timeout_sec}")

        start = time.time()
        step = 0
        while time.time() - start < self._execute_time_sec:
            # 如果协调节点因为超时触发 cancel,这里会收到取消请求并返回 canceled 结果
            if goal_handle.is_cancel_requested:
                goal_handle.canceled()
                result = ExecuteArmPath.Result()
                result.ok = False
                result.error_code = 2
                result.message = "canceled"
                result.final_state = 2
                return result

            # feedback 用于“进度可见”:终端里用 --feedback 可观察是否持续反馈
            feedback = ExecuteArmPath.Feedback()
            feedback.progress = min(0.99, (time.time() - start) / max(self._execute_time_sec, 1e-3))
            feedback.current_step = step
            feedback.safety_state = "OK"
            goal_handle.publish_feedback(feedback)
            step += 1
            time.sleep(0.1)

        result = ExecuteArmPath.Result()
        if self._always_fail:
            goal_handle.abort()
            result.ok = False
            result.error_code = 1
            result.message = "injected failure"
            result.final_state = 4
            return result

        goal_handle.succeed()
        result.ok = True
        result.error_code = 0
        result.message = "ok"
        result.final_state = 3
        return result

def main() -> None:
    rclpy.init()
    node = FakeArmActionServer()
    try:
        rclpy.spin(node)
    finally:
        node.destroy_node()
        rclpy.shutdown()

解释与注意:

7)编译与运行(四终端联调)

cd ~/ros2_ws
colcon build --symlink-install --packages-select sorting_coordinator_fsm
source install/setup.bash

逐行解释:

终端 A(夹爪服务):

ros2 run sorting_coordinator_fsm fake_gripper_server

终端 B(机械臂 Action Server):

ros2 run sorting_coordinator_fsm fake_arm_action_server --ros-args -p execute_time_sec:=1.0

终端 C(检测发布):

ros2 run sorting_coordinator_fsm fake_perception_node --ros-args -p publish:=true -p period_sec:=0.2

终端 D(协调节点):

ros2 run sorting_coordinator_fsm coordinator_node --ros-args -p detect_timeout_sec:=2.0 -p pick_timeout_sec:=4.0
ros2 topic echo /sorting/coordinator/state --once

解释:

做法:把假机械臂执行时间调大,让它超过 pick_timeout_sec

ros2 run sorting_coordinator_fsm fake_arm_action_server --ros-args -p execute_time_sec:=10.0

你应观察到的证据(至少二选一):

做法:停止终端 C 或设置 publish:=false

ros2 run sorting_coordinator_fsm fake_perception_node --ros-args -p publish:=false

你应观察到的证据:

1)让 AI 先出“状态机方案”(含流程图与说明)

可直接复制给 AI 的指令模板(教师发放):

你是 ROS2 Humble 工程与工业流程控制专家。请为“分拣流程协调节点”设计有限状态机(FSM)方案。

背景:分拣系统包含 perception(发布检测 Topic)、arm(提供执行 Action)、gripper(提供 SetBool Service)。
目标:协调节点编排一件物品的流程:等待检测→抓取→放置→下一件。
异常:1)检测失败(超时/无目标/置信度低);2)抓取超时(Action 无结果/无反馈/执行时间超限)。
要求:
1)输出状态机:列出所有状态、事件、转移(含守卫条件)、进入/退出动作。
2)输出 Mermaid 流程图(stateDiagram-v2 或 flowchart 均可)。
3)给出异常处理策略:重试次数上限、超时阈值、降级策略、进入 SAFE_STOP 的条件。
4)给出验收证据清单:每条异常至少 2 条可复现证据(CLI/日志/状态话题)。

校验点(人工审计,不通过就退回重写):

2)让 AI 生成“协调节点基础代码”时的约束(避免生成垃圾代码)

请生成 ROS2 Python 协调节点代码,必须满足:
1)状态切换只能在一个函数里发生(例如 tick),回调只投递事件;
2)必须实现 detection_timeout 与 arm_timeout 两种异常;
3)必须提供命令级验收方法(ros2 topic/service/action),并说明期望输出;
4)必须包含人工审计清单:至少列出 8 条(线程/回调、超时、重试上限、日志证据、接口字段对齐等)。

案例引入(教师讲解要点):

作业(布置)

1)提交分拣流程状态机设计图(手绘 / 工具生成均可),附核心状态与切换逻辑说明。
2)提交协调节点完整代码(含 AI 生成代码审计优化痕迹),重点标注异常处理逻辑。
3)提交协调节点运行测试截图(含正常调度、异常触发场景),撰写 200 字左右分析,说明异常处理机制的设计思路与效果。
4)提交 AI 交互记录(至少 1 次,用于设计状态机 / 生成协调节点代码),阐述对 AI 生成内容的验证与优化过程。

Markdown 与代码自检清单(提交前必过)