15 FastAPI WebSocket 服务端搭建与 Vue3 客户端封装

FastAPI WebSocket 服务端搭建与 Vue3 客户端封装

关联:索引

要解决的问题

章节内容(本讲核心):

与前置知识衔接(避免重复):

  1. 没有连接列表:只能服务“当前一个”,无法广播、无法统计在线
  2. 不清理断开的连接:广播时会报错,导致整个循环中断
  3. 不解析消息:业务扩展靠字符串拼接,排错困难、协议难升级

项目目标:不从零新建项目,直接基于 02_websocket_client_advanced/server/app.py 升级出“连接管理 + 广播 + 解析路由”。

推荐文件路径:02_websocket_client_advanced/server/app.py

# 标准库:负责 JSON 序列化与时间戳
import json
import time
from typing import Any

# FastAPI:WebSocket 端点与断开异常
from fastapi import FastAPI, WebSocket, WebSocketDisconnect

app = FastAPI()

def now_ms() -> int:
    # 统一毫秒时间戳:用于消息 id/ts,便于排查“消息先后顺序”
    return int(time.time() * 1000)

def make_msg(msg_type: str, payload: Any, msg_id: str | None = None) -> dict[str, Any]:
    # 统一 Envelope:type/id/ts/payload,前后端按同一结构解析与渲染
    return {"type": msg_type, "id": msg_id or str(now_ms()), "ts": now_ms(), "payload": payload}

def try_parse_json(text: str) -> dict[str, Any] | None:
    # “降级解析”:不是 JSON 就返回 None,避免服务端因为解析失败而中断连接
    try:
        data = json.loads(text)
    except Exception:
        return None
    # 只接受对象结构(dict),数组/字符串等一律视为无效协议
    return data if isinstance(data, dict) else None

class ConnectionManager:
    def __init__(self) -> None:
        # 用 set 保存连接:自动去重;连接对象本身可作为集合元素
        self._connections: set[WebSocket] = set()

    async def connect(self, ws: WebSocket) -> None:
        # WebSocket 必须先 accept,后续才能收发
        await ws.accept()
        self._connections.add(ws)

    def disconnect(self, ws: WebSocket) -> None:
        # 断开时从连接池剔除:避免广播遍历到“死连接”
        self._connections.discard(ws)

    async def send_json(self, ws: WebSocket, data: dict[str, Any]) -> None:
        # 统一用文本帧发送 JSON(前端 WebSocket.onmessage 收到 string)
        await ws.send_text(json.dumps(data, ensure_ascii=False))

    async def broadcast_json(self, data: dict[str, Any]) -> None:
        # 广播容错:某个连接发送失败,不影响其他连接
        dead: list[WebSocket] = []
        for ws in self._connections:
            try:
                await self.send_json(ws, data)
            except Exception:
                dead.append(ws)
        # 广播结束后统一清理失败连接,避免影响本次遍历
        for ws in dead:
            self.disconnect(ws)

manager = ConnectionManager()

@app.websocket("/ws")
async def ws_endpoint(ws: WebSocket):
    # 连接建立:加入连接池,并给客户端一个“可见证据”(system)
    await manager.connect(ws)
    await manager.send_json(ws, make_msg("system", {"info": "connected"}))
    try:
        while True:
            # 等待客户端下一条文本帧(这里以 text 为主;二进制不在本讲范围)
            text = await ws.receive_text()

            data = try_parse_json(text)
            if data is None:
                # 客户端没按协议发 JSON:服务端降级为“原样回显”,用于快速联调
                await manager.send_json(ws, make_msg("chat", {"echo": text}))
                continue

            # 从 Envelope 中取必要字段:type/id/payload
            t = data.get("type")
            msg_id = data.get("id")
            payload = data.get("payload")

            if t == "ping":
                # 心跳:用同一个 msg_id 回 pong,前端方便对上是哪次 ping
                await manager.send_json(ws, make_msg("pong", {}, msg_id))
            elif t == "chat":
                # chat:回显 payload(这里不做业务校验,先让联调闭环)
                await manager.send_json(ws, make_msg("chat", {"echo": payload}, msg_id))
            elif t == "broadcast":
                # broadcast:把消息广播给所有在线连接(包括触发者)
                await manager.broadcast_json(make_msg("chat", {"from": "server", "payload": payload}))
            else:
                # 未知 type:返回 system ok,避免直接报错断开
                await manager.send_json(ws, make_msg("system", {"info": "ok"}))
    except WebSocketDisconnect:
        # 客户端正常断开:务必清理连接池
        manager.disconnect(ws)
    except Exception:
        # 其他异常:也要清理,保证连接池不会越来越脏
        manager.disconnect(ws)

要点固化(只记“新增”):

启动命令(复用上一课):

uvicorn app:app --reload --port 8000

测试策略(侧重“增量改造后的回归验证”):

  1. 多个页面都要连接:复制粘贴,改一处漏三处
  2. 状态判断散落:OPEN 才能 send 的规则容易遗漏
  3. 生命周期遗漏:页面切换后连接没关、定时器没停
  4. 日志不可追:消息收发没有统一格式,排错靠猜

本节只强调“新增的封装边界”,不重复讲 readyState 与事件机制:

