27 实践课-多工具组合调用落地与优化

多工具组合调用落地与优化

关联:索引

术语小抄(初学者版)

复习卡片(与 26 对齐,只保留必要要点):


课程思政融入点(口径统一):

1)工具接口异常(工具“没按约定说话/根本没说话”)

2)参数传递错误(工具“被调用了,但喂错了东西”)

  1. 先看“入参”:字段是否齐全、类型是否正确、值域是否合理。
  2. 再看“调用层”:是否有超时/重试/异常捕获;错误码是否可分类。
  3. 最后看“工具层”:工具是否启动、接口是否可达、版本是否一致、字段是否漂移(参考 20)。

1)异常结果过滤:主链路不被污染,但证据必须保留

2)格式标准化:Join 的前提是“所有结果可比、可测、可断言”

常见的“字段漂移”兼容策略(与 20 对齐):

把下面代码保存为 orchestrator_resilience_l1.py,然后运行。代码使用“故障注入桩工具”模拟真实世界的常见失败:异常抛出、超时、返回不合约、参数缺失。

from __future__ import annotations

import asyncio
import json
import random
import time
import uuid
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, Tuple

# 代码说明:
# - 目标:演示“多工具组合调用”的落地优化骨架:标准化返回、异常捕获、超时、重试退避、warnings 降级、证据字段。
# - 原则:所有工具输出统一为 {ok, tool, trace_id, ts_ms, data, error},失败必须可分类(error.code)。
# - 备注:这里的 tool_xxx 都是“故障注入桩工具”,真实项目只需要替换工具函数体,尽量保留编排与规范化骨架。

def now_ms() -> int:
    # 统一生成毫秒时间戳:用于排序、审计、回放定位(避免秒/毫秒混用)
    return int(time.time() * 1000)

def ok_result(*, tool: str, trace_id: str, data: Dict[str, Any]) -> Dict[str, Any]:
    # 统一成功返回:ok 只表示“这一步调用成功”,不等价于“业务完成”
    return {"ok": True, "tool": tool, "trace_id": trace_id, "ts_ms": now_ms(), "data": data, "error": None}

def err_result(
    *,
    tool: str,
    trace_id: str,
    code: str,
    message: str,
    detail: Optional[Dict[str, Any]] = None,
    retryable: bool = False,
) -> Dict[str, Any]:
    # 统一失败返回:
    # - code:可统计/可回归断言的错误码
    # - message:给人看的简短原因
    # - detail:补充证据(缺字段、期望类型、阈值等),便于复现与修复
    # - retryable:是否允许重试(通常查询类可重试;控制类默认不重试)
    return {
        "ok": False,
        "tool": tool,
        "trace_id": trace_id,
        "ts_ms": now_ms(),
        "data": {},
        "error": {"code": code, "message": message, "detail": detail or {}, "retryable": retryable},
    }

def require_fields(obj: Dict[str, Any], fields: List[str]) -> Tuple[bool, List[str]]:
    # 必填字段检查:把“KeyError 类异常”前置为结构化错误,减少联调噪声
    missing = [f for f in fields if obj.get(f) in (None, "", [])]
    return (len(missing) == 0), missing

def _to_obj(x: Any) -> Dict[str, Any]:
    # 工具输出兼容:允许工具返回 dict 或 JSON 字符串
    if isinstance(x, dict):
        return x
    if isinstance(x, str):
        return json.loads(x)
    raise TypeError(f"unsupported tool output type: {type(x)}")

