16 实践课-ROS2 端设备控制工具完整开发与标注数据驱动指令识别优化

ROS2 端设备控制工具完整开发与标注数据驱动指令识别优化

关联:索引

课程定位与分工(避免与已讲内容重复)

本讲是《人工智能综合实践》的设备控制与智能体协同课,定位为:ROS2 端如何以“可校验协议 + 可回执闭环”的方式对接工具/智能体,并在联调中具备工程化容错能力。

课程/ 更偏哪一端 本讲与它的关系
《机器人与智能系统开发技术》19:传感器仿真配置与 ROS2 数据交互实现 ROS 端与仿真/硬件模型交互(Gazebo/桥接/控制器/QoS/频率/延迟来源) 本讲不重复“仿真/硬件如何配置与跑通”。本讲把“设备驱动调用”抽象为 Driver 接口:你可以把 19 的执行链路封装到 Driver 内,但本讲重点是“指令解析→驱动调用→状态回执→可追踪联调”。
《机器人与智能系统综合实践》22:Web 端指令下发控制 ROS2 Web 端业务与交互(按钮/权限/回传匹配) 本讲不重复 Web 端 publish/页面封装。本讲把 Web 端视为“工具端的一种”,强调端侧要能承接指令并给出可解释回执,供 Web/智能体统一复用。
《机器人与智能系统综合实践》23:跨端数据格式统一与联调测试 跨端统一协议与测试闭环(Envelope + Payload、trace、测试矩阵) 本讲复用 23 的“trace_id/msg_id/ts_ms/错误码口径与联调证据链”,并给出端侧对 Envelope 的兼容解析思路,但不重复完整协议文档与 TS/FastAPI 校验实现。
《人工智能综合实践》15:rosbridge 配置与 ROS2 通信基础工具开发 ROS2 控制面接入(rosbridge/Topic 命名/最小闭环) 本讲承接 15 的话题名与字段口径(/sorting_arm/cmd/sorting_arm/statuscmd_id/last_cmd_id),在其上补齐“端侧执行与工程化容错 + 标注数据驱动识别优化”。

术语小抄(初学者版)

作业:布置(见文末)


1)控制指令与状态回执(最小闭环字段)

说明:本讲默认沿用上一讲的“统一指令骨架”。这里不重复展开字段意义,只固定端侧必须支持的最小字段回执口径,保证工具端/端侧联调不会“各说各话”。

控制指令(工具端 → ROS2 端,承载在 std_msgs/msg/String 的 JSON 字符串中):

{
  "cmd_id": "C-20260415-0001",
  "scene": "sorting",
  "device_type": "arm",
  "device_id": "arm_01",
  "action": "pick_place",
  "params": { "from": "bin_in_3", "to": "bin_out_glass", "speed": 0.5 },
  "safety": { "require_enable": true, "require_guard_closed": true },
  "meta": { "user": "stu01", "role": "operator", "source": "tool" },
  "ts_ms": 1710000000000
}

状态回执(ROS2 端 → 工具端):

{
  "device_type": "arm",
  "device_id": "arm_01",
  "state": "idle",
  "last_cmd_id": "C-20260415-0001",
  "ok": true,
  "code": "OK",
  "message": "pick_place finished",
  "detail": { "progress": 1.0 },
  "ts_ms": 1710000001234
}

2)推荐话题名(避免同学间不一致导致无法复现)

说明:

3)与 23 的跨端 Envelope 协议衔接(兼容解析,不改变话题承载)

本讲的控制链路承载仍是 std_msgs/msg/String 的 JSON 字符串(便于 rosbridge/Web/脚本/智能体快速接入)。但在“智能体协同”与“跨端联调”场景中,建议复用 23 的 trace_id/msg_id/schema_version/ts_ms 概念,用于跨端日志串联与去重。

端侧对 Envelope 的最小兼容规则(建议):

本课程里,“解析结构化指令”至少包含两层,不建议混成一层讲:

人话/语音/按钮
  └─ 工具端(Web/脚本/智能体)做解析层 A:意图识别 + 槽位抽取 + 生成 ArmCommand/Envelope
        ↓ 发送(/sorting_arm/cmd,String JSON)
ROS2 端(设备控制节点)做解析层 B:字段校验 + 安全门禁 + 去重幂等 + 驱动调用 + 回执
        ↑ 回传(/sorting_arm/status,String JSON)

