18 工业数据实时推送业务开发

工业数据实时推送业务开发

关联:索引

要解决的问题

与上节课的衔接(非常重要)

本讲不另起一套协议,而是把上节课项目 05_ws_industrial_push/ 的“协议与页面套路”提炼出来,独立成一个本讲专用项目,便于与上节课区分:

章节内容(本讲核心)

项目工坊(本讲落地项目)

作业

一、统一消息协议(沿用上一讲协议,在本讲独立项目中落地)

本讲协议不再使用 client.subscribe,统一用“房间(room)订阅”:

1) Message 结构(前后端统一)

对应项目文件:06_ws_industrial_push/client/06_ws_industrial_push/src/utils/ws.ts

export type MsgType =
  | 'chat'
  | 'ping'
  | 'pong'
  | 'system'
  | 'error'
  | 'tick'
  | 'join'
  | 'leave'
  | 'device.status'
  | 'sorting.data'
  | 'stats.summary'

export type Message<T = unknown> = {
  type: MsgType
  id: string
  ts: number
  payload: T
  meta?: { from?: 'client' | 'server'; traceId?: string }
}

逐段说明:

2) join/leave 的 payload(订阅/退订)

export type JoinPayload = { room: 'device.status' | 'sorting.data' | 'stats.summary'; role?: 'web' | 'ros2' }
export type LeavePayload = { room: 'device.status' | 'sorting.data' | 'stats.summary' }

要点:

3) 三类业务 payload(本讲最小可用)

export type DeviceStatusPayload = {
  deviceId: string
  online: boolean
  state: 'IDLE' | 'RUNNING' | 'ALARM' | 'MAINTENANCE'
  updatedAt: number
}

export type SortingDataPayload = {
  lineId: string
  batchId: string
  okCount: number
  ngCount: number
  updatedAt: number
}

export type StatsSummaryPayload = {
  windowSec: number
  throughput: number
  passRate: number
  updatedAt: number
}

二、模块边界(沿用项目结构,不新增 realtime/)

对应项目结构(本讲独立项目前端):

src/
  utils/ws.ts                 # 协议与工具:MsgType/Message/buildMessage/mapReadyState
  composables/useWebSocket.ts # 连接封装:connect/disconnect/send + safeParse
  components/DeviceMonitor.vue# 业务页面:join + 分发 + 响应式更新

规则:

三、数据源替换:mock -> ROS2 的“最小迁移”

只要做到两点,页面就能无感切换数据源:

  1. 协议不变:仍然发送 type/id/ts/payload/meta
  2. topic 不变:仍然按 device.status / sorting.data / stats.summary 广播

迁移时的变化只有“谁发送业务消息”:

四、ROS2 链接 Web 的两种方式(工程选型,结合“ROS2 在虚拟机”的真实约束)

在工业现场更常见的是:ROS2 节点运行在工控机/虚拟机/内网环境里,而 Web 页面运行在开发机浏览器里。此时建议先做清楚“网络拓扑”,再选方案。

方式 A(推荐):ROS2 当 WebSocket 客户端,连到网关,由网关转发给浏览器

ROS2(虚拟机/内网) --出向连接 ws--> FastAPI 网关(可被浏览器访问) --广播--> Web(浏览器)

优势:

代价:

方式 B(不推荐为默认):ROS2 作为 WebSocket 服务端,浏览器直接连接 ROS2

Web(浏览器) --ws/wss--> ROS2(虚拟机里跑 WebSocket server)

风险点(在“ROS2 在虚拟机”时最常见):

五、本课小结与常见错误

一、页面交付(DeviceMonitor.vue)

对应项目文件:06_ws_industrial_push/client/06_ws_industrial_push/src/components/DeviceMonitor.vue

核心结构(示例与项目一致):

<script setup lang="ts">
import { computed, onMounted, ref, watch } from 'vue'
import { useWebSocket } from '../composables/useWebSocket'
import type { Message } from '../utils/ws'

type DeviceStatusPayload = {
  deviceId: string
  online: boolean
  state: 'IDLE' | 'RUNNING' | 'ALARM' | 'MAINTENANCE'
  updatedAt: number
}

type SortingDataPayload = {
  lineId: string
  batchId: string
  okCount: number
  ngCount: number
  updatedAt: number
}

type StatsSummaryPayload = {
  windowSec: number
  throughput: number
  passRate: number
  updatedAt: number
}

const { status, lastError, connect, send } = useWebSocket()

const devices = ref<Record<string, DeviceStatusPayload>>({})
const stats = ref<StatsSummaryPayload>({ windowSec: 60, throughput: 0, passRate: 0, updatedAt: 0 })
const sortingList = ref<SortingDataPayload[]>([])

const deviceList = computed(() => Object.values(devices.value).sort((a, b) => a.deviceId.localeCompare(b.deviceId)))

