18 工业数据实时推送业务开发
工业数据实时推送业务开发
关联:索引
要解决的问题
- WebSocket 已经“会连会收发”了,为什么一进入工业实时业务(设备状态/分拣数据/统计数据)就容易变得混乱、难维护
- 同一条连接如何承载多种业务数据:如何用
type + payload做清晰扩展,而不是堆字符串 - 业务推送如何做到“可替换数据源”:先用 mock 跑通页面,再把数据源切到 ROS2,而不是推倒重来
- 前端列表/卡片为什么会“越更越卡”:实时渲染的最小性能策略是什么(按 key 更新、列表限长)
- 如何把“能跑 Demo”提升到“能交付页面”:状态展示、错误提示、数据更新时间、最小测试用例
与上节课的衔接(非常重要)
本讲不另起一套协议,而是把上节课项目 05_ws_industrial_push/ 的“协议与页面套路”提炼出来,独立成一个本讲专用项目,便于与上节课区分:
- 上节课:
17 WebSocket 网关 + 工业实时推送(路线A融合)... - 上节课代码参考:
05_ws_industrial_push/ - 本讲独立项目:
06_ws_industrial_push/ - 本讲要做的升级:让 ROS2 端也能连到网关,把 ROS2 数据推到 Web 页面(数据源从 mock -> ros2)
章节内容(本讲核心)
- 工业实时推送业务拆解:设备状态 / 分拣数据 / 统计数据三类消息边界
- 统一协议(沿用上节课 Message):
type/id/ts/payload/meta+join/leave房间订阅 - 模块边界(沿用项目结构):
utils/ws.ts(协议)/useWebSocket.ts(连接)/DeviceMonitor.vue(业务渲染) - ROS2 接入策略:优先采用“ROS2 -> 网关(FastAPI WebSocket)-> 浏览器”的转发模式(尤其当 ROS2 在虚拟机/隔离网络中)
- 页面实时更新最小策略:设备按
deviceId增量更新;分拣列表限长;状态与错误可视化
项目工坊(本讲落地项目)
-
交付“设备状态实时监控”页面:统计卡片 + 设备状态表格 + 分拣数据列表(最近 20 条)
-
完成 ROS2 端接入:ROS2 -> WebSocket 网关 -> Web 页面实时更新
-
任务 1:页面能实时更新三块数据,并展示连接状态与错误
-
任务 2:服务端网关支持 ROS2 数据注入(ROS2 连接后可推送三类业务消息)
-
任务 3:写出协议校验策略:至少 2 条规则(例如:必须有
ts;device.status必须有deviceId) -
生成:业务 payload 类型、消息分发骨架、ROS2 -> 网关桥接脚手架(Python)
-
人工校验:type 字符串一致、payload 字段一致、订阅/推送链路可跑通
作业
一、统一消息协议(沿用上一讲协议,在本讲独立项目中落地)
本讲协议不再使用 client.subscribe,统一用“房间(room)订阅”:
- Web 端:连接后发送
join加入房间(房间名就是业务type) - ROS2 端:作为数据源,也连接网关并
join,然后发送三类业务消息 - 网关:按房间广播,Web 页面只需要订阅并渲染
1) Message 结构(前后端统一)
对应项目文件:06_ws_industrial_push/client/06_ws_industrial_push/src/utils/ws.ts
export type MsgType =
| 'chat'
| 'ping'
| 'pong'
| 'system'
| 'error'
| 'tick'
| 'join'
| 'leave'
| 'device.status'
| 'sorting.data'
| 'stats.summary'
export type Message<T = unknown> = {
type: MsgType
id: string
ts: number
payload: T
meta?: { from?: 'client' | 'server'; traceId?: string }
}
逐段说明:
type:唯一分发入口;服务端分支、前端分发都只看它id/ts:追踪、排错、观察延迟与更新时间payload:业务体;同一个type的payload必须稳定meta:可选;本讲仅用from/traceId做最小追踪
2) join/leave 的 payload(订阅/退订)
export type JoinPayload = { room: 'device.status' | 'sorting.data' | 'stats.summary'; role?: 'web' | 'ros2' }
export type LeavePayload = { room: 'device.status' | 'sorting.data' | 'stats.summary' }
要点:
room:房间名就是业务 topic(本讲直接用三类业务type作为房间名)role:用于区分“谁是数据源”(ros2)与“谁是页面订阅者”(web)
3) 三类业务 payload(本讲最小可用)
export type DeviceStatusPayload = {
deviceId: string
online: boolean
state: 'IDLE' | 'RUNNING' | 'ALARM' | 'MAINTENANCE'
updatedAt: number
}
export type SortingDataPayload = {
lineId: string
batchId: string
okCount: number
ngCount: number
updatedAt: number
}
export type StatsSummaryPayload = {
windowSec: number
throughput: number
passRate: number
updatedAt: number
}
- 所有消息必须有
type/ts/payload device.status必须有payload.deviceId;页面按它增量更新sorting.data必须有updatedAt;用于列表 key 与“最后更新时间”
二、模块边界(沿用项目结构,不新增 realtime/)
对应项目结构(本讲独立项目前端):
src/
utils/ws.ts # 协议与工具:MsgType/Message/buildMessage/mapReadyState
composables/useWebSocket.ts # 连接封装:connect/disconnect/send + safeParse
components/DeviceMonitor.vue# 业务页面:join + 分发 + 响应式更新
规则:
ws.ts:只放协议与工具函数,不写业务渲染useWebSocket.ts:只管连接与解析,不写业务分发DeviceMonitor.vue:只做订阅/分发/更新响应式状态,不直接拼 WebSocket 协议细节
三、数据源替换:mock -> ROS2 的“最小迁移”
只要做到两点,页面就能无感切换数据源:
- 协议不变:仍然发送
type/id/ts/payload/meta - topic 不变:仍然按
device.status/sorting.data/stats.summary广播
迁移时的变化只有“谁发送业务消息”:
- mock:服务端后台任务生成 payload 并广播
- ROS2:ROS2 端连接网关,发送同样的业务消息,网关接收后广播给 Web
四、ROS2 链接 Web 的两种方式(工程选型,结合“ROS2 在虚拟机”的真实约束)
在工业现场更常见的是:ROS2 节点运行在工控机/虚拟机/内网环境里,而 Web 页面运行在开发机浏览器里。此时建议先做清楚“网络拓扑”,再选方案。
方式 A(推荐):ROS2 当 WebSocket 客户端,连到网关,由网关转发给浏览器
ROS2(虚拟机/内网) --出向连接 ws--> FastAPI 网关(可被浏览器访问) --广播--> Web(浏览器)
优势:
- 更适合虚拟机/NAT:只要虚拟机能“访问到网关”,就能建立出向连接,不需要暴露虚拟机端口给外网
- 统一鉴权与隔离:浏览器永远只连网关;ROS2 数据源是否允许注入由网关控制(role=ros2)
- 更易运维:后续接 rosbridge/多数据源时,网关仍是统一入口
代价:
- 多一跳转发(但通常在局域网内可接受)
方式 B(不推荐为默认):ROS2 作为 WebSocket 服务端,浏览器直接连接 ROS2
Web(浏览器) --ws/wss--> ROS2(虚拟机里跑 WebSocket server)
风险点(在“ROS2 在虚拟机”时最常见):
-
虚拟机网络模式:NAT 默认外部不可直连,需要端口转发;桥接才可能直接访问
-
端口暴露与安全:浏览器直连意味着你必须把 ROS2 端口暴露出来并处理鉴权/限流/跨域来源校验
-
HTTPS/WSS 约束:如果页面是 https,浏览器通常不允许连接明文 ws(混合内容),必须提供 wss(TLS 证书与部署复杂度上升)
-
网络可达性:浏览器所在主机是否能 ping/访问到目标 IP(桥接)或是否配置了端口转发(NAT)
-
端口策略:防火墙/安全组是否放行 WebSocket 端口
-
URL 选择:浏览器访问的是“网关地址”还是“虚拟机地址”,两者不要混淆
-
协议匹配:页面是 https 则优先 wss;页面是 http 才能直接 ws
五、本课小结与常见错误
- 常见错误 1:同一个业务用多套字段名(
device_id/deviceId混用)导致前端解析混乱 - 常见错误 2:把订阅做成随意字符串(subscribe/unsubscribe)导致协议碎片化
- 常见错误 3:设备列表每条消息都全量替换数组,实时运行后越来越卡
一、页面交付(DeviceMonitor.vue)
对应项目文件:06_ws_industrial_push/client/06_ws_industrial_push/src/components/DeviceMonitor.vue
核心结构(示例与项目一致):
<script setup lang="ts">
import { computed, onMounted, ref, watch } from 'vue'
import { useWebSocket } from '../composables/useWebSocket'
import type { Message } from '../utils/ws'
type DeviceStatusPayload = {
deviceId: string
online: boolean
state: 'IDLE' | 'RUNNING' | 'ALARM' | 'MAINTENANCE'
updatedAt: number
}
type SortingDataPayload = {
lineId: string
batchId: string
okCount: number
ngCount: number
updatedAt: number
}
type StatsSummaryPayload = {
windowSec: number
throughput: number
passRate: number
updatedAt: number
}
const { status, lastError, connect, send } = useWebSocket()
const devices = ref<Record<string, DeviceStatusPayload>>({})
const stats = ref<StatsSummaryPayload>({ windowSec: 60, throughput: 0, passRate: 0, updatedAt: 0 })
const sortingList = ref<SortingDataPayload[]>([])
const deviceList = computed(() => Object.values(devices.value).sort((a, b) => a.deviceId.localeCompare(b.deviceId)))
function handleMessage(msg: Message) {
if (msg.type === 'device.status') {
const p = msg.payload as DeviceStatusPayload
devices.value[p.deviceId] = p
return
}
if (msg.type === 'stats.summary') {
stats.value = msg.payload as StatsSummaryPayload
return
}
if (msg.type === 'sorting.data') {
const p = msg.payload as SortingDataPayload
sortingList.value.unshift(p)
sortingList.value = sortingList.value.slice(0, 20)
}
}
const joined = ref(false)
watch(status, (s) => {
if (s !== 'OPEN') return
if (joined.value) return
send('join', { room: 'device.status', role: 'web' })
send('join', { room: 'stats.summary', role: 'web' })
send('join', { room: 'sorting.data', role: 'web' })
joined.value = true
})
onMounted(() => connect('ws://localhost:8000/ws', handleMessage))
</script>
- 订阅只做一次:连接变为 OPEN 后
join,避免重复订阅 - 设备状态按
deviceId增量更新:行数稳定、内容变化 - 分拣列表限长:只保留最近 20 条,避免越跑越慢
- 状态与错误可视化:
status/lastError在页面上可见
二、网关升级:允许 ROS2 注入业务消息
对应项目文件:06_ws_industrial_push/server/app.py
约定:
- ROS2 端以
role=ros2加入房间 - ROS2 端可以发送三类业务消息(
device.status/sorting.data/stats.summary) - 网关收到后按同名房间广播给 Web
最小行为(概念图):
ROS2(生产数据) --WebSocket--> 网关(/ws) --广播到 room--> Web(渲染页面)
三、数据源注入演示(可选:只验证“网关注入接口”,不讲 ROS2 订阅细节)
- 推荐做法:用一个 Python 脚本模拟 “role=ros2 的数据源”,发送三类业务消息
- 如果你的 ROS2 运行在虚拟机中:更建议选“方式 A(ROS2 客户端连网关)”,因为它只依赖虚拟机的出向网络能力,避免浏览器直连虚拟机的端口与 TLS 复杂度
检查点:
- Web 页面三块区域持续更新(统计/设备/分拣)
- 停止注入脚本后,页面能回到 mock 数据或停止更新(取决于服务端是否开启 mock 推送),体现“数据源可替换”
| 测试点 | 操作 | 预期结果 |
|---|---|---|
| Web 连接成功 | 启动服务端后打开页面 | 状态从 CONNECTING 变为 OPEN |
| join 生效 | 页面 OPEN 后发送 join | 开始持续收到三类业务消息 |
| 设备增量更新 | 连续推送同一 deviceId 的 device.status | 表格对应行更新,行数不变 |
| 分拣列表限长 | 连续推送 30 条 sorting.data | 页面只保留最近 20 条 |
| 数据源注入(可选) | 注入端发送业务消息 | Web 页面更新(无需改前端) |
| 异常提示 | 服务端停掉后刷新页面 | 状态为 CLOSED,错误提示可见 |
五、大模型任务(指令模板 + 校验点)
请基于本讲独立项目 06_ws_industrial_push 做升级,要求:
1) 协议沿用 Message(type/id/ts/payload/meta)与 join/leave 房间机制(room=业务 type)
2) 前端:DeviceMonitor.vue 能接收 device.status / sorting.data / stats.summary 并实时更新;设备按 deviceId 增量更新;分拣列表只保留最近 20 条;显示连接状态与错误
3) 服务端:允许 role=ros2 的连接发送上述三类业务消息,网关收到后广播到同名 room
4) 输出时注明需要修改的文件路径:06_ws_industrial_push/client/06_ws_industrial_push/src/utils/ws.ts、06_ws_industrial_push/client/06_ws_industrial_push/src/composables/useWebSocket.ts、06_ws_industrial_push/client/06_ws_industrial_push/src/components/DeviceMonitor.vue、06_ws_industrial_push/server/app.py
5) 额外输出一个 Python 脚本骨架:模拟 role=ros2 数据源 -> WebSocket 网关推送(不涉及 rosbridge)
校验点:type 字符串完全一致(device.status/sorting.data/stats.summary/join/leave),payload 字段一致,Web 页面无需改动即可切换数据源
参考与延伸
- MDN WebSocket API:https://developer.mozilla.org/zh-CN/docs/Web/API/WebSocket
- Vue 3 组合式 API:https://cn.vuejs.org/guide/extras/composition-api-faq.html
附录 A:快速注入脚本(模拟 role=ros2 数据源)
用途:验证网关“允许外部数据源注入并广播”的能力,先不引入 ROS2/rosbridge 环境复杂度。
依赖:
pip install websockets
import asyncio
import contextlib
import json
import random
import time
import websockets
def now_ms() -> int:
return int(time.time() * 1000)
def build_message(msg_type: str, payload: dict) -> dict:
return {
"type": msg_type,
"id": f"inject-{now_ms()}-{random.randint(1000, 9999)}",
"ts": now_ms(),
"payload": payload,
"meta": {"from": "client", "traceId": "inject"},
}
async def drain_messages(ws) -> None:
async for _ in ws:
pass
async def main():
async with websockets.connect("ws://localhost:8000/ws") as ws:
drain_task = asyncio.create_task(drain_messages(ws))
try:
await ws.send(json.dumps(build_message("join", {"room": "device.status", "role": "ros2"}), ensure_ascii=False))
await ws.send(json.dumps(build_message("join", {"room": "stats.summary", "role": "ros2"}), ensure_ascii=False))
await ws.send(json.dumps(build_message("join", {"room": "sorting.data", "role": "ros2"}), ensure_ascii=False))
while True:
await ws.send(
json.dumps(
build_message(
"device.status",
{
"deviceId": "D-01",
"online": True,
"state": random.choice(["IDLE", "RUNNING", "ALARM", "MAINTENANCE"]),
"updatedAt": now_ms(),
},
),
ensure_ascii=False,
)
)
await ws.send(
json.dumps(
build_message(
"stats.summary",
{
"windowSec": 60,
"throughput": round(random.uniform(20, 60), 2),
"passRate": 0.98,
"updatedAt": now_ms(),
},
),
ensure_ascii=False,
)
)
await ws.send(
json.dumps(
build_message(
"sorting.data",
{
"lineId": "L-02",
"batchId": f"B-{random.randint(100, 999)}",
"okCount": random.randint(0, 50),
"ngCount": random.randint(0, 5),
"updatedAt": now_ms(),
},
),
ensure_ascii=False,
)
)
await asyncio.sleep(1)
finally:
drain_task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await drain_task
asyncio.run(main())
附录 A2(可选):用 ROS2 功能包 + 节点模拟 role=ros2 数据源
依赖:
pip install websockets
创建包(ament_python):
cd <your_ros2_ws>/src
ros2 pkg create --build-type ament_python industrial_ws_injector --dependencies rclpy
节点代码(示例文件:industrial_ws_injector/industrial_ws_injector/ws_injector_node.py):
import asyncio
import json
import random
import threading
import rclpy
from rclpy.node import Node
import websockets
def now_ms(node: Node) -> int:
return int(node.get_clock().now().nanoseconds / 1_000_000)
def build_message(node: Node, msg_type: str, payload: dict) -> dict:
ts = now_ms(node)
return {
"type": msg_type,
"id": f"inject-{ts}-{random.randint(1000, 9999)}",
"ts": ts,
"payload": payload,
"meta": {"from": "client", "traceId": "inject"},
}
class WsInjector(Node):
def __init__(self) -> None:
super().__init__("ws_injector")
self.declare_parameter("url", "ws://localhost:8000/ws")
self.declare_parameter("period_sec", 1.0)
self._loop = asyncio.new_event_loop()
self._thread = threading.Thread(target=self._loop.run_forever, daemon=True)
self._thread.start()
self._loop.call_soon_threadsafe(lambda: asyncio.create_task(self._run()))
async def _run(self) -> None:
url = self.get_parameter("url").get_parameter_value().string_value
period = self.get_parameter("period_sec").get_parameter_value().double_value
async with websockets.connect(url) as ws:
await ws.send(json.dumps(build_message(self, "join", {"room": "device.status", "role": "ros2"}), ensure_ascii=False))
await ws.send(json.dumps(build_message(self, "join", {"room": "stats.summary", "role": "ros2"}), ensure_ascii=False))
await ws.send(json.dumps(build_message(self, "join", {"room": "sorting.data", "role": "ros2"}), ensure_ascii=False))
while rclpy.ok():
await ws.send(
json.dumps(
build_message(
self,
"device.status",
{
"deviceId": "D-01",
"online": True,
"state": random.choice(["IDLE", "RUNNING", "ALARM", "MAINTENANCE"]),
"updatedAt": now_ms(self),
},
),
ensure_ascii=False,
)
)
await ws.send(
json.dumps(
build_message(
self,
"stats.summary",
{"windowSec": 60, "throughput": round(random.uniform(20, 60), 2), "passRate": 0.98, "updatedAt": now_ms(self)},
),
ensure_ascii=False,
)
)
await ws.send(
json.dumps(
build_message(
self,
"sorting.data",
{
"lineId": "L-02",
"batchId": f"B-{random.randint(100, 999)}",
"okCount": random.randint(0, 50),
"ngCount": random.randint(0, 5),
"updatedAt": now_ms(self),
},
),
ensure_ascii=False,
)
)
await asyncio.sleep(period)
def destroy_node(self):
if self._loop.is_running():
self._loop.call_soon_threadsafe(self._loop.stop)
super().destroy_node()
def main():
rclpy.init()
node = WsInjector()
try:
rclpy.spin(node)
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == "__main__":
main()
运行方式(示例):
cd <your_ros2_ws>
colcon build
source install/setup.bash
ros2 run industrial_ws_injector ws_injector_node --ros-args -p url:=ws://localhost:8000/ws -p period_sec:=1.0
附录 B:ROS2/rosbridge 的正式路径(只给链路图与映射点)
Web(roslibjs) --WebSocket--> rosbridge_server --DDS--> ROS2 nodes
- topic -> type:例如
/device_status->device.status - ROS2 msg -> payload:字段名统一成
deviceId/updatedAt/...,避免snake_case与camelCase混用
附录 C:什么时候才考虑“浏览器直连 ROS2 WebSocket 服务端”
只在满足以下条件时再考虑方式 B:
- 网络拓扑稳定:ROS2 主机(或虚拟机)在与浏览器同一网段,或已做可靠的端口映射
- 安全策略明确:能做鉴权、来源校验、限流(避免任意浏览器直接访问 ROS2 数据口)
- 协议可落地:若页面走 https,需要同时提供 wss(证书与部署已准备好)