15 实践课-rosbridge 配置与 ROS2 通信基础工具开发

rosbridge 配置与 ROS2 通信基础工具开发

关联:索引

  1. 场景提问:同样是“能连上 9090”,为什么有的配置只能用于演示,有的配置可以用于设备控制联调?

如果你尚未完成 rosbridge 部署,请先参考《机器人与智能系统综合实践》课程的 19 完成“安装 → 启动 → 端口监听 → 协议最小验证”。本讲默认该步骤已完成。

本讲只保留与设备控制安全直接相关的配置要点与自检命令清单。

最小可用(默认端口与地址由 launch 决定):

ros2 launch rosbridge_server rosbridge_websocket_launch.xml

设备控制联调推荐(显式指定端口 + 虚拟机环境对外监听,配合端口映射):

ros2 launch rosbridge_server rosbridge_websocket_launch.xml port:=9090 address:=0.0.0.0

逐行解释(只保留与本讲相关的关键点):

2. 参数自检:避免“以为改了但其实没生效”

ros2 launch rosbridge_server rosbridge_websocket_launch.xml --show-args

解释:

3. 端口占用检查(设备控制联调必做)

ss -lntp | grep 9090 || true

解释:

课程思政融入点(口径统一):

本节课只保留“联调必用”的命令速查,不展开原理解释。

1. Topic 调试速查

ros2 topic list
ros2 topic info /sorting_arm/cmd
ros2 topic echo /sorting_arm/status

解释:

2. Service 调试速查(如你的设备控制走 Service)

ros2 service list
ros2 service type /sorting_arm/home
ros2 service call /sorting_arm/home std_srvs/srv/Trigger "{}"

解释:

1. 用 Python 验证 WebSocket 能连上

先安装依赖(在你的 Python 环境中执行;建议在本讲统一项目目录内安装):

cd 09_rosbridge_control_tool
python -m pip install -r requirements.txt

解释:

最小连通性脚本(项目内文件):

文件:09_rosbridge_control_tool/scripts/ws_connect_check.py

from websocket import create_connection

WS_URL = "ws://localhost:19090"

ws = create_connection(WS_URL, timeout=5)
print("connected:", WS_URL)
ws.close()

逐段解释:


  1. 共建一套“分拣产线多设备控制指令格式”(统一骨架;机械臂为必做示例,可扩展到 AGV/传送带/工业相机)。
  2. 用 rosbridge 完成两件事:发控制指令(publish/call_service)与收状态(subscribe)。
  3. 用 AI 协同完成:生成配置脚本、定位 1 个通信问题、完成复验与记录。

0. 统一控制指令骨架(适用于机械臂/AGV/传送带/工业相机)

本的核心不是“某个设备的所有指令枚举”,而是统一一套控制面闭环:指令可解析、可校验、可追踪、可回执、可审计。为避免每加一类设备就重写一套协议,建议统一使用如下骨架:

{
  "cmd_id": "C-20260412-0001",
  "scene": "sorting",
  "device_type": "arm",
  "device_id": "arm_01",
  "action": "home",
  "params": {},
  "safety": {
    "require_enable": true,
    "require_guard_closed": true
  },
  "meta": {
    "user": "stu01",
    "role": "operator"
  },
  "ts_ms": 1710000000000
}

字段解释(控制面闭环的“最小必备”):

建议每类设备先固化 3–5 个“最常用动作”,其余动作留到项目扩展或设备厂商文档中。

设备 device_type 最小动作(示例) params 典型字段(示例) 建议通信方式(原则)
机械臂 arm home / move_to / pick_place / stop / e_stop posefrom/to/speed 控制指令 Topic + 状态 Topic;关键动作可补 Service 确认
传送带 conveyor start / stop / set_speed / set_direction speeddirection 简单控制可用 Topic;需要“必须成功”的可用 Service
工业相机 camera trigger_once / start_stream / stop_stream / set_exposure exposure_msgainmode 控制走 Service/Topic;图像数据不建议走 String+JSON

说明(防止与 20/21 混淆):

1. 代表示例:机械臂控制指令(arm)

{
  "cmd_id": "C-20260412-0001",
  "scene": "sorting",
  "device_type": "arm",
  "device_id": "arm_01",
  "action": "pick_place",
  "params": {
    "from": "bin_in_3",
    "to": "bin_out_glass",
    "speed": 0.5
  },
  "safety": {
    "require_enable": true,
    "require_guard_closed": true
  },
  "ts_ms": 1710000000000
}