常见反例(用来讲“为什么两边都要做”):


AI 工具使用:端侧代码框架 / 标注数据解析 / 联调排障 / 容错生成(学生可直接复制)

使用方法:把你的环境信息、代码片段、日志与需求粘贴到 {你的内容}。要求 AI 输出“步骤清单 + 可复制代码 + 校验点”。你必须做复验并记录证据。

模板目录:

模板 1:生成 ROS2 端设备控制节点完整框架

你是 ROS2 Python(rclpy)实践课助教。请给我一个“可运行”的设备控制节点示例,要求:
1)订阅 /sorting_arm/cmd(std_msgs/msg/String),消息内容是 JSON 字符串;
2)发布 /sorting_arm/status(std_msgs/msg/String),回执 JSON 字符串,必须带 last_cmd_id/ok/code/message/ts_ms;
3)实现指令校验:cmd_id/device_type/device_id/action/params/ts_ms 缺失要返回可解释错误;
4)实现去重:同一 cmd_id 重复到达不重复执行;
5)实现超时与状态机:idle/running/error/estop;
6)驱动层用一个 MockDriver 类模拟 home/pick_place/stop/e_stop;
7)不要阻塞 ROS2 回调:用队列+后台 worker 线程执行驱动调用;
8)给出 3 条命令行自测(ros2 topic pub/echo),每条写清预期现象。

我的环境与约束:{你的内容}

模板 2:生成标注指令数据解析脚本(规则/统计/评估)

你是数据标注与指令识别实践课助教。请给我一个 Python 脚本,用于解析“语音/文本指令标注数据”,并生成指令识别规则与评估报告,要求:
1)输入支持 JSONL 或 CSV,两种格式都给出示例;
2)每条样本至少包含 raw_text、canonical_action(例如 arm.home、arm.pick_place)、slots(可为空);
3)输出:①动作分布统计;②同义词建议表;③基于规则的识别器(exact/regex/fuzzy 三段式),并给出 top-1 准确率;
4)识别器要有容错:空格/标点/大小写/常见口头语(比如“帮我”“麻烦”);
5)不要依赖第三方库(只用 Python 标准库)。

我的数据样例与需求:{你的内容}

模板 3:联调延迟/丢指令排查(分层定位)

下面是我工具端与 ROS2 端联调的现象:{你的现象}
我能提供的证据:rosbridge 启动日志、ros2 topic info/echo/hz 输出、工具端发送/接收日志、网络信息(IP/端口)。

请你按“分层定位”输出最多 10 步排查:
1)网络层(端口/防火墙/连通性);
2)rosbridge 层(订阅/发布是否成功、topic 名/消息类型、队列拥塞);
3)ROS2 层(节点是否启动、topic 是否存在、QoS 是否匹配、回调是否阻塞);
每一步都要写清:我该执行什么命令/看什么日志、预期结果是什么、下一步怎么走。
最后给 3 条“预防清单”(比如超时、去重、日志字段、节流)。

模板 4:容错处理代码生成(安全优先)

我有一段指令识别/解析代码:{你的代码}
请你帮我做“安全优先”的容错增强,要求:
1)未知指令宁可拒绝也不要猜错(给出可解释错误与澄清建议);
2)对紧急停止类关键词(急停/停止/停下/stop/e-stop)优先级最高;
3)对常见错字/缺参(比如“回零”写成“回灵”、缺少 to/from)给出修复策略;
4)加入单元自测用例(至少 8 条输入 → 预期 action/拒绝原因),不要依赖第三方库。
输出:改造后的代码 + 自测用例 + 验证要点。

  1. 场景提问:为什么“工具端发出一条 pick_place 指令”并不代表“设备一定执行了”?你至少要拿到哪些证据才能确认闭环成立?
工具端(脚本/Web/智能体)
  └─ 生成结构化指令(含 cmd_id)并下发
        ↓(Topic: /sorting_arm/cmd,String JSON)
ROS2 端设备控制节点(本课时实现)
  ├─ 解析与校验(缺字段/错字段立即回执错误)
  ├─ 去重(cmd_id 重复只回执不重复执行)
  ├─ 调用驱动(MockDriver/真实驱动)
  └─ 发布状态回执(Topic: /sorting_arm/status,String JSON)
        ↑(工具端订阅状态,形成闭环)

为什么必须拆成“解析/驱动/回执”三层:

说明:

1)端侧代码(rclpy,订阅指令/发布回执/去重/状态机)

import json
import queue
import threading
import time
from dataclasses import dataclass
from typing import Any

import rclpy
from rclpy.node import Node
from std_msgs.msg import String

def now_ms() -> int:
    return int(time.time() * 1000)

@dataclass(frozen=True)
class DeviceCommand:
    cmd_id: str
    scene: str
    device_type: str
    device_id: str
    action: str
    params: dict[str, Any]
    safety: dict[str, Any]
    meta: dict[str, Any]
    ts_ms: int
    trace_id: str | None
    msg_id: str | None
    event: str | None
    topic: str | None

class MockSortingArmDriver:
    def __init__(self) -> None:
        self._estop = False

    def e_stop(self) -> None:
        self._estop = True

    def reset_estop(self) -> None:
        self._estop = False

    def home(self) -> None:
        self._ensure_not_estop()
        time.sleep(0.3)

    def pick_place(self, from_bin: str, to_bin: str, speed: float) -> None:
        self._ensure_not_estop()
        if speed <= 0 or speed > 1.0:
            raise ValueError("speed must be in (0, 1.0]")
        time.sleep(0.6)

    def stop(self) -> None:
        time.sleep(0.05)

    def _ensure_not_estop(self) -> None:
        if self._estop:
            raise RuntimeError("device is in estop")

