25 断线续传、数据缓存与优化

断线续传、数据缓存与优化

关联:索引

要解决的问题

章节内容(本讲核心):

与前置知识衔接(避免重复):

项目工坊(本讲交付):

学生任务(当堂必做):

作业(布置):

  1. 实现自动重连 + 断线续传。
  2. 设计 3 个异常测试用例并验证通过。
  3. 撰写测试报告。

从上一讲项目到本讲目标(过渡说明)

上一讲《WebSocket 异常处理与断线检测》中,我们已经在 11_websocket_reconnect/ 工程里完成了:

  1. 先把 12_websocket_resume/ 跑起来,确认重连与日志都正常;
  2. 按本讲的 Outbox → ReliableSender → ACK 三个小步骤,逐步把“自动重连”升级为“断线可续传”;

本讲统一口径:断线续传的最小协议(可落地、可扩展)

本讲把“续传”拆成两条链路(你必须分开设计,否则一定混乱):

1) 顶层 Envelope(本讲最小版)

{
  "schema_version": "1.0.0",
  "client_id": "web-01",
  "msg_id": "M-1776200000123-000001",
  "ts_ms": 1776200000123,
  "event": "client.msg",
  "seq": 1,
  "payload": {
    "action": "arm.stop",
    "params": {
      "reason": "user_click"
    }
  }
}

逐项解释与自检要点:

2) ACK(服务端回执,确认“已处理到哪里”)

{
  "schema_version": "1.0.0",
  "client_id": "web-01",
  "ts_ms": 1776200000456,
  "event": "server.ack",
  "ack_seq": 12,
  "ack_msg_id": "M-1776200000123-000012"
}

逐项解释与自检要点:

3) Hello(重连后,先同步游标,再补发)

{
  "schema_version": "1.0.0",
  "client_id": "web-01",
  "ts_ms": 1776200000789,
  "event": "client.hello",
  "resume": {
    "last_acked_seq": 12,
    "want_server_resume": true,
    "last_server_seq": 50
  },
  "cap": {
    "compression": ["gzip"]
  }
}

逐项解释与自检要点:


一、核心理论:把“可靠性”变成可计算的状态

本讲只做“最小可靠”但必须具备 4 个点:

推荐的消息流(客户端 → 服务端):

  1. 客户端生成消息(seq+1)→ 写入 outbox(持久化)→ 尝试发送。
  2. 服务端收到消息:
  1. 客户端收到 ack_seq:删除 seq <= ack_seq 的 outbox 项,并更新 last_acked_seq

二、实操:客户端 Outbox(队列 + 持久化 + 限长 + 过期)

1) Outbox 类型与持久化工具

// ClientMsg:客户端发送给服务端的“可靠消息”顶层结构
// 约束点:
// - 字段名固定(跨端统一口径)
// - event 固定为 client.msg(服务端用它分支处理)
// - seq 单调递增(断线续传的核心)
export type ClientMsg<TPayload extends Record<string, unknown>> = {
  schema_version: '1.0.0' // 协议版本:未来升级时用于兼容判断
  client_id: string // 客户端身份:服务端按它维护 last_processed_seq
  msg_id: string // 消息唯一 ID:用于排查问题(不要只靠 seq)
  ts_ms: number // 客户端生成时间:用于 TTL/诊断
  event: 'client.msg' // 消息类型:本讲可靠消息固定为 client.msg
  seq: number // 客户端序列号:从 1 开始递增
  payload: TPayload // 业务体:可靠性层不关心内容,但必须原样传递
}

// OutboxItem:Outbox 内部持久化的条目
// 设计点:
// - json 直接存序列化后的文本,避免 localStorage 持久化时出现循环引用
export type OutboxItem = {
  seq: number // 用于排序/补发
  msg_id: string // 用于日志定位
  ts_ms: number // 用于过期清理
  json: string // 完整的 ClientMsg 序列化结果
}

// OutboxSnapshot:Outbox 的整体快照(一次性写入 localStorage)
// 关键点:
// - last_acked_seq 是“服务端确认游标”,必须单调递增
// - next_seq 是“下一条要分配的序号”,不要轻易回退,否则会导致服务端误判重复
export type OutboxSnapshot = {
  client_id: string
  last_acked_seq: number
  next_seq: number
  items: OutboxItem[]
}

