22 Web 端指令下发控制 ROS2
Web 端指令下发控制 ROS2
关联:索引
要解决的问题
- Web 端已经能“订阅 ROS2 话题”了,如何反过来“发布指令”到 ROS2,并保证格式可控、可追踪、可回传结果?
- rosbridge 的
publishJSON 应该长什么样?msg里放什么?什么时候用std_msgs/msg/String携带 JSON? - 指令下发怎么做“权限控制”:按钮是否可点、指令是否允许、是否需要二次校验?
- Web 端发出一条指令后,如何判断“已送达 / 已执行 / 执行失败”?结果回传如何与请求做关联?
章节内容(本讲核心):
- Web 端发布指令到 ROS2 话题(rosbridge JSON:publish)
- 统一指令消息格式封装(ArmCommand/ArmStatus:cmd_id/last_cmd_id、安全前置条件与追踪字段)
- 指令下发与权限控制(前端 UI 权限 + ROS2 侧二次校验的最小模型)
- 指令响应与结果回传(status 话题、cmd_id 关联、超时与状态机)
与前置知识衔接(避免重复):
- 已学:WebSocket 生命周期事件、统一 JSON、解析容错(见 WebSocket 系列)
- 已学:rosbridge 原理与环境部署与连通性验证(见 rosbridge 部署;虚拟机端口映射场景见《人工智能综合实践》15 :宿主机示例
ws://localhost:19090) - 已学:Web 端连接 rosbridge 与话题订阅、RosbridgeClient 封装思路(见“Web 端连接 ROS2 与话题订阅”)
- 本讲不重复:subscribe/unsubscribe 字段解释、基础连接代码从零搭建
- 本讲定位:在已能连接与订阅的基础上,补齐“发布指令 + 权限 + 回传”闭环,完成 Web ↔ ROS2 双向通信(订阅 + 发布)
项目工坊:实现网页按钮下发控制指令到 ROS2
- 工坊产物:一个“控制面板”组件(按钮 + 状态区),点击按钮即可发布指令,并在页面显示 status 回执
学生任务:
- 完成按钮与交互状态(可点击、禁用、loading、结果提示)
- 完成指令发送(publish)与 ROS2 接收验证(ros2 topic echo 或自写节点打印)
- 完成结果回传订阅与关联(收到 status 后能按
cmd_id/last_cmd_id匹配到对应请求)
大模型任务:
- AI 生成指令下发代码与统一格式(ArmCommand/ArmStatus + TS 封装)
作业(布置):
1)实现 Web 与 ROS2 双向通信(订阅 + 发布)
2)截图数据接收与指令下发效果
3)提供关键代码说明
本讲配套前端项目:09_rosbridge_web_control_panel/(Vue3 + Vite + TypeScript)。
推荐项目结构(与本讲代码一一对应):
09_rosbridge_web_control_panel/
src/
App.vue
components/
RosbridgeCommandMinimal.vue
SortingArmControlPanel.vue
utils/
sorting-arm-protocol.ts
sorting-arm-web-control-client.ts
创建与运行(在你的工作目录执行):
npm.cmd create vite@latest 09_rosbridge_web_control_panel -- --template vue-ts --no-interactive
cd 09_rosbridge_web_control_panel
npm.cmd install
npm.cmd run dev
- 如果你在 PowerShell 中遇到 “禁止运行脚本 npm.ps1”,本讲统一使用
npm.cmd执行命令。 - 页面启动后,默认 WebSocket URL 预置为
ws://localhost:19090(对应《人工智能综合实践》15 的虚拟机端口映射场景);直连可改为ws://localhost:9090。
本讲统一用 std_msgs/msg/String 携带 JSON 字符串,原因:
- Web 侧发布最简单:
msg: { data: "<JSON字符串>" } - ROS2 侧解析最简单:只需
json.loads(msg.data) - 可扩展:未来换成自定义 msg(如
my_msgs/msg/ArmCommand/my_msgs/msg/ArmStatus)时,ArmCommand/ArmStatus 的字段仍然可复用
话题约定
- 指令下发话题:
/sorting_arm/cmd(Web → ROS2) - 指令回传话题:
/sorting_arm/status(ROS2 → Web)
说明:
- 本讲与《人工智能综合实践》15 保持一致:Web 端不重新发明一套话题名与字段,而是直接复用
/sorting_arm/cmd、/sorting_arm/status与cmd_id/last_cmd_id。 - 本讲新增的内容在于:把这套控制协议“接入到 Vue 页面”,完成按钮下发、权限控制与回传匹配(请求-回执闭环)。
指令消息(String 内部 JSON)推荐结构:ArmCommand(与《人工智能综合实践》15 一致)
{
"cmd_id": "C-20260412-0001",
"scene": "sorting",
"device_type": "arm",
"device_id": "arm_01",
"action": "stop",
"params": { "reason": "user_click" },
"safety": { "require_enable": true, "require_guard_closed": true },
"meta": { "user": "stu01", "role": "operator" },
"ts_ms": 1710000000000
}
cmd_id:指令关联 id(本讲核心),用于回传匹配与审计追踪device_type/device_id:明确控制对象(本讲示例为机械臂)action/params:动作与参数(示例:stop+ reason)meta:用户与角色(用于权限与审计;设备侧可先忽略,后续逐步纳入校验)ts_ms:毫秒时间戳(用于排查延迟/重放/乱序)
回传消息(String 内部 JSON)推荐结构:ArmStatus(与《人工智能综合实践》15 一致)
{
"device_type": "arm",
"device_id": "arm_01",
"state": "idle",
"last_cmd_id": "C-20260412-0001",
"ok": true,
"code": "OK",
"message": "stop executed",
"detail": { "progress": 1.0 },
"ts_ms": 1710000001234
}
last_cmd_id:必须回传且与请求cmd_id一致(本讲核心)ok/code/message:结果三件套(用于 UI 提示与问题定位)state/detail:设备状态与扩展信息(用于面板展示或后续联动)
- WebSocket 已连接 rosbridge(状态为 OPEN)
- 点击按钮后,Web 端发出
op=publish的 JSON - ROS2 侧能看到
/sorting_arm/cmd收到的std_msgs/msg/String - ROS2 侧发布
/sorting_arm/status,Web 端能收到并在页面显示结果
1) rosbridge publish 最小结构(协议层)
{
"op": "publish",
"topic": "/sorting_arm/cmd",
"msg": {
"data": "{\"cmd_id\":\"C-20260412-0001\",\"scene\":\"sorting\",\"device_type\":\"arm\",\"device_id\":\"arm_01\",\"action\":\"stop\",\"params\":{\"reason\":\"user_click\"},\"safety\":{\"require_enable\":true,\"require_guard_closed\":true},\"meta\":{\"user\":\"stu01\",\"role\":\"operator\"},\"ts_ms\":1710000000000}"
}
}
op:操作类型,发布必须是publishtopic:要发布到的 ROS2 话题msg:ROS2 消息体对象;当类型是std_msgs/msg/String时,核心字段是datadata:这里放的是“业务层 JSON 字符串”(注意:它本身是字符串,所以里面的引号会被转义)
2) 为什么建议“String 携带 JSON”?
- Web 侧与 ROS2 侧都能统一用 JSON 表达业务字段(命令、参数、权限、追踪)
- 未来要升级:只要保持 ArmCommand/ArmStatus 的业务字段不变,底层承载可以从 String 升级为自定义 msg
说明:
- 你可以沿用上节课项目(如
07_rosbridge_topic_subscriber)的连接与订阅封装 - 本讲推荐直接使用配套项目
09_rosbridge_web_control_panel/(与《人工智能综合实践》15 接口对齐)
1) 控制面板组件:按钮下发 ArmAction,并监听 status(项目版)
本讲项目中:
- 页面入口:
src/App.vue渲染src/components/RosbridgeCommandMinimal.vue RosbridgeCommandMinimal.vue是“同名落位”,内部直接渲染SortingArmControlPanel.vueSortingArmControlPanel.vue才是本讲核心实现(连接、权限、下发、回传匹配)
src/App.vue(最小接线):
<template>
<RosbridgeCommandMinimal />
</template>
<script setup lang="ts">
import RosbridgeCommandMinimal from './components/RosbridgeCommandMinimal.vue'
</script>
src/components/RosbridgeCommandMinimal.vue(同名落位组件):
<template>
<SortingArmControlPanel />
</template>
<script setup lang="ts">
import SortingArmControlPanel from './SortingArmControlPanel.vue'
</script>
src/components/SortingArmControlPanel.vue(项目核心实现:连接 + 权限 + 下发 + 回传匹配):
<template>
<div class="wrap">
<h2>Web → ROS2 指令下发(/sorting_arm/cmd)</h2>
<div class="row">
<label class="label" for="url">WebSocket URL</label>
<input id="url" v-model="url" class="input" type="text" />
</div>
<div class="row">
<label class="label" for="deviceId">device_id</label>
<input id="deviceId" v-model="deviceId" class="input" type="text" />
</div>
<div class="row">
<div class="status">连接状态:{{ connStatus }}</div>
<button class="btn" type="button" :disabled="connStatus === 'OPEN' || connStatus === 'CONNECTING'" @click="connect">
连接
</button>
<button class="btn" type="button" :disabled="connStatus !== 'OPEN'" @click="disconnect">
断开
</button>
</div>
<div class="row">
<div class="status">当前角色:{{ role }}</div>
<select v-model="role" class="select">
<option value="viewer">viewer(只读)</option>
<option value="operator">operator(可控)</option>
<option value="admin">admin(高权限)</option>
</select>
</div>
<div class="row">
<button class="btn primary" type="button" :disabled="!canSend('home') || sending" @click="send('home')">
{{ sending && currentAction === 'home' ? '下发中...' : '回零(home)' }}
</button>
<button class="btn primary" type="button" :disabled="!canSend('stop') || sending" @click="send('stop')">
{{ sending && currentAction === 'stop' ? '下发中...' : '停止(stop)' }}
</button>
<button class="btn danger" type="button" :disabled="!canSend('e_stop') || sending" @click="send('e_stop')">
{{ sending && currentAction === 'e_stop' ? '下发中...' : '急停(e_stop)' }}
</button>
</div>
<div class="row">
<div class="status">最近一次 cmd_id:{{ lastCmdId || '(暂无)' }}</div>
</div>
<div class="row">
<div class="status">最近一次结果:{{ lastResultText || '(暂无)' }}</div>
</div>
<div v-if="lastRawStatus" class="row">
<details>
<summary>最近一条 /sorting_arm/status 原始 JSON</summary>
<pre class="pre">{{ lastRawStatus }}</pre>
</details>
</div>
<div v-if="lastError" class="row error">错误:{{ lastError }}</div>
</div>
</template>
<script setup lang="ts">
import { onUnmounted, ref } from 'vue'
import type { ArmAction, ArmStatus, Role } from '../utils/sorting-arm-protocol'
import { SortingArmWebControlClient } from '../utils/sorting-arm-web-control-client'
type ConnStatus = 'IDLE' | 'CONNECTING' | 'OPEN' | 'CLOSING' | 'CLOSED'
const url = ref('ws://localhost:19090')
const deviceId = ref('arm_01')
const connStatus = ref<ConnStatus>('IDLE')
const role = ref<Role>('operator')
const lastCmdId = ref('')
const lastResultText = ref('')
const lastRawStatus = ref('')
const lastError = ref('')
const sending = ref(false)
const currentAction = ref<ArmAction | null>(null)
const client = new SortingArmWebControlClient()
const actionPermission: Record<Role, ArmAction[]> = {
viewer: [],
operator: ['home', 'stop'],
admin: ['home', 'stop', 'e_stop', 'pick_place']
}
function canSend(action: ArmAction): boolean {
return connStatus.value === 'OPEN' && actionPermission[role.value].includes(action)
}
function setConnStatusFromReadyState(sock: WebSocket): void {
connStatus.value =
sock.readyState === WebSocket.CONNECTING
? 'CONNECTING'
: sock.readyState === WebSocket.OPEN
? 'OPEN'
: sock.readyState === WebSocket.CLOSING
? 'CLOSING'
: 'CLOSED'
}
function connect(): void {
lastError.value = ''
lastResultText.value = ''
lastRawStatus.value = ''
client.connect(url.value)
const ws = client.getSocket()
if (!ws) return
setConnStatusFromReadyState(ws)
ws.addEventListener('open', () => setConnStatusFromReadyState(ws))
ws.addEventListener('close', () => setConnStatusFromReadyState(ws))
ws.addEventListener('error', () => {
lastError.value = 'WebSocket error'
})
}
function disconnect(): void {
try {
connStatus.value = 'CLOSING'
client.disconnect()
} finally {
connStatus.value = 'CLOSED'
}
}
function formatStatusText(status: ArmStatus): string {
const okText = status.ok ? '成功' : '失败'
const codeText = status.code ? `(${status.code})` : ''
const msgText = status.message || ''
const stateText = status.state ? ` state=${status.state}` : ''
return `${okText}${codeText} ${msgText}${stateText}`.trim()
}
async function send(action: ArmAction): Promise<void> {
if (!canSend(action)) return
lastError.value = ''
lastResultText.value = ''
sending.value = true
currentAction.value = action
const params =
action === 'home'
? {}
: action === 'stop'
? { reason: 'user_click' }
: action === 'e_stop'
? { reason: 'user_click' }
: {}
try {
const { cmdId, status } = await client.sendCommand({
user: 'stu01',
role: role.value,
deviceId: deviceId.value,
action,
params,
timeoutMs: 3000
})
lastCmdId.value = cmdId
lastResultText.value = formatStatusText(status)
lastRawStatus.value = JSON.stringify(status, null, 2)
} catch (e) {
lastError.value = e instanceof Error ? e.message : 'send failed'
} finally {
sending.value = false
currentAction.value = null
}
}
onUnmounted(() => {
disconnect()
})
</script>
代码自检要点(对应要求:代码/命令必须解释):
SortingArmWebControlClient.connect():建立连接,并在open后订阅/sorting_arm/status(避免错过快速回传)sendCommand():内部生成cmd_id并 publish 到/sorting_arm/cmd,再用last_cmd_id匹配回传(含超时 reject)actionPermission:前端权限控制(viewer 禁止下发控制类动作;operator/admin 分级)
2) ROS2 侧自测:如何验证 Web 端确实发出了指令?
在 ROS2 侧终端执行:
ros2 topic echo /sorting_arm/cmd
- 如果能看到
data: '{"cmd_id":"...","device_type":"arm","action":"stop",...}',说明 Web → ROS2 发布已成功
优先推荐:直接复用《人工智能综合实践》15 提供的 ROS2 Python 功能包 sorting_arm_mock(可直接 ros2 run)。如果你不方便构建 ROS2 包,再用下面的“单文件 demo”临时跑通回传。
按《人工智能综合实践》15 的统一口径,在 ROS2 环境终端执行:
cd 09_rosbridge_control_tool/ros2_ws
colcon build --symlink-install
source install/setup.bash
ros2 run sorting_arm_mock arm_mock
- 作用:启动模拟设备节点,收到
/sorting_arm/cmd后会发布/sorting_arm/status,并带last_cmd_id便于 Web 端匹配
import json
import time
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
class SortingArmStatusDemo(Node):
def __init__(self) -> None:
super().__init__("sorting_arm_status_demo")
self.sub = self.create_subscription(String, "/sorting_arm/cmd", self.on_cmd, 10)
self.pub = self.create_publisher(String, "/sorting_arm/status", 10)
def on_cmd(self, msg: String) -> None:
try:
cmd = json.loads(msg.data)
except Exception:
return
cmd_id = cmd.get("cmd_id")
if not isinstance(cmd_id, str):
return
status = {
"device_type": "arm",
"device_id": cmd.get("device_id", "arm_01"),
"state": "idle",
"last_cmd_id": cmd_id,
"ok": True,
"code": "OK",
"message": "mock status",
"detail": {"received_action": cmd.get("action")},
"ts_ms": int(time.time() * 1000),
}
out = String()
out.data = json.dumps(status, ensure_ascii=False)
self.pub.publish(out)
def main() -> None:
rclpy.init()
node = SortingArmStatusDemo()
try:
rclpy.spin(node)
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == "__main__":
main()
create_subscription(String, "/sorting_arm/cmd", ...):订阅 Web 下发的指令话题status["last_cmd_id"] = cmd_id:回传时必须带上请求的cmd_id,Web 才能匹配
运行方式(示例):
python3 sorting_arm_status_demo.py
- 如果你的 ROS2 Python 环境要求用
ros2 run或工作空间运行,按你的课程环境调整启动方式
一、工程化目标:把“指令发送”从组件里抽出去
目标(组件只做 UI):
SortingArmWebControlClient负责:连接复用、publish、订阅 status、超时与 cmd_id 匹配- 组件负责:按钮、输入、状态展示、调用
sendCommand()并展示 Promise 结果
二、统一类型:TypeScript 定义(可复制粘贴)
建议放到 src/utils/sorting-arm-protocol.ts(相对路径)。
export type Role = 'viewer' | 'operator' | 'admin'
export type ArmAction = 'home' | 'stop' | 'e_stop' | 'pick_place'
export type ArmCommand<TParams = Record<string, unknown>> = {
cmd_id: string
scene: 'sorting'
device_type: 'arm'
device_id: string
action: ArmAction
params: TParams
safety: { require_enable: boolean; require_guard_closed: boolean }
meta?: { user: string; role: Role }
ts_ms: number
}
export type ArmStatus = {
device_type: 'arm'
device_id: string
state: 'idle' | 'running' | 'error' | 'estop'
last_cmd_id: string
ok: boolean
code: 'OK' | 'DENY' | 'BAD_REQUEST' | 'EXEC_ERROR'
message: string
detail?: Record<string, unknown>
ts_ms: number
}
ArmAction:用联合类型把“允许的动作”固定下来,减少拼写错误ArmCommand/ArmStatus:字段与《人工智能综合实践》15 对齐,后续做多设备扩展时可把arm抽象成device_type联合类型
三、权限控制:前端 UI 限制 + ROS2 侧二次校验(最小模型)
1) 前端权限(按钮是否可点)
import type { ArmAction, Role } from './sorting-arm-protocol'
const actionPermission: Record<Role, ArmAction[]> = {
viewer: [],
operator: ['home', 'stop'],
admin: ['home', 'stop', 'e_stop', 'pick_place']
}
export function canSendAction(role: Role, action: ArmAction): boolean {
return actionPermission[role].includes(action)
}
- 这段代码只解决“UI 层防误操作”(让不该点的人点不了)
- 注意:前端权限不能当作安全边界;真正安全边界必须在 ROS2 执行侧(或网关/服务端)再次校验
2) ROS2 执行侧权限(必须二次校验)
- 只允许动作白名单:
{'home','stop','e_stop','pick_place'} - 按 role 允许子集:viewer 禁止所有控制类动作;operator 允许
home/stop;admin 允许全部 - 校验失败:发布 status 回执
ok=false、code='DENY'并写明message,同时保证last_cmd_id回传用于 Web 匹配
四、请求-回传封装:sendCommand(核心)
下面给出一个“只依赖浏览器 WebSocket”的封装示例(与配套项目 09_rosbridge_web_control_panel/src/utils/sorting-arm-web-control-client.ts 对齐)。
import type { ArmCommand, ArmStatus, Role } from './sorting-arm-protocol'
type Pending = {
resolve: (status: ArmStatus) => void
reject: (err: Error) => void
timer: number
}
type RosbridgePublish = { op: 'publish'; topic: string; msg: { data: string } }
type RosbridgeSubscribe = { op: 'subscribe'; topic: string; type: 'std_msgs/msg/String'; id: string }
type RosbridgeIncoming = { op: 'publish'; topic: string; msg: { data?: unknown } }
export class SortingArmWebControlClient {
private ws: WebSocket | null = null
private readonly cmdTopic: string
private readonly statusTopic: string
private readonly pending: Map<string, Pending> = new Map()
constructor(options: { cmdTopic?: string; statusTopic?: string } = {}) {
this.cmdTopic = options.cmdTopic ?? '/sorting_arm/cmd'
this.statusTopic = options.statusTopic ?? '/sorting_arm/status'
}
connect(url: string): void {
if (this.ws && (this.ws.readyState === WebSocket.OPEN || this.ws.readyState === WebSocket.CONNECTING)) return
this.ws = new WebSocket(url)
this.ws.addEventListener('open', () => {
if (!this.ws) return
const sub: RosbridgeSubscribe = { op: 'subscribe', topic: this.statusTopic, type: 'std_msgs/msg/String', id: 'sub-sorting-arm-status' }
this.ws.send(JSON.stringify(sub))
})
this.ws.addEventListener('message', (ev) => {
const raw = typeof ev.data === 'string' ? ev.data : ''
if (!raw) return
let data: unknown
try {
data = JSON.parse(raw)
} catch {
return
}
if (!data || typeof data !== 'object') return
if (!('op' in data) || !('topic' in data) || !('msg' in data)) return
const d = data as RosbridgeIncoming
if (d.op !== 'publish' || d.topic !== this.statusTopic) return
const msgData = d.msg?.data
if (typeof msgData !== 'string') return
let status: unknown
try {
status = JSON.parse(msgData)
} catch {
return
}
if (!status || typeof status !== 'object') return
if (!('last_cmd_id' in status) || !('ok' in status) || !('code' in status) || !('message' in status)) return
const s = status as ArmStatus
const p = this.pending.get(s.last_cmd_id)
if (!p) return
window.clearTimeout(p.timer)
this.pending.delete(s.last_cmd_id)
p.resolve(s)
})
this.ws.addEventListener('close', () => {
for (const [cmdId, p] of this.pending.entries()) {
window.clearTimeout(p.timer)
p.reject(new Error(`connection closed, cmd_id=${cmdId}`))
}
this.pending.clear()
})
}
getSocket(): WebSocket | null {
return this.ws
}
disconnect(): void {
if (!this.ws) return
this.ws.close()
this.ws = null
}
publishCommand(cmd: ArmCommand): void {
const ws = this.ws
if (!ws || ws.readyState !== WebSocket.OPEN) throw new Error('WebSocket not open')
const payload = JSON.stringify(cmd)
const msg: RosbridgePublish = { op: 'publish', topic: this.cmdTopic, msg: { data: payload } }
ws.send(JSON.stringify(msg))
}
async sendCommand<TParams extends Record<string, unknown>>(input: {
user: string
role: Role
deviceId: string
action: ArmCommand['action']
params: TParams
safety?: ArmCommand['safety']
timeoutMs?: number
}): Promise<{ cmdId: string; status: ArmStatus }> {
const cmdId = makeCmdId()
const cmd: ArmCommand<TParams> = {
cmd_id: cmdId,
scene: 'sorting',
device_type: 'arm',
device_id: input.deviceId,
action: input.action,
params: input.params,
safety: input.safety ?? { require_enable: true, require_guard_closed: true },
meta: { user: input.user, role: input.role },
ts_ms: Date.now()
}
this.publishCommand(cmd)
const timeoutMs = input.timeoutMs ?? 3000
const status = await new Promise<ArmStatus>((resolve, reject) => {
const timer = window.setTimeout(() => {
this.pending.delete(cmdId)
reject(new Error(`status timeout, cmd_id=${cmdId}`))
}, timeoutMs)
this.pending.set(cmdId, { resolve, reject, timer })
})
return { cmdId, status }
}
}
export function makeCmdId(): string {
const ts = Date.now()
const rand = Math.random().toString(16).slice(2, 8)
const day = new Date(ts)
const y = String(day.getFullYear())
const m = String(day.getMonth() + 1).padStart(2, '0')
const d = String(day.getDate()).padStart(2, '0')
return `C-${y}${m}${d}-${rand}`
}
关键点解释:
-
pending: Map<cmd_id, Pending>:用 cmd_id 做 key,把“未来会回来的状态回执”暂存起来 -
sendCommand():先发 publish,再把 Promise 挂到 pending,等 status 到来时 resolve;超时则 reject -
message处理:只吃/sorting_arm/status,并且只处理msg.data是字符串且能解析成 status 的情况 -
close处理:连接断开时,把所有 pending 都 reject,避免页面一直 loading -
至少 3 个按钮:
回零、停止、急停(动作名可自定,但要写入ArmAction联合类型) -
viewer 角色看得到按钮但点不了,operator 能点控制类按钮
-
每次点击能看到:cmd_id、发送中状态、最终 status(成功/失败/超时)
你可以把下面模板直接发给 AI,让它输出“可粘贴进项目”的代码。
模板 1:生成 ArmCommand/ArmStatus 格式与类型
你是前端工程师。请为 Web → ROS2 的指令下发设计一个 JSON 消息格式(ArmCommand)和回传格式(ArmStatus)。
要求:复用“机械臂控制”骨架:cmd_id/scene/device_type/device_id/action/params/safety/ts_ms,回传包含 last_cmd_id/state/ok/code/message/detail/ts_ms,并给出 TypeScript 类型定义与 3 条示例消息。
约束:消息承载使用 std_msgs/msg/String 的 data 字段携带 JSON 字符串;action 使用字符串白名单联合类型;保证字段命名一致并能用于 cmd_id/last_cmd_id 匹配。
模板 2:生成 sendCommand 封装(带超时与回传匹配)
你是 TypeScript 工程师。请实现一个 SortingArmWebControlClient:
- connect(url):建立 WebSocket,并在 open 后 subscribe /sorting_arm/status
- sendCommand({user,role,deviceId,action,params,timeoutMs}):publish 到 /sorting_arm/cmd,并等待 status(通过 cmd_id/last_cmd_id 匹配),超时 reject
- disconnect():断开连接;close 时 reject 所有 pending
输出:完整 ts 代码(不依赖第三方库),并说明关键设计点。
模板 3:生成 ROS2 Python 执行节点(含权限校验)
你是 ROS2 Python 工程师。请编写 rclpy 节点:订阅 /sorting_arm/cmd (std_msgs/msg/String),解析 JSON 指令(含 cmd_id/action/device_id/meta.role),按 role 做动作白名单权限校验;执行通过则回传 /sorting_arm/status (std_msgs/msg/String);status 必须复用 last_cmd_id=cmd_id。
输出:可运行的 Python 文件,包含错误处理(JSON 解析失败/字段缺失/权限拒绝),并给出 ros2 topic echo 的验证步骤。
课后作业(提交要求)
1)实现 Web 与 ROS2 双向通信(订阅 + 发布)
2)截图数据接收与指令下发效果(至少两张:收到数据、下发成功或失败提示)
3)提供关键代码说明(至少包含:消息格式、sendCommand/订阅 status 的核心逻辑、权限控制点)
参考与延伸
- rosbridge_suite(协议与实现,通用参考):https://github.com/RobotWebTools/rosbridge_suite
- ROS2 话题基础(topic/echo/pub,通用参考):https://docs.ros.org/