function handleMessage(msg: Message) {
  if (msg.type === 'device.status') {
    const p = msg.payload as DeviceStatusPayload
    devices.value[p.deviceId] = p
    return
  }
  if (msg.type === 'stats.summary') {
    stats.value = msg.payload as StatsSummaryPayload
    return
  }
  if (msg.type === 'sorting.data') {
    const p = msg.payload as SortingDataPayload
    sortingList.value.unshift(p)
    sortingList.value = sortingList.value.slice(0, 20)
  }
}

const joined = ref(false)
watch(status, (s) => {
  if (s !== 'OPEN') return
  if (joined.value) return
  send('join', { room: 'device.status', role: 'web' })
  send('join', { room: 'stats.summary', role: 'web' })
  send('join', { room: 'sorting.data', role: 'web' })
  joined.value = true
})

onMounted(() => connect('ws://localhost:8000/ws', handleMessage))
</script>

二、网关升级:允许 ROS2 注入业务消息

对应项目文件:06_ws_industrial_push/server/app.py

约定:

最小行为(概念图):

ROS2(生产数据) --WebSocket--> 网关(/ws) --广播到 room--> Web(渲染页面)

三、数据源注入演示(可选:只验证“网关注入接口”,不讲 ROS2 订阅细节)

检查点:

测试点 操作 预期结果
Web 连接成功 启动服务端后打开页面 状态从 CONNECTING 变为 OPEN
join 生效 页面 OPEN 后发送 join 开始持续收到三类业务消息
设备增量更新 连续推送同一 deviceId 的 device.status 表格对应行更新,行数不变
分拣列表限长 连续推送 30 条 sorting.data 页面只保留最近 20 条
数据源注入(可选) 注入端发送业务消息 Web 页面更新(无需改前端)
异常提示 服务端停掉后刷新页面 状态为 CLOSED,错误提示可见

五、大模型任务(指令模板 + 校验点)

请基于本讲独立项目 06_ws_industrial_push 做升级,要求:
1) 协议沿用 Message(type/id/ts/payload/meta)与 join/leave 房间机制(room=业务 type)
2) 前端:DeviceMonitor.vue 能接收 device.status / sorting.data / stats.summary 并实时更新;设备按 deviceId 增量更新;分拣列表只保留最近 20 条;显示连接状态与错误
3) 服务端:允许 role=ros2 的连接发送上述三类业务消息,网关收到后广播到同名 room
4) 输出时注明需要修改的文件路径:06_ws_industrial_push/client/06_ws_industrial_push/src/utils/ws.ts、06_ws_industrial_push/client/06_ws_industrial_push/src/composables/useWebSocket.ts、06_ws_industrial_push/client/06_ws_industrial_push/src/components/DeviceMonitor.vue、06_ws_industrial_push/server/app.py
5) 额外输出一个 Python 脚本骨架:模拟 role=ros2 数据源 -> WebSocket 网关推送(不涉及 rosbridge)
校验点:type 字符串完全一致(device.status/sorting.data/stats.summary/join/leave),payload 字段一致,Web 页面无需改动即可切换数据源

参考与延伸

附录 A:快速注入脚本(模拟 role=ros2 数据源)

用途:验证网关“允许外部数据源注入并广播”的能力,先不引入 ROS2/rosbridge 环境复杂度。

依赖:

pip install websockets
import asyncio
import contextlib
import json
import random
import time

import websockets

def now_ms() -> int:
    return int(time.time() * 1000)

def build_message(msg_type: str, payload: dict) -> dict:
    return {
        "type": msg_type,
        "id": f"inject-{now_ms()}-{random.randint(1000, 9999)}",
        "ts": now_ms(),
        "payload": payload,
        "meta": {"from": "client", "traceId": "inject"},
    }

async def drain_messages(ws) -> None:
    async for _ in ws:
        pass

async def main():
    async with websockets.connect("ws://localhost:8000/ws") as ws:
        drain_task = asyncio.create_task(drain_messages(ws))
        try:
            await ws.send(json.dumps(build_message("join", {"room": "device.status", "role": "ros2"}), ensure_ascii=False))
            await ws.send(json.dumps(build_message("join", {"room": "stats.summary", "role": "ros2"}), ensure_ascii=False))
            await ws.send(json.dumps(build_message("join", {"room": "sorting.data", "role": "ros2"}), ensure_ascii=False))

            while True:
                await ws.send(
                    json.dumps(
                        build_message(
                            "device.status",
                            {
                                "deviceId": "D-01",
                                "online": True,
                                "state": random.choice(["IDLE", "RUNNING", "ALARM", "MAINTENANCE"]),
                                "updatedAt": now_ms(),
                            },
                        ),
                        ensure_ascii=False,
                    )
                )
                await ws.send(
                    json.dumps(
                        build_message(
                            "stats.summary",
                            {
                                "windowSec": 60,
                                "throughput": round(random.uniform(20, 60), 2),
                                "passRate": 0.98,
                                "updatedAt": now_ms(),
                            },
                        ),
                        ensure_ascii=False,
                    )
                )
                await ws.send(
                    json.dumps(
                        build_message(
                            "sorting.data",
                            {
                                "lineId": "L-02",
                                "batchId": f"B-{random.randint(100, 999)}",
                                "okCount": random.randint(0, 50),
                                "ngCount": random.randint(0, 5),
                                "updatedAt": now_ms(),
                            },
                        ),
                        ensure_ascii=False,
                    )
                )
                await asyncio.sleep(1)
        finally:
            drain_task.cancel()
            with contextlib.suppress(asyncio.CancelledError):
                await drain_task