// 统一毫秒时间戳
export function nowMs(): number {
  return Date.now()
}

// 消息 ID 生成策略(课堂版):时间戳 + 6 位序号
export function makeMsgId(tsMs: number, seq: number): string {
  const seq6 = String(seq).padStart(6, '0')
  return `M-${tsMs}-${seq6}`
}

// 容错 JSON 解析:localStorage 里有脏数据也不应把页面搞崩
export function safeParseJson<T>(text: string): T | null {
  try {
    return JSON.parse(text) as T
  } catch {
    return null
  }
}

逐段解释与自检要点:

2) Outbox 实现:限长 + 过期 + 断电恢复

export type OutboxOptions = {
  storage_key: string // localStorage key:按 clientId 隔离
  max_items: number // 队列最多保留多少条(防止无限增长)
  ttl_ms: number // 单条消息的生存时间(毫秒)
  max_item_bytes: number // 单条消息最大字节数(UTF-8),防止超大对象写爆存储
}

export class Outbox {
  private snapshot: OutboxSnapshot
  private opt: OutboxOptions

  constructor(clientId: string, opt?: Partial<OutboxOptions>) {
    this.opt = {
      storage_key: `ws_outbox_${clientId}`,
      max_items: 300,
      ttl_ms: 30 * 60 * 1000,
      max_item_bytes: 256 * 1024,
      ...opt
    }

    // 启动时尝试从 localStorage 恢复快照;失败则使用新快照
    const loaded = this.load()
    this.snapshot =
      loaded ??
      ({
        client_id: clientId,
        last_acked_seq: 0,
        next_seq: 1,
        items: []
      } satisfies OutboxSnapshot)

    this.gc()
    this.persist()
  }

  get clientId(): string {
    return this.snapshot.client_id
  }

  get lastAckedSeq(): number {
    return this.snapshot.last_acked_seq
  }

  // ACK 游标只允许前进(回退会导致客户端重复补发旧消息)
  setLastAckedSeq(ackSeq: number) {
    if (!Number.isFinite(ackSeq) || ackSeq < 0) return
    if (ackSeq <= this.snapshot.last_acked_seq) return
    this.snapshot.last_acked_seq = ackSeq
    // 删除所有“已被服务端确认处理”的消息
    this.snapshot.items = this.snapshot.items.filter((x) => x.seq > ackSeq)
    this.persist()
  }

  enqueue<TPayload extends Record<string, unknown>>(payload: TPayload): OutboxItem {
    const ts = nowMs()
    const seq = this.snapshot.next_seq
    const msg: ClientMsg<TPayload> = {
      schema_version: '1.0.0',
      client_id: this.snapshot.client_id,
      msg_id: makeMsgId(ts, seq),
      ts_ms: ts,
      event: 'client.msg',
      seq,
      payload
    }

    // 先序列化再入队:保证“断电/刷新”后仍可恢复补发
    const json = JSON.stringify(msg)
    const bytes = new TextEncoder().encode(json).length
    if (bytes > this.opt.max_item_bytes) {
      // 超大消息直接拒绝入队:避免 localStorage 写爆导致整站异常
      throw new Error(`outbox item too large: ${bytes} > ${this.opt.max_item_bytes}`)
    }
    const item: OutboxItem = { seq, msg_id: msg.msg_id, ts_ms: ts, json }
    this.snapshot.items.push(item)
    this.snapshot.next_seq = seq + 1

    // 入队后做清理(过期/限长/已确认),并持久化
    this.gc()
    this.persist()
    return item
  }

  // 返回“待补发”的消息列表(按 seq 升序)
  listPending(): OutboxItem[] {
    const ack = this.snapshot.last_acked_seq
    return this.snapshot.items.filter((x) => x.seq > ack).sort((a, b) => a.seq - b.seq)
  }

  private load(): OutboxSnapshot | null {
    const raw = localStorage.getItem(this.opt.storage_key)
    if (!raw) return null
    const parsed = safeParseJson<OutboxSnapshot>(raw)
    if (!parsed) return null
    // 课堂最小实现只做最轻量的校验;更严谨可以逐字段校验结构
    return parsed
  }