def normalize_tool_result(raw: Any, *, tool: str, trace_id: str) -> Dict[str, Any]:
    # 核心:把各种“奇形怪状的工具输出”标准化成统一结构
    # - 目的:让 Join/回归测试/审计都能用同一套断言口径
    try:
        obj = _to_obj(raw)
    except Exception as e:
        return err_result(tool=tool, trace_id=trace_id, code="RESP_NOT_JSON", message="tool response is not valid json", detail={"err": str(e)})

    # meta 用于记录“兼容转换/注入行为”,避免学生误以为工具本身天然返回了这些字段
    meta: Dict[str, Any] = {}
    if "ok" not in obj and "success" in obj:
        # 兼容字段漂移:success → ok(对外只暴露 ok)
        obj["ok"] = bool(obj["success"])
        meta["ok_compat_from_success"] = True

    if "ts_ms" not in obj and "ts" in obj:
        # 兼容时间戳漂移:ts(秒/毫秒不确定)→ ts_ms(统一毫秒)
        ts = obj["ts"]
        if isinstance(ts, (int, float)) and ts < 10_000_000_000:
            obj["ts_ms"] = int(ts * 1000)
            meta["ts_converted_from_s"] = True
        else:
            obj["ts_ms"] = int(ts)

    if "trace_id" not in obj:
        # 工具未返回 trace_id:编排层注入(不推荐,但现实常见)
        obj["trace_id"] = trace_id
        meta["trace_id_injected"] = True

    # 兜底:保证 tool/ts_ms/data 形态稳定,避免后续 Join 写一堆 if/else
    obj["tool"] = obj.get("tool") or tool
    obj["ts_ms"] = int(obj.get("ts_ms") or now_ms())
    obj["data"] = obj.get("data") if isinstance(obj.get("data"), dict) else {}

    ok = bool(obj.get("ok"))
    if ok:
        # 成功:把 meta 写入 data._meta(可选证据字段),方便复盘兼容转换发生了什么
        obj["error"] = None
        data = dict(obj["data"])
        if meta:
            data["_meta"] = meta
        return {"ok": True, "tool": obj["tool"], "trace_id": obj["trace_id"], "ts_ms": obj["ts_ms"], "data": data, "error": None}

    err = obj.get("error")
    if not isinstance(err, dict):
        # 失败但 error 不是对象:统一成可断言的结构化错误
        err = {"code": "RESP_ERROR_SHAPE", "message": "tool error is not an object", "detail": {"raw_error": str(obj.get("error"))}}
    if "code" not in err or "message" not in err:
        # 失败但缺少 error.code/message:统一归类为 RESP_ERROR_SHAPE
        err = {"code": "RESP_ERROR_SHAPE", "message": "missing error.code/message", "detail": {"raw_error": err}}
    err.setdefault("detail", {})
    if meta:
        # 失败:把 meta 写入 error.detail._meta(便于解释“为何这次失败/为何这次被注入”)
        err["detail"].setdefault("_meta", {}).update(meta)
    err.setdefault("retryable", False)
    return {"ok": False, "tool": obj["tool"], "trace_id": obj["trace_id"], "ts_ms": obj["ts_ms"], "data": {}, "error": err}

async def call_with_timeout(coro, *, timeout_s: float, tool: str, trace_id: str) -> Dict[str, Any]:
    # 把“超时/异常抛出”变成结构化结果,避免编排层直接崩溃
    try:
        raw = await asyncio.wait_for(coro, timeout=timeout_s)
        return normalize_tool_result(raw, tool=tool, trace_id=trace_id)
    except asyncio.TimeoutError:
        return err_result(tool=tool, trace_id=trace_id, code="TIMEOUT", message=f"tool timeout after {timeout_s}s", retryable=True)
    except Exception as e:
        return err_result(tool=tool, trace_id=trace_id, code="EXCEPTION", message="tool raised exception", detail={"err": repr(e)}, retryable=True)

