26 实践课-多工具组合调用逻辑设计

多工具组合调用逻辑设计

关联:索引

术语小抄(初学者版)

作业:布置(见文末)


  1. 为什么“视觉识别失败”时不能直接尝试机械臂控制?
  2. 如果并行调用了两个工具,一个成功一个失败,你的系统应当返回成功还是失败?为什么?

1)串行:适用于“强依赖链”

分拣主链路属于强依赖:

2)并行:适用于“弱依赖 + 可合并”

并行适用的典型情况:

本讲全程统一的“规范化返回格式”(建议你们组后续所有工具/编排都对齐):

{
  "ok": true,
  "trace_id": "a1b2c3d4",
  "tool": "vision_detect",
  "ts_ms": 1710000000000,
  "data": {},
  "error": null
}

解释与自检要点:

把下面代码保存为 orchestrator_demo.py,然后运行。示例工具用的是“可运行的桩函数”,目的是把编排结构与返回规范先跑通,后续再替换成你们真实工具(视觉模型、解析器、ROS2 控制)。

from __future__ import annotations

# 这是一个“多工具编排层”的最小可运行示例:
# - 用 3 个核心工具模拟分拣主链路:视觉识别 → 指令解析 → 机械臂控制(串行)
# - 在控制工具前加入门禁:safety_check(置信度阈值)
# - 所有工具都返回统一的结构(规范化返回),便于 Join、测试、审计、回归
# 你后续替换成真实工具(视觉模型/解析器/ROS2 控制)时,尽量只替换工具函数体,保留返回结构与编排骨架。

import asyncio
import json
import time
import uuid
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, Tuple

def now_ms() -> int:
    # 统一生成毫秒时间戳:用于日志排序、审计对齐、问题回放定位
    return int(time.time() * 1000)

def ok_result(*, tool: str, trace_id: str, data: Dict[str, Any]) -> Dict[str, Any]:
    # 规范化成功返回:
    # - ok=true 表示“这一步工具调用成功”,不等价于“业务完成”
    # - tool/trace_id/ts_ms 是审计与串联证据链的关键字段
    return {"ok": True, "tool": tool, "trace_id": trace_id, "ts_ms": now_ms(), "data": data, "error": None}

