Skip to content

第六章:Agent Core -- 引擎的心脏

"简单是可靠的先决条件。" -- Edsger Dijkstra


6.1 只有 5 个文件

packages/agent 是整个项目最精炼的部分。它只有 5 个文件,总共约 1600 行代码,却定义了 Agent 的全部核心行为。

packages/agent/src/
├── types.ts         366 行   接口契约
├── agent.ts         543 行   状态管理
├── agent-loop.ts    684 行   核心循环
├── proxy.ts          ~50 行   远程代理
└── index.ts          ~10 行   导出

★ Insight ─────────────────────────────────────为什么只有 5 个文件? 因为这一层只做一件事——驱动 LLM 交互循环。它不管 UI,不管会话持久化,不管工具怎么实现。单一职责原则的极致体现。在 Java 中,这就是一个只有 Service 接口 + 一个实现类的最小 Spring Bean。 ─────────────────────────────────────────────────

6.2 Agent 类:状态的守护者

Agent 类是整个运行时的核心。它管理三样东西:状态事件队列

typescript
// packages/agent/src/agent.ts(简化)
export class Agent {
    // 私有状态
    private _state: MutableAgentState;
    private readonly listeners = new Set<Listener>();
    private readonly steeringQueue: PendingMessageQueue;
    private readonly followUpQueue: PendingMessageQueue;
    private activeRun?: ActiveRun;

    // 公开配置
    public convertToLlm: (messages: AgentMessage[]) => Message[];
    public transformContext?: (messages: AgentMessage[]) => Promise<AgentMessage[]>;
    public streamFn: StreamFn;
    public beforeToolCall?: BeforeToolCallHook;
    public afterToolCall?: AfterToolCallHook;
    public toolExecution: ToolExecutionMode;  // "sequential" | "parallel"
}

AgentState -- 不可变快照

typescript
interface AgentState {
    readonly systemPrompt: string;
    readonly model: Model<any>;
    readonly thinkingLevel: ThinkingLevel;
    set tools(tools: AgentTool<any>[]);    // setter 会复制数组
    get tools(): AgentTool<any>[];
    set messages(messages: AgentMessage[]); // setter 会复制数组
    get messages(): AgentMessage[];
    readonly isStreaming: boolean;
    readonly streamingMessage?: AgentMessage;
    readonly pendingToolCalls: ReadonlySet<string>;
    readonly errorMessage?: string;
}

⚡ Java 对照 ───────────────────────────────────── 这就像一个不可变的值对象(Immutable Value Object),但带有一些运行时状态。setter 中的 slice() 复制就像 Java 中 List.copyOf(list) ——外部修改不会影响内部状态。这和 Java 的防御性拷贝(Defensive Copying)原则完全一致。 ─────────────────────────────────────────────────

★ Insight ─────────────────────────────────────为什么 toolsmessages 用 getter/setter 而不是直接的 public 字段? 因为赋值时需要复制。当你写 agent.state.messages = newMessages 时,setter 内部执行 messages = nextMessages.slice(),创建一个浅拷贝。这样外部对 newMessages 的后续修改不会影响 Agent 内部状态。这是不可变性模式的一种实践。 ─────────────────────────────────────────────────

6.3 Agent 的核心方法

Agent 只有 3 个核心操作:

prompt() -- 开始新对话

typescript
async prompt(input: string | AgentMessage | AgentMessage[]): Promise<void> {
    if (this.activeRun) {
        throw new Error("Agent is already processing. Use steer() or followUp().");
    }
    const messages = this.normalizePromptInput(input);
    await this.runPromptMessages(messages);
}

steer() -- 中途注入消息

typescript
steer(message: AgentMessage): void {
    this.steeringQueue.enqueue(message);
}

followUp() -- 等待结束后追加

typescript
followUp(message: AgentMessage): void {
    this.followUpQueue.enqueue(message);
}