async def retry_call(
    *,
    call_fn,
    tool: str,
    trace_id: str,
    timeout_s: float,
    retries: int,
    base_backoff_ms: int,
) -> Dict[str, Any]:
    # 统一重试 wrapper:
    # - 只对 retryable=true 的失败重试
    # - 指数退避 + 随机抖动,避免高并发场景“集体重试雪崩”
    # - 控制类工具建议默认不重试(除非你能证明幂等,并且回执可对齐)
    last: Optional[Dict[str, Any]] = None
    for i in range(retries + 1):
        out = await call_with_timeout(call_fn(), timeout_s=timeout_s, tool=tool, trace_id=trace_id)
        last = out
        if out["ok"]:
            return out
        retryable = bool((out.get("error") or {}).get("retryable"))
        if (not retryable) or i == retries:
            return out
        backoff_ms = base_backoff_ms * (2**i) + random.randint(0, 80)
        await asyncio.sleep(backoff_ms / 1000)
    return last or err_result(tool=tool, trace_id=trace_id, code="UNKNOWN", message="unexpected retry loop end")

async def tool_parse_instruction(*, instruction: str, trace_id: str) -> Dict[str, Any]:
    # 工具桩 1:解析指令(故障注入)
    # 真实项目可替换为:意图识别/槽位抽取/参数标准化
    tool = "parse_instruction"
    if not isinstance(instruction, str):
        return err_result(
            tool=tool,
            trace_id=trace_id,
            code="TYPE_ERROR",
            message="instruction must be a string",
            detail={"expect": "str", "got": type(instruction).__name__},
            retryable=False,
        )
    if not instruction.strip():
        return err_result(tool=tool, trace_id=trace_id, code="INPUT_EMPTY", message="instruction is empty", retryable=False)
    await asyncio.sleep(0.05)
    if "字段错" in instruction:
        return {"success": False, "error": "bad shape"}  # 故意返回不合约:用于演示 normalize
    return ok_result(tool=tool, trace_id=trace_id, data={"action": "place", "target_bin": "bin_a"})

async def tool_query_kb(*, query: str, trace_id: str) -> Dict[str, Any]:
    # 工具桩 2:知识库检索(故障注入)
    # 真实项目可替换为:向量库/数据库/规则库查询;通常属于“可降级”的非关键分支
    tool = "kb_search"
    if not isinstance(query, str):
        return err_result(
            tool=tool,
            trace_id=trace_id,
            code="TYPE_ERROR",
            message="query must be a string",
            detail={"expect": "str", "got": type(query).__name__},
            retryable=False,
        )
    await asyncio.sleep(0.15)
    if "超时" in query:
        await asyncio.sleep(5)
    if "抛异常" in query:
        raise RuntimeError("kb backend crashed")
    return ok_result(tool=tool, trace_id=trace_id, data={"hits": [{"id": "doc-01", "score": 0.76}]})

async def tool_arm_control(*, action: str, target_bin: str, trace_id: str) -> Dict[str, Any]:
    # 工具桩 3:设备控制(示例)
    # 关键原则:控制类工具必须最后执行,并且要严格做参数校验与安全拒绝
    tool = "arm_control"
    ok1, missing = require_fields({"action": action, "target_bin": target_bin}, ["action", "target_bin"])
    if not ok1:
        return err_result(tool=tool, trace_id=trace_id, code="INPUT_MISSING", message="missing required fields", detail={"missing": missing})
    if action not in {"place"}:
        return err_result(
            tool=tool,
            trace_id=trace_id,
            code="VALUE_ERROR",
            message="unsupported action",
            detail={"action": action, "allow": ["place"]},
            retryable=False,
        )
    await asyncio.sleep(0.05)
    if target_bin == "bin_unknown":
        return err_result(tool=tool, trace_id=trace_id, code="SAFETY_REJECT", message="unsafe target_bin", retryable=False)
    return ok_result(tool=tool, trace_id=trace_id, data={"ack": True, "cmd_id": "c-001"})

@dataclass
class PipelineOutput:
    ok: bool
    trace_id: str
    steps: List[Dict[str, Any]]
    final: Dict[str, Any]
    warnings: List[Dict[str, Any]]

    def to_dict(self) -> Dict[str, Any]:
        return {"ok": self.ok, "trace_id": self.trace_id, "steps": self.steps, "final": self.final, "warnings": self.warnings}