  private gc() {
    const now = nowMs()
    // 1) 过期清理:超过 ttl_ms 的条目直接丢弃
    this.snapshot.items = this.snapshot.items.filter((x) => now - x.ts_ms <= this.opt.ttl_ms)
    // 2) 限长清理:只保留最近 max_items 条(丢弃最旧)
    if (this.snapshot.items.length > this.opt.max_items) {
      const keep = this.snapshot.items.slice(this.snapshot.items.length - this.opt.max_items)
      this.snapshot.items = keep
    }
    // 3) 再次按 ACK 游标删除已确认消息(双保险)
    const ack = this.snapshot.last_acked_seq
    this.snapshot.items = this.snapshot.items.filter((x) => x.seq > ack)
  }

  private persist() {
    localStorage.setItem(this.opt.storage_key, JSON.stringify(this.snapshot))
  }
}

逐段解释与自检要点:

三、实操:可靠发送层(连接打开就补发,收到 ACK 就删除)

下面代码不实现重连本身(复用上一讲的自动重连/心跳),只实现“可靠发送层”核心逻辑:Outbox + flush + ACK。

推荐在客户端工程中新建一个组合式工具文件:

export type ServerAck = {
  schema_version: '1.0.0'
  client_id: string // ACK 属于哪个客户端(防止多个 clientId 混淆)
  ts_ms: number // 服务端生成时间戳
  event: 'server.ack' // 事件名固定
  ack_seq: number // 服务端已处理到的最大 seq(可靠性核心字段)
  ack_msg_id?: string // 可选:用于日志定位
}

export type WsLike = {
  send: (text: string) => void // WebSocket.send
  readyState: number // WebSocket.readyState(1 表示 OPEN)
}

// ReliableSender:把 Outbox + ACK 组合成“可靠发送”能力
// 核心点:
// - outbox 负责存储“未确认消息”
// - inFlight + windowSize 负责“补发限流”,避免断线积压后一次性打爆服务端
export class ReliableSender {
  private outbox: Outbox
  private inFlight = new Set<number>()
  private windowSize: number

  constructor(outbox: Outbox, windowSize = 8) {
    this.outbox = outbox
    this.windowSize = Math.max(1, windowSize)
  }

  sendBusiness<TPayload extends Record<string, unknown>>(ws: WsLike | null, payload: TPayload): OutboxItem {
    // 永远“先入队再发送”:断线时也不会丢
    const item = this.outbox.enqueue(payload)
    this.tryFlush(ws)
    return item
  }

  onAck(ack: ServerAck) {
    // 只处理当前 client_id 的 ACK
    if (ack.client_id !== this.outbox.clientId) return
    // 推进 outbox 游标,并删除已确认消息
    this.outbox.setLastAckedSeq(ack.ack_seq)
    // 清理所有 seq <= ack_seq 的 inFlight(它们已经被服务端确认处理)
    for (const seq of Array.from(this.inFlight)) {
      if (seq <= ack.ack_seq) this.inFlight.delete(seq)
    }
  }

  tryFlush(ws: WsLike | null) {
    // 仅在连接 OPEN 时允许发送
    if (!ws || ws.readyState !== 1) return
    const pending = this.outbox.listPending()
    for (const item of pending) {
      // 避免重复发送同一条 seq(例如 ACK 还没到时多次触发 tryFlush)
      if (this.inFlight.has(item.seq)) continue
      // 窗口限流:达到窗口大小就暂停,等待 ACK 推进后再继续
      if (this.inFlight.size >= this.windowSize) break
      ws.send(item.json)
      this.inFlight.add(item.seq)
    }
  }
}

逐段解释与自检要点:

四、服务端增量:ACK + 去重(保证幂等)

下面是 FastAPI WebSocket 的最小逻辑:记住每个 client_id 已处理到的最大 seq;收到重复 seq 就只回 ACK,不重复执行业务。

import json
import time
from dataclasses import dataclass, field
from typing import Any

from fastapi import FastAPI, WebSocket, WebSocketDisconnect

app = FastAPI()

def now_ms() -> int:
    # 统一毫秒时间戳(与前端协议字段 ts_ms 对齐)
    return int(time.time() * 1000)