★ Insight ─────────────────────────────────────steer 和 followUp 的区别是什么?

  • steer(转向):Agent 正在执行工具时,你插话说"别忘了检查错误处理"。Agent 会在当前工具批次完成后立即处理你的消息。
  • followUp(后续):Agent 完成了所有工作后,你说"还有,帮我写个测试"。只有当 Agent 即将停下来时才会处理。

这就像一个会议中的两种打断方式:steer 是"等一下,我想补充一点",followUp 是"等你说完,我还有个问题"。 ─────────────────────────────────────────────────

6.4 subscribe() -- 事件订阅

typescript
subscribe(listener: (event: AgentEvent, signal: AbortSignal) => Promise<void> | void): () => void {
    this.listeners.add(listener);
    return () => this.listeners.delete(listener);  // 返回取消订阅函数
}

使用方式

typescript
const agent = new Agent({ ... });

// 订阅事件
const unsubscribe = agent.subscribe(async (event, signal) => {
    switch (event.type) {
        case "message_update":
            process.stdout.write(event.assistantMessageEvent.delta || "");
            break;
        case "tool_execution_start":
            console.log(`\n[Tool] ${event.toolName}(${JSON.stringify(event.args)})`);
            break;
        case "agent_end":
            console.log("\n[Done]");
            break;
    }
});

// 开始对话
await agent.prompt("Read package.json");

// 取消订阅
unsubscribe();

⚡ Java 对照 ─────────────────────────────────────

TypeScriptJava
subscribe(listener)addEventListener(listener)@EventListener
返回 () => void 取消函数返回 Subscription 或调用 removeListener()
Set<Listener>CopyOnWriteArraySet<Listener>
await listener(event)listener.onEvent(event)(同步)或 CompletableFuture

─────────────────────────────────────────────────

6.5 生命周期管理

Agent 的每次 prompt() 调用都遵循严格的生命周期:

prompt("...")


runWithLifecycle()
    ├── 创建 AbortController
    ├── 设置 isStreaming = true

    ├── ▼ executor(signal) ──────────────────────────┐
    │   │                                              │
    │   │   runAgentLoop()                             │
    │   │   ├── emit("agent_start")                    │
    │   │   ├── emit("turn_start")                     │
    │   │   ├── emit("message_start")                  │
    │   │   ├── streamAssistantResponse()              │
    │   │   │   ├── emit("message_update") × N         │
    │   │   │   └── emit("message_end")                │
    │   │   ├── executeToolCalls()                     │
    │   │   │   ├── emit("tool_execution_start")       │
    │   │   │   ├── emit("tool_execution_update") × N  │
    │   │   │   └── emit("tool_execution_end")         │
    │   │   ├── [可能多轮 turn]                        │
    │   │   └── emit("agent_end")                      │
    │   │                                              │
    │   └──────────────────────────────────────────────┘

    ├── 设置 isStreaming = false
    └── 清理 activeRun

★ Insight ─────────────────────────────────────AbortController 是取消机制的核心。 在 Java 中你可能用 Thread.interrupt()CancellationToken。TypeScript 的 AbortController 提供了标准化的取消协议——调用 controller.abort() 后,signal.aborted 变为 true,所有监听 signal 的异步操作都应该退出。Pi 在每个 prompt() 调用开始时创建一个新的 AbortController,确保并行调用之间互不干扰。 ─────────────────────────────────────────────────

6.6 Agent Loop:核心循环

这是整个项目最重要的算法。让我逐行解析:

typescript
// packages/agent/src/agent-loop.ts(简化并注释)
async function runLoop(
    currentContext: AgentContext,
    newMessages: AgentMessage[],
    config: AgentLoopConfig,
    signal: AbortSignal,
    emit: AgentEventSink,
): Promise<void> {

    // 外层循环:处理 follow-up 消息
    while (true) {

        // 内层循环:处理工具调用和 steering 消息
        let hasMoreToolCalls = true;
        let pendingMessages = await config.getSteeringMessages?.() || [];

        while (hasMoreToolCalls || pendingMessages.length > 0) {

            // 1. 注入 pending 消息(steering)
            if (pendingMessages.length > 0) {
                for (const msg of pendingMessages) {
                    await emit({ type: "message_start", message: msg });
                    await emit({ type: "message_end", message: msg });
                    currentContext.messages.push(msg);
                }
                pendingMessages = [];
            }

            // 2. 调用 LLM,流式获取响应
            const message = await streamAssistantResponse(currentContext, config, signal, emit);
            newMessages.push(message);

            // 3. 如果出错,终止
            if (message.stopReason === "error" || message.stopReason === "aborted") {
                await emit({ type: "agent_end", messages: newMessages });
                return;
            }

            // 4. 检查工具调用
            const toolCalls = message.content.filter(c => c.type === "toolCall");
            hasMoreToolCalls = false;

            if (toolCalls.length > 0) {
                // 5. 执行工具
                const batch = await executeToolCalls(currentContext, message, config, signal, emit);
                hasMoreToolCalls = !batch.terminate;  // 如果所有工具都要求终止,停止

                // 6. 将工具结果加入上下文
                for (const result of batch.messages) {
                    currentContext.messages.push(result);
                }
            }

            // 7. 检查新的 steering 消息
            pendingMessages = await config.getSteeringMessages?.() || [];
        }

        // 8. 内层循环结束,检查 follow-up
        const followUpMessages = await config.getFollowUpMessages?.() || [];
        if (followUpMessages.length > 0) {
            pendingMessages = followUpMessages;  // 作为下一轮的 pending
            continue;  // 继续外层循环
        }

        break;  // 没有更多消息,退出
    }

    await emit({ type: "agent_end", messages: newMessages });
}

流程图

┌─────────────────────────────────────────────────┐
│              while(true) 外层循环                │
│                                                  │
│  ┌─────────────────────────────────────────────┐ │
│  │         while(有工具调用或pending消息)       │ │
│  │                                              │ │
│  │  1. 注入 steering 消息                       │ │
│  │  2. 调用 LLM                                │ │
│  │  3. 如果 stopReason=toolUse → 执行工具       │ │
│  │     工具结果 → 加入上下文 → 继续内层循环     │ │
│  │  4. 如果 stopReason=stop → 退出内层循环      │ │
│  │                                              │ │
│  └─────────────────────────────────────────────┘ │
│                                                  │
│  5. 检查 follow-up 队列                          │
│     有消息 → 作为 pending → 继续外层循环         │
│     无消息 → 退出                                │
│                                                  │
└─────────────────────────────────────────────────┘

★ Insight ─────────────────────────────────────双层循环是 Agent 的精髓。 外层循环处理"用户追加的后续问题",内层循环处理"LLM 的工具调用链"。大多数时候你只需要内层循环——LLM 调用工具、拿到结果、再决定下一步。但当用户在 Agent 工作时追加了新问题(通过 followUp),外层循环确保这些问题最终会被处理。 ─────────────────────────────────────────────────

6.7 工具执行:顺序 vs 并行

当 LLM 一次请求多个工具调用时,Pi 支持两种执行策略:

typescript
type ToolExecutionMode = "sequential" | "parallel";

顺序执行sequential):

ToolCall 1 → execute → result 1 → ToolCall 2 → execute → result 2

并行执行parallel,默认):

ToolCall 1 ─→ execute ─┐
ToolCall 2 ─→ execute ─┼→ results → 按顺序组装
ToolCall 3 ─→ execute ─┘
typescript
// 并行执行的核心逻辑
async function executeToolCallsParallel(...): Promise&lt;ExecutedToolCallBatch&gt; {
    const prepared: FinalizedToolCallEntry[] = [];

    // 1. 顺序准备(验证参数、检查权限)
    for (const toolCall of toolCalls) {
        const preparation = await prepareToolCall(...);
        prepared.push(preparation);
    }

    // 2. 并行执行
    const results = await Promise.all(
        prepared.map(entry => typeof entry === "function" ? entry() : Promise.resolve(entry))
    );

    // 3. 按原始顺序组装结果
    return { messages: results.map(createToolResultMessage), terminate: ... };
}