async def orchestrate(*, instruction: str, kb_query: str) -> PipelineOutput:
    # 编排主函数:
    # - 关键分支:parse → control(失败则 fail-fast)
    # - 非关键分支:kb_search(失败则写入 warnings,不阻断主链路)
    trace_id = uuid.uuid4().hex[:8]
    steps: List[Dict[str, Any]] = []
    warnings: List[Dict[str, Any]] = []

    # Step 1(关键):解析
    parse = await retry_call(
        call_fn=lambda: tool_parse_instruction(instruction=instruction, trace_id=trace_id),
        tool="parse_instruction",
        trace_id=trace_id,
        timeout_s=1.0,
        retries=0,
        base_backoff_ms=100,
    )
    steps.append(parse)
    if not parse["ok"]:
        return PipelineOutput(ok=False, trace_id=trace_id, steps=steps, final={"reason": "parse_failed", "error": parse["error"]}, warnings=warnings)

    # Step 2(非关键):检索(失败允许降级,但必须留证据)
    kb = await retry_call(
        call_fn=lambda: tool_query_kb(query=kb_query, trace_id=trace_id),
        tool="kb_search",
        trace_id=trace_id,
        timeout_s=0.3,
        retries=2,
        base_backoff_ms=120,
    )
    steps.append(kb)
    if not kb["ok"]:
        warnings.append({"tool": kb["tool"], "trace_id": kb["trace_id"], "ts_ms": kb["ts_ms"], "error": kb["error"]})

    # Step 3(关键):控制(示例中默认不重试)
    control = await retry_call(
        call_fn=lambda: tool_arm_control(action=str(parse["data"]["action"]), target_bin=str(parse["data"]["target_bin"]), trace_id=trace_id),
        tool="arm_control",
        trace_id=trace_id,
        timeout_s=1.0,
        retries=0,
        base_backoff_ms=100,
    )
    steps.append(control)
    if not control["ok"]:
        return PipelineOutput(ok=False, trace_id=trace_id, steps=steps, final={"reason": "control_failed", "error": control["error"]}, warnings=warnings)

    # 对外 final:只保留业务关键字段;证据链放在 steps/warnings
    final = {"action": parse["data"]["action"], "target_bin": parse["data"]["target_bin"], "cmd_id": control["data"]["cmd_id"]}
    if kb["ok"]:
        final["kb_hit_count"] = len(kb["data"].get("hits") or [])
    return PipelineOutput(ok=True, trace_id=trace_id, steps=steps, final=final, warnings=warnings)

def main():
    # 三组演示用例:
    # 1)全成功
    # 2)parse 返回不合约 + kb 抛异常(演示 normalize + EXCEPTION)
    # 3)kb 超时(演示 TIMEOUT + warnings 降级)
    out = asyncio.run(orchestrate(instruction="把苹果放到A格口", kb_query="查询分拣规则"))
    print(json.dumps(out.to_dict(), ensure_ascii=False, indent=2))

    out2 = asyncio.run(orchestrate(instruction="字段错:把苹果放到A格口", kb_query="抛异常"))
    print(json.dumps(out2.to_dict(), ensure_ascii=False, indent=2))

    out3 = asyncio.run(orchestrate(instruction="把苹果放到A格口", kb_query="超时"))
    print(json.dumps(out3.to_dict(), ensure_ascii=False, indent=2))

if __name__ == "__main__":
    main()

逐段解释与自检要点:

运行命令(在含 orchestrator_resilience_l1.py 的目录执行):

python orchestrator_resilience_l1.py

命令解释与自检要点:

目标(分组):