def make_ack(client_id: str, ack_seq: int, ack_msg_id: str | None) -> dict[str, Any]:
    # server.ack:告诉客户端“我处理到哪个 seq 了”
    # - ack_seq 是可靠性核心字段(客户端据此删除 outbox)
    data: dict[str, Any] = {
        "schema_version": "1.0.0",
        "client_id": client_id,
        "ts_ms": now_ms(),
        "event": "server.ack",
        "ack_seq": ack_seq,
    }
    if ack_msg_id is not None:
        data["ack_msg_id"] = ack_msg_id
    return data

def parse_json(text: str) -> dict[str, Any] | None:
    # 容错解析:收到非 JSON 直接忽略
    try:
        data = json.loads(text)
    except Exception:
        return None
    return data if isinstance(data, dict) else None

@dataclass
class ClientState:
    # last_processed_seq:服务端确认“已按序处理完成”的最大 seq
    # last_seen_at_ms:便于做清理/超时策略(课堂不展开)
    last_processed_seq: int = 0
    last_seen_at_ms: int = field(default_factory=now_ms)

CLIENTS: dict[str, ClientState] = {}

@app.websocket("/ws")
async def ws_endpoint(ws: WebSocket) -> None:
    await ws.accept()
    # 当前连接绑定的 client_id(收到 hello 或第一条 client.msg 后确定)
    client_id: str | None = None
    try:
        while True:
            text = await ws.receive_text()
            data = parse_json(text)
            if data is None:
                continue

            event = data.get("event")
            if event == "client.hello":
                # 重连后的第一件事:对齐游标
                client_id = data.get("client_id")
                if not isinstance(client_id, str) or client_id == "":
                    await ws.close(code=4401)
                    return
                CLIENTS.setdefault(client_id, ClientState())
                state = CLIENTS[client_id]
                await ws.send_text(json.dumps(make_ack(client_id, state.last_processed_seq, None), ensure_ascii=False))
                continue

            if event != "client.msg":
                continue

            if client_id is None:
                client_id = data.get("client_id")
            if not isinstance(client_id, str) or client_id == "":
                await ws.close(code=4401)
                return

            seq = data.get("seq")
            msg_id = data.get("msg_id")
            if not isinstance(seq, int) or seq < 1:
                await ws.close(code=4400)
                return

            state = CLIENTS.setdefault(client_id, ClientState())
            state.last_seen_at_ms = now_ms()

            if seq <= state.last_processed_seq:
                # 重复消息:只回 ACK,不重复执行业务逻辑(幂等)
                await ws.send_text(json.dumps(make_ack(client_id, state.last_processed_seq, str(msg_id) if msg_id else None), ensure_ascii=False))
                continue

            if seq != state.last_processed_seq + 1:
                # 跳号:说明中间缺消息;课堂最小实现为“回当前 ACK”,客户端会按序补发
                await ws.send_text(json.dumps(make_ack(client_id, state.last_processed_seq, None), ensure_ascii=False))
                continue

            # 正常按序:处理本条消息,并推进服务端游标
            state.last_processed_seq = seq
            await ws.send_text(json.dumps(make_ack(client_id, state.last_processed_seq, str(msg_id) if msg_id else None), ensure_ascii=False))
    except WebSocketDisconnect:
        return

逐段解释与自检要点:

启动命令(复用既有工程习惯)

# 启动 FastAPI WebSocket 服务(开发模式)
uvicorn app:app --reload --port 8000

逐段解释与自检要点:

练习(至少 2 题)

  1. 为 Outbox 增加“手动清空”能力,但必须保留 last_acked_seq/next_seq 的合理性。
    提示:清空 items 不等于把 next_seq 归零;归零会导致重复 seq,引发服务端幂等误判。
  2. 把服务端的“缺口处理”从“拒绝处理”升级为“暂存乱序消息并等待缺口补齐”。
    提示:设置上限(最多缓存 50 条乱序),超过上限直接丢弃并回当前 ACK,避免内存被打爆。

一、数据压缩(应用层):可协商、可降级、可观测

1) 前端:gzip 压缩/解压(优先用原生 CompressionStream,缺失则降级)

代码在配套工程中的推荐文件路径:

// CompressionAlg:应用层压缩算法枚举
export type CompressionAlg = 'none' | 'gzip'

// EncodedPayload:把“压缩后的二进制”包装为可 JSON 传输的结构
export type EncodedPayload = {
  alg: CompressionAlg
  b64: string
}

