第 8 章:并行 — 多件事同时发生时,秩序从哪里来
从一道面试题开始
订单详情页需要同时请求两个接口:
fetchOrder和fetchInventory。两边都拿到之后,合并成Detail返回。两个请求之间没有依赖,应该并发执行。
任何用过 async/await 的人都能写出这段代码。但真正的问题在后面:
- 如果
fetchInventory失败了,fetchOrder要不要取消? - 如果整个请求超时了,两个子任务怎么回收?
- 这里的 "同时" 是在一个线程上交替执行,还是在多个线程上真正并行?
对前两个问题的回答,区分了初级和高级工程师。对第三个问题的回答,区分了 "会用语法" 和 "理解并发模型"。
8.1 语法基础:8 种语言如何让多件事同时发生
Python(asyncio)
import asyncio
async def load_detail(order_id: str) -> Detail:
# 并发等待两个协程
order, inventory = await asyncio.gather(
fetch_order(order_id),
fetch_inventory(order_id),
)
return merge(order, inventory)
# 运行
detail = asyncio.run(load_detail("A1001"))
# 带超时和取消
async def load_detail_safe(order_id: str, timeout: float = 5.0) -> Detail:
try:
order, inventory = await asyncio.wait_for(
asyncio.gather(
fetch_order(order_id),
fetch_inventory(order_id),
),
timeout=timeout
)
return merge(order, inventory)
except asyncio.TimeoutError:
# gather 内的两个任务都会被取消
raiseJavaScript
async function loadDetail(orderId) {
const [order, inventory] = await Promise.all([
fetchOrder(orderId),
fetchInventory(orderId),
]);
return merge(order, inventory);
}
// 带超时
async function loadDetailSafe(orderId, timeout = 5000) {
const controller = new AbortController();
const timeoutId = setTimeout(() => controller.abort(), timeout);
try {
const [order, inventory] = await Promise.all([
fetchOrder(orderId, { signal: controller.signal }),
fetchInventory(orderId, { signal: controller.signal }),
]);
return merge(order, inventory);
} finally {
clearTimeout(timeoutId);
}
}Java
// CompletableFuture
CompletableFuture<Detail> loadDetail(String orderId) {
var orderFuture = CompletableFuture.supplyAsync(() -> fetchOrder(orderId));
var inventoryFuture = CompletableFuture.supplyAsync(() -> fetchInventory(orderId));
return orderFuture.thenCombine(inventoryFuture, this::merge);
}
// 虚拟线程(Java 21+):写起来像同步代码,跑起来像异步
Detail loadDetailVT(String orderId) throws InterruptedException {
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
var orderFuture = executor.submit(() -> fetchOrder(orderId));
var inventoryFuture = executor.submit(() -> fetchInventory(orderId));
return merge(orderFuture.get(), inventoryFuture.get());
}
}C++
// std::async + std::future
Detail load_detail(const std::string& order_id) {
auto order_future = std::async(std::launch::async, fetch_order, order_id);
auto inventory_future = std::async(std::launch::async, fetch_inventory, order_id);
return merge(order_future.get(), inventory_future.get());
}
// C++20 协程(co_await)
task<Detail> load_detail_coro(const std::string& order_id) {
auto [order, inventory] = co_await when_all(
fetch_order_async(order_id),
fetch_inventory_async(order_id)
);
co_return merge(order, inventory);
}Rust(Tokio)
use tokio;
async fn load_detail(order_id: &str) -> Detail {
let (order, inventory) = tokio::join!(
fetch_order(order_id),
fetch_inventory(order_id),
);
merge(order, inventory)
}
// 带超时和取消
async fn load_detail_safe(order_id: &str) -> Result<Detail, tokio::time::error::Elapsed> {
tokio::time::timeout(Duration::from_secs(5), async {
let (order, inventory) = tokio::join!(
fetch_order(order_id),
fetch_inventory(order_id),
);
merge(order, inventory)
}).await
}
// timeout 到期 → 内部 future 被 drop → 子任务被取消Go
func LoadDetail(orderID string) (Detail, error) {
orderCh := make(chan Order, 1)
inventoryCh := make(chan Inventory, 1)
errCh := make(chan error, 2)
go func() {
order, err := FetchOrder(orderID)
if err != nil {
errCh <- err
return
}
orderCh <- order
}()
go func() {
inventory, err := FetchInventory(orderID)
if err != nil {
errCh <- err
return
}
inventoryCh <- inventory
}()
// 收集结果
var order Order
var inventory Inventory
var errs []error
for i := 0; i < 2; i++ {
select {
case order = <-orderCh:
case inventory = <-inventoryCh:
case err := <-errCh:
errs = append(errs, err)
}
}
if len(errs) > 0 {
return Detail{}, errs[0]
}
return Merge(order, inventory), nil
}Swift
func loadDetail(orderId: String) async throws -> Detail {
async let order = fetchOrder(orderId: orderId)
async let inventory = fetchInventory(orderId: orderId)
return try await merge(order, inventory)
// try await 时如果其中一个失败,另一个自动取消
}
// 带超时
func loadDetailSafe(orderId: String) async throws -> Detail {
try await withThrowingTaskGroup(of: Any.self) { group in
group.addTask { try await fetchOrder(orderId: orderId) }
group.addTask { try await fetchInventory(orderId: orderId) }
// 第一个失败 → group 中其他 task 自动取消
// group 离开作用域 → 所有 task 等待完成或取消
}
}Kotlin
suspend fun loadDetail(orderId: String): Detail = coroutineScope {
val order = async { fetchOrder(orderId) }
val inventory = async { fetchInventory(orderId) }
merge(order.await(), inventory.await())
// coroutineScope 保证:子协程不会泄漏到外部
}
// 带超时
suspend fun loadDetailSafe(orderId: String): Detail = coroutineScope {
withTimeout(5000) {
val order = async { fetchOrder(orderId) }
val inventory = async { fetchInventory(orderId) }
merge(order.await(), inventory.await())
}
// 超时 → coroutineScope 取消 → 所有子协程取消
}语法速查:并发模型
| 能力 | Python | JavaScript | Java | C++ | Rust | Go | Swift | Kotlin |
|---|---|---|---|---|---|---|---|---|
| 并发模型 | asyncio(协程) | 事件循环 + Promise | 线程池 + CompletableFuture | 线程 + async/future | Tokio(async runtime) | goroutine + channel | async/await + TaskGroup | 协程(coroutineScope) |
| 默认 I/O 并发 | asyncio | 原生 | CompletableFuture | std::async | Tokio | goroutine(原生) | async/await(原生) | 协程(原生) |
| 结构化并发 | asyncio.gather | Promise.all(较弱) | 无(手动 join) | 无(手动 get) | tokio::join! | 无(手动 channel/select) | TaskGroup(强制) | coroutineScope(强制) |
| 取消传播 | gather 取消传播 | AbortController | Future.cancel | 无标准机制 | drop = 取消 | context.Context | Task 取消自动传播 | Scope 取消自动传播 |
| 数据竞争保护 | 约定(asyncio 单线程) | 约定(单线程事件循环) | synchronized / Lock | mutex / atomic | 编译期阻止 | channel / sync.Mutex | actor / Sendable | 约定 + @Volatile |
| 多核并行 | multiprocessing | Worker threads | 线程池 | 线程 | tokio::spawn / rayon | goroutine(多核) | async + Task | 协程 + Dispatchers.Default |
8.2 深度对照一:结构化并发——谁在帮你看管子任务
这是现代并发设计最重要的一条分界线。
传统并发:任务像野草
// JavaScript:Promise 不强制父子关系
async function loadDetail(orderId) {
const orderPromise = fetchOrder(orderId);
const inventoryPromise = fetchInventory(orderId);
// 如果这里抛异常,两个 Promise 还在跑
// 没有任何机制自动取消它们
return merge(await orderPromise, await inventoryPromise);
}// Go:goroutine 没有父子关系
func loadDetail(orderID string) {
go func() { fetchOrder(orderID) }() // 启动了
go func() { fetchInventory(orderID) }() // 启动了
// 如果 loadDetail 返回了,两个 goroutine 继续跑
// 没有任何机制自动停止它们
}结构化并发:任务有来处,也有归处
// Kotlin:coroutineScope 是子任务的边界
suspend fun loadDetail(orderId: String): Detail = coroutineScope {
val order = async { fetchOrder(orderId) }
val inventory = async { fetchInventory(orderId) }
// 如果 merge 抛异常 → scope 被取消 → order 和 inventory 都被取消
// scope 结束时 → 所有子协程保证完成或取消 → 不会泄漏
merge(order.await(), inventory.await())
}// Swift:TaskGroup 类似
func loadDetail(orderId: String) async throws -> Detail {
try await withThrowingTaskGroup(of: (Order, Inventory).self) { group in
// group 结束时,所有子 task 自动等待完成
// 如果抛异常,所有子 task 自动被取消
}
}// Rust:tokio::join! 是局部的结构化并发
let (order, inventory) = tokio::join!(
fetch_order(order_id),
fetch_inventory(order_id),
);
// join! 保证:返回时所有分支都完成了(或取消了)关键差异
| 传统并发 | 结构化并发 | |
|---|---|---|
| 任务关系 | 平等——没有父子 | 父子——scope 是父,task 是子 |
| 生命周期 | 难追踪 | scope 结束 = 全部结束 |
| 错误传播 | 手动 | 自动——子错 → scope 错 → 其余子被取消 |
| 资源泄漏 | 容易 | scope 保证不会泄漏 |
| 代表语言 | JS、Go、Python asyncio | Kotlin、Swift、Rust(部分) |
结构化并发是并发编程近年最重要的范式进化。它的核心思想很简单:子任务的生命周期不应该超过父任务的生命周期。 Kotlin 和 Swift 把这个思想做进了语言设计;Go 和 JavaScript 把它交给了程序员(和 linter)。
8.3 深度对照二:取消——谁能叫停已经起飞的任务
合作式取消(Go、Python、C++)
任务不会强制停止——你得定期检查 "我是不是该停了"。
// Go:context 传递取消信号
func FetchOrder(ctx context.Context, orderID string) (Order, error) {
select {
case <-ctx.Done():
return Order{}, ctx.Err() // "我该停了"
case result := <-doFetch(orderID):
return result, nil
}
}Go 不强制你检查 ctx.Done()。如果你忘了,任务就会在父请求已经超时之后继续运行。
结构化取消(Kotlin、Swift)
// Kotlin:coroutineScope 取消自动向下传播
suspend fun loadDetail(orderId: String): Detail = coroutineScope {
withTimeout(5000) { // 超时 → 取消 scope → 取消所有子协程
val order = async { fetchOrder(orderId) }
val inventory = async { fetchInventory(orderId) }
merge(order.await(), inventory.await()) // 抛异常也会取消对方
}
}Kotlin 的协程在挂起点检查取消。你不需要手动传 context——结构化并发保证取消向下传播。
Drop 即取消(Rust)
// Rust:drop future = 取消任务
let future = tokio::time::timeout(Duration::from_secs(5), fetch_data());
// 如果 timeout future 先完成(超时)
// → fetch_data future 被 drop
// → 其内部资源被释放
// → 不能 "忘记" 取消Rust 的方式最彻底:取消不是发信号,而是直接销毁 future。这种方式保证了资源释放,但也要求代码对 "随时可能被 drop" 做好准备(这恰是 Rust async 编程最难的部分)。
8.4 深度对照三:共享状态——谁在保护你不犯错
无保护(需要你自己注意)
# Python asyncio:单线程事件循环,await 之间的代码是原子的
# 但两个 await 之间,其他协程可能运行
count = 0
async def increment():
global count
tmp = count # await 之前 —— 安全
await asyncio.sleep(0) # 这里可能有其他协程也读 count
count = tmp + 1 # await 之后 —— count 可能被改了!Python asyncio 的单线程模型让很多并发问题被隐藏了——数据竞争不会发生(因为是单线程),但逻辑竞争(两个协程读到同一个值然后各自加 1)仍然会发生。
约定保护(Go:CSP,"用通信来共享内存")
// Go:channel 传递数据,而不是共享数据
// "Don't communicate by sharing memory; share memory by communicating."
// 正确做法:通过 channel 传递
func processOrders(orders []Order) {
resultCh := make(chan Result)
for _, o := range orders {
go func(order Order) {
result := process(order)
resultCh <- result // 发送结果,而不是写入共享变量
}(o)
}
// 收集结果
}Go 的 CSP 哲学很优雅,但 Go 不会阻止你 直接共享变量。sync.Mutex 的存在本身就说明:当 channel 不方便时,Go 也允许你去共享内存——只是你要自己承担后果。
编译期阻止(Rust)
// Rust:Send + Sync trait 控制线程安全
use std::thread;
let orders = vec![order1, order2];
let handle = thread::spawn(move || {
// orders 的所有权转移到了新线程里
process_orders(&orders); // 主线程不能再访问 orders
});
// process_orders(&orders); // 编译错误!orders 已移走
// 如果需要共享:
let orders = Arc::new(Mutex::new(vec![order1, order2]));
// Arc = 原子引用计数,Mutex = 互斥锁
// 编译器强制你通过 Mutex 访问共享数据Rust 的 Send 和 Sync trait 是并发安全的类型级保证。一个类型如果不能安全地在线程间传递,编译器就不让你传。这不是约定,是物理定律。
语言级 Actor(Swift)
// Swift 5.5+:actor 提供数据竞争保护
actor OrderCache {
private var orders: [String: Order] = [:]
func get(_ id: String) -> Order? {
return orders[id]
}
func set(_ order: Order) {
orders[order.orderId] = order
}
}
// 从外部访问 actor 必须 await
let cache = OrderCache()
await cache.set(order)Swift 的 actor 模型保证:同一时间只有一个任务能访问 actor 的内部状态。编译器强制执行这个规则——你不需要手动加锁。
8.5 边界探索:跨语言并发陷阱
陷阱一:JavaScript → Go:以为 await 和 goroutine 一样轻松
// JavaScript:Promise.all 自动管理
const [a, b] = await Promise.all([fetchA(), fetchB()]);
// a 和 b 都拿到了,子任务都结束了// Go:goroutine 启动了,但没人在终点等你
go fetchA()
go fetchB()
// fetchA 和 fetchB 可能还没跑完,主函数已经返回了从 JS 切到 Go,最容易出的问题是:goroutine 不会被 await。 你必须自己用 channel、WaitGroup 或 errgroup 来收集结果和等待完成。
陷阱二:Go → Kotlin:goroutine 泄漏变成协程泄漏
Go 的 goroutine 泄漏通常是因为阻塞在 channel 上。Kotlin 的协程泄漏通常是因为 scope 管理不当。从 Go 过来的人容易用 Go 的心智模型("起一个 goroutine 很便宜,多起几个无所谓")操作 Kotlin 协程——但结构化并发要求你给每个协程一个 scope。
陷阱三:Python asyncio → Rust async:单线程心智 vs 多线程心智
Python asyncio 是单线程的——你不用想数据竞争。Rust async 默认可以是多线程的——你需要想 Send、Sync、Mutex。从 Python asyncio 切到 Rust async,你不只换了语法,你换了一整套并发心智模型。
陷阱四:Java → Go:从线程池心智到 goroutine 心智
Java 开发者习惯了线程池(固定大小、拒绝策略、队列满的处理)。Go 的 goroutine 让线程池看似不需要了——但如果你不控制并发度,你仍然需要某种形式的限流(semaphore、worker pool)。go func() 太便宜了,便宜到你忘了控制。
8.6 洞察:并发的难度不在 "跑起来",而在 "收回来"
如果你只用一门语言写并发,你不会发现:
所有并发模型的差异,表面上是语法和 API 的差异。深层上是 "多件事同时进行时,你怎么收回来" 这个问题的不同回答。
Go 用 channel 和 select 收回结果。JavaScript/Python 用 await Promise.all / asyncio.gather。Java 用 CompletableFuture.allOf。Kotlin/Swift 用结构化并发——scope 结束就是收回。Rust 用 join! 宏同时驱动多个 future,drop 就是取消。
但 "收回来" 不仅仅是收集结果。它包括:
- 错误时怎么办——一个子任务失败了,其他子任务要不要取消?(结构化并发说 "要";传统并发说 "你自己决定")
- 超时时怎么办——任务还在跑,但调用者不等了,谁来停止它们?(Go 的 context、Rust 的 drop、Kotlin/Swift 的 scope 取消)
- 资源怎么回收——子任务中打开的文件、持有的锁、分配的内存,在取消时如何善后?
最容易写的并发代码是 "启动一切" 的代码。最难写的并发代码是 "在一切出错时,把一切干净收回" 的代码。
而不同的语言,给了你不同力度的 "收回" 机制。Rust 的 Drop 保证资源回收(最彻底)。Swift/Kotlin 的 scope 保证子任务不泄漏(最优雅)。Go 的 context 让你选择是否合作取消(最灵活但也最容易忘)。JavaScript 的传统 Promise 模式在 "收回" 这件事上几乎不提供任何帮助。
本章小结
并发模型不是语法风格,而是系统治理方式。 JS/Python 擅长 I/O 并发,Java/C++ 控制力强但秩序自己守,Go 让并发平民化但边界要自己设计,Swift/Kotlin 的结构化并发让任务来去有踪。
结构化并发是现代并发设计最重要的进化。 子任务的生命周期被纳入父任务的 scope,错误和取消自动向下传播。Kotlin 和 Swift 做进了语言,Go 和 JS 留给了程序员。
取消不是可选项。 真实系统里,超时、用户取消、上游失败每天都在发生。Go 用 context,Kotlin/Swift 用 scope 取消,Rust 用 drop——每种机制的力度不同。
共享状态的保护力度从 "靠自觉" 到 "编译期强制" 有一条完整的光谱。 哪门语言适合你,取决于你更相信自己的纪律,还是编译器的纪律。
如果你只用一门语言,你不会学到这个:
所有并发问题,最后都是同一个问题:当一个东西同时被多个人碰的时候,系统怎么不陷入混乱。而不同语言的回答,构成了从 "你最好小心点" 到 "你碰不了这个" 的完整谱系。