步骤(建议):

  1. 选定你们链路中的 2 个“高频失败点工具”(例如 KB 检索、ROS 控制、数据库查询)。
  2. 给每个工具加一层统一 wrapper:异常捕获、超时、规范化返回(字段漂移兼容)。
  3. 在编排 Join 处实施过滤:关键分支失败 fail-fast;非关键分支失败写入 warnings。
  4. 写 2 条错例:字段缺失、类型错误或单位错误,并确保错误码能被归类(INPUT_MISSING/TYPE_ERROR/UNIT_ERROR 等)。
  5. 给“参数错误提示”定口径:至少能回答三件事——缺哪个字段、期望类型是什么、允许范围/枚举有哪些(写进 error.detail)。

1)先做“策略表”:不同类型工具,容错策略不同

2)容错模式四件套(落地可写成代码)

  1. 超时(Timeout)
  1. 有条件重试(Retry with backoff)
  1. 降级(Fallback / Degrade)
  1. 熔断(Circuit Breaker)
  1. 补偿/回滚(Compensation)

下面代码片段可直接粘贴进你们的编排层(把 call_fn 换成真实工具调用即可)。

import time
from dataclasses import dataclass
from typing import Callable, Dict, Optional

@dataclass
class CircuitBreaker:
    # 熔断器的最小状态机:
    # - CLOSED:正常放行
    # - OPEN:拒绝请求(直接走降级/兜底),等待 reset_timeout_s
    # - HALF_OPEN:试探性放行 1 次(或少量次数),成功则恢复 CLOSED,失败则回到 OPEN
    failure_threshold: int
    reset_timeout_s: float
    state: str = "CLOSED"  # CLOSED | OPEN | HALF_OPEN
    consecutive_failures: int = 0
    opened_at: Optional[float] = None

    def allow_request(self) -> bool:
        # 判断“当前是否允许调用工具”
        if self.state == "CLOSED":
            return True
        if self.state == "OPEN":
            if self.opened_at is None:
                return False
            if (time.time() - self.opened_at) >= self.reset_timeout_s:
                # 到时间后进入 HALF_OPEN:允许一次试探调用
                self.state = "HALF_OPEN"
                return True
            return False
        return True

    def record_success(self) -> None:
        # 一旦成功:清空连续失败并关闭熔断
        self.state = "CLOSED"
        self.consecutive_failures = 0
        self.opened_at = None

    def record_failure(self) -> None:
        # 连续失败达到阈值:打开熔断并记录打开时间
        self.consecutive_failures += 1
        if self.consecutive_failures >= self.failure_threshold:
            self.state = "OPEN"
            self.opened_at = time.time()

def call_with_circuit_breaker(
    *,
    breaker: CircuitBreaker,
    call_fn: Callable[[], Dict],
    on_open_fallback: Callable[[], Dict],
) -> Dict:
    # 熔断包装器:
    # - 若熔断为 OPEN 且未到恢复时间:不调用工具,直接走 fallback(并写入 warnings/错误码)
    if not breaker.allow_request():
        return on_open_fallback()

    out = call_fn()
    if bool(out.get("ok")):
        breaker.record_success()
        return out

    breaker.record_failure()
    return out

逐段解释与自检要点:

AI 容错方案生成模板(可直接复制):

你是工业级编排与容错工程师。请基于我的现状,生成“可落地的容错设计方案 + 最小代码改动”,要求:
1)先输出策略表:每个工具属于查询/计算/控制哪类?允许超时/重试/降级/熔断吗?理由是什么?
2)给出错误码分类建议(不超过 12 个 code),并说明每个 code 的触发条件与建议处理方式(fail-fast / retry / fallback)。
3)给出最小补丁代码:实现超时、重试退避、warnings、(可选)熔断;不要引入新依赖。
4)给出 4 条回归用例(2 正常 + 2 异常),每条必须能复验(输入→期望行为→断言点)。

现有工具列表与签名:{粘贴}
现有编排代码片段:{粘贴}
失败证据(含 trace_id 的输出):{粘贴}

AI 输出审计清单(必须做):

建议的最小异常标注格式(JSONL:一行一个样本),保存为 anomaly_labels.jsonl