// bytes -> base64(浏览器环境)
function bytesToBase64(bytes: Uint8Array): string {
  let binary = ''
  const len = bytes.length
  for (let i = 0; i < len; i++) binary += String.fromCharCode(bytes[i])
  return btoa(binary)
}

// base64 -> bytes(浏览器环境)
function base64ToBytes(b64: string): Uint8Array {
  const binary = atob(b64)
  const bytes = new Uint8Array(binary.length)
  for (let i = 0; i < binary.length; i++) bytes[i] = binary.charCodeAt(i)
  return bytes
}

async function gzipCompress(text: string): Promise<Uint8Array> {
  if (typeof CompressionStream === 'undefined') {
    throw new Error('CompressionStream not supported')
  }
  const cs = new CompressionStream('gzip')
  const input = new TextEncoder().encode(text)
  // 注意:部分 TS lib 定义下,BlobPart 对 Uint8Array 的类型较挑剔
  // 课堂工程用“拷贝到 ArrayBuffer”避免类型报错
  const inputAb = new ArrayBuffer(input.byteLength)
  new Uint8Array(inputAb).set(input)
  const stream = new Blob([inputAb]).stream().pipeThrough(cs)
  const outAb = await new Response(stream).arrayBuffer()
  return new Uint8Array(outAb)
}

async function gzipDecompress(bytes: Uint8Array): Promise<string> {
  if (typeof DecompressionStream === 'undefined') {
    throw new Error('DecompressionStream not supported')
  }
  const ds = new DecompressionStream('gzip')
  const inputAb = new ArrayBuffer(bytes.byteLength)
  new Uint8Array(inputAb).set(bytes)
  const stream = new Blob([inputAb]).stream().pipeThrough(ds)
  const outAb = await new Response(stream).arrayBuffer()
  return new TextDecoder().decode(outAb)
}

export async function encodeJsonWithCompression(obj: unknown, prefer: CompressionAlg): Promise<EncodedPayload> {
  const text = JSON.stringify(obj)
  // prefer 由 hello.cap 协商得到;如果不是 gzip 就直接返回 none
  if (prefer !== 'gzip') return { alg: 'none', b64: bytesToBase64(new TextEncoder().encode(text)) }
  try {
    const gz = await gzipCompress(text)
    return { alg: 'gzip', b64: bytesToBase64(gz) }
  } catch {
    // 不支持 gzip 或压缩失败时自动降级,保证功能可用
    return { alg: 'none', b64: bytesToBase64(new TextEncoder().encode(text)) }
  }
}

export async function decodeJsonWithCompression(payload: EncodedPayload): Promise<unknown> {
  const bytes = base64ToBytes(payload.b64)
  const text = payload.alg === 'gzip' ? await gzipDecompress(bytes) : new TextDecoder().decode(bytes)
  return JSON.parse(text) as unknown
}

逐段解释与自检要点:

2) 后端:gzip/base64 解码(标准库实现)

代码在配套工程中的位置:

import base64
import gzip
import json
from typing import Any

def decode_payload(alg: str, b64: str) -> Any:
    # 1) base64 文本 -> 原始 bytes
    raw = base64.b64decode(b64.encode("utf-8"))
    # 2) gzip 解压(若 alg != gzip 则视为未压缩)
    if alg == "gzip":
        raw = gzip.decompress(raw)
    # 3) bytes -> utf-8 文本 -> JSON 对象
    text = raw.decode("utf-8")
    return json.loads(text)

逐段解释与自检要点:

本讲只做“最常见风险的最低成本防线”,不把安全讲成“大工程”:

1) 服务端:token 与 origin 检查(最小示例)

代码在配套工程中的位置:

from urllib.parse import parse_qs

def extract_token(query_string: str) -> str | None:
    # query_string 形如 "token=devtoken&x=1"
    qs = parse_qs(query_string)
    token = qs.get("token", [None])[0]
    # 返回非空字符串,否则返回 None
    return token if isinstance(token, str) and token != "" else None

def is_allowed_origin(origin: str | None) -> bool:
    # 浏览器 WebSocket 会自动带 Origin;非浏览器客户端可能没有
    if origin is None:
        return False
    allowed = {"http://localhost:5173", "http://127.0.0.1:5173"}
    return origin in allowed

逐段解释与自检要点:

推荐使用方式(伪代码表达意图):

2) 限长:防止大消息直接把服务端内存顶爆