2. 机械臂状态回执(arm)

{
  "device_type": "arm",
  "device_id": "arm_01",
  "state": "idle",
  "last_cmd_id": "C-20260412-0001",
  "ok": true,
  "code": "OK",
  "message": "pick_place finished",
  "detail": {
    "progress": 1.0
  },
  "ts_ms": 1710000001234
}

字段解释:

3. 扩展示例(选做):AGV/传送带/工业相机的最小控制指令

AGV:导航到目标点(agv.navigate_to):

{
  "cmd_id": "C-20260412-0101",
  "scene": "sorting",
  "device_type": "agv",
  "device_id": "agv_01",
  "action": "navigate_to",
  "params": { "goal": { "x": 1.2, "y": -0.8, "yaw": 1.57 }, "speed": 0.6 },
  "safety": { "require_enable": true, "require_guard_closed": true },
  "ts_ms": 1710000000000
}

传送带:设置速度并启动(conveyor.set_speed + conveyor.start):

{
  "cmd_id": "C-20260412-0201",
  "scene": "sorting",
  "device_type": "conveyor",
  "device_id": "conv_01",
  "action": "set_speed",
  "params": { "speed": 0.4, "direction": "forward" },
  "safety": { "require_enable": true, "require_guard_closed": true },
  "ts_ms": 1710000000000
}

工业相机:单次触发(camera.trigger_once):

{
  "cmd_id": "C-20260412-0301",
  "scene": "sorting",
  "device_type": "camera",
  "device_id": "cam_01",
  "action": "trigger_once",
  "params": { "exposure_ms": 8.0, "gain": 1.2 },
  "safety": { "require_enable": true, "require_guard_closed": true },
  "ts_ms": 1710000000000
}

rosbridge JSON 协议字段解释(op/topic/type/msg/id 等)已在 20 固化,本讲只保留“本次设备控制联调要对齐的三件事”:

  1. 控制指令 Topic:/sorting_arm/cmd
  2. 状态回执 Topic:/sorting_arm/status

解释:

本节课交付一个“rosbridge 客户端小工具”,具备两项能力:

1. 工具依赖与运行方式

cd 09_rosbridge_control_tool
python -m pip install -r requirements.txt

解释:

2. 参考实现:一体化 demo(可直接运行)

文件:09_rosbridge_control_tool/rosbridge_client.py

import json
import time
from typing import Any, Dict

from websocket import WebSocketApp

WS_URL = "ws://localhost:19090"
TOPIC_CMD = "/sorting_arm/cmd"
TOPIC_STATUS = "/sorting_arm/status"

def build_arm_command(cmd_id: str, action: str, params: Dict[str, Any]) -> Dict[str, Any]:
    return {
        "cmd_id": cmd_id,
        "scene": "sorting",
        "device_type": "arm",
        "device_id": "arm_01",
        "action": action,
        "params": params,
        "safety": {"require_enable": True, "require_guard_closed": True},
        "ts_ms": int(time.time() * 1000),
    }

def on_open(ws: WebSocketApp) -> None:
    subscribe_msg = {"op": "subscribe", "topic": TOPIC_STATUS}
    ws.send(json.dumps(subscribe_msg, ensure_ascii=False))

    cmd_id = f"C-{time.strftime('%Y%m%d')}-0001"
    arm_cmd = build_arm_command(cmd_id=cmd_id, action="home", params={})
    publish_msg = {
        "op": "publish",
        "topic": TOPIC_CMD,
        "msg": {"data": json.dumps(arm_cmd, ensure_ascii=False)},
    }
    ws.send(json.dumps(publish_msg, ensure_ascii=False))
    print("sent cmd:", arm_cmd)

def on_message(ws: WebSocketApp, message: str) -> None:
    try:
        payload = json.loads(message)
    except json.JSONDecodeError:
        print("recv non-json:", message)
        return

    if payload.get("topic") != TOPIC_STATUS:
        return

    msg = payload.get("msg") or {}
    data = msg.get("data", "")
    try:
        status = json.loads(data) if isinstance(data, str) else data
    except json.JSONDecodeError:
        print("status data not json:", data)
        return
    print("recv status:", status)

def on_error(ws: WebSocketApp, error: Exception) -> None:
    print("ws error:", repr(error))

