第六章:Agent Core -- 引擎的心脏
"简单是可靠的先决条件。" -- Edsger Dijkstra
6.1 只有 5 个文件
packages/agent 是整个项目最精炼的部分。它只有 5 个文件,总共约 1600 行代码,却定义了 Agent 的全部核心行为。
packages/agent/src/
├── types.ts 366 行 接口契约
├── agent.ts 543 行 状态管理
├── agent-loop.ts 684 行 核心循环
├── proxy.ts ~50 行 远程代理
└── index.ts ~10 行 导出★ Insight ─────────────────────────────────────为什么只有 5 个文件? 因为这一层只做一件事——驱动 LLM 交互循环。它不管 UI,不管会话持久化,不管工具怎么实现。单一职责原则的极致体现。在 Java 中,这就是一个只有 Service 接口 + 一个实现类的最小 Spring Bean。 ─────────────────────────────────────────────────
6.2 Agent 类:状态的守护者
Agent 类是整个运行时的核心。它管理三样东西:状态、事件、队列。
// packages/agent/src/agent.ts(简化)
export class Agent {
// 私有状态
private _state: MutableAgentState;
private readonly listeners = new Set<Listener>();
private readonly steeringQueue: PendingMessageQueue;
private readonly followUpQueue: PendingMessageQueue;
private activeRun?: ActiveRun;
// 公开配置
public convertToLlm: (messages: AgentMessage[]) => Message[];
public transformContext?: (messages: AgentMessage[]) => Promise<AgentMessage[]>;
public streamFn: StreamFn;
public beforeToolCall?: BeforeToolCallHook;
public afterToolCall?: AfterToolCallHook;
public toolExecution: ToolExecutionMode; // "sequential" | "parallel"
}AgentState -- 不可变快照
interface AgentState {
readonly systemPrompt: string;
readonly model: Model<any>;
readonly thinkingLevel: ThinkingLevel;
set tools(tools: AgentTool<any>[]); // setter 会复制数组
get tools(): AgentTool<any>[];
set messages(messages: AgentMessage[]); // setter 会复制数组
get messages(): AgentMessage[];
readonly isStreaming: boolean;
readonly streamingMessage?: AgentMessage;
readonly pendingToolCalls: ReadonlySet<string>;
readonly errorMessage?: string;
}⚡ Java 对照 ───────────────────────────────────── 这就像一个不可变的值对象(Immutable Value Object),但带有一些运行时状态。setter 中的 slice() 复制就像 Java 中 List.copyOf(list) ——外部修改不会影响内部状态。这和 Java 的防御性拷贝(Defensive Copying)原则完全一致。 ─────────────────────────────────────────────────
★ Insight ─────────────────────────────────────为什么 tools 和 messages 用 getter/setter 而不是直接的 public 字段? 因为赋值时需要复制。当你写 agent.state.messages = newMessages 时,setter 内部执行 messages = nextMessages.slice(),创建一个浅拷贝。这样外部对 newMessages 的后续修改不会影响 Agent 内部状态。这是不可变性模式的一种实践。 ─────────────────────────────────────────────────
6.3 Agent 的核心方法
Agent 只有 3 个核心操作:
prompt() -- 开始新对话
async prompt(input: string | AgentMessage | AgentMessage[]): Promise<void> {
if (this.activeRun) {
throw new Error("Agent is already processing. Use steer() or followUp().");
}
const messages = this.normalizePromptInput(input);
await this.runPromptMessages(messages);
}steer() -- 中途注入消息
steer(message: AgentMessage): void {
this.steeringQueue.enqueue(message);
}followUp() -- 等待结束后追加
followUp(message: AgentMessage): void {
this.followUpQueue.enqueue(message);
}★ Insight ─────────────────────────────────────steer 和 followUp 的区别是什么?
steer(转向):Agent 正在执行工具时,你插话说"别忘了检查错误处理"。Agent 会在当前工具批次完成后立即处理你的消息。followUp(后续):Agent 完成了所有工作后,你说"还有,帮我写个测试"。只有当 Agent 即将停下来时才会处理。
这就像一个会议中的两种打断方式:steer 是"等一下,我想补充一点",followUp 是"等你说完,我还有个问题"。 ─────────────────────────────────────────────────
6.4 subscribe() -- 事件订阅
subscribe(listener: (event: AgentEvent, signal: AbortSignal) => Promise<void> | void): () => void {
this.listeners.add(listener);
return () => this.listeners.delete(listener); // 返回取消订阅函数
}使用方式:
const agent = new Agent({ ... });
// 订阅事件
const unsubscribe = agent.subscribe(async (event, signal) => {
switch (event.type) {
case "message_update":
process.stdout.write(event.assistantMessageEvent.delta || "");
break;
case "tool_execution_start":
console.log(`\n[Tool] ${event.toolName}(${JSON.stringify(event.args)})`);
break;
case "agent_end":
console.log("\n[Done]");
break;
}
});
// 开始对话
await agent.prompt("Read package.json");
// 取消订阅
unsubscribe();⚡ Java 对照 ─────────────────────────────────────
| TypeScript | Java |
|---|---|
subscribe(listener) | addEventListener(listener) 或 @EventListener |
返回 () => void 取消函数 | 返回 Subscription 或调用 removeListener() |
Set<Listener> | CopyOnWriteArraySet<Listener> |
await listener(event) | listener.onEvent(event)(同步)或 CompletableFuture |
─────────────────────────────────────────────────
6.5 生命周期管理
Agent 的每次 prompt() 调用都遵循严格的生命周期:
prompt("...")
│
▼
runWithLifecycle()
├── 创建 AbortController
├── 设置 isStreaming = true
│
├── ▼ executor(signal) ──────────────────────────┐
│ │ │
│ │ runAgentLoop() │
│ │ ├── emit("agent_start") │
│ │ ├── emit("turn_start") │
│ │ ├── emit("message_start") │
│ │ ├── streamAssistantResponse() │
│ │ │ ├── emit("message_update") × N │
│ │ │ └── emit("message_end") │
│ │ ├── executeToolCalls() │
│ │ │ ├── emit("tool_execution_start") │
│ │ │ ├── emit("tool_execution_update") × N │
│ │ │ └── emit("tool_execution_end") │
│ │ ├── [可能多轮 turn] │
│ │ └── emit("agent_end") │
│ │ │
│ └──────────────────────────────────────────────┘
│
├── 设置 isStreaming = false
└── 清理 activeRun★ Insight ─────────────────────────────────────AbortController 是取消机制的核心。 在 Java 中你可能用 Thread.interrupt() 或 CancellationToken。TypeScript 的 AbortController 提供了标准化的取消协议——调用 controller.abort() 后,signal.aborted 变为 true,所有监听 signal 的异步操作都应该退出。Pi 在每个 prompt() 调用开始时创建一个新的 AbortController,确保并行调用之间互不干扰。 ─────────────────────────────────────────────────
6.6 Agent Loop:核心循环
这是整个项目最重要的算法。让我逐行解析:
// packages/agent/src/agent-loop.ts(简化并注释)
async function runLoop(
currentContext: AgentContext,
newMessages: AgentMessage[],
config: AgentLoopConfig,
signal: AbortSignal,
emit: AgentEventSink,
): Promise<void> {
// 外层循环:处理 follow-up 消息
while (true) {
// 内层循环:处理工具调用和 steering 消息
let hasMoreToolCalls = true;
let pendingMessages = await config.getSteeringMessages?.() || [];
while (hasMoreToolCalls || pendingMessages.length > 0) {
// 1. 注入 pending 消息(steering)
if (pendingMessages.length > 0) {
for (const msg of pendingMessages) {
await emit({ type: "message_start", message: msg });
await emit({ type: "message_end", message: msg });
currentContext.messages.push(msg);
}
pendingMessages = [];
}
// 2. 调用 LLM,流式获取响应
const message = await streamAssistantResponse(currentContext, config, signal, emit);
newMessages.push(message);
// 3. 如果出错,终止
if (message.stopReason === "error" || message.stopReason === "aborted") {
await emit({ type: "agent_end", messages: newMessages });
return;
}
// 4. 检查工具调用
const toolCalls = message.content.filter(c => c.type === "toolCall");
hasMoreToolCalls = false;
if (toolCalls.length > 0) {
// 5. 执行工具
const batch = await executeToolCalls(currentContext, message, config, signal, emit);
hasMoreToolCalls = !batch.terminate; // 如果所有工具都要求终止,停止
// 6. 将工具结果加入上下文
for (const result of batch.messages) {
currentContext.messages.push(result);
}
}
// 7. 检查新的 steering 消息
pendingMessages = await config.getSteeringMessages?.() || [];
}
// 8. 内层循环结束,检查 follow-up
const followUpMessages = await config.getFollowUpMessages?.() || [];
if (followUpMessages.length > 0) {
pendingMessages = followUpMessages; // 作为下一轮的 pending
continue; // 继续外层循环
}
break; // 没有更多消息,退出
}
await emit({ type: "agent_end", messages: newMessages });
}流程图:
┌─────────────────────────────────────────────────┐
│ while(true) 外层循环 │
│ │
│ ┌─────────────────────────────────────────────┐ │
│ │ while(有工具调用或pending消息) │ │
│ │ │ │
│ │ 1. 注入 steering 消息 │ │
│ │ 2. 调用 LLM │ │
│ │ 3. 如果 stopReason=toolUse → 执行工具 │ │
│ │ 工具结果 → 加入上下文 → 继续内层循环 │ │
│ │ 4. 如果 stopReason=stop → 退出内层循环 │ │
│ │ │ │
│ └─────────────────────────────────────────────┘ │
│ │
│ 5. 检查 follow-up 队列 │
│ 有消息 → 作为 pending → 继续外层循环 │
│ 无消息 → 退出 │
│ │
└─────────────────────────────────────────────────┘★ Insight ─────────────────────────────────────双层循环是 Agent 的精髓。 外层循环处理"用户追加的后续问题",内层循环处理"LLM 的工具调用链"。大多数时候你只需要内层循环——LLM 调用工具、拿到结果、再决定下一步。但当用户在 Agent 工作时追加了新问题(通过 followUp),外层循环确保这些问题最终会被处理。 ─────────────────────────────────────────────────
6.7 工具执行:顺序 vs 并行
当 LLM 一次请求多个工具调用时,Pi 支持两种执行策略:
type ToolExecutionMode = "sequential" | "parallel";顺序执行(sequential):
ToolCall 1 → execute → result 1 → ToolCall 2 → execute → result 2并行执行(parallel,默认):
ToolCall 1 ─→ execute ─┐
ToolCall 2 ─→ execute ─┼→ results → 按顺序组装
ToolCall 3 ─→ execute ─┘// 并行执行的核心逻辑
async function executeToolCallsParallel(...): Promise<ExecutedToolCallBatch> {
const prepared: FinalizedToolCallEntry[] = [];
// 1. 顺序准备(验证参数、检查权限)
for (const toolCall of toolCalls) {
const preparation = await prepareToolCall(...);
prepared.push(preparation);
}
// 2. 并行执行
const results = await Promise.all(
prepared.map(entry => typeof entry === "function" ? entry() : Promise.resolve(entry))
);
// 3. 按原始顺序组装结果
return { messages: results.map(createToolResultMessage), terminate: ... };
}★ Insight ─────────────────────────────────────为什么准备阶段是顺序的,执行阶段是并行的? 准备阶段需要验证参数和检查 beforeToolCall 钩子,这些操作可能有副作用(比如记录日志),需要按顺序执行。而实际的工具执行(读文件、执行命令)通常是独立的 I/O 操作,可以安全并行。这就是 Promise.all 的典型使用场景,类似于 Java 的 CompletableFuture.allOf()。 ─────────────────────────────────────────────────
6.8 beforeToolCall / afterToolCall 钩子
这是 Agent 最重要的扩展点:
interface AgentLoopConfig {
// 工具执行前的拦截器
beforeToolCall?: (
context: BeforeToolCallContext,
signal?: AbortSignal
) => Promise<BeforeToolCallResult | undefined>;
// 工具执行后的拦截器
afterToolCall?: (
context: AfterToolCallContext,
signal?: AbortSignal
) => Promise<AfterToolCallResult | undefined>;
}beforeToolCall 可以:
- 阻止工具执行(返回
{ block: true }) - 记录审计日志
- 修改参数
afterToolCall 可以:
- 替换工具结果
- 将成功结果改为错误(
isError: true) - 请求终止(
terminate: true)
⚡ Java 对照 ───────────────────────────────────── 这就是 Java 中 AOP(面向切面编程) 的函数式版本。beforeToolCall = @Before,afterToolCall = @After。在 Spring 中你可能用 @Around 注解来做类似的事。Pi 用回调函数代替了注解,更加灵活。 ─────────────────────────────────────────────────
6.9 PendingMessageQueue:消息队列
class PendingMessageQueue {
private messages: AgentMessage[] = [];
constructor(public mode: QueueMode) {} // "all" | "one-at-a-time"
enqueue(message: AgentMessage): void { ... }
hasItems(): boolean { ... }
drain(): AgentMessage[] {
if (this.mode === "all") {
const drained = this.messages.slice();
this.messages = [];
return drained;
}
// one-at-a-time: 每次只返回一条
const first = this.messages[0];
this.messages = this.messages.slice(1);
return first ? [first] : [];
}
}★ Insight ─────────────────────────────────────"all" vs "one-at-a-time" 模式的区别:"all" 模式一次性排空所有消息,适合批量处理;"one-at-a-time" 模式每次只返回一条,适合需要逐条处理的场景。默认的 steering 模式是 "one-at-a-time"——如果你快速输入了多条消息,它们会被逐条注入,而不是一次性全部注入。这让 Agent 能更及时地响应用户的每一条输入。 ─────────────────────────────────────────────────
6.10 本章小结
┌─────────────────────────────────────────────────────────────┐
│ Agent 类 │
│ │
│ ┌──────────┐ ┌──────────────┐ ┌──────────────────────┐ │
│ │ State │ │ Events │ │ Message Queues │ │
│ │ │ │ │ │ │ │
│ │ messages │ │ subscribe() │ │ steeringQueue │ │
│ │ tools │ │ emit() │ │ followUpQueue │ │
│ │ model │ │ │ │ │ │
│ └──────────┘ └──────────────┘ └──────────────────────┘ │
│ │
│ ┌──────────────────────────────────────────────────────────┐│
│ │ Agent Loop ││
│ │ ││
│ │ while(true) { ││
│ │ while(有工具调用 || 有pending消息) { ││
│ │ 注入steering → 调用LLM → 执行工具 → 结果入上下文 ││
│ │ } ││
│ │ 检查follow-up → 有则继续外层循环 ││
│ │ } ││
│ └──────────────────────────────────────────────────────────┘│
│ │
│ 扩展点: beforeToolCall / afterToolCall │
└─────────────────────────────────────────────────────────────┘