asyncio.run(main())

附录 A2(可选):用 ROS2 功能包 + 节点模拟 role=ros2 数据源

依赖:

pip install websockets

创建包(ament_python):

cd <your_ros2_ws>/src
ros2 pkg create --build-type ament_python industrial_ws_injector --dependencies rclpy

节点代码(示例文件:industrial_ws_injector/industrial_ws_injector/ws_injector_node.py):

import asyncio
import json
import random
import threading

import rclpy
from rclpy.node import Node
import websockets

def now_ms(node: Node) -> int:
    return int(node.get_clock().now().nanoseconds / 1_000_000)

def build_message(node: Node, msg_type: str, payload: dict) -> dict:
    ts = now_ms(node)
    return {
        "type": msg_type,
        "id": f"inject-{ts}-{random.randint(1000, 9999)}",
        "ts": ts,
        "payload": payload,
        "meta": {"from": "client", "traceId": "inject"},
    }

class WsInjector(Node):
    def __init__(self) -> None:
        super().__init__("ws_injector")
        self.declare_parameter("url", "ws://localhost:8000/ws")
        self.declare_parameter("period_sec", 1.0)

        self._loop = asyncio.new_event_loop()
        self._thread = threading.Thread(target=self._loop.run_forever, daemon=True)
        self._thread.start()
        self._loop.call_soon_threadsafe(lambda: asyncio.create_task(self._run()))

    async def _run(self) -> None:
        url = self.get_parameter("url").get_parameter_value().string_value
        period = self.get_parameter("period_sec").get_parameter_value().double_value

        async with websockets.connect(url) as ws:
            await ws.send(json.dumps(build_message(self, "join", {"room": "device.status", "role": "ros2"}), ensure_ascii=False))
            await ws.send(json.dumps(build_message(self, "join", {"room": "stats.summary", "role": "ros2"}), ensure_ascii=False))
            await ws.send(json.dumps(build_message(self, "join", {"room": "sorting.data", "role": "ros2"}), ensure_ascii=False))

            while rclpy.ok():
                await ws.send(
                    json.dumps(
                        build_message(
                            self,
                            "device.status",
                            {
                                "deviceId": "D-01",
                                "online": True,
                                "state": random.choice(["IDLE", "RUNNING", "ALARM", "MAINTENANCE"]),
                                "updatedAt": now_ms(self),
                            },
                        ),
                        ensure_ascii=False,
                    )
                )
                await ws.send(
                    json.dumps(
                        build_message(
                            self,
                            "stats.summary",
                            {"windowSec": 60, "throughput": round(random.uniform(20, 60), 2), "passRate": 0.98, "updatedAt": now_ms(self)},
                        ),
                        ensure_ascii=False,
                    )
                )
                await ws.send(
                    json.dumps(
                        build_message(
                            self,
                            "sorting.data",
                            {
                                "lineId": "L-02",
                                "batchId": f"B-{random.randint(100, 999)}",
                                "okCount": random.randint(0, 50),
                                "ngCount": random.randint(0, 5),
                                "updatedAt": now_ms(self),
                            },
                        ),
                        ensure_ascii=False,
                    )
                )
                await asyncio.sleep(period)

    def destroy_node(self):
        if self._loop.is_running():
            self._loop.call_soon_threadsafe(self._loop.stop)
        super().destroy_node()

def main():
    rclpy.init()
    node = WsInjector()
    try:
        rclpy.spin(node)
    finally:
        node.destroy_node()
        rclpy.shutdown()

if __name__ == "__main__":
    main()

运行方式(示例):

cd <your_ros2_ws>
colcon build
source install/setup.bash
ros2 run industrial_ws_injector ws_injector_node --ros-args -p url:=ws://localhost:8000/ws -p period_sec:=1.0

附录 B:ROS2/rosbridge 的正式路径(只给链路图与映射点)

Web(roslibjs) --WebSocket--> rosbridge_server --DDS--> ROS2 nodes

附录 C:什么时候才考虑“浏览器直连 ROS2 WebSocket 服务端”

只在满足以下条件时再考虑方式 B: