第三章:AI 层 -- 统一的 LLM 调用抽象
"抽象不是简化复杂性,而是管理复杂性。"
3.1 问题:25 个提供商,一个接口
想象一下:你的系统需要同时支持 MySQL、PostgreSQL、Oracle、MongoDB、Redis。你会怎么做?
答案是抽象层——定义一个统一的接口,让上层代码不关心底层实现。
Pi 面临同样的问题:它需要支持 Anthropic、OpenAI、Google、Mistral、DeepSeek 等 25+ 个 AI 提供商,每个都有不同的 API 格式、认证方式、流式响应协议。Pi 的解决方案是 packages/ai 包。
📌 源码定位 ───────────────────────────────────── 核心文件:
packages/ai/src/types.ts-- 类型定义packages/ai/src/api-registry.ts-- Provider 注册表packages/ai/src/stream.ts-- 统一入口函数─────────────────────────────────────────────────
3.2 核心类型:一切的基础
Model -- 模型描述
// packages/ai/src/types.ts
interface Model<TApi extends Api> {
id: string; // "claude-sonnet-4-20250514"
name: string; // "Claude Sonnet 4"
api: TApi; // "anthropic-messages"
provider: Provider; // "anthropic"
baseUrl: string; // "https://api.anthropic.com"
reasoning: boolean; // 是否支持推理/思考
input: ("text" | "image")[]; // 支持的输入类型
cost: {
input: number; // $/百万 token
output: number;
cacheRead: number;
cacheWrite: number;
};
contextWindow: number; // 最大上下文窗口
maxTokens: number; // 最大输出 token
headers?: Record<string, string>;
compat?: ...; // 兼容性配置(条件类型)
}⚡ Java 对照 ───────────────────────────────────── 这就像一个 JPA 实体类,描述了"数据库连接"的所有元信息。Model<Api> 的泛型参数 TApi 决定了使用哪种通信协议(类似于 JDBC URL 的协议部分决定使用哪个驱动)。 ─────────────────────────────────────────────────
Message -- 消息体系
这是整个项目最核心的类型之一:
// 三种消息角色
type Message = UserMessage | AssistantMessage | ToolResultMessage;
interface UserMessage {
role: "user";
content: string | (TextContent | ImageContent)[];
timestamp: number;
}
interface AssistantMessage {
role: "assistant";
content: (TextContent | ThinkingContent | ToolCall)[];
api: Api;
provider: Provider;
model: string;
usage: Usage;
stopReason: "stop" | "length" | "toolUse" | "error" | "aborted";
errorMessage?: string;
timestamp: number;
}
interface ToolResultMessage {
role: "toolResult";
toolCallId: string;
toolName: string;
content: (TextContent | ImageContent)[];
details?: unknown;
isError: boolean;
timestamp: number;
}★ Insight ─────────────────────────────────────消息流的生命周期:UserMessage → LLM → AssistantMessage(可能包含 ToolCall)→ 执行工具 → ToolResultMessage → LLM → AssistantMessage → ... 这个循环就是 Agent 的本质。每条消息都有 timestamp,因为会话需要持久化到磁盘,时间戳是恢复顺序的关键。 ─────────────────────────────────────────────────
Content Types -- 内容类型
interface TextContent {
type: "text";
text: string;
}
interface ThinkingContent {
type: "thinking";
thinking: string;
redacted?: boolean; // 被安全过滤器编辑时为 true
}
interface ImageContent {
type: "image";
data: string; // base64 编码
mimeType: string; // "image/png"
}
interface ToolCall {
type: "toolCall";
id: string; // 唯一 ID,用于匹配 ToolResult
name: string; // 工具名,如 "read", "bash"
arguments: Record<string, any>; // 工具参数
}⚡ Java 对照 ─────────────────────────────────────
// Java 等价:sealed interface + record
sealed interface Content permits TextContent, ThinkingContent, ImageContent, ToolCall {}
record TextContent(String text) implements Content {}
record ThinkingContent(String thinking, boolean redacted) implements Content {}
record ImageContent(byte[] data, String mimeType) implements Content {}
record ToolCall(String id, String name, Map<String, Object> arguments) implements Content {}─────────────────────────────────────────────────
Context -- LLM 请求上下文
interface Context {
systemPrompt?: string;
messages: Message[];
tools?: Tool[];
}这就是每次调用 LLM 时传入的全部信息。没有任何提供商特定的字段——所有差异都被 Provider 层吸收了。
3.3 Provider 注册表:ServiceLoader 模式
Pi 使用全局注册表模式来管理 AI 提供商,这和 Java 的 ServiceLoader 非常相似。
// packages/ai/src/api-registry.ts
const apiProviderRegistry = new Map<string, RegisteredApiProvider>();
export function registerApiProvider<TApi extends Api, TOptions extends StreamOptions>(
provider: ApiProvider<TApi, TOptions>,
sourceId?: string,
): void {
apiProviderRegistry.set(provider.api, { provider, sourceId });
}
export function getApiProvider(api: Api): ApiProviderInternal | undefined {
return apiProviderRegistry.get(api);
}⚡ Java 对照 ─────────────────────────────────────
| TypeScript | Java |
|---|---|
new Map<string, Provider>() | Map<String, Provider> 或 ServiceLoader |
registerApiProvider(provider) | ServiceLoader.register(provider) |
getApiProvider(api) | ServiceLoader.load(Api.class).findFirst() |
| 模块导入时自动注册 | META-INF/services 自动发现 |
─────────────────────────────────────────────────
Provider 的注册发生在 packages/ai/src/providers/register-builtins.ts 中,通过动态 import 按需加载:
// 简化的注册逻辑
export async function registerBuiltInProviders() {
const { registerAnthropicProvider } = await import("./anthropic.js");
registerAnthropicProvider();
const { registerOpenAIProvider } = await import("./openai.js");
registerOpenAIProvider();
// ... 18 个 provider
}★ Insight ─────────────────────────────────────为什么用动态 import 而不是静态 import? 因为每个 Provider 都有自己的 SDK 依赖(@anthropic-ai/sdk、openai、@google/genai 等)。如果静态导入,启动时就要加载所有 SDK,即使你只用其中一个。动态 import 实现了懒加载——只有实际使用某个 Provider 时才会加载对应的 SDK。这和 Java 中 OSGi 的 bundle 懒加载是同一个思路。 ─────────────────────────────────────────────────
3.4 streamSimple:统一入口
这是你作为 Agent 开发者最常接触的函数:
// packages/ai/src/stream.ts
export function streamSimple<TApi extends Api>(
model: Model<TApi>,
context: Context,
options?: SimpleStreamOptions,
): AssistantMessageEventStream {
const provider = resolveApiProvider(model.api);
return provider.streamSimple(model, context, options);
}调用链:
你的代码
→ streamSimple(model, context, options)
→ resolveApiProvider(model.api) // 从注册表找到 Provider
→ provider.streamSimple(...) // 调用具体实现
→ Anthropic SDK / OpenAI SDK // 底层 HTTP 调用★ Insight ─────────────────────────────────────AssistantMessageEventStream 是一个异步迭代器。 它不是一次性返回结果,而是像 Java 的 Flow.Publisher 一样,逐块推送数据。你可以用 for await (const event of stream) 来消费它。这种流式设计对 Agent 至关重要——用户可以看到 LLM 的回复逐字出现,而不是等几秒钟后一次性显示。 ─────────────────────────────────────────────────
3.5 流式事件协议
LLM 的流式响应遵循一个标准事件协议:
type AssistantMessageEvent =
| { type: "start"; partial: AssistantMessage }
| { type: "text_start"; contentIndex: number; partial: AssistantMessage }
| { type: "text_delta"; contentIndex: number; delta: string; partial: AssistantMessage }
| { type: "text_end"; contentIndex: number; content: string; partial: AssistantMessage }
| { type: "thinking_start"; contentIndex: number; partial: AssistantMessage }
| { type: "thinking_delta"; contentIndex: number; delta: string; partial: AssistantMessage }
| { type: "thinking_end"; contentIndex: number; content: string; partial: AssistantMessage }
| { type: "toolcall_start"; contentIndex: number; partial: AssistantMessage }
| { type: "toolcall_delta"; contentIndex: number; delta: string; partial: AssistantMessage }
| { type: "toolcall_end"; contentIndex: number; toolCall: ToolCall; partial: AssistantMessage }
| { type: "done"; reason: StopReason; message: AssistantMessage }
| { type: "error"; reason: StopReason; error: AssistantMessage };事件流的生命周期:
start → [text_start → text_delta* → text_end] → [toolcall_start → toolcall_delta* → toolcall_end] → done
\___ 可以有多个文本块 ___/ \___ 可以有多个工具调用 ___/★ Insight ─────────────────────────────────────每个事件都携带 partial: AssistantMessage。 这意味着你随时可以拿到"当前累积的完整消息",而不仅仅是增量。这种设计让 UI 更新变得简单——你不需要自己拼接 delta,直接用 partial 渲染即可。 ─────────────────────────────────────────────────
3.6 一个完整的调用示例
让我们跟踪一次完整的 LLM 调用:
import { streamSimple } from "@mariozechner/pi-ai";
// 1. 构建上下文
const context: Context = {
systemPrompt: "You are a helpful coding assistant.",
messages: [
{ role: "user", content: "Read the file package.json", timestamp: Date.now() }
],
tools: [
{
name: "read",
description: "Read a file from disk",
parameters: Type.Object({ path: Type.String() })
}
]
};
// 2. 选择模型
const model = { id: "claude-sonnet-4-20250514", api: "anthropic-messages", ... };
// 3. 流式调用
const stream = streamSimple(model, context, { reasoning: "medium" });
// 4. 消费事件
for await (const event of stream) {
switch (event.type) {
case "text_delta":
process.stdout.write(event.delta); // 逐字输出
break;
case "toolcall_end":
console.log(`\nTool call: ${event.toolCall.name}(${JSON.stringify(event.toolCall.arguments)})`);
break;
case "done":
console.log(`\nDone. Tokens used: ${event.message.usage.totalTokens}`);
break;
}
}3.7 Provider 实现模式
每个 Provider 都遵循相同的接口:
// 以 Anthropic 为例(简化)
export function registerAnthropicProvider() {
registerApiProvider({
api: "anthropic-messages",
stream: (model, context, options) => {
// 1. 将标准 Context 转换为 Anthropic 格式
const anthropicPayload = convertToAnthropicFormat(context);
// 2. 调用 Anthropic SDK
const response = anthropic.messages.stream({ ...anthropicPayload });
// 3. 将 Anthropic 事件转换为标准 AssistantMessageEvent
return new AssistantMessageEventStream(async function* () {
for await (const chunk of response) {
yield convertToStandardEvent(chunk);
}
});
},
streamSimple: (model, context, options) => {
// 添加推理级别等选项处理
return streamWithReasoning(model, context, options);
}
});
}★ Insight ─────────────────────────────────────每个 Provider 的核心工作就是"翻译":把 Pi 的标准格式翻译成提供商的原生格式,再把提供商的响应翻译回 Pi 的标准格式。这和 JDBC 驱动把标准 SQL 翻译成数据库方言是完全一样的模式。如果你要添加一个新的 AI 提供商,你只需要实现这个翻译层。 ─────────────────────────────────────────────────
3.8 本章小结
调用者代码
│
▼
streamSimple(model, context, options)
│
▼
┌─────────────────────────────┐
│ API Provider Registry │ ← 全局 Map,ServiceLoader 模式
│ Map<Api, ApiProvider> │
└──────────────┬──────────────┘
│
┌──────────┼──────────┐
▼ ▼ ▼
┌────────┐ ┌────────┐ ┌────────┐
│Anthropic│ │ OpenAI │ │ Google │ ← 18 个 Provider 实现
│Provider │ │Provider│ │Provider│
└────────┘ └────────┘ └────────┘
│ │ │
▼ ▼ ▼
SDK SDK SDK ← 各提供商的原生 SDK你需要记住的:
streamSimple(model, context, options)是唯一入口Context={ systemPrompt, messages, tools }是标准请求格式AssistantMessageEventStream是流式响应- Provider 注册表是全局 Map,通过
registerApiProvider()注册