前端大小估算(最小实现):

代码在配套工程中的推荐文件路径:

// 输入一个字符串,估算它按 UTF-8 编码后的字节数(用于“限长”与“压缩阈值”判断)
export function estimateUtf8Bytes(text: string): number {
  return new TextEncoder().encode(text).length
}

逐段解释与自检要点:

三、性能优化思路:先稳,再快(网络/解析/渲染三段)

把性能问题拆成三段定位:

1) 环形缓冲(只保留最近 N 条日志/消息)

代码在配套工程中的推荐文件路径:

// RingBuffer:只保留最近 N 条数据的“限长数组”
// 用途:日志窗口、最近消息列表(防止页面越跑越慢)
export class RingBuffer<T> {
  private buf: T[] = []
  private cap: number

  constructor(capacity: number) {
    // 至少 1 个容量
    this.cap = Math.max(1, capacity)
  }

  push(x: T) {
    this.buf.push(x)
    if (this.buf.length > this.cap) {
      // 超过容量时,丢弃最旧的若干条
      this.buf.splice(0, this.buf.length - this.cap)
    }
  }

  toArray(): T[] {
    // 返回拷贝,避免外部修改内部数组
    return this.buf.slice()
  }
}

逐段解释与自检要点:

2) UI 渲染节流:把 100 次更新变成 10 次(思路示例)

代码在配套工程中的推荐文件路径:

// createRafBatcher:把高频 push 合并到“下一帧”批量处理
// 目标:降低响应式更新次数,避免 100 条消息触发 100 次渲染
export function createRafBatcher<T>(apply: (batch: T[]) => void) {
  let pending: T[] = []
  let scheduled = false

  return (x: T) => {
    // 先缓存到 pending
    pending.push(x)
    if (scheduled) return
    scheduled = true
    requestAnimationFrame(() => {
      scheduled = false
      // 一帧内的所有数据合并成一个 batch
      const batch = pending
      pending = []
      apply(batch)
    })
  }
}

逐段解释与自检要点:

练习(至少 2 题)

  1. 在 ReliableSender 上增加“失败重试次数限制”:同一 seq 如果连续发送失败超过 N 次,标记为 dead-letter 并停止补发。
    提示:dead-letter 也要持久化,否则刷新后又会继续补发。
  2. 设计一套“压缩启用阈值”:小于 1KB 不压缩,大于 1KB 才压缩,并记录压缩前后字节数用于观察效果。
    提示:记录字段可以写进日志,不必改协议。

建议的联调步骤(最小可复现)

  1. 打开页面并连接成功(保持上一讲的心跳与重连能力)。
  2. 断网(系统断网/浏览器离线)后连续触发业务消息 10 次。
  3. 恢复网络:观察 hello → ack → flush 补发 → ack 推进 的完整链路。
  4. 在补发过程中重启服务端:观察客户端重连后继续补发,最终 ack_seq 到达目标值。

大模型任务:AI 生成断线续传方案与代码(可直接复制使用)

把下面提示词复制给 AI,并要求它输出“协议 + 代码 + 自检点”,且必须与你的工程栈一致(Vue3 + TS + FastAPI):

你是资深全栈工程师。请为 WebSocket 实现“断线续传 + 消息缓存 + 去重幂等”方案,要求:
1) 客户端:Vue3 + TypeScript,提供 Outbox(持久化/限长/过期)、窗口限流补发、ACK 处理、重连后 hello 同步游标;
2) 服务端:FastAPI WebSocket,按 client_id 维护 last_processed_seq,支持重复消息去重并回 ACK;
3) 协议:schema_version、client_id、msg_id、ts_ms、event、seq、payload;包含 client.hello 与 server.ack;
4) 可靠性:断网/服务端重启/重复发送/乱序场景都要说明处理策略;
5) 输出:给出关键代码(TS 与 Python)与每段代码的自检点;
6) 禁止:不要引入额外第三方库(除非给出无库版本),不要省略关键字段。

校验点(你必须人工检查):


课后作业(布置)

  1. 实现自动重连 + 断线续传。
  2. 设计 3 个异常测试用例并验证通过。
  3. 撰写测试报告。

建议的 3 个异常用例(可直接用,也可自行替换):

测试报告最小结构(不要求长,但必须可复现):


自检清单(提交前必做)