{"id":"a-001","input":{"instruction":"把苹果放到A格口","kb_query":"超时"},"expect":{"behavior":"DEGRADE","warnings_contains":["TIMEOUT"],"final_required_keys":["action","target_bin"]}}
{"id":"a-002","input":{"instruction":"","kb_query":"查询分拣规则"},"expect":{"behavior":"FAIL_FAST","error_code":"INPUT_EMPTY","failed_step":"parse_instruction"}}
{"id":"a-003","input":{"instruction":"字段错:把苹果放到A格口","kb_query":"查询分拣规则"},"expect":{"behavior":"FAIL_FAST","error_code":"RESP_ERROR_SHAPE","failed_step":"parse_instruction"}}
{"id":"a-004","input":{"instruction":"把苹果放到A格口","kb_query":"抛异常"},"expect":{"behavior":"DEGRADE","warnings_contains":["EXCEPTION"],"final_required_keys":["cmd_id"]}}

字段解释与自检要点:

扩展样本(可选,完成“熔断接入”后再加进 JSONL):

{"id":"a-005","input":{"instruction":"把苹果放到A格口","kb_query":"查询分拣规则"},"expect":{"behavior":"CIRCUIT_OPEN","warnings_contains":["CIRCUIT_OPEN"]}}

可选:异常标注数据的回归脚本骨架(标准库即可)

import json

def load_jsonl(path: str):
    # 读取 JSONL:每行一个样本,适合持续追加异常案例做回归
    with open(path, "r", encoding="utf-8") as f:
        for line in f:
            line = line.strip()
            if line:
                yield json.loads(line)

def has_warning(warnings, codes):
    # 把 warnings 里的 error.code 抽成集合,再做“包含关系”断言
    # 目的:避免用字符串 contains 产生误判
    codes = set(codes or [])
    seen = {(w.get("error") or {}).get("code") for w in (warnings or [])}
    return codes.issubset(seen)

def main():
    # 约定:你已把第一课时代码保存为 orchestrator_resilience_l1.py
    # 并且其中有 async orchestrate(instruction, kb_query) 返回 to_dict() 的结构
    from orchestrator_resilience_l1 import orchestrate
    import asyncio

    for case in load_jsonl("anomaly_labels.jsonl"):
        cid = case["id"]
        inp = case["input"]
        exp = case["expect"]

        # 运行一条样本:输出结构里包含 ok/final/warnings/steps/trace_id
        out = asyncio.run(orchestrate(instruction=inp.get("instruction", ""), kb_query=inp.get("kb_query", ""))).to_dict()
        ok = bool(out.get("ok"))
        final = out.get("final") or {}
        warnings = out.get("warnings") or []

        behavior = exp.get("behavior")
        if behavior == "FAIL_FAST":
            # FAIL_FAST:关键分支失败,整体必须失败,且 error.code 要能对上标注期望
            assert ok is False, (cid, "should fail-fast")
            if "error_code" in exp:
                got = ((final.get("error") or {}).get("code"))
                assert got == exp["error_code"], (cid, got, exp["error_code"])
        elif behavior == "DEGRADE":
            # DEGRADE:非关键分支失败允许降级,但必须留下 warnings 证据,且 final 仍包含关键字段
            if "warnings_contains" in exp:
                assert has_warning(warnings, exp["warnings_contains"]), (cid, warnings)
            for k in exp.get("final_required_keys") or []:
                assert k in final, (cid, "missing final key", k)
        else:
            raise ValueError(f"unknown behavior: {behavior} | case={cid}")

    print("all anomaly labels passed")

if __name__ == "__main__":
    main()

逐段解释与自检要点:

落地建议(每组至少做到 1 条):


课后作业(布置)

1)提交调试后的工具调用代码、运行成功截图(含多工具调用完整流程)。

2)提交容错设计方案与对应的代码实现,附 150 字左右说明,阐述优化效果。

3)提交标注异常场景数据的使用记录,说明如何基于数据完善逻辑。

Markdown 与代码自检清单(提交前自查)