★ Insight ─────────────────────────────────────为什么准备阶段是顺序的,执行阶段是并行的? 准备阶段需要验证参数和检查 beforeToolCall 钩子,这些操作可能有副作用(比如记录日志),需要按顺序执行。而实际的工具执行(读文件、执行命令)通常是独立的 I/O 操作,可以安全并行。这就是 Promise.all 的典型使用场景,类似于 Java 的 CompletableFuture.allOf()─────────────────────────────────────────────────

6.8 beforeToolCall / afterToolCall 钩子

这是 Agent 最重要的扩展点:

typescript
interface AgentLoopConfig {
    // 工具执行前的拦截器
    beforeToolCall?: (
        context: BeforeToolCallContext,
        signal?: AbortSignal
    ) => Promise&lt;BeforeToolCallResult | undefined&gt;;

    // 工具执行后的拦截器
    afterToolCall?: (
        context: AfterToolCallContext,
        signal?: AbortSignal
    ) => Promise&lt;AfterToolCallResult | undefined&gt;;
}

beforeToolCall 可以:

  • 阻止工具执行(返回 { block: true }
  • 记录审计日志
  • 修改参数

afterToolCall 可以:

  • 替换工具结果
  • 将成功结果改为错误(isError: true
  • 请求终止(terminate: true

⚡ Java 对照 ───────────────────────────────────── 这就是 Java 中 AOP(面向切面编程) 的函数式版本。beforeToolCall = @BeforeafterToolCall = @After。在 Spring 中你可能用 @Around 注解来做类似的事。Pi 用回调函数代替了注解,更加灵活。 ─────────────────────────────────────────────────

6.9 PendingMessageQueue:消息队列

typescript
class PendingMessageQueue {
    private messages: AgentMessage[] = [];

    constructor(public mode: QueueMode) {}  // "all" | "one-at-a-time"

    enqueue(message: AgentMessage): void { ... }
    hasItems(): boolean { ... }

    drain(): AgentMessage[] {
        if (this.mode === "all") {
            const drained = this.messages.slice();
            this.messages = [];
            return drained;
        }
        // one-at-a-time: 每次只返回一条
        const first = this.messages[0];
        this.messages = this.messages.slice(1);
        return first ? [first] : [];
    }
}

★ Insight ─────────────────────────────────────"all" vs "one-at-a-time" 模式的区别"all" 模式一次性排空所有消息,适合批量处理;"one-at-a-time" 模式每次只返回一条,适合需要逐条处理的场景。默认的 steering 模式是 "one-at-a-time"——如果你快速输入了多条消息,它们会被逐条注入,而不是一次性全部注入。这让 Agent 能更及时地响应用户的每一条输入。 ─────────────────────────────────────────────────

6.10 本章小结

┌─────────────────────────────────────────────────────────────┐
│                        Agent 类                              │
│                                                              │
│  ┌──────────┐  ┌──────────────┐  ┌──────────────────────┐  │
│  │  State    │  │   Events     │  │   Message Queues     │  │
│  │          │  │              │  │                      │  │
│  │ messages │  │ subscribe()  │  │ steeringQueue        │  │
│  │ tools    │  │ emit()       │  │ followUpQueue        │  │
│  │ model    │  │              │  │                      │  │
│  └──────────┘  └──────────────┘  └──────────────────────┘  │
│                                                              │
│  ┌──────────────────────────────────────────────────────────┐│
│  │                    Agent Loop                            ││
│  │                                                          ││
│  │  while(true) {                                           ││
│  │    while(有工具调用 || 有pending消息) {                   ││
│  │      注入steering → 调用LLM → 执行工具 → 结果入上下文    ││
│  │    }                                                     ││
│  │    检查follow-up → 有则继续外层循环                      ││
│  │  }                                                       ││
│  └──────────────────────────────────────────────────────────┘│
│                                                              │
│  扩展点: beforeToolCall / afterToolCall                      │
└─────────────────────────────────────────────────────────────┘

第五章:类型系统 | 第七章:工具系统 -- 7 大内置工具 →

基于 MIT 许可证发布