def err_result(*, tool: str, trace_id: str, code: str, message: str, detail: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
    # 规范化失败返回:
    # - code:机器可判断的错误码(用于统计/报警/回归断言)
    # - message:给人看的简短原因
    # - detail:补充信息(缺哪些字段、阈值是多少等),便于复现与修复
    return {
        "ok": False,
        "tool": tool,
        "trace_id": trace_id,
        "ts_ms": now_ms(),
        "data": {},
        "error": {"code": code, "message": message, "detail": detail or {}},
    }

def require_fields(obj: Dict[str, Any], fields: List[str]) -> Tuple[bool, List[str]]:
    # 统一的“必填字段检查”:把业务字段缺失变成可解释、可复现的错误
    missing = [f for f in fields if obj.get(f) in (None, "", [])]
    return (len(missing) == 0), missing

async def tool_vision_detect(*, image_path: str, trace_id: str) -> Dict[str, Any]:
    # 工具 1:视觉识别(桩)
    # 真实场景中这里会做:图像读取/预处理/模型推理/结果后处理
    tool = "vision_detect"
    p = (image_path or "").strip()
    if not p:
        # 输入不合规:直接拒绝,避免后续工具“带病执行”
        return err_result(tool=tool, trace_id=trace_id, code="INPUT_EMPTY", message="image_path is empty")

    # 用 sleep 模拟“工具耗时”;真实项目里可以替换为实际推理时间
    await asyncio.sleep(0.05)

    # 这里用非常简化的规则模拟识别结果(只用于课堂演示编排结构)
    item_type = "apple" if "apple" in p.lower() else "unknown"
    confidence = 0.92 if item_type == "apple" else 0.30
    return ok_result(tool=tool, trace_id=trace_id, data={"item_type": item_type, "confidence": confidence})

async def tool_parse_instruction(*, instruction: str, trace_id: str) -> Dict[str, Any]:
    # 工具 2:指令解析(桩)
    # 真实场景中这里会做:意图识别/槽位抽取/约束提取/参数标准化
    tool = "parse_instruction"
    ins = (instruction or "").strip()
    if not ins:
        return err_result(tool=tool, trace_id=trace_id, code="INPUT_EMPTY", message="instruction is empty")

    await asyncio.sleep(0.03)
    lower = ins.lower()

    # 解析“目标格口”:
    # - 只匹配明确模式,避免出现 "place" 这种英文词导致误判(含字母 a/b)
    if ("a格口" in lower) or ("bin_a" in lower) or ("slot_a" in lower):
        target_bin = "bin_a"
    elif ("b格口" in lower) or ("bin_b" in lower) or ("slot_b" in lower):
        target_bin = "bin_b"
    else:
        target_bin = "bin_unknown"

    # 解析“动作类型”:
    # - 工业控制类动作必须有限枚举,unknown 一律不允许控制执行
    action = "place" if ("place" in lower or "放" in ins) else "unknown"
    return ok_result(tool=tool, trace_id=trace_id, data={"action": action, "target_bin": target_bin})

async def tool_arm_control(*, action: str, target_bin: str, item_type: str, trace_id: str) -> Dict[str, Any]:
    # 工具 3:机械臂控制(桩)
    # 真实场景中这里会对接 ROS2/PLC/设备 SDK;课堂里用桩函数强调两件事:
    # 1)控制类工具必须最后执行
    # 2)执行前必须做门禁与参数二次校验(宁可拒绝也不误控)
    tool = "arm_control"

    ok1, missing = require_fields({"action": action, "target_bin": target_bin, "item_type": item_type}, ["action", "target_bin", "item_type"])
    if not ok1:
        return err_result(tool=tool, trace_id=trace_id, code="INPUT_MISSING", message="missing required fields", detail={"missing": missing})

    await asyncio.sleep(0.04)

    # 安全拒绝策略:只要关键参数不确定,就不下发控制
    if target_bin == "bin_unknown" or action == "unknown" or item_type == "unknown":
        return err_result(tool=tool, trace_id=trace_id, code="SAFETY_REJECT", message="unsafe to execute control with unknown parameters")

    # 模拟“控制执行回执”:工业场景里这个回执必须可追溯、可审计
    return ok_result(tool=tool, trace_id=trace_id, data={"ack": True, "executed": {"action": action, "target_bin": target_bin, "item_type": item_type}})

async def tool_safety_check(*, confidence: float, trace_id: str) -> Dict[str, Any]:
    # 门禁工具(示例):用置信度做阈值门禁
    # 真实场景可扩展:权限校验、二次确认、速度上限、危险动作白名单等
    tool = "safety_check"
    await asyncio.sleep(0.01)
    allow_control = float(confidence) >= 0.85
    return ok_result(tool=tool, trace_id=trace_id, data={"allow_control": allow_control, "min_confidence": 0.85})

@dataclass
class PipelineOutput:
    # 编排层输出结构:
    # - steps:保留每一步工具的“规范化返回”,形成证据链
    # - final:对外给业务的关键结果(成功时给关键字段,失败时给 reason)
    ok: bool
    trace_id: str
    steps: List[Dict[str, Any]]
    final: Dict[str, Any]
    warnings: List[Dict[str, Any]]

    def to_dict(self) -> Dict[str, Any]:
        return {
            "ok": self.ok,
            "trace_id": self.trace_id,
            "steps": self.steps,
            "final": self.final,
            "warnings": self.warnings,
        }

async def orchestrate_sorting(*, image_path: str, instruction: str) -> PipelineOutput:
    # 编排主函数(串行主链路):
    # - 严格按顺序执行:vision → parse → safety → control
    # - 任一步失败:立刻停止并返回(fail-fast),避免“错误扩散”
    trace_id = uuid.uuid4().hex[:8]
    steps: List[Dict[str, Any]] = []
    warnings: List[Dict[str, Any]] = []

    # Step 1:视觉识别
    vision = await tool_vision_detect(image_path=image_path, trace_id=trace_id)
    steps.append(vision)
    if not vision["ok"]:
        return PipelineOutput(ok=False, trace_id=trace_id, steps=steps, final={"reason": "vision_failed"}, warnings=warnings)

    # Step 2:指令解析(依赖视觉/业务上下文时,也可以把 vision 结果作为输入的一部分传入)
    parse = await tool_parse_instruction(instruction=instruction, trace_id=trace_id)
    steps.append(parse)
    if not parse["ok"]:
        return PipelineOutput(ok=False, trace_id=trace_id, steps=steps, final={"reason": "parse_failed"}, warnings=warnings)

    # Step 3:门禁(控制前必经)
    # 这里的门禁只用置信度演示;真实系统还应加入权限/场景/参数范围等门禁
    safety = await tool_safety_check(confidence=float(vision["data"]["confidence"]), trace_id=trace_id)
    steps.append(safety)
    if not safety["ok"]:
        return PipelineOutput(ok=False, trace_id=trace_id, steps=steps, final={"reason": "safety_check_failed"}, warnings=warnings)
    if not bool(safety["data"]["allow_control"]):
        return PipelineOutput(ok=False, trace_id=trace_id, steps=steps, final={"reason": "confidence_too_low"}, warnings=warnings)

    # Step 4:机械臂控制(必须最后执行)
    # 通过“上游输出 → 下游入参”的方式传参,避免出现字段名不一致与不可追踪的隐式依赖
    control = await tool_arm_control(
        action=str(parse["data"]["action"]),
        target_bin=str(parse["data"]["target_bin"]),
        item_type=str(vision["data"]["item_type"]),
        trace_id=trace_id,
    )
    steps.append(control)
    if not control["ok"]:
        return PipelineOutput(ok=False, trace_id=trace_id, steps=steps, final={"reason": "control_failed"}, warnings=warnings)

    # 最终对外结果:只保留业务关键字段(其余证据在 steps 里)
    final = {
        "item_type": vision["data"]["item_type"],
        "target_bin": parse["data"]["target_bin"],
        "action": parse["data"]["action"],
        "executed": control["data"]["executed"],
    }
    return PipelineOutput(ok=True, trace_id=trace_id, steps=steps, final=final, warnings=warnings)

async def demo_parallel_extension() -> Dict[str, Any]:
    # 并行扩展示例(只演示“并行骨架”):
    # - vision 与 parse 在这里没有数据依赖,因此可并行
    # - 注意:并行情况下 steps 的 append 顺序可能不稳定(调度导致),不要依赖 steps 的顺序做严格断言
    trace_id = uuid.uuid4().hex[:8]
    steps: List[Dict[str, Any]] = []

    async def _vision():
        r = await tool_vision_detect(image_path="sample_apple.jpg", trace_id=trace_id)
        steps.append(r)
        return r

    async def _parse():
        r = await tool_parse_instruction(instruction="把苹果放到A格口", trace_id=trace_id)
        steps.append(r)
        return r

    # gather:并行等待两个分支结束;真实项目里可以加入超时、取消策略、关键分支失败就中止等逻辑
    v, p = await asyncio.gather(_vision(), _parse())
    return {"trace_id": trace_id, "ok": bool(v["ok"] and p["ok"]), "joined": {"vision": v, "parse": p}, "steps": steps}

def main():
    # 运行串行主链路 demo
    out = asyncio.run(orchestrate_sorting(image_path="apple_001.jpg", instruction="把苹果放到A格口"))
    print(json.dumps(out.to_dict(), ensure_ascii=False, indent=2))

    # 运行并行扩展示例 demo
    joined = asyncio.run(demo_parallel_extension())
    print(json.dumps(joined, ensure_ascii=False, indent=2))

if __name__ == "__main__":
    main()

解释与自检要点:

运行命令(在含 orchestrator_demo.py 的目录执行):

python orchestrator_demo.py

解释与自检要点:

目标(分组):

步骤(建议):

  1. 选定 3 个工具:视觉识别 / 指令解析 / 机械臂控制(或你们自选题的 3 个等价工具)。
  2. 写出串行流程:上游失败立刻停止,返回结构化错误。
  3. 加入一个门禁工具:safety_check(或 authorize)放在控制工具之前。

审计清单(拿到 AI 代码后先检查这 6 项):

  1. 工具名与字段名是否与你们注册表/工具实现一致(避免“拼错就全崩”)。

  2. 是否把控制类工具放最后,并在前面做门禁(权限/置信度/二次确认)。

  3. 失败是否结构化(error.code/message/detail),是否可定位(trace_id)。

  4. 并行分支的 Join 是否有明确规则(哪些必须成功,哪些可选)。

  5. 是否把不可控的自然语言直接传给控制工具(危险做法)。

  6. 是否有最小测试(至少:正常 2、异常 2)。

  7. 调用序列匹配率:标注期望的工具调用顺序 vs 实际 steps[*].tool

  8. 关键结果匹配率:标注期望字段(如 item_type/target_bin/action)vs 实际 final

标注数据最小格式(JSONL:一行一个样本),保存为 labels.jsonl

{"id":"case-001","image_path":"apple_001.jpg","instruction":"把苹果放到A格口","expect":{"tool_sequence":["vision_detect","parse_instruction","safety_check","arm_control"],"final":{"item_type":"apple","target_bin":"bin_a","action":"place"}}}
{"id":"case-002","image_path":"","instruction":"把苹果放到A格口","expect":{"tool_sequence":["vision_detect"],"final":{"reason":"vision_failed"}}}
{"id":"case-003","image_path":"unknown_001.jpg","instruction":"把苹果放到A格口","expect":{"tool_sequence":["vision_detect","parse_instruction","safety_check"],"final":{"reason":"confidence_too_low"}}}

解释与自检要点:

把下面代码保存为 evaluate_pipeline.py,与 orchestrator_demo.pylabels.jsonl 放在同一目录后运行。

from __future__ import annotations

# 这是一个“标注数据驱动验证”的最小脚本:
# - 输入:labels.jsonl(每行一个样本,包含 expect.tool_sequence 与 expect.final)
# - 过程:逐条运行编排 orchestrate_sorting(),对比“实际 steps 与 final”是否满足标注期望
# - 输出:总体匹配率 + 每条样本的问题清单(便于定位与回归)

import json
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Dict, List, Tuple

import asyncio

from orchestrator_demo import orchestrate_sorting

def load_jsonl(path: Path) -> List[Dict[str, Any]]:
    # 读取 JSONL:一行一个 JSON 对象,便于你不断追加标注样本
    lines = path.read_text(encoding="utf-8").splitlines()
    out: List[Dict[str, Any]] = []
    for ln in lines:
        s = ln.strip()
        if s:
            out.append(json.loads(s))
    return out

def tool_sequence(steps: List[Dict[str, Any]]) -> List[str]:
    # 从编排输出的 steps 中提取“实际调用序列”(每一步的 tool 字段)
    # 串行链路:该序列通常稳定,可直接与标注做严格相等对比
    # 并行链路:该序列可能受调度影响不稳定,建议改为校验“必经子序列/相对顺序约束/关键工具集合”
    return [str(s.get("tool", "")) for s in steps if s.get("tool")]

def match_subset(expect: Dict[str, Any], actual: Dict[str, Any]) -> Tuple[bool, List[str]]:
    # 子集匹配:只校验标注里写明的关键字段
    # 好处:系统可演进(final 增加新字段不影响旧标注),也避免把“非关键细节”写死
    mismatches: List[str] = []
    for k, v in expect.items():
        if actual.get(k) != v:
            mismatches.append(f"{k}: expect={v!r}, actual={actual.get(k)!r}")
    return (len(mismatches) == 0), mismatches

@dataclass
class EvalRow:
    # 每条样本的评估结果(用于输出问题清单与课堂验收)
    sample_id: str
    ok: bool
    seq_ok: bool
    final_ok: bool
    issues: List[str]

    def to_dict(self) -> Dict[str, Any]:
        return {
            "id": self.sample_id,
            "ok": self.ok,
            "seq_ok": self.seq_ok,
            "final_ok": self.final_ok,
            "issues": self.issues,
        }

async def eval_one(sample: Dict[str, Any]) -> EvalRow:
    # 评估单条样本:
    # - 跑一次编排得到实际输出
    # - 对比标注期望的 tool_sequence 与 final(关键字段子集)
    # - 汇总成 issues(问题清单),用于定位错误原因
    sample_id = str(sample.get("id", ""))
    image_path = str(sample.get("image_path", ""))
    instruction = str(sample.get("instruction", ""))
    expect = sample.get("expect") or {}

    # 执行编排:这里把 sample 的输入字段与 orchestrate_sorting() 的参数一一对齐
    out = await orchestrate_sorting(image_path=image_path, instruction=instruction)
    out_dict = out.to_dict()

    # 1)调用序列匹配:用于检查“顺序控制逻辑是否正确”
    actual_seq = tool_sequence(out_dict["steps"])
    expect_seq = list(expect.get("tool_sequence") or [])
    seq_ok = actual_seq == expect_seq

    # 2)关键结果匹配:用于检查“整合后的关键字段是否正确”
    expect_final = dict(expect.get("final") or {})
    actual_final = dict(out_dict.get("final") or {})
    final_ok, mismatches = match_subset(expect_final, actual_final)

    # 汇总问题清单:既包含序列不一致,也包含关键字段不一致
    issues: List[str] = []
    if not seq_ok:
        issues.append(f"tool_sequence mismatch: expect={expect_seq}, actual={actual_seq}")
    issues.extend(mismatches)

    ok = bool(seq_ok and final_ok)
    return EvalRow(sample_id=sample_id, ok=ok, seq_ok=seq_ok, final_ok=final_ok, issues=issues)

async def main():
    # 主入口:逐条样本评估 → 统计匹配率 → 打印 JSON 报告
    samples = load_jsonl(Path("labels.jsonl"))
    rows: List[EvalRow] = []
    for s in samples:
        rows.append(await eval_one(s))

    # 统计三类指标:
    # - overall_ok_rate:序列与结果都匹配的比例(最严格)
    # - sequence_match_rate:只看流程/顺序是否对
    # - final_match_rate:只看关键结果是否对
    total = len(rows)
    ok_cnt = sum(1 for r in rows if r.ok)
    seq_cnt = sum(1 for r in rows if r.seq_ok)
    final_cnt = sum(1 for r in rows if r.final_ok)

    report = {
        "total": total,
        "overall_ok_rate": 0.0 if total == 0 else ok_cnt / total,
        "sequence_match_rate": 0.0 if total == 0 else seq_cnt / total,
        "final_match_rate": 0.0 if total == 0 else final_cnt / total,
        "rows": [r.to_dict() for r in rows],
    }
    print(json.dumps(report, ensure_ascii=False, indent=2))

if __name__ == "__main__":
    asyncio.run(main())

解释与自检要点:

运行命令(在含 3 个文件的目录执行):

python evaluate_pipeline.py

解释与自检要点:

七、课程思政(融入点)


课后作业(布置)

1)提交自选题场景的工具组合调用逻辑图(含调用顺序、结果整合规则)。

2)提交工具调用代码(AI 生成 + 人工优化版)、标注数据验证结果(含问题清单)。

3)撰写 200 字左右说明:阐述工具调用逻辑设计的思考与优化方向(建议包含:门禁策略、Join 规则、验证指标、下一步改进)。


输出前自检清单(本已执行)