15 实践课-rosbridge 配置与 ROS2 通信基础工具开发
rosbridge 配置与 ROS2 通信基础工具开发
关联:索引
- 场景提问:同样是“能连上 9090”,为什么有的配置只能用于演示,有的配置可以用于设备控制联调?
如果你尚未完成 rosbridge 部署,请先参考《机器人与智能系统综合实践》课程的 19 完成“安装 → 启动 → 端口监听 → 协议最小验证”。本讲默认该步骤已完成。
本讲只保留与设备控制安全直接相关的配置要点与自检命令清单。
最小可用(默认端口与地址由 launch 决定):
ros2 launch rosbridge_server rosbridge_websocket_launch.xml
设备控制联调推荐(显式指定端口 + 虚拟机环境对外监听,配合端口映射):
ros2 launch rosbridge_server rosbridge_websocket_launch.xml port:=9090 address:=0.0.0.0
逐行解释(只保留与本讲相关的关键点):
port:=9090:明确端口,避免“同学 A 默认 9090、同学 B 默认 9091”导致互相无法复现实验。address:=0.0.0.0:让 rosbridge 在虚拟机内监听所有网卡,宿主机才能通过 NAT/端口映射访问到虚拟机内的 9090。
2. 参数自检:避免“以为改了但其实没生效”
ros2 launch rosbridge_server rosbridge_websocket_launch.xml --show-args
解释:
- 该命令会列出 launch 支持的参数名与默认值;用于确认参数确实叫
port/address(不同版本可能存在差异)。
3. 端口占用检查(设备控制联调必做)
ss -lntp | grep 9090 || true
解释:
-
发现 9090 已被占用时:优先换端口(例如 9091),并把“启动命令 + 客户端 URL”同步修改,保证证据一致。
-
不以 root 身份运行 rosbridge(原则:设备控制入口不应带系统最高权限)。
-
在虚拟机 + 端口映射场景下,通常需要
address=0.0.0.0才能被宿主机访问;同时必须明确访问范围与审计证据,避免误暴露。 -
控制面与状态面分 Topic(
/sorting_arm/cmd与/sorting_arm/status),并要求状态回执可追踪(必须能关联cmd_id)。
课程思政融入点(口径统一):
- 设备通信能力是“高风险能力”。权限配置与暴露面控制不是形式要求,而是对人员安全与设备资产负责的工程底线。
本节课只保留“联调必用”的命令速查,不展开原理解释。
1. Topic 调试速查
ros2 topic list
ros2 topic info /sorting_arm/cmd
ros2 topic echo /sorting_arm/status
解释:
topic list:确认话题名是否存在(很多“收不到”其实是话题名拼错或节点未启动)。topic info:确认消息类型、发布者/订阅者情况。topic echo:确认状态是否在发布(是联调最直观的证据)。
2. Service 调试速查(如你的设备控制走 Service)
ros2 service list
ros2 service type /sorting_arm/home
ros2 service call /sorting_arm/home std_srvs/srv/Trigger "{}"
解释:
- Service 适合“一次性事务”(回零/急停确认/一次性查询);命令行先调通再接 rosbridge,排错更快。
1. 用 Python 验证 WebSocket 能连上
先安装依赖(在你的 Python 环境中执行;建议在本讲统一项目目录内安装):
cd 09_rosbridge_control_tool
python -m pip install -r requirements.txt
解释:
requirements.txt里固定了websocket-client版本,减少“同学 A 能跑、同学 B 不能跑”的差异。
最小连通性脚本(项目内文件):
文件:09_rosbridge_control_tool/scripts/ws_connect_check.py
from websocket import create_connection
WS_URL = "ws://localhost:19090"
ws = create_connection(WS_URL, timeout=5)
print("connected:", WS_URL)
ws.close()
逐段解释:
-
WS_URL:在“虚拟机端口映射”场景下,客户端应连接宿主机映射端口(例如ws://localhost:19090),而不是虚拟机内的9090。 -
create_connection(..., timeout=5):避免“卡住不返回”,利于排障。 -
本讲只做“握手连通性”自检;协议层最小验证(advertise/publish/subscribe)已在 19/20 中固化,本讲不重复。
-
终端 1:rosbridge 启动日志(含 port/address)。
-
终端 2:Python 脚本输出
connected: ...。
- 共建一套“分拣产线多设备控制指令格式”(统一骨架;机械臂为必做示例,可扩展到 AGV/传送带/工业相机)。
- 用 rosbridge 完成两件事:发控制指令(publish/call_service)与收状态(subscribe)。
- 用 AI 协同完成:生成配置脚本、定位 1 个通信问题、完成复验与记录。
0. 统一控制指令骨架(适用于机械臂/AGV/传送带/工业相机)
本的核心不是“某个设备的所有指令枚举”,而是统一一套控制面闭环:指令可解析、可校验、可追踪、可回执、可审计。为避免每加一类设备就重写一套协议,建议统一使用如下骨架:
{
"cmd_id": "C-20260412-0001",
"scene": "sorting",
"device_type": "arm",
"device_id": "arm_01",
"action": "home",
"params": {},
"safety": {
"require_enable": true,
"require_guard_closed": true
},
"meta": {
"user": "stu01",
"role": "operator"
},
"ts_ms": 1710000000000
}
字段解释(控制面闭环的“最小必备”):
cmd_id:必须唯一;用于日志追踪与状态回执关联(没有它就无法审计与复验)。device_type/device_id:把“控制对象”显式化,避免“同一条指令发错设备/错实例”;也方便智能体做路由与权限控制。action/params:动作与参数必须匹配;参数缺失或多余都应返回明确错误(可解释、可回归)。safety:把安全前置条件显式化(联锁/使能/区域限制/速度上限等),避免只写在口头约定里。meta:可选字段,用于携带 Web/智能体侧的用户与角色信息(用于权限与审计);设备侧可先忽略,后续再逐步纳入校验。ts_ms:客户端时间戳;用于排查“延迟/重放/乱序”。
建议每类设备先固化 3–5 个“最常用动作”,其余动作留到项目扩展或设备厂商文档中。
| 设备 | device_type | 最小动作(示例) | params 典型字段(示例) | 建议通信方式(原则) |
|---|---|---|---|---|
| 机械臂 | arm |
home / move_to / pick_place / stop / e_stop |
pose 或 from/to/speed |
控制指令 Topic + 状态 Topic;关键动作可补 Service 确认 |
| 传送带 | conveyor |
start / stop / set_speed / set_direction |
speed、direction |
简单控制可用 Topic;需要“必须成功”的可用 Service |
| 工业相机 | camera |
trigger_once / start_stream / stop_stream / set_exposure |
exposure_ms、gain、mode |
控制走 Service/Topic;图像数据不建议走 String+JSON |
说明(防止与 20/21 混淆):
- 20/21 偏“数据面”:订阅与展示;本讲偏“控制面”:指令下发与回执闭环。
- 工业相机的“控制指令”可以走本讲统一骨架,但“图像数据流”通常应走原生消息(如
sensor_msgs/msg/Image)或更合适的传输方案,避免用 String 携带大 JSON/二进制。
1. 代表示例:机械臂控制指令(arm)
{
"cmd_id": "C-20260412-0001",
"scene": "sorting",
"device_type": "arm",
"device_id": "arm_01",
"action": "pick_place",
"params": {
"from": "bin_in_3",
"to": "bin_out_glass",
"speed": 0.5
},
"safety": {
"require_enable": true,
"require_guard_closed": true
},
"ts_ms": 1710000000000
}
cmd_id:必须唯一;用于日志追踪与状态回执关联(没有它就无法审计)。scene/device_type/device_id:用于多设备与多场景扩展;避免“同一条指令发错设备”。action:动作枚举(建议:home/move_to/pick/place/pick_place/stop/e_stop)。params:动作参数,必须与action对应;字段缺失要返回可解释错误。safety:安全前置条件(示例:电机使能、护栏关闭);用于把“安全要求”显式化,而不是藏在口头约定里。ts_ms:客户端生成时间戳;便于排查“延迟/重放/乱序”。
2. 机械臂状态回执(arm)
{
"device_type": "arm",
"device_id": "arm_01",
"state": "idle",
"last_cmd_id": "C-20260412-0001",
"ok": true,
"code": "OK",
"message": "pick_place finished",
"detail": {
"progress": 1.0
},
"ts_ms": 1710000001234
}
字段解释:
state:设备状态(建议:idle/running/error/estop)。ok/code/message:错误可解释三件套;code用于归类,message用于人读。last_cmd_id:必须回传;用于证明“这条状态是回应哪条指令”。detail:可选扩展字段;允许不同设备携带各自的关键状态(例如 AGV 的电量/位姿,传送带的转速/过载,相机的曝光/最近触发时间)。
3. 扩展示例(选做):AGV/传送带/工业相机的最小控制指令
AGV:导航到目标点(agv.navigate_to):
{
"cmd_id": "C-20260412-0101",
"scene": "sorting",
"device_type": "agv",
"device_id": "agv_01",
"action": "navigate_to",
"params": { "goal": { "x": 1.2, "y": -0.8, "yaw": 1.57 }, "speed": 0.6 },
"safety": { "require_enable": true, "require_guard_closed": true },
"ts_ms": 1710000000000
}
传送带:设置速度并启动(conveyor.set_speed + conveyor.start):
{
"cmd_id": "C-20260412-0201",
"scene": "sorting",
"device_type": "conveyor",
"device_id": "conv_01",
"action": "set_speed",
"params": { "speed": 0.4, "direction": "forward" },
"safety": { "require_enable": true, "require_guard_closed": true },
"ts_ms": 1710000000000
}
工业相机:单次触发(camera.trigger_once):
{
"cmd_id": "C-20260412-0301",
"scene": "sorting",
"device_type": "camera",
"device_id": "cam_01",
"action": "trigger_once",
"params": { "exposure_ms": 8.0, "gain": 1.2 },
"safety": { "require_enable": true, "require_guard_closed": true },
"ts_ms": 1710000000000
}
rosbridge JSON 协议字段解释(op/topic/type/msg/id 等)已在 20 固化,本讲只保留“本次设备控制联调要对齐的三件事”:
- 控制指令 Topic:
/sorting_arm/cmd - 状态回执 Topic:
/sorting_arm/status
解释:
- 只要把“Topic 名 + 消息类型 + JSON 字段”三件事对齐,本讲的通信工具代码即可复用。
- 当你扩展到 AGV/传送带/相机时,建议为每类设备分配各自的控制与状态 Topic(例如
/agv/cmd、/agv/status),消息类型仍可先用std_msgs/msg/String承载统一骨架,保证“设备越多,协议越统一”。
本节课交付一个“rosbridge 客户端小工具”,具备两项能力:
- 指令发送:向
/sorting_arm/cmd发布 String(内含控制指令 JSON)。 - 状态接收:订阅
/sorting_arm/status并打印回执(能关联cmd_id)。
1. 工具依赖与运行方式
cd 09_rosbridge_control_tool
python -m pip install -r requirements.txt
解释:
2. 参考实现:一体化 demo(可直接运行)
文件:09_rosbridge_control_tool/rosbridge_client.py
import json
import time
from typing import Any, Dict
from websocket import WebSocketApp
WS_URL = "ws://localhost:19090"
TOPIC_CMD = "/sorting_arm/cmd"
TOPIC_STATUS = "/sorting_arm/status"
def build_arm_command(cmd_id: str, action: str, params: Dict[str, Any]) -> Dict[str, Any]:
return {
"cmd_id": cmd_id,
"scene": "sorting",
"device_type": "arm",
"device_id": "arm_01",
"action": action,
"params": params,
"safety": {"require_enable": True, "require_guard_closed": True},
"ts_ms": int(time.time() * 1000),
}
def on_open(ws: WebSocketApp) -> None:
subscribe_msg = {"op": "subscribe", "topic": TOPIC_STATUS}
ws.send(json.dumps(subscribe_msg, ensure_ascii=False))
cmd_id = f"C-{time.strftime('%Y%m%d')}-0001"
arm_cmd = build_arm_command(cmd_id=cmd_id, action="home", params={})
publish_msg = {
"op": "publish",
"topic": TOPIC_CMD,
"msg": {"data": json.dumps(arm_cmd, ensure_ascii=False)},
}
ws.send(json.dumps(publish_msg, ensure_ascii=False))
print("sent cmd:", arm_cmd)
def on_message(ws: WebSocketApp, message: str) -> None:
try:
payload = json.loads(message)
except json.JSONDecodeError:
print("recv non-json:", message)
return
if payload.get("topic") != TOPIC_STATUS:
return
msg = payload.get("msg") or {}
data = msg.get("data", "")
try:
status = json.loads(data) if isinstance(data, str) else data
except json.JSONDecodeError:
print("status data not json:", data)
return
print("recv status:", status)
def on_error(ws: WebSocketApp, error: Exception) -> None:
print("ws error:", repr(error))
def on_close(ws: WebSocketApp, code: int, reason: str) -> None:
print("ws closed:", code, reason)
if __name__ == "__main__":
app = WebSocketApp(
WS_URL,
on_open=on_open,
on_message=on_message,
on_error=on_error,
on_close=on_close,
)
app.run_forever(ping_interval=20, ping_timeout=5)
on_open:连接成功后先subscribe再publish,避免“状态回得很快但你还没订阅到”。publish_msg.msg.data:把应用层指令 JSON 作为字符串放入 String 消息字段data(最小可用方案)。on_message:只处理topic == /sorting_arm/status的消息;并把data解析回 JSON,便于打印与后续断言。run_forever(ping_interval=20):维持连接心跳,减少“中途断线不自知”。
当现场没有真实机械臂时,用这个 ROS2 节点模拟“收到指令 → 回一条状态”。
源码文件:09_rosbridge_control_tool/ros2_ws/src/sorting_arm_mock/sorting_arm_mock/arm_mock_node.py
构建与运行(在 ROS2 环境终端执行):
cd 09_rosbridge_control_tool/ros2_ws
colcon build --symlink-install
source install/setup.bash
ros2 run sorting_arm_mock arm_mock
解释:
colcon build --symlink-install:构建 ROS2 工作空间,便于修改源码后快速生效。source install/setup.bash:让当前终端识别新安装的包与可执行入口。ros2 run sorting_arm_mock arm_mock:运行模拟节点(由setup.py的console_scripts注册)。
import json
import time
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
class ArmMock(Node):
def __init__(self) -> None:
super().__init__("arm_mock")
self.sub = self.create_subscription(String, "/sorting_arm/cmd", self.on_cmd, 10)
self.pub = self.create_publisher(String, "/sorting_arm/status", 10)
def on_cmd(self, msg: String) -> None:
try:
cmd = json.loads(msg.data)
except Exception:
cmd = {"cmd_id": "UNKNOWN"}
status = {
"device_type": "arm",
"device_id": "arm_01",
"state": "idle",
"last_cmd_id": cmd.get("cmd_id", "UNKNOWN"),
"ok": True,
"code": "OK",
"message": "mock ack",
"ts_ms": int(time.time() * 1000),
}
out = String()
out.data = json.dumps(status, ensure_ascii=False)
self.pub.publish(out)
def main() -> None:
rclpy.init()
node = ArmMock()
try:
rclpy.spin(node)
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == "__main__":
main()
逐段解释:
/sorting_arm/cmd:订阅控制指令;msg.data里是 JSON 字符串。/sorting_arm/status:发布状态回执;把last_cmd_id回传用于关联。
本节课要求“AI 协同,但必须可复验、可审计”。建议把以下模板复制到你的 AI 对话里使用。
1. 让 AI 生成 rosbridge 配置脚本(带注释)
可直接粘贴的指令模板:
你是 ROS2 工程助教。请为我生成一个可在 Ubuntu 上运行的 bash 脚本,用于安装并启动 rosbridge_server(WebSocket)。
要求:
1) 支持参数:ROS_DISTRO、PORT、ADDRESS(默认 ADDRESS=127.0.0.1)。
2) 启动前做自检:ros2 是否可用、包是否安装、端口是否被占用。
3) 输出清晰日志:每一步做什么、失败时怎么处理。
4) 生成后请给出验证步骤(如何用 Python websocket-client 连通)。
请把脚本完整输出,并解释关键行。
复验要点(你必须自己检查):
- 脚本是否真的使用了你设置的
PORT/ADDRESS。 - 自检是否覆盖:ROS2 环境、包安装、端口占用。
- 验证步骤是否与脚本配置一致(同一个 port/address)。
2. 让 AI 帮你定位“连不上/收不到/类型不匹配”
可直接粘贴的指令模板:
我在做 rosbridge↔ROS2 通信联调。现象如下:
1) rosbridge 启动命令与参数:
2) rosbridge 日志(粘贴 30 行):
3) 客户端连接方式(URL/代码片段):
4) ROS2 端话题/服务信息:
- ros2 topic list 输出:
- ros2 topic info /sorting_arm/cmd 输出:
- ros2 topic echo /sorting_arm/status 的现象:
5) 期望行为:
请你按“三层定位法”给出排查步骤:网络层(端口/地址/防火墙)→ rosbridge 层(启动参数/日志/协议消息)→ ROS2 层(话题名/类型/QoS)。
每一步必须包含:要执行的命令/要看的输出/如何判定下一步。
- 你提交的 AI 调试记录必须包含:原始现象、AI 给的步骤、你执行后的关键输出、最后的修复点与复验截图。
- rosbridge 正确启动(截图含 port/address)。
- Python 工具成功连接并发送 1 条
home指令(输出含 cmd_id)。 - ROS2 端(真实设备或 mock)发布状态回执,Python 工具能收到并打印(输出含 last_cmd_id 对应 cmd_id)。
常见错误(要求能解释):
- URL/端口不一致:客户端连的是 9090,但 rosbridge 实际起在 9091。
- 绑定地址错误:rosbridge 只绑定 127.0.0.1,但你从另一台机器访问。
- 话题名不一致:
/sorting_arm/status与/sorting_arm/state混用导致订阅不到。 - 消息字段不匹配:String 必须用
data字段;字段写错会解析失败。
七、回看资料(不重复,来自《机器人与智能系统综合实践》课程)
- 说明:以下 19/20/21 为《机器人与智能系统综合实践》课程,与本讲《人工智能综合实践》不是同一门课;本讲仅引用其“通用 rosbridge 基础与 Web 端工程”部分作为回看资料。
- 19 rosbridge 原理与环境部署
- 20 Web 端连接 ROS2 与话题订阅
- 21 ROS2 数据 Web 端实时展示
作业(布置)
- 提交 rosbridge 配置截图(含验证连通性的日志)及配置脚本。
- 提交 ROS2 通信工具代码及通信测试截图(指令发送 / 状态接收)。
- 提交 AI 调试通信问题的交互记录,附问题解决过程说明。
- 标题层级连续(# → ## → ###),无跳级;代码块均已闭合且语言标签正确。
- 所有命令均有“逐行解释”,且命令中的端口/address 与示例脚本一致。
- Python 示例无语法错误:
json.dumps/loads配对正确;WebSocketApp回调签名正确;ROS2 mock 节点能rclpy.spin。 - 指令协议字段可落地:
cmd_id/last_cmd_id可关联;safety字段不为空;状态回执含ok/code/message。