class SortingArmControlNode(Node):
    def __init__(self) -> None:
        super().__init__("sorting_arm_control_node")

        self._driver = MockSortingArmDriver()
        self._enabled = True
        self._guard_closed = True

        self._state = "idle"
        self._handled_cmd_ids: set[str] = set()
        self._cmd_queue: queue.Queue[DeviceCommand] = queue.Queue(maxsize=50)

        self._cmd_sub = self.create_subscription(String, "/sorting_arm/cmd", self._on_cmd, 10)
        self._status_pub = self.create_publisher(String, "/sorting_arm/status", 10)

        self._worker = threading.Thread(target=self._worker_loop, daemon=True)
        self._worker.start()

        self.get_logger().info("sorting_arm_control_node started")

    def _publish_status(
        self,
        last_cmd_id: str,
        ok: bool,
        code: str,
        message: str,
        state: str | None = None,
        detail: dict[str, Any] | None = None,
        trace_id: str | None = None,
        msg_id: str | None = None,
        event: str | None = None,
    ) -> None:
        payload = {
            "device_type": "arm",
            "device_id": "arm_01",
            "state": state or self._state,
            "last_cmd_id": last_cmd_id,
            "ok": ok,
            "code": code,
            "message": message,
            "detail": detail or {},
            "ts_ms": now_ms(),
        }
        if trace_id is not None:
            payload["trace_id"] = trace_id
        if msg_id is not None:
            payload["msg_id"] = msg_id
        if event is not None:
            payload["event"] = event
        msg = String()
        msg.data = json.dumps(payload, ensure_ascii=False)
        self._status_pub.publish(msg)

    def _on_cmd(self, msg: String) -> None:
        parse_ok, result = self._parse_and_validate(msg.data)
        if not parse_ok:
            last_cmd_id = result.get("cmd_id") or "UNKNOWN"
            self._publish_status(
                last_cmd_id,
                False,
                result["code"],
                result["message"],
                state="error",
                trace_id=result.get("trace_id"),
                msg_id=result.get("msg_id"),
                event=result.get("event"),
            )
            return

        cmd: DeviceCommand = result["cmd"]
        if cmd.cmd_id in self._handled_cmd_ids:
            self._publish_status(
                cmd.cmd_id,
                True,
                "DUPLICATE",
                "cmd_id already handled; ignored",
                trace_id=cmd.trace_id,
                msg_id=cmd.msg_id,
                event=cmd.event,
            )
            return

        if cmd.action in {"e_stop", "estop"}:
            self._handled_cmd_ids.add(cmd.cmd_id)
            self._state = "estop"
            self._driver.e_stop()
            self._drain_queue()
            self._publish_status(
                cmd.cmd_id,
                True,
                "ESTOP",
                "emergency stop triggered",
                state="estop",
                trace_id=cmd.trace_id,
                msg_id=cmd.msg_id,
                event=cmd.event,
            )
            return

        if not self._check_safety(cmd):
            self._handled_cmd_ids.add(cmd.cmd_id)
            self._state = "error"
            self._publish_status(
                cmd.cmd_id,
                False,
                "SAFETY_BLOCK",
                "safety precondition not satisfied",
                state="error",
                trace_id=cmd.trace_id,
                msg_id=cmd.msg_id,
                event=cmd.event,
            )
            return

        try:
            self._cmd_queue.put_nowait(cmd)
        except queue.Full:
            self._publish_status(
                cmd.cmd_id,
                False,
                "QUEUE_FULL",
                "device busy; command queue is full",
                state="error",
                trace_id=cmd.trace_id,
                msg_id=cmd.msg_id,
                event=cmd.event,
            )
            return

        self._handled_cmd_ids.add(cmd.cmd_id)
        self._state = "running"
        self._publish_status(
            cmd.cmd_id,
            True,
            "ACCEPTED",
            "command accepted",
            state="running",
            trace_id=cmd.trace_id,
            msg_id=cmd.msg_id,
            event=cmd.event,
        )

    def _drain_queue(self) -> None:
        while True:
            try:
                _ = self._cmd_queue.get_nowait()
            except queue.Empty:
                break

    def _check_safety(self, cmd: DeviceCommand) -> bool:
        safety = cmd.safety or {}
        require_enable = bool(safety.get("require_enable", False))
        require_guard_closed = bool(safety.get("require_guard_closed", False))
        if require_enable and not self._enabled:
            return False
        if require_guard_closed and not self._guard_closed:
            return False
        return True

    def _worker_loop(self) -> None:
        while rclpy.ok():
            try:
                cmd = self._cmd_queue.get(timeout=0.2)
            except queue.Empty:
                if self._state == "running":
                    self._state = "idle"
                continue

            try:
                self._execute(cmd)
            except Exception as e:
                self._state = "error"
                self._publish_status(
                    cmd.cmd_id,
                    False,
                    "EXEC_ERROR",
                    str(e),
                    state="error",
                    trace_id=cmd.trace_id,
                    msg_id=cmd.msg_id,
                    event=cmd.event,
                )
            else:
                self._state = "idle"
                self._publish_status(
                    cmd.cmd_id,
                    True,
                    "OK",
                    f"{cmd.action} finished",
                    state="idle",
                    trace_id=cmd.trace_id,
                    msg_id=cmd.msg_id,
                    event=cmd.event,
                )

    def _execute(self, cmd: DeviceCommand) -> None:
        if cmd.action == "home":
            self._driver.home()
            return

        if cmd.action == "pick_place":
            from_bin = str(cmd.params.get("from", ""))
            to_bin = str(cmd.params.get("to", ""))
            speed = float(cmd.params.get("speed", 0.5))
            if not from_bin or not to_bin:
                raise ValueError("pick_place requires params.from and params.to")
            self._driver.pick_place(from_bin, to_bin, speed)
            return

        if cmd.action == "stop":
            self._driver.stop()
            return

        raise ValueError(f"unknown action: {cmd.action}")

    def _parse_and_validate(self, raw: str) -> tuple[bool, dict[str, Any]]:
        try:
            obj = json.loads(raw)
        except Exception:
            return False, {"cmd_id": "UNKNOWN", "code": "BAD_JSON", "message": "cmd must be a JSON string"}

        trace_id = None
        msg_id = None
        event = None
        topic = None

        data: dict[str, Any] | None
        if isinstance(obj, dict) and "schema_version" in obj and "payload" in obj:
            trace_id = str(obj.get("trace_id")) if obj.get("trace_id") is not None else None
            msg_id = str(obj.get("msg_id")) if obj.get("msg_id") is not None else None
            event = str(obj.get("event")) if obj.get("event") is not None else None
            topic = str(obj.get("topic")) if obj.get("topic") is not None else None

            payload = obj.get("payload")
            if not isinstance(payload, dict):
                return False, {
                    "cmd_id": msg_id or "UNKNOWN",
                    "trace_id": trace_id,
                    "msg_id": msg_id,
                    "event": event,
                    "code": "BAD_ENVELOPE",
                    "message": "envelope.payload must be an object",
                }
            data = payload
        elif isinstance(obj, dict):
            data = obj
        else:
            return False, {"cmd_id": "UNKNOWN", "code": "BAD_BODY", "message": "cmd must be a JSON object or Envelope"}

        def infer_device_type(in_topic: str | None, in_event: str | None) -> str | None:
            if in_event and in_event.startswith("arm."):
                return "arm"
            if in_topic and "/sorting_arm/" in in_topic:
                return "arm"
            return None

        cmd_id = data.get("cmd_id") or msg_id
        cmd_id_str = str(cmd_id).strip() if cmd_id is not None else ""
        if not cmd_id_str:
            return False, {
                "cmd_id": "UNKNOWN",
                "trace_id": trace_id,
                "msg_id": msg_id,
                "event": event,
                "code": "MISSING_FIELD",
                "message": "missing field: cmd_id (or envelope.msg_id)",
            }

        scene = data.get("scene") or "sorting"
        device_type = data.get("device_type") or infer_device_type(topic, event)
        device_type_str = str(device_type).strip() if device_type is not None else ""
        device_id = data.get("device_id")
        device_id_str = str(device_id).strip() if device_id is not None else ""
        action = data.get("action")
        action_str = str(action).strip() if action is not None else ""
        params = data.get("params")
        ts_ms = data.get("ts_ms") if data.get("ts_ms") is not None else obj.get("ts_ms") if isinstance(obj, dict) else None

        missing: list[str] = []
        if not device_id_str:
            missing.append("device_id")
        if not action_str:
            missing.append("action")
        if params is None:
            missing.append("params")
        if ts_ms is None:
            missing.append("ts_ms")
        if missing:
            return False, {
                "cmd_id": cmd_id_str,
                "trace_id": trace_id,
                "msg_id": msg_id,
                "event": event,
                "code": "MISSING_FIELD",
                "message": f"missing fields: {missing}",
            }

        if not device_type_str:
            return False, {
                "cmd_id": cmd_id_str,
                "trace_id": trace_id,
                "msg_id": msg_id,
                "event": event,
                "code": "MISSING_FIELD",
                "message": "missing field: device_type (or cannot infer from envelope.event/topic)",
            }

        if not isinstance(params, dict):
            return False, {
                "cmd_id": cmd_id_str,
                "trace_id": trace_id,
                "msg_id": msg_id,
                "event": event,
                "code": "BAD_PARAMS",
                "message": "params must be an object",
            }

        try:
            ts_ms_int = int(ts_ms)
        except Exception:
            return False, {
                "cmd_id": cmd_id_str,
                "trace_id": trace_id,
                "msg_id": msg_id,
                "event": event,
                "code": "BAD_FIELD_TYPE",
                "message": "ts_ms must be integer milliseconds",
            }

        cmd = DeviceCommand(
            cmd_id=cmd_id_str,
            scene=str(scene),
            device_type=device_type_str,
            device_id=device_id_str,
            action=action_str,
            params=dict(params),
            safety=dict(data.get("safety") or {}),
            meta=dict(data.get("meta") or {}),
            ts_ms=ts_ms_int,
            trace_id=trace_id,
            msg_id=msg_id,
            event=event,
            topic=topic,
        )
        return True, {"cmd": cmd}

def main() -> None:
    rclpy.init()
    node = SortingArmControlNode()
    try:
        rclpy.spin(node)
    finally:
        node.destroy_node()
        rclpy.shutdown()

if __name__ == "__main__":
    main()

2)命令行自测(最短闭环证据)

cd 10_ai_ros2_device_control_lab/ros2_ws
colcon build --symlink-install
source install/setup.bash
ros2 run sorting_arm_control sorting_arm_control_node

解释:

终端 B:订阅状态回执:

ros2 topic echo /sorting_arm/status

解释:

终端 C:下发一条 home 指令(注意:这里发布的是 std_msgs/msg/String,其中 data 是 JSON 字符串):

ros2 topic pub --once /sorting_arm/cmd std_msgs/msg/String "{data: '{\"cmd_id\":\"C-20260415-0001\",\"scene\":\"sorting\",\"device_type\":\"arm\",\"device_id\":\"arm_01\",\"action\":\"home\",\"params\":{},\"safety\":{\"require_enable\":true,\"require_guard_closed\":true},\"meta\":{\"user\":\"stu01\",\"role\":\"operator\"},\"ts_ms\":1710000000000}'}"

解释:

常见错与修复:

练习 1(必做):制造一条“缺字段”的指令,让端侧回执 MISSING_FIELD,并能指出缺了哪个字段。

