16 实践课-ROS2 端设备控制工具完整开发与标注数据驱动指令识别优化
ROS2 端设备控制工具完整开发与标注数据驱动指令识别优化
关联:索引
- 先修:已完成《人工智能综合实践》15 的 rosbridge 连通性与控制面最小闭环(能发
/sorting_arm/cmd,能收/sorting_arm/status)。 - 协议:了解《机器人与智能系统综合实践》23 的 Envelope/trace 思路即可;本讲采用“兼容解析”,不要求你先把全链路都改成 Envelope。
- 运行环境(建议):ROS2 Humble(或同级长期支持版本)+ Python 3.10+(本讲示例使用
str | None类型写法);若你使用更低版本 Python,请把类型写法改成Optional[str]。 - 工具端脚本:标注数据解析脚本仅依赖 Python 标准库,Windows/macOS/Linux 均可运行。
课程定位与分工(避免与已讲内容重复)
本讲是《人工智能综合实践》的设备控制与智能体协同课,定位为:ROS2 端如何以“可校验协议 + 可回执闭环”的方式对接工具/智能体,并在联调中具备工程化容错能力。
| 课程/ | 更偏哪一端 | 本讲与它的关系 |
|---|---|---|
| 《机器人与智能系统开发技术》19:传感器仿真配置与 ROS2 数据交互实现 | ROS 端与仿真/硬件模型交互(Gazebo/桥接/控制器/QoS/频率/延迟来源) | 本讲不重复“仿真/硬件如何配置与跑通”。本讲把“设备驱动调用”抽象为 Driver 接口:你可以把 19 的执行链路封装到 Driver 内,但本讲重点是“指令解析→驱动调用→状态回执→可追踪联调”。 |
| 《机器人与智能系统综合实践》22:Web 端指令下发控制 ROS2 | Web 端业务与交互(按钮/权限/回传匹配) | 本讲不重复 Web 端 publish/页面封装。本讲把 Web 端视为“工具端的一种”,强调端侧要能承接指令并给出可解释回执,供 Web/智能体统一复用。 |
| 《机器人与智能系统综合实践》23:跨端数据格式统一与联调测试 | 跨端统一协议与测试闭环(Envelope + Payload、trace、测试矩阵) | 本讲复用 23 的“trace_id/msg_id/ts_ms/错误码口径与联调证据链”,并给出端侧对 Envelope 的兼容解析思路,但不重复完整协议文档与 TS/FastAPI 校验实现。 |
| 《人工智能综合实践》15:rosbridge 配置与 ROS2 通信基础工具开发 | ROS2 控制面接入(rosbridge/Topic 命名/最小闭环) | 本讲承接 15 的话题名与字段口径(/sorting_arm/cmd、/sorting_arm/status、cmd_id/last_cmd_id),在其上补齐“端侧执行与工程化容错 + 标注数据驱动识别优化”。 |
术语小抄(初学者版):
-
端侧(ROS2 端):贴近设备与驱动的一侧,负责最终执行与安全。
-
工具端(Tool 端):Web/脚本/智能体一侧,负责把用户意图变成结构化指令并下发。
-
指令解析:把字符串/JSON/自然语言转成结构化字段,并校验合法性。
-
设备驱动调用:把结构化动作映射到具体驱动 API(真实设备或模拟设备)。
-
状态回执:设备端对执行结果的结构化反馈,必须能关联
cmd_id。 -
去重(Dedup):同一条指令重复到达时不应重复执行(联调常见坑)。
-
证据链:日志/截图/命令输出能证明“哪里通、哪里不通、修复后如何复验”。
-
15 的核心是“控制面最小闭环接入”:完成 rosbridge 基础通信工具(发指令/收回执)与统一指令骨架口径。
-
本讲新增与强化:把“ROS2 端设备执行”补齐为完整工具链:端侧实现指令解析与驱动调用;工具端用标注数据提升“指令识别”稳定性;联调阶段解决延迟/丢指令/重复执行问题。
-
本讲不重复:rosbridge 的安装部署细节、Web 前端订阅展示工程(这些在相关中已有)。
作业:布置(见文末)
1)控制指令与状态回执(最小闭环字段)
说明:本讲默认沿用上一讲的“统一指令骨架”。这里不重复展开字段意义,只固定端侧必须支持的最小字段与回执口径,保证工具端/端侧联调不会“各说各话”。
控制指令(工具端 → ROS2 端,承载在 std_msgs/msg/String 的 JSON 字符串中):
{
"cmd_id": "C-20260415-0001",
"scene": "sorting",
"device_type": "arm",
"device_id": "arm_01",
"action": "pick_place",
"params": { "from": "bin_in_3", "to": "bin_out_glass", "speed": 0.5 },
"safety": { "require_enable": true, "require_guard_closed": true },
"meta": { "user": "stu01", "role": "operator", "source": "tool" },
"ts_ms": 1710000000000
}
状态回执(ROS2 端 → 工具端):
{
"device_type": "arm",
"device_id": "arm_01",
"state": "idle",
"last_cmd_id": "C-20260415-0001",
"ok": true,
"code": "OK",
"message": "pick_place finished",
"detail": { "progress": 1.0 },
"ts_ms": 1710000001234
}
- 端侧收到指令后,任何情况都必须发布一次状态回执(成功/失败均可)。
- 回执必须携带
last_cmd_id,并与输入的cmd_id对得上。 - 同一
cmd_id重复到达时,端侧必须做到:要么拒绝并说明原因,要么只回执不重复执行。
2)推荐话题名(避免同学间不一致导致无法复现)
- 控制指令 Topic:
/sorting_arm/cmd - 状态回执 Topic:
/sorting_arm/status
说明:
- 话题名与字段名请统一复用,避免“同一条链路出现两套协议”。
3)与 23 的跨端 Envelope 协议衔接(兼容解析,不改变话题承载)
本讲的控制链路承载仍是 std_msgs/msg/String 的 JSON 字符串(便于 rosbridge/Web/脚本/智能体快速接入)。但在“智能体协同”与“跨端联调”场景中,建议复用 23 的 trace_id/msg_id/schema_version/ts_ms 概念,用于跨端日志串联与去重。
- 应用层控制协议仍是 ArmCommand/ArmStatus(含
cmd_id/last_cmd_id),用于描述“要设备做什么/做得如何”。 - 若上游采用 Envelope(23):端侧先做最小 Envelope 校验,再把
payload转换为 ArmCommand;同时把trace_id/msg_id/event/topic带进回执,便于全链路追踪。
端侧对 Envelope 的最小兼容规则(建议):
- 优先:
payload直接携带完整 ArmCommand(包含cmd_id/device_type/device_id/action/params/ts_ms),端侧不需要猜测字段。
本课程里,“解析结构化指令”至少包含两层,不建议混成一层讲:
- 解析层 A(意图解析):从人话/语音/按钮意图 → 标准动作与槽位(
canonical_action + slots) - 解析层 B(执行解析):从结构化 JSON → 可执行动作(字段校验/路由/驱动调用/回执)
人话/语音/按钮
└─ 工具端(Web/脚本/智能体)做解析层 A:意图识别 + 槽位抽取 + 生成 ArmCommand/Envelope
↓ 发送(/sorting_arm/cmd,String JSON)
ROS2 端(设备控制节点)做解析层 B:字段校验 + 安全门禁 + 去重幂等 + 驱动调用 + 回执
↑ 回传(/sorting_arm/status,String JSON)
- 工具端(Web/智能体侧)必须做到:
- 用标注数据把“常见说法 → 标准动作/参数”做稳定(减少端侧拒绝率)
- 不确定时宁可拒绝/澄清,不要把猜测结果直接下发设备
- 生成可追踪字段(至少
cmd_id;采用 Envelope 时带trace_id/msg_id) - ROS2 端(端侧)必须做到:
- 永远不信任上游:即使工具端已校验,端侧仍要二次校验(类型/值域/缺字段)
- 安全优先:安全前置条件不满足必须拒绝并回执原因
- 幂等与去重:重复
cmd_id不重复执行(允许只回执或明确拒绝) - 回执可解释:
ok/code/message/last_cmd_id必备;若上游有trace_id/msg_id需尽量回传
常见反例(用来讲“为什么两边都要做”):
- 只在工具端做解析、端侧不校验:工具端一旦误判(如“停下”被识别成 “回零”)就可能误控设备,且难追责。
- 只在端侧做解析、工具端不做意图识别:端侧要处理大量人话噪声与歧义,规则难维护,且无法利用标注数据快速迭代。
- 两边都做但口径不一致:工具端叫
success,端侧叫ok,字段漂移导致联调成本暴涨(23 要解决的就是这一类问题)。
AI 工具使用:端侧代码框架 / 标注数据解析 / 联调排障 / 容错生成(学生可直接复制)
使用方法:把你的环境信息、代码片段、日志与需求粘贴到
{你的内容}。要求 AI 输出“步骤清单 + 可复制代码 + 校验点”。你必须做复验并记录证据。
模板目录:
- 模板 1:生成 ROS2 端设备控制节点完整框架(解析/驱动/回执/去重/超时)
- 模板 2:生成标注指令数据解析脚本(JSONL/CSV → 规则/统计/评估)
- 模板 3:联调延迟/丢指令排查(网络/rosbridge/ROS2/QoS 分层)
- 模板 4:生成容错处理代码(匹配/阈值/澄清/安全优先)
模板 1:生成 ROS2 端设备控制节点完整框架
你是 ROS2 Python(rclpy)实践课助教。请给我一个“可运行”的设备控制节点示例,要求:
1)订阅 /sorting_arm/cmd(std_msgs/msg/String),消息内容是 JSON 字符串;
2)发布 /sorting_arm/status(std_msgs/msg/String),回执 JSON 字符串,必须带 last_cmd_id/ok/code/message/ts_ms;
3)实现指令校验:cmd_id/device_type/device_id/action/params/ts_ms 缺失要返回可解释错误;
4)实现去重:同一 cmd_id 重复到达不重复执行;
5)实现超时与状态机:idle/running/error/estop;
6)驱动层用一个 MockDriver 类模拟 home/pick_place/stop/e_stop;
7)不要阻塞 ROS2 回调:用队列+后台 worker 线程执行驱动调用;
8)给出 3 条命令行自测(ros2 topic pub/echo),每条写清预期现象。
我的环境与约束:{你的内容}
模板 2:生成标注指令数据解析脚本(规则/统计/评估)
你是数据标注与指令识别实践课助教。请给我一个 Python 脚本,用于解析“语音/文本指令标注数据”,并生成指令识别规则与评估报告,要求:
1)输入支持 JSONL 或 CSV,两种格式都给出示例;
2)每条样本至少包含 raw_text、canonical_action(例如 arm.home、arm.pick_place)、slots(可为空);
3)输出:①动作分布统计;②同义词建议表;③基于规则的识别器(exact/regex/fuzzy 三段式),并给出 top-1 准确率;
4)识别器要有容错:空格/标点/大小写/常见口头语(比如“帮我”“麻烦”);
5)不要依赖第三方库(只用 Python 标准库)。
我的数据样例与需求:{你的内容}
模板 3:联调延迟/丢指令排查(分层定位)
下面是我工具端与 ROS2 端联调的现象:{你的现象}
我能提供的证据:rosbridge 启动日志、ros2 topic info/echo/hz 输出、工具端发送/接收日志、网络信息(IP/端口)。
请你按“分层定位”输出最多 10 步排查:
1)网络层(端口/防火墙/连通性);
2)rosbridge 层(订阅/发布是否成功、topic 名/消息类型、队列拥塞);
3)ROS2 层(节点是否启动、topic 是否存在、QoS 是否匹配、回调是否阻塞);
每一步都要写清:我该执行什么命令/看什么日志、预期结果是什么、下一步怎么走。
最后给 3 条“预防清单”(比如超时、去重、日志字段、节流)。
模板 4:容错处理代码生成(安全优先)
我有一段指令识别/解析代码:{你的代码}
请你帮我做“安全优先”的容错增强,要求:
1)未知指令宁可拒绝也不要猜错(给出可解释错误与澄清建议);
2)对紧急停止类关键词(急停/停止/停下/stop/e-stop)优先级最高;
3)对常见错字/缺参(比如“回零”写成“回灵”、缺少 to/from)给出修复策略;
4)加入单元自测用例(至少 8 条输入 → 预期 action/拒绝原因),不要依赖第三方库。
输出:改造后的代码 + 自测用例 + 验证要点。
- 场景提问:为什么“工具端发出一条 pick_place 指令”并不代表“设备一定执行了”?你至少要拿到哪些证据才能确认闭环成立?
工具端(脚本/Web/智能体)
└─ 生成结构化指令(含 cmd_id)并下发
↓(Topic: /sorting_arm/cmd,String JSON)
ROS2 端设备控制节点(本课时实现)
├─ 解析与校验(缺字段/错字段立即回执错误)
├─ 去重(cmd_id 重复只回执不重复执行)
├─ 调用驱动(MockDriver/真实驱动)
└─ 发布状态回执(Topic: /sorting_arm/status,String JSON)
↑(工具端订阅状态,形成闭环)
为什么必须拆成“解析/驱动/回执”三层:
- 解析层:把输入变成“可被机器验证”的结构,负责拒绝非法指令(安全底线)。
- 驱动层:只关注“怎么让设备动”,不掺杂协议细节(利于替换真实设备)。
- 回执层:把执行结果结构化输出,利于工具端等待/重试/审计(工程闭环)。
说明:
- 本示例以“机械臂 mock 驱动”为例,真实设备接入时只需要替换
MockSortingArmDriver的实现与动作映射。 - 为避免回调阻塞,本示例用“队列 + 后台 worker 线程”执行驱动动作;ROS2 回调只负责入队与快速回执。
1)端侧代码(rclpy,订阅指令/发布回执/去重/状态机)
import json
import queue
import threading
import time
from dataclasses import dataclass
from typing import Any
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
def now_ms() -> int:
return int(time.time() * 1000)
@dataclass(frozen=True)
class DeviceCommand:
cmd_id: str
scene: str
device_type: str
device_id: str
action: str
params: dict[str, Any]
safety: dict[str, Any]
meta: dict[str, Any]
ts_ms: int
trace_id: str | None
msg_id: str | None
event: str | None
topic: str | None
class MockSortingArmDriver:
def __init__(self) -> None:
self._estop = False
def e_stop(self) -> None:
self._estop = True
def reset_estop(self) -> None:
self._estop = False
def home(self) -> None:
self._ensure_not_estop()
time.sleep(0.3)
def pick_place(self, from_bin: str, to_bin: str, speed: float) -> None:
self._ensure_not_estop()
if speed <= 0 or speed > 1.0:
raise ValueError("speed must be in (0, 1.0]")
time.sleep(0.6)
def stop(self) -> None:
time.sleep(0.05)
def _ensure_not_estop(self) -> None:
if self._estop:
raise RuntimeError("device is in estop")
class SortingArmControlNode(Node):
def __init__(self) -> None:
super().__init__("sorting_arm_control_node")
self._driver = MockSortingArmDriver()
self._enabled = True
self._guard_closed = True
self._state = "idle"
self._handled_cmd_ids: set[str] = set()
self._cmd_queue: queue.Queue[DeviceCommand] = queue.Queue(maxsize=50)
self._cmd_sub = self.create_subscription(String, "/sorting_arm/cmd", self._on_cmd, 10)
self._status_pub = self.create_publisher(String, "/sorting_arm/status", 10)
self._worker = threading.Thread(target=self._worker_loop, daemon=True)
self._worker.start()
self.get_logger().info("sorting_arm_control_node started")
def _publish_status(
self,
last_cmd_id: str,
ok: bool,
code: str,
message: str,
state: str | None = None,
detail: dict[str, Any] | None = None,
trace_id: str | None = None,
msg_id: str | None = None,
event: str | None = None,
) -> None:
payload = {
"device_type": "arm",
"device_id": "arm_01",
"state": state or self._state,
"last_cmd_id": last_cmd_id,
"ok": ok,
"code": code,
"message": message,
"detail": detail or {},
"ts_ms": now_ms(),
}
if trace_id is not None:
payload["trace_id"] = trace_id
if msg_id is not None:
payload["msg_id"] = msg_id
if event is not None:
payload["event"] = event
msg = String()
msg.data = json.dumps(payload, ensure_ascii=False)
self._status_pub.publish(msg)
def _on_cmd(self, msg: String) -> None:
parse_ok, result = self._parse_and_validate(msg.data)
if not parse_ok:
last_cmd_id = result.get("cmd_id") or "UNKNOWN"
self._publish_status(
last_cmd_id,
False,
result["code"],
result["message"],
state="error",
trace_id=result.get("trace_id"),
msg_id=result.get("msg_id"),
event=result.get("event"),
)
return
cmd: DeviceCommand = result["cmd"]
if cmd.cmd_id in self._handled_cmd_ids:
self._publish_status(
cmd.cmd_id,
True,
"DUPLICATE",
"cmd_id already handled; ignored",
trace_id=cmd.trace_id,
msg_id=cmd.msg_id,
event=cmd.event,
)
return
if cmd.action in {"e_stop", "estop"}:
self._handled_cmd_ids.add(cmd.cmd_id)
self._state = "estop"
self._driver.e_stop()
self._drain_queue()
self._publish_status(
cmd.cmd_id,
True,
"ESTOP",
"emergency stop triggered",
state="estop",
trace_id=cmd.trace_id,
msg_id=cmd.msg_id,
event=cmd.event,
)
return
if not self._check_safety(cmd):
self._handled_cmd_ids.add(cmd.cmd_id)
self._state = "error"
self._publish_status(
cmd.cmd_id,
False,
"SAFETY_BLOCK",
"safety precondition not satisfied",
state="error",
trace_id=cmd.trace_id,
msg_id=cmd.msg_id,
event=cmd.event,
)
return
try:
self._cmd_queue.put_nowait(cmd)
except queue.Full:
self._publish_status(
cmd.cmd_id,
False,
"QUEUE_FULL",
"device busy; command queue is full",
state="error",
trace_id=cmd.trace_id,
msg_id=cmd.msg_id,
event=cmd.event,
)
return
self._handled_cmd_ids.add(cmd.cmd_id)
self._state = "running"
self._publish_status(
cmd.cmd_id,
True,
"ACCEPTED",
"command accepted",
state="running",
trace_id=cmd.trace_id,
msg_id=cmd.msg_id,
event=cmd.event,
)
def _drain_queue(self) -> None:
while True:
try:
_ = self._cmd_queue.get_nowait()
except queue.Empty:
break
def _check_safety(self, cmd: DeviceCommand) -> bool:
safety = cmd.safety or {}
require_enable = bool(safety.get("require_enable", False))
require_guard_closed = bool(safety.get("require_guard_closed", False))
if require_enable and not self._enabled:
return False
if require_guard_closed and not self._guard_closed:
return False
return True
def _worker_loop(self) -> None:
while rclpy.ok():
try:
cmd = self._cmd_queue.get(timeout=0.2)
except queue.Empty:
if self._state == "running":
self._state = "idle"
continue
try:
self._execute(cmd)
except Exception as e:
self._state = "error"
self._publish_status(
cmd.cmd_id,
False,
"EXEC_ERROR",
str(e),
state="error",
trace_id=cmd.trace_id,
msg_id=cmd.msg_id,
event=cmd.event,
)
else:
self._state = "idle"
self._publish_status(
cmd.cmd_id,
True,
"OK",
f"{cmd.action} finished",
state="idle",
trace_id=cmd.trace_id,
msg_id=cmd.msg_id,
event=cmd.event,
)
def _execute(self, cmd: DeviceCommand) -> None:
if cmd.action == "home":
self._driver.home()
return
if cmd.action == "pick_place":
from_bin = str(cmd.params.get("from", ""))
to_bin = str(cmd.params.get("to", ""))
speed = float(cmd.params.get("speed", 0.5))
if not from_bin or not to_bin:
raise ValueError("pick_place requires params.from and params.to")
self._driver.pick_place(from_bin, to_bin, speed)
return
if cmd.action == "stop":
self._driver.stop()
return
raise ValueError(f"unknown action: {cmd.action}")
def _parse_and_validate(self, raw: str) -> tuple[bool, dict[str, Any]]:
try:
obj = json.loads(raw)
except Exception:
return False, {"cmd_id": "UNKNOWN", "code": "BAD_JSON", "message": "cmd must be a JSON string"}
trace_id = None
msg_id = None
event = None
topic = None
data: dict[str, Any] | None
if isinstance(obj, dict) and "schema_version" in obj and "payload" in obj:
trace_id = str(obj.get("trace_id")) if obj.get("trace_id") is not None else None
msg_id = str(obj.get("msg_id")) if obj.get("msg_id") is not None else None
event = str(obj.get("event")) if obj.get("event") is not None else None
topic = str(obj.get("topic")) if obj.get("topic") is not None else None
payload = obj.get("payload")
if not isinstance(payload, dict):
return False, {
"cmd_id": msg_id or "UNKNOWN",
"trace_id": trace_id,
"msg_id": msg_id,
"event": event,
"code": "BAD_ENVELOPE",
"message": "envelope.payload must be an object",
}
data = payload
elif isinstance(obj, dict):
data = obj
else:
return False, {"cmd_id": "UNKNOWN", "code": "BAD_BODY", "message": "cmd must be a JSON object or Envelope"}
def infer_device_type(in_topic: str | None, in_event: str | None) -> str | None:
if in_event and in_event.startswith("arm."):
return "arm"
if in_topic and "/sorting_arm/" in in_topic:
return "arm"
return None
cmd_id = data.get("cmd_id") or msg_id
cmd_id_str = str(cmd_id).strip() if cmd_id is not None else ""
if not cmd_id_str:
return False, {
"cmd_id": "UNKNOWN",
"trace_id": trace_id,
"msg_id": msg_id,
"event": event,
"code": "MISSING_FIELD",
"message": "missing field: cmd_id (or envelope.msg_id)",
}
scene = data.get("scene") or "sorting"
device_type = data.get("device_type") or infer_device_type(topic, event)
device_type_str = str(device_type).strip() if device_type is not None else ""
device_id = data.get("device_id")
device_id_str = str(device_id).strip() if device_id is not None else ""
action = data.get("action")
action_str = str(action).strip() if action is not None else ""
params = data.get("params")
ts_ms = data.get("ts_ms") if data.get("ts_ms") is not None else obj.get("ts_ms") if isinstance(obj, dict) else None
missing: list[str] = []
if not device_id_str:
missing.append("device_id")
if not action_str:
missing.append("action")
if params is None:
missing.append("params")
if ts_ms is None:
missing.append("ts_ms")
if missing:
return False, {
"cmd_id": cmd_id_str,
"trace_id": trace_id,
"msg_id": msg_id,
"event": event,
"code": "MISSING_FIELD",
"message": f"missing fields: {missing}",
}
if not device_type_str:
return False, {
"cmd_id": cmd_id_str,
"trace_id": trace_id,
"msg_id": msg_id,
"event": event,
"code": "MISSING_FIELD",
"message": "missing field: device_type (or cannot infer from envelope.event/topic)",
}
if not isinstance(params, dict):
return False, {
"cmd_id": cmd_id_str,
"trace_id": trace_id,
"msg_id": msg_id,
"event": event,
"code": "BAD_PARAMS",
"message": "params must be an object",
}
try:
ts_ms_int = int(ts_ms)
except Exception:
return False, {
"cmd_id": cmd_id_str,
"trace_id": trace_id,
"msg_id": msg_id,
"event": event,
"code": "BAD_FIELD_TYPE",
"message": "ts_ms must be integer milliseconds",
}
cmd = DeviceCommand(
cmd_id=cmd_id_str,
scene=str(scene),
device_type=device_type_str,
device_id=device_id_str,
action=action_str,
params=dict(params),
safety=dict(data.get("safety") or {}),
meta=dict(data.get("meta") or {}),
ts_ms=ts_ms_int,
trace_id=trace_id,
msg_id=msg_id,
event=event,
topic=topic,
)
return True, {"cmd": cmd}
def main() -> None:
rclpy.init()
node = SortingArmControlNode()
try:
rclpy.spin(node)
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == "__main__":
main()
DeviceCommand:端侧内部的结构化指令对象,避免在执行阶段还在到处写data["xxx"]。MockSortingArmDriver:驱动层替身,用home/pick_place/stop/e_stop模拟真实设备接口;真实设备接入时替换它,不改解析与回执逻辑。_handled_cmd_ids:去重集合,保证同一cmd_id重复到达时不重复执行(联调常见:重试/断线重连导致重复下发)。_cmd_queue + _worker_loop:回调只入队并快速回执;后台线程串行执行驱动调用,避免 ROS2 回调被sleep/IO阻塞导致延迟堆积。_check_safety:把安全条件显式检查,不满足就拒绝;工业场景里“宁可不动,也不要乱动”。_publish_status:状态回执统一口径,任何成功/失败都用last_cmd_id/ok/code/message说明白;当上游携带trace_id/msg_id/event时,本讲会把追踪字段带回回执,便于跨端联调与定位。_parse_and_validate:兼容两种输入形态:①直接 ArmCommand;②Envelope(23 口径)里payload承载的 ArmCommand(或简化 payload)。端侧原则是“能兼容就兼容,但拒绝必须可解释”。
2)命令行自测(最短闭环证据)
cd 10_ai_ros2_device_control_lab/ros2_ws
colcon build --symlink-install
source install/setup.bash
ros2 run sorting_arm_control sorting_arm_control_node
解释:
colcon build --symlink-install:构建 ROS2 工作空间;Python 文件改动可快速生效。source install/setup.bash:加载当前工作空间环境,让ros2 run能找到sorting_arm_control包。ros2 run sorting_arm_control sorting_arm_control_node:启动端侧设备控制节点(订阅/sorting_arm/cmd,发布/sorting_arm/status)。- 若你在 Windows 原生环境运行 ROS2(少见):把
source install/setup.bash换成call install\\setup.bat。
终端 B:订阅状态回执:
ros2 topic echo /sorting_arm/status
解释:
- 一旦你在终端 C 发指令,终端 B 应该能看到带
last_cmd_id的 JSON 回执。
终端 C:下发一条 home 指令(注意:这里发布的是 std_msgs/msg/String,其中 data 是 JSON 字符串):
ros2 topic pub --once /sorting_arm/cmd std_msgs/msg/String "{data: '{\"cmd_id\":\"C-20260415-0001\",\"scene\":\"sorting\",\"device_type\":\"arm\",\"device_id\":\"arm_01\",\"action\":\"home\",\"params\":{},\"safety\":{\"require_enable\":true,\"require_guard_closed\":true},\"meta\":{\"user\":\"stu01\",\"role\":\"operator\"},\"ts_ms\":1710000000000}'}"
解释:
--once:只发一次,便于对照cmd_id看回执。data: '...:外层是 ROS 消息字段,内层是 JSON 字符串;最容易出错的是引号与转义,建议照抄再改cmd_id/action。- 预期现象:终端 B 至少出现两条回执(
ACCEPTED→OK或直接OK,取决于你的状态发布节奏)。
常见错与修复:
- 现象:终端 B 没任何输出
修复:先执行ros2 topic list确认话题存在;再执行ros2 topic info /sorting_arm/status看是否有发布者;最后检查节点是否启动在同一 ROS_DOMAIN_ID 与同一网络环境。 - 现象:回执
BAD_JSON
修复:说明data里的 JSON 字符串没有正确转义或括号不配对;先把 JSON 单独放到在线/本地校验器里校验,再放回命令行。
练习 1(必做):制造一条“缺字段”的指令,让端侧回执 MISSING_FIELD,并能指出缺了哪个字段。
- 提示:删掉
ts_ms或把params改成字符串。
练习 2(选做):下发两次相同 cmd_id 的 home,验证端侧只执行一次并回执 DUPLICATE。
目标:
- 把
MockSortingArmDriver的pick_place扩展为“分拣场景动作”(例如支持from/to/speed、并在回执detail中携带progress)。
- 在
_execute中把pick_place的参数校验写完整(缺参必须拒绝,给出可解释错误)。 - 在执行成功时,把
detail加上关键字段(例如{"from": "...", "to": "...", "speed": 0.5})。
- 状态回执能看到
last_cmd_id与detail的关键字段。 - 缺参时回执
EXEC_ERROR(或你自定义错误码)且 message 解释清楚。
- 给定一批“语音/文本指令标注数据”,写脚本解析并生成识别规则与评估报告。
- 在工具端加入“指令匹配 + 容错处理”:让识别率提升、拒绝更可解释、对急停更敏感。
- 与 ROS2 端联调:定位并解决至少 1 个问题(通信延迟/丢指令/重复执行),保留证据链。
1)为什么要用标注数据?
- 标注数据让你从“凭感觉写规则”变成“用数据驱动改规则”,工程上更稳定。
2)推荐数据格式(JSONL 与 CSV 二选一)
JSONL(每行一个 JSON 对象,适合追加与版本管理):
{"raw_text":"把机械臂回零","canonical_action":"arm.home","slots":{}}
{"raw_text":"从 3 号进料箱抓取,放到玻璃出料箱","canonical_action":"arm.pick_place","slots":{"from":"bin_in_3","to":"bin_out_glass","speed":0.5}}
{"raw_text":"停下","canonical_action":"arm.stop","slots":{}}
{"raw_text":"急停!","canonical_action":"arm.e_stop","slots":{}}
CSV(适合用表格工具编辑,但要注意引号与逗号转义):
raw_text,canonical_action,slots_json
把机械臂回零,arm.home,"{}"
从 3 号进料箱抓取,放到玻璃出料箱,arm.pick_place,"{""from"":""bin_in_3"",""to"":""bin_out_glass"",""speed"":0.5}"
停下,arm.stop,"{}"
急停!,arm.e_stop,"{}"
-
raw_text:语音转写或文本输入的原句(允许有口头语/错字/标点)。 -
canonical_action:标准动作标签(建议用device.action,例如arm.home)。 -
slots:参数槽位(如from/to/speed),可以为空,但一旦有参数就必须可解析与可复验。 -
标注样例:
10_ai_ros2_device_control_lab/data/command_annotations_example.jsonl -
解析脚本:
10_ai_ros2_device_control_lab/scripts/parse_command_annotations.py
最小可复现命令(输出动作分布/同义表达建议/评估报告):
cd 10_ai_ros2_device_control_lab
py scripts/parse_command_annotations.py --input data/command_annotations_example.jsonl --format jsonl
解释:
--input ...jsonl:指定标注数据文件(JSONL/CSV 都支持)。- 输出里的
report.acc:top-1 准确率;用它做“优化前/优化后”对比。
目标:
- 读入标注数据,输出动作分布统计;
- 构建一个“只用标准库”的三段式识别器:exact → regex → fuzzy;
- 计算 top-1 准确率,作为优化前后对比依据。
import csv
import difflib
import json
import re
from collections import Counter, defaultdict
from dataclasses import dataclass
from pathlib import Path
from typing import Any
FILLER_WORDS = ["帮我", "麻烦", "请", "一下", "立刻", "马上", "给我"]
def normalize_text(text: str) -> str:
t = text.strip().lower()
for w in FILLER_WORDS:
t = t.replace(w, "")
t = re.sub(r"\s+", "", t)
t = re.sub(r"[,。!?,.!?;;::]", "", t)
return t
def load_jsonl(path: Path) -> list[dict[str, Any]]:
items: list[dict[str, Any]] = []
for line in path.read_text(encoding="utf-8").splitlines():
line = line.strip()
if not line:
continue
items.append(json.loads(line))
return items
def load_csv(path: Path) -> list[dict[str, Any]]:
items: list[dict[str, Any]] = []
with path.open("r", encoding="utf-8", newline="") as f:
reader = csv.DictReader(f)
for row in reader:
slots = json.loads(row.get("slots_json") or "{}")
items.append(
{
"raw_text": row["raw_text"],
"canonical_action": row["canonical_action"],
"slots": slots,
}
)
return items
@dataclass(frozen=True)
class Rule:
canonical_action: str
mode: str
pattern: str
def build_synonym_suggestions(samples: list[dict[str, Any]], limit_per_action: int = 5) -> dict[str, list[str]]:
action_phrases: dict[str, Counter[str]] = defaultdict(Counter)
for s in samples:
raw = str(s["raw_text"]).strip()
canonical = str(s["canonical_action"])
if raw:
action_phrases[canonical][raw] += 1
return {k: [p for p, _ in c.most_common(limit_per_action)] for k, c in action_phrases.items()}
def build_rules(samples: list[dict[str, Any]]) -> tuple[dict[str, str], list[Rule]]:
exact_map: dict[str, str] = {}
regex_rules: list[Rule] = []
for s in samples:
raw = str(s["raw_text"])
canonical = str(s["canonical_action"])
norm = normalize_text(raw)
if norm and norm not in exact_map:
exact_map[norm] = canonical
regex_rules.extend(
[
Rule("arm.e_stop", "regex", r"(急停|紧急停止|estop|e-stop|stopnow)"),
Rule("arm.stop", "regex", r"(停下|停止|别动|暂停)"),
Rule("arm.home", "regex", r"(回零|回原点|归零|home)"),
]
)
return exact_map, regex_rules
def recognize_action(text: str, exact_map: dict[str, str], regex_rules: list[Rule]) -> tuple[str | None, str]:
norm = normalize_text(text)
if norm in exact_map:
return exact_map[norm], "exact"
for rule in regex_rules:
if re.search(rule.pattern, norm):
return rule.canonical_action, "regex"
best_action = None
best_score = 0.0
for k, action in exact_map.items():
score = difflib.SequenceMatcher(a=norm, b=k).ratio()
if score > best_score:
best_action = action
best_score = score
if best_action is not None and best_score >= 0.82:
return best_action, f"fuzzy:{best_score:.2f}"
return None, "reject"
def evaluate(samples: list[dict[str, Any]], exact_map: dict[str, str], regex_rules: list[Rule]) -> dict[str, Any]:
total = len(samples)
hit = 0
mode_counter: Counter[str] = Counter()
errors: list[dict[str, Any]] = []
for s in samples:
raw = str(s["raw_text"])
gt = str(s["canonical_action"])
pred, mode = recognize_action(raw, exact_map, regex_rules)
mode_counter[mode] += 1
if pred == gt:
hit += 1
else:
errors.append({"raw_text": raw, "gt": gt, "pred": pred, "mode": mode})
return {
"total": total,
"hit": hit,
"acc": (hit / total) if total else 0.0,
"mode_counter": dict(mode_counter),
"errors_top10": errors[:10],
}
def main() -> None:
data_path = Path("command_annotations.jsonl")
samples = load_jsonl(data_path) if data_path.suffix == ".jsonl" else load_csv(data_path)
action_dist = Counter(str(s["canonical_action"]) for s in samples)
print("action_dist:", dict(action_dist))
synonym_suggestions = build_synonym_suggestions(samples)
print("synonym_suggestions:", json.dumps(synonym_suggestions, ensure_ascii=False, indent=2))
exact_map, regex_rules = build_rules(samples)
report = evaluate(samples, exact_map, regex_rules)
print("report:", json.dumps(report, ensure_ascii=False, indent=2))
if __name__ == "__main__":
main()
逐段解释(你要能用它做“前后对比”):
normalize_text:把口头语、空格、标点等噪声先去掉;容错的第一步永远是“规范化”。build_rules:由标注数据生成exact_map(精确匹配表),再补充少量regex_rules(强规则:急停/停止/回零)。recognize_action:三段式识别:精确优先 → 正则优先(安全动作优先)→ 模糊匹配兜底;模糊阈值0.82需要用数据调参(不是拍脑袋)。evaluate:输出acc(top-1 准确率)与前 10 条错误样本;错误样本是你下一轮优化的起点。
注意(安全优先口径):
- 模糊匹配永远是“兜底”,阈值不宜过低;工业控制宁可拒绝也不要误判。
- 急停/停止类规则优先级最高;这类规则要“宁可多触发”,但要配合端侧去重与状态机,避免抖动。
1)联调最小闭环:只测“结构化指令 → 回执关联”
你要验证的不是“设备动作多复杂”,而是这四件事:
- 指令能到达(工具端发出后,ROS2 端有日志/回执)
- 指令能被解析(非法指令回执
BAD_JSON/MISSING_FIELD) - 回执能关联(回执
last_cmd_id与下发cmd_id一致) - 重复不会重复执行(同
cmd_id不会导致重复驱动调用)
2)通信延迟排查:从“证据链”而不是“感觉”出发
建议的分层定位顺序(先快后慢):
- 网络层:端口是否通、是否有防火墙拦截(先保证“连得上”)。
- rosbridge 层:订阅/发布是否成功,topic 名是否一致(再保证“桥是通的”)。
- ROS2 层:节点是否阻塞回调、QoS 是否匹配(最后看“端侧是否堵住了”)。
ROS2 侧速查命令(常用 3 条):
ros2 topic list
ros2 topic info /sorting_arm/cmd
ros2 topic hz /sorting_arm/status
解释:
topic list:先排除“话题名拼错/节点没启动”。topic info:确认消息类型、发布者/订阅者是否对得上。topic hz:回执是否有节奏、是否突然掉到 0(丢指令/回调阻塞常见征兆)。
3)指令丢失与重复执行:工程上更常见的“隐蔽坑”
- 原因:工具端超时后重发,但端侧没有去重
对策:端侧必须按cmd_id去重;工具端重试必须带同一cmd_id(不要每次重试都换 id)。 - 原因:端侧回调阻塞(把驱动调用写在回调里)导致队列堆积,看起来像丢包/延迟
对策:回调只做解析与入队;执行放到 worker 线程或 Action/服务端。 - 原因:QoS 不匹配(可靠性/历史深度)导致部分订阅者收不到
对策:先把链路跑通,再讨论 QoS;必要时用ros2 topic info -v对照 QoS。
目标:
- 用你的标注数据与识别器,生成结构化控制指令(含
cmd_id/action/params),并能被端侧节点执行并回执。
- 用识别器把
raw_text识别成canonical_action,并把slots合并到params。 - 生成
cmd_id(建议:日期 + 序号,或用 UUID,但要能追踪)。 - 发送到
/sorting_arm/cmd,等待/sorting_arm/status中last_cmd_id匹配。
- 识别器评估报告截图(
acc与错误样本)。 - 至少 3 条不同指令的“下发 → 回执匹配”截图(包含
cmd_id/last_cmd_id)。 - 至少 1 个联调问题的“定位→修复→复验”记录(哪怕是引号转义这种小问题也可以,但必须写清证据)。
配套项目的最小串联演示(10_ai_ros2_device_control_lab/ 口径):
cd 10_ai_ros2_device_control_lab
py -m pip install -r requirements.txt
py scripts/recognize_and_send.py --ws-url ws://localhost:9090 --text "把机械臂回零"
解释:
- 第一条命令安装工具端依赖(仅用于 rosbridge WebSocket 连接)。
- 第二条命令做三件事:从标注数据生成规则 → 识别文本 → 下发结构化 ArmCommand。
课后作业(布置)
作业 1(必做):
- 提交设备控制工具完整代码(含指令解析、驱动调用、状态反馈、去重/超时/容错中的至少 2 项),附开发说明(说明你做了什么、怎么验证的)。
作业 2(必做):
- 提交联调测试日志(含指令执行成功截图,至少 3 条指令),附问题排查过程(按“现象→证据→定位→修复→复验”写)。
作业 3(必做):
- 撰写 150 字左右分析:说明标注数据对指令识别能力的优化效果及应用心得(必须包含“你用什么指标衡量、提升了多少/遇到什么困难”)。
课程思政融入点(统一口径)
- AI 工具链与 ROS2 自动化控制技术正在进入农业与工业一线:农产品分拣、仓储搬运、质量检测、智能包装等环节的“提质增效”依赖稳定的设备控制闭环与可追踪的数据链路。
Markdown 与代码自检清单(提交前自己勾一遍)
- 标题层级连续(
##→###→####),无跳级。 - 代码块均成对闭合,语言标签正确(
python/bash/json/csv)。 - 所有指令 JSON 都能通过基本校验(括号/引号配对),字段名与本讲口径一致(
cmd_id/last_cmd_id等)。 - 端侧逻辑满足三条底线:非法指令必回执、重复指令不重复执行、急停优先级最高。
- 联调证据链完整:至少包含一次
ros2 topic info/echo/hz输出或截图,并能说明你如何验证修复有效。