def on_close(ws: WebSocketApp, code: int, reason: str) -> None:
    print("ws closed:", code, reason)

if __name__ == "__main__":
    app = WebSocketApp(
        WS_URL,
        on_open=on_open,
        on_message=on_message,
        on_error=on_error,
        on_close=on_close,
    )
    app.run_forever(ping_interval=20, ping_timeout=5)

当现场没有真实机械臂时,用这个 ROS2 节点模拟“收到指令 → 回一条状态”。

源码文件:09_rosbridge_control_tool/ros2_ws/src/sorting_arm_mock/sorting_arm_mock/arm_mock_node.py

构建与运行(在 ROS2 环境终端执行):

cd 09_rosbridge_control_tool/ros2_ws
colcon build --symlink-install
source install/setup.bash
ros2 run sorting_arm_mock arm_mock

解释:

import json
import time

import rclpy
from rclpy.node import Node
from std_msgs.msg import String

class ArmMock(Node):
    def __init__(self) -> None:
        super().__init__("arm_mock")
        self.sub = self.create_subscription(String, "/sorting_arm/cmd", self.on_cmd, 10)
        self.pub = self.create_publisher(String, "/sorting_arm/status", 10)

    def on_cmd(self, msg: String) -> None:
        try:
            cmd = json.loads(msg.data)
        except Exception:
            cmd = {"cmd_id": "UNKNOWN"}
        status = {
            "device_type": "arm",
            "device_id": "arm_01",
            "state": "idle",
            "last_cmd_id": cmd.get("cmd_id", "UNKNOWN"),
            "ok": True,
            "code": "OK",
            "message": "mock ack",
            "ts_ms": int(time.time() * 1000),
        }
        out = String()
        out.data = json.dumps(status, ensure_ascii=False)
        self.pub.publish(out)

def main() -> None:
    rclpy.init()
    node = ArmMock()
    try:
        rclpy.spin(node)
    finally:
        node.destroy_node()
        rclpy.shutdown()

if __name__ == "__main__":
    main()

逐段解释:

本节课要求“AI 协同,但必须可复验、可审计”。建议把以下模板复制到你的 AI 对话里使用。

1. 让 AI 生成 rosbridge 配置脚本(带注释)

可直接粘贴的指令模板:

你是 ROS2 工程助教。请为我生成一个可在 Ubuntu 上运行的 bash 脚本,用于安装并启动 rosbridge_server(WebSocket)。
要求:
1) 支持参数:ROS_DISTRO、PORT、ADDRESS(默认 ADDRESS=127.0.0.1)。
2) 启动前做自检:ros2 是否可用、包是否安装、端口是否被占用。
3) 输出清晰日志:每一步做什么、失败时怎么处理。
4) 生成后请给出验证步骤(如何用 Python websocket-client 连通)。
请把脚本完整输出,并解释关键行。

复验要点(你必须自己检查):

2. 让 AI 帮你定位“连不上/收不到/类型不匹配”

可直接粘贴的指令模板:

我在做 rosbridge↔ROS2 通信联调。现象如下:
1) rosbridge 启动命令与参数:
2) rosbridge 日志(粘贴 30 行):
3) 客户端连接方式(URL/代码片段):
4) ROS2 端话题/服务信息:
   - ros2 topic list 输出:
   - ros2 topic info /sorting_arm/cmd 输出:
   - ros2 topic echo /sorting_arm/status 的现象:
5) 期望行为:
请你按“三层定位法”给出排查步骤:网络层(端口/地址/防火墙)→ rosbridge 层(启动参数/日志/协议消息)→ ROS2 层(话题名/类型/QoS)。
每一步必须包含:要执行的命令/要看的输出/如何判定下一步。
  1. rosbridge 正确启动(截图含 port/address)。
  2. Python 工具成功连接并发送 1 条 home 指令(输出含 cmd_id)。
  3. ROS2 端(真实设备或 mock)发布状态回执,Python 工具能收到并打印(输出含 last_cmd_id 对应 cmd_id)。

常见错误(要求能解释):

七、回看资料(不重复,来自《机器人与智能系统综合实践》课程)


作业(布置)

  1. 提交 rosbridge 配置截图(含验证连通性的日志)及配置脚本。
  2. 提交 ROS2 通信工具代码及通信测试截图(指令发送 / 状态接收)。
  3. 提交 AI 调试通信问题的交互记录,附问题解决过程说明。