项目目标:基于上一课的 02_websocket_client_advanced,把组件里的连接逻辑抽到 useWebSocket,组件只保留 UI + 业务参数。

推荐文件路径:02_websocket_client_advanced/client/02_websocket_client_advanced/src/composables/useWebSocket.ts

import { onBeforeUnmount, ref } from 'vue'
import { buildMessage, mapReadyState } from '../utils/ws'
import type { ReadyStateText } from '../utils/ws'

export function useWebSocket() {
  // 连接状态(文本化):用于 UI 直接展示
  const status = ref<ReadyStateText>('CLOSED')
  // 收发日志:用于“证据可见”,联调时非常关键
  const logs = ref<string[]>([])

  // 当前 WebSocket 实例:connect/disconnect 会更新它
  let ws: WebSocket | null = null

  function log(s: string) {
    logs.value.push(s)
  }

  function refreshStatus() {
    // 把原生 readyState 映射成可读文本
    status.value = mapReadyState(ws ?? undefined)
  }

  function connect(url: string) {
    // 避免重复连接:先断开旧连接,再建新连接
    if (ws) disconnect()

    ws = new WebSocket(url)
    refreshStatus()

    ws.onopen = () => {
      refreshStatus()
      log('✅ connected')
    }
    ws.onmessage = (e) => {
      // 这里先记录“原始字符串”,后续需要再做结构化解析
      log(`📥 ${typeof e.data === 'string' ? e.data : String(e.data)}`)
    }
    ws.onerror = () => {
      refreshStatus()
      log('❌ error')
    }
    ws.onclose = () => {
      refreshStatus()
      log('❌ closed')
    }
  }

  function disconnect() {
    if (!ws) return
    // CONNECTING / OPEN 才需要 close;CLOSING/CLOSED 不重复操作
    if (ws.readyState === WebSocket.OPEN || ws.readyState === WebSocket.CONNECTING) ws.close()
    ws = null
    refreshStatus()
  }

  function sendChat(text: string) {
    // 输入清理:避免空消息
    const t = text.trim()
    if (!t) return false
    // 只允许 OPEN 才 send:避免报错与“发了但其实没发出去”
    if (!ws || ws.readyState !== WebSocket.OPEN) return false
    // 统一按 Envelope 发送:服务端按 type 分支处理
    ws.send(JSON.stringify(buildMessage('chat', { text: t })))
    log(`📤 ${t}`)
    return true
  }

  function sendPing() {
    if (!ws || ws.readyState !== WebSocket.OPEN) return false
    // 心跳包:让服务端回 pong,用来验证链路是否稳定
    ws.send(JSON.stringify(buildMessage('ping', {})))
    return true
  }

  // 组件卸载时自动断开:避免页面切换后“后台还连着”
  onBeforeUnmount(disconnect)

  return { status, logs, connect, disconnect, sendChat, sendPing }
}

推荐组件改造示例:02_websocket_client_advanced/client/02_websocket_client_advanced/src/components/WebSocketAdvanced.vue(重构后)

<template>
  <div>
    <h3>WebSocket(封装后)</h3>
    <!-- 连接状态:联调用的第一证据 -->
    <div>状态:{{ status }}</div>
    <input v-model="msg" type="text" placeholder="输入消息" />
    <button @click="send">发送</button>
    <button @click="sendPing">ping</button>
    <button @click="disconnect">断开</button>
    <!-- 日志区:联调与排错的第二证据(收发原始数据) -->
    <div style="margin-top: 12px; white-space: pre-line;">{{ logs.join('\\n') }}</div>
  </div>
</template>

<script setup lang="ts">
import { onMounted, ref } from 'vue'
import { useWebSocket } from '../composables/useWebSocket'

// 从 composable 获取“连接能力”,组件只做 UI 与交互
const { status, logs, connect, disconnect, sendChat, sendPing } = useWebSocket()
const msg = ref('')

// 页面加载即连接:课堂 demo 更直观;真实项目可改成按钮触发
onMounted(() => connect('ws://localhost:8000/ws'))

function send() {
  // sendChat 内部已做 OPEN 判断与空字符串拦截
  if (sendChat(msg.value)) msg.value = ''
}
</script>

要点固化:

测试点 操作与验证 预期结果
服务端启动 启动 uvicorn 终端无报错,ws 接口可用
客户端连接 打开页面(mounted 自动连接) status 变为 OPEN,logs 出现 ✅ connected
文本消息收发 输入文本并发送 logs 出现回包内容(chat echo)
ping/pong(选做) 点击 ping logs 出现 pong(或服务端回包)
broadcast(选做) 另开 2 个页面并触发广播 其他页面也收到推送,且断开页面不会带崩广播
断开清理 点击断开/关闭页面 status 变为 CLOSED,连接释放(服务端连接池清理)

作业(布置)

  1. 在 Vue3 中完成 WebSocket 封装,实现连接、发送、接收(useWebSocket)
  2. 实现实时数据展示与连接状态显示(至少包含状态文本与消息列表/最新数据)
  3. 提供运行截图与封装说明(说明包含:返回的状态/方法、OPEN 发送拦截、卸载清理策略)