练习 2(选做):下发两次相同 cmd_idhome,验证端侧只执行一次并回执 DUPLICATE

目标:

  1. _execute 中把 pick_place 的参数校验写完整(缺参必须拒绝,给出可解释错误)。
  2. 在执行成功时,把 detail 加上关键字段(例如 {"from": "...", "to": "...", "speed": 0.5})。

  1. 给定一批“语音/文本指令标注数据”,写脚本解析并生成识别规则与评估报告。
  2. 在工具端加入“指令匹配 + 容错处理”:让识别率提升、拒绝更可解释、对急停更敏感。
  3. 与 ROS2 端联调:定位并解决至少 1 个问题(通信延迟/丢指令/重复执行),保留证据链。

1)为什么要用标注数据?

2)推荐数据格式(JSONL 与 CSV 二选一)

JSONL(每行一个 JSON 对象,适合追加与版本管理):

{"raw_text":"把机械臂回零","canonical_action":"arm.home","slots":{}}
{"raw_text":"从 3 号进料箱抓取,放到玻璃出料箱","canonical_action":"arm.pick_place","slots":{"from":"bin_in_3","to":"bin_out_glass","speed":0.5}}
{"raw_text":"停下","canonical_action":"arm.stop","slots":{}}
{"raw_text":"急停!","canonical_action":"arm.e_stop","slots":{}}

CSV(适合用表格工具编辑,但要注意引号与逗号转义):

raw_text,canonical_action,slots_json
把机械臂回零,arm.home,"{}"
从 3 号进料箱抓取,放到玻璃出料箱,arm.pick_place,"{""from"":""bin_in_3"",""to"":""bin_out_glass"",""speed"":0.5}"
停下,arm.stop,"{}"
急停!,arm.e_stop,"{}"

最小可复现命令(输出动作分布/同义表达建议/评估报告):

cd 10_ai_ros2_device_control_lab
py scripts/parse_command_annotations.py --input data/command_annotations_example.jsonl --format jsonl

解释:

目标:

import csv
import difflib
import json
import re
from collections import Counter, defaultdict
from dataclasses import dataclass
from pathlib import Path
from typing import Any

FILLER_WORDS = ["帮我", "麻烦", "请", "一下", "立刻", "马上", "给我"]

def normalize_text(text: str) -> str:
    t = text.strip().lower()
    for w in FILLER_WORDS:
        t = t.replace(w, "")
    t = re.sub(r"\s+", "", t)
    t = re.sub(r"[,。!?,.!?;;::]", "", t)
    return t

def load_jsonl(path: Path) -> list[dict[str, Any]]:
    items: list[dict[str, Any]] = []
    for line in path.read_text(encoding="utf-8").splitlines():
        line = line.strip()
        if not line:
            continue
        items.append(json.loads(line))
    return items

def load_csv(path: Path) -> list[dict[str, Any]]:
    items: list[dict[str, Any]] = []
    with path.open("r", encoding="utf-8", newline="") as f:
        reader = csv.DictReader(f)
        for row in reader:
            slots = json.loads(row.get("slots_json") or "{}")
            items.append(
                {
                    "raw_text": row["raw_text"],
                    "canonical_action": row["canonical_action"],
                    "slots": slots,
                }
            )
    return items

@dataclass(frozen=True)
class Rule:
    canonical_action: str
    mode: str
    pattern: str

def build_synonym_suggestions(samples: list[dict[str, Any]], limit_per_action: int = 5) -> dict[str, list[str]]:
    action_phrases: dict[str, Counter[str]] = defaultdict(Counter)
    for s in samples:
        raw = str(s["raw_text"]).strip()
        canonical = str(s["canonical_action"])
        if raw:
            action_phrases[canonical][raw] += 1
    return {k: [p for p, _ in c.most_common(limit_per_action)] for k, c in action_phrases.items()}

def build_rules(samples: list[dict[str, Any]]) -> tuple[dict[str, str], list[Rule]]:
    exact_map: dict[str, str] = {}
    regex_rules: list[Rule] = []

    for s in samples:
        raw = str(s["raw_text"])
        canonical = str(s["canonical_action"])
        norm = normalize_text(raw)
        if norm and norm not in exact_map:
            exact_map[norm] = canonical

    regex_rules.extend(
        [
            Rule("arm.e_stop", "regex", r"(急停|紧急停止|estop|e-stop|stopnow)"),
            Rule("arm.stop", "regex", r"(停下|停止|别动|暂停)"),
            Rule("arm.home", "regex", r"(回零|回原点|归零|home)"),
        ]
    )
    return exact_map, regex_rules

def recognize_action(text: str, exact_map: dict[str, str], regex_rules: list[Rule]) -> tuple[str | None, str]:
    norm = normalize_text(text)
    if norm in exact_map:
        return exact_map[norm], "exact"

    for rule in regex_rules:
        if re.search(rule.pattern, norm):
            return rule.canonical_action, "regex"

    best_action = None
    best_score = 0.0
    for k, action in exact_map.items():
        score = difflib.SequenceMatcher(a=norm, b=k).ratio()
        if score > best_score:
            best_action = action
            best_score = score

    if best_action is not None and best_score >= 0.82:
        return best_action, f"fuzzy:{best_score:.2f}"

    return None, "reject"

def evaluate(samples: list[dict[str, Any]], exact_map: dict[str, str], regex_rules: list[Rule]) -> dict[str, Any]:
    total = len(samples)
    hit = 0
    mode_counter: Counter[str] = Counter()
    errors: list[dict[str, Any]] = []

    for s in samples:
        raw = str(s["raw_text"])
        gt = str(s["canonical_action"])
        pred, mode = recognize_action(raw, exact_map, regex_rules)
        mode_counter[mode] += 1
        if pred == gt:
            hit += 1
        else:
            errors.append({"raw_text": raw, "gt": gt, "pred": pred, "mode": mode})

    return {
        "total": total,
        "hit": hit,
        "acc": (hit / total) if total else 0.0,
        "mode_counter": dict(mode_counter),
        "errors_top10": errors[:10],
    }

def main() -> None:
    data_path = Path("command_annotations.jsonl")
    samples = load_jsonl(data_path) if data_path.suffix == ".jsonl" else load_csv(data_path)

    action_dist = Counter(str(s["canonical_action"]) for s in samples)
    print("action_dist:", dict(action_dist))

    synonym_suggestions = build_synonym_suggestions(samples)
    print("synonym_suggestions:", json.dumps(synonym_suggestions, ensure_ascii=False, indent=2))

    exact_map, regex_rules = build_rules(samples)
    report = evaluate(samples, exact_map, regex_rules)
    print("report:", json.dumps(report, ensure_ascii=False, indent=2))

if __name__ == "__main__":
    main()

逐段解释(你要能用它做“前后对比”):

注意(安全优先口径):

1)联调最小闭环:只测“结构化指令 → 回执关联”

你要验证的不是“设备动作多复杂”,而是这四件事:

2)通信延迟排查:从“证据链”而不是“感觉”出发

建议的分层定位顺序(先快后慢):

  1. 网络层:端口是否通、是否有防火墙拦截(先保证“连得上”)。
  2. rosbridge 层:订阅/发布是否成功,topic 名是否一致(再保证“桥是通的”)。
  3. ROS2 层:节点是否阻塞回调、QoS 是否匹配(最后看“端侧是否堵住了”)。

ROS2 侧速查命令(常用 3 条):

ros2 topic list
ros2 topic info /sorting_arm/cmd
ros2 topic hz /sorting_arm/status

解释:

3)指令丢失与重复执行:工程上更常见的“隐蔽坑”

目标:

  1. 用识别器把 raw_text 识别成 canonical_action,并把 slots 合并到 params
  2. 生成 cmd_id(建议:日期 + 序号,或用 UUID,但要能追踪)。
  3. 发送到 /sorting_arm/cmd,等待 /sorting_arm/statuslast_cmd_id 匹配。

配套项目的最小串联演示(10_ai_ros2_device_control_lab/ 口径):

cd 10_ai_ros2_device_control_lab
py -m pip install -r requirements.txt

py scripts/recognize_and_send.py --ws-url ws://localhost:9090 --text "把机械臂回零"

解释:


课后作业(布置)

作业 1(必做):

作业 2(必做):

作业 3(必做):


课程思政融入点(统一口径)


Markdown 与代码自检清单(提交前自己勾一遍)