Skip to content

第 8 章:并行 — 多件事同时发生时,秩序从哪里来

从一道面试题开始

订单详情页需要同时请求两个接口:fetchOrderfetchInventory。两边都拿到之后,合并成 Detail 返回。两个请求之间没有依赖,应该并发执行。

任何用过 async/await 的人都能写出这段代码。但真正的问题在后面:

  • 如果 fetchInventory 失败了,fetchOrder 要不要取消?
  • 如果整个请求超时了,两个子任务怎么回收?
  • 这里的 "同时" 是在一个线程上交替执行,还是在多个线程上真正并行?

对前两个问题的回答,区分了初级和高级工程师。对第三个问题的回答,区分了 "会用语法" 和 "理解并发模型"。


8.1 语法基础:8 种语言如何让多件事同时发生

Python(asyncio)

python
import asyncio

async def load_detail(order_id: str) -> Detail:
    # 并发等待两个协程
    order, inventory = await asyncio.gather(
        fetch_order(order_id),
        fetch_inventory(order_id),
    )
    return merge(order, inventory)

# 运行
detail = asyncio.run(load_detail("A1001"))

# 带超时和取消
async def load_detail_safe(order_id: str, timeout: float = 5.0) -> Detail:
    try:
        order, inventory = await asyncio.wait_for(
            asyncio.gather(
                fetch_order(order_id),
                fetch_inventory(order_id),
            ),
            timeout=timeout
        )
        return merge(order, inventory)
    except asyncio.TimeoutError:
        # gather 内的两个任务都会被取消
        raise

JavaScript

javascript
async function loadDetail(orderId) {
    const [order, inventory] = await Promise.all([
        fetchOrder(orderId),
        fetchInventory(orderId),
    ]);
    return merge(order, inventory);
}

// 带超时
async function loadDetailSafe(orderId, timeout = 5000) {
    const controller = new AbortController();

    const timeoutId = setTimeout(() => controller.abort(), timeout);

    try {
        const [order, inventory] = await Promise.all([
            fetchOrder(orderId, { signal: controller.signal }),
            fetchInventory(orderId, { signal: controller.signal }),
        ]);
        return merge(order, inventory);
    } finally {
        clearTimeout(timeoutId);
    }
}

Java

java
// CompletableFuture
CompletableFuture<Detail> loadDetail(String orderId) {
    var orderFuture = CompletableFuture.supplyAsync(() -> fetchOrder(orderId));
    var inventoryFuture = CompletableFuture.supplyAsync(() -> fetchInventory(orderId));
    return orderFuture.thenCombine(inventoryFuture, this::merge);
}

// 虚拟线程(Java 21+):写起来像同步代码,跑起来像异步
Detail loadDetailVT(String orderId) throws InterruptedException {
    try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
        var orderFuture = executor.submit(() -> fetchOrder(orderId));
        var inventoryFuture = executor.submit(() -> fetchInventory(orderId));
        return merge(orderFuture.get(), inventoryFuture.get());
    }
}

C++

cpp
// std::async + std::future
Detail load_detail(const std::string& order_id) {
    auto order_future = std::async(std::launch::async, fetch_order, order_id);
    auto inventory_future = std::async(std::launch::async, fetch_inventory, order_id);
    return merge(order_future.get(), inventory_future.get());
}

// C++20 协程(co_await)
task<Detail> load_detail_coro(const std::string& order_id) {
    auto [order, inventory] = co_await when_all(
        fetch_order_async(order_id),
        fetch_inventory_async(order_id)
    );
    co_return merge(order, inventory);
}

Rust(Tokio)

rust
use tokio;

async fn load_detail(order_id: &str) -> Detail {
    let (order, inventory) = tokio::join!(
        fetch_order(order_id),
        fetch_inventory(order_id),
    );
    merge(order, inventory)
}

// 带超时和取消
async fn load_detail_safe(order_id: &str) -> Result<Detail, tokio::time::error::Elapsed> {
    tokio::time::timeout(Duration::from_secs(5), async {
        let (order, inventory) = tokio::join!(
            fetch_order(order_id),
            fetch_inventory(order_id),
        );
        merge(order, inventory)
    }).await
}
// timeout 到期 → 内部 future 被 drop → 子任务被取消

Go

go
func LoadDetail(orderID string) (Detail, error) {
    orderCh := make(chan Order, 1)
    inventoryCh := make(chan Inventory, 1)
    errCh := make(chan error, 2)

    go func() {
        order, err := FetchOrder(orderID)
        if err != nil {
            errCh <- err
            return
        }
        orderCh <- order
    }()

    go func() {
        inventory, err := FetchInventory(orderID)
        if err != nil {
            errCh <- err
            return
        }
        inventoryCh <- inventory
    }()

    // 收集结果
    var order Order
    var inventory Inventory
    var errs []error

    for i := 0; i < 2; i++ {
        select {
        case order = <-orderCh:
        case inventory = <-inventoryCh:
        case err := <-errCh:
            errs = append(errs, err)
        }
    }

    if len(errs) > 0 {
        return Detail{}, errs[0]
    }
    return Merge(order, inventory), nil
}

Swift

swift
func loadDetail(orderId: String) async throws -> Detail {
    async let order = fetchOrder(orderId: orderId)
    async let inventory = fetchInventory(orderId: orderId)
    return try await merge(order, inventory)
    // try await 时如果其中一个失败,另一个自动取消
}

// 带超时
func loadDetailSafe(orderId: String) async throws -> Detail {
    try await withThrowingTaskGroup(of: Any.self) { group in
        group.addTask { try await fetchOrder(orderId: orderId) }
        group.addTask { try await fetchInventory(orderId: orderId) }

        // 第一个失败 → group 中其他 task 自动取消
        // group 离开作用域 → 所有 task 等待完成或取消
    }
}

Kotlin

kotlin
suspend fun loadDetail(orderId: String): Detail = coroutineScope {
    val order = async { fetchOrder(orderId) }
    val inventory = async { fetchInventory(orderId) }
    merge(order.await(), inventory.await())
    // coroutineScope 保证:子协程不会泄漏到外部
}

// 带超时
suspend fun loadDetailSafe(orderId: String): Detail = coroutineScope {
    withTimeout(5000) {
        val order = async { fetchOrder(orderId) }
        val inventory = async { fetchInventory(orderId) }
        merge(order.await(), inventory.await())
    }
    // 超时 → coroutineScope 取消 → 所有子协程取消
}

语法速查:并发模型

能力PythonJavaScriptJavaC++RustGoSwiftKotlin
并发模型asyncio(协程)事件循环 + Promise线程池 + CompletableFuture线程 + async/futureTokio(async runtime)goroutine + channelasync/await + TaskGroup协程(coroutineScope)
默认 I/O 并发asyncio原生CompletableFuturestd::asyncTokiogoroutine(原生)async/await(原生)协程(原生)
结构化并发asyncio.gatherPromise.all(较弱)无(手动 join)无(手动 get)tokio::join!无(手动 channel/select)TaskGroup(强制)coroutineScope(强制)
取消传播gather 取消传播AbortControllerFuture.cancel无标准机制drop = 取消context.ContextTask 取消自动传播Scope 取消自动传播
数据竞争保护约定(asyncio 单线程)约定(单线程事件循环)synchronized / Lockmutex / atomic编译期阻止channel / sync.Mutexactor / Sendable约定 + @Volatile
多核并行multiprocessingWorker threads线程池线程tokio::spawn / rayongoroutine(多核)async + Task协程 + Dispatchers.Default

8.2 深度对照一:结构化并发——谁在帮你看管子任务

这是现代并发设计最重要的一条分界线。

传统并发:任务像野草

javascript
// JavaScript:Promise 不强制父子关系
async function loadDetail(orderId) {
    const orderPromise = fetchOrder(orderId);
    const inventoryPromise = fetchInventory(orderId);
    // 如果这里抛异常,两个 Promise 还在跑
    // 没有任何机制自动取消它们
    return merge(await orderPromise, await inventoryPromise);
}
go
// Go:goroutine 没有父子关系
func loadDetail(orderID string) {
    go func() { fetchOrder(orderID) }()     // 启动了
    go func() { fetchInventory(orderID) }() // 启动了
    // 如果 loadDetail 返回了,两个 goroutine 继续跑
    // 没有任何机制自动停止它们
}

结构化并发:任务有来处,也有归处

kotlin
// Kotlin:coroutineScope 是子任务的边界
suspend fun loadDetail(orderId: String): Detail = coroutineScope {
    val order = async { fetchOrder(orderId) }
    val inventory = async { fetchInventory(orderId) }
    // 如果 merge 抛异常 → scope 被取消 → order 和 inventory 都被取消
    // scope 结束时 → 所有子协程保证完成或取消 → 不会泄漏
    merge(order.await(), inventory.await())
}
swift
// Swift:TaskGroup 类似
func loadDetail(orderId: String) async throws -> Detail {
    try await withThrowingTaskGroup(of: (Order, Inventory).self) { group in
        // group 结束时,所有子 task 自动等待完成
        // 如果抛异常,所有子 task 自动被取消
    }
}
rust
// Rust:tokio::join! 是局部的结构化并发
let (order, inventory) = tokio::join!(
    fetch_order(order_id),
    fetch_inventory(order_id),
);
// join! 保证:返回时所有分支都完成了(或取消了)

关键差异

传统并发结构化并发
任务关系平等——没有父子父子——scope 是父,task 是子
生命周期难追踪scope 结束 = 全部结束
错误传播手动自动——子错 → scope 错 → 其余子被取消
资源泄漏容易scope 保证不会泄漏
代表语言JS、Go、Python asyncioKotlin、Swift、Rust(部分)

结构化并发是并发编程近年最重要的范式进化。它的核心思想很简单:子任务的生命周期不应该超过父任务的生命周期。 Kotlin 和 Swift 把这个思想做进了语言设计;Go 和 JavaScript 把它交给了程序员(和 linter)。


8.3 深度对照二:取消——谁能叫停已经起飞的任务

合作式取消(Go、Python、C++)

任务不会强制停止——你得定期检查 "我是不是该停了"。

go
// Go:context 传递取消信号
func FetchOrder(ctx context.Context, orderID string) (Order, error) {
    select {
    case <-ctx.Done():
        return Order{}, ctx.Err()  // "我该停了"
    case result := <-doFetch(orderID):
        return result, nil
    }
}

Go 不强制你检查 ctx.Done()。如果你忘了,任务就会在父请求已经超时之后继续运行。

结构化取消(Kotlin、Swift)

kotlin
// Kotlin:coroutineScope 取消自动向下传播
suspend fun loadDetail(orderId: String): Detail = coroutineScope {
    withTimeout(5000) {  // 超时 → 取消 scope → 取消所有子协程
        val order = async { fetchOrder(orderId) }
        val inventory = async { fetchInventory(orderId) }
        merge(order.await(), inventory.await())  // 抛异常也会取消对方
    }
}

Kotlin 的协程在挂起点检查取消。你不需要手动传 context——结构化并发保证取消向下传播。

Drop 即取消(Rust)

rust
// Rust:drop future = 取消任务
let future = tokio::time::timeout(Duration::from_secs(5), fetch_data());
// 如果 timeout future 先完成(超时)
// → fetch_data future 被 drop
// → 其内部资源被释放
// → 不能 "忘记" 取消

Rust 的方式最彻底:取消不是发信号,而是直接销毁 future。这种方式保证了资源释放,但也要求代码对 "随时可能被 drop" 做好准备(这恰是 Rust async 编程最难的部分)。


8.4 深度对照三:共享状态——谁在保护你不犯错

无保护(需要你自己注意)

python
# Python asyncio:单线程事件循环,await 之间的代码是原子的
# 但两个 await 之间,其他协程可能运行
count = 0

async def increment():
    global count
    tmp = count        # await 之前 —— 安全
    await asyncio.sleep(0)  # 这里可能有其他协程也读 count
    count = tmp + 1    # await 之后 —— count 可能被改了!

Python asyncio 的单线程模型让很多并发问题被隐藏了——数据竞争不会发生(因为是单线程),但逻辑竞争(两个协程读到同一个值然后各自加 1)仍然会发生。

约定保护(Go:CSP,"用通信来共享内存")

go
// Go:channel 传递数据,而不是共享数据
// "Don't communicate by sharing memory; share memory by communicating."

// 正确做法:通过 channel 传递
func processOrders(orders []Order) {
    resultCh := make(chan Result)
    for _, o := range orders {
        go func(order Order) {
            result := process(order)
            resultCh <- result  // 发送结果,而不是写入共享变量
        }(o)
    }
    // 收集结果
}

Go 的 CSP 哲学很优雅,但 Go 不会阻止你 直接共享变量。sync.Mutex 的存在本身就说明:当 channel 不方便时,Go 也允许你去共享内存——只是你要自己承担后果。

编译期阻止(Rust)

rust
// Rust:Send + Sync trait 控制线程安全
use std::thread;

let orders = vec![order1, order2];
let handle = thread::spawn(move || {
    // orders 的所有权转移到了新线程里
    process_orders(&orders);  // 主线程不能再访问 orders
});
// process_orders(&orders);  // 编译错误!orders 已移走

// 如果需要共享:
let orders = Arc::new(Mutex::new(vec![order1, order2]));
// Arc = 原子引用计数,Mutex = 互斥锁
// 编译器强制你通过 Mutex 访问共享数据

Rust 的 SendSync trait 是并发安全的类型级保证。一个类型如果不能安全地在线程间传递,编译器就不让你传。这不是约定,是物理定律。

语言级 Actor(Swift)

swift
// Swift 5.5+:actor 提供数据竞争保护
actor OrderCache {
    private var orders: [String: Order] = [:]

    func get(_ id: String) -> Order? {
        return orders[id]
    }

    func set(_ order: Order) {
        orders[order.orderId] = order
    }
}

// 从外部访问 actor 必须 await
let cache = OrderCache()
await cache.set(order)

Swift 的 actor 模型保证:同一时间只有一个任务能访问 actor 的内部状态。编译器强制执行这个规则——你不需要手动加锁。


8.5 边界探索:跨语言并发陷阱

陷阱一:JavaScript → Go:以为 await 和 goroutine 一样轻松

javascript
// JavaScript:Promise.all 自动管理
const [a, b] = await Promise.all([fetchA(), fetchB()]);
// a 和 b 都拿到了,子任务都结束了
go
// Go:goroutine 启动了,但没人在终点等你
go fetchA()
go fetchB()
// fetchA 和 fetchB 可能还没跑完,主函数已经返回了

从 JS 切到 Go,最容易出的问题是:goroutine 不会被 await 你必须自己用 channel、WaitGroup 或 errgroup 来收集结果和等待完成。

陷阱二:Go → Kotlin:goroutine 泄漏变成协程泄漏

Go 的 goroutine 泄漏通常是因为阻塞在 channel 上。Kotlin 的协程泄漏通常是因为 scope 管理不当。从 Go 过来的人容易用 Go 的心智模型("起一个 goroutine 很便宜,多起几个无所谓")操作 Kotlin 协程——但结构化并发要求你给每个协程一个 scope。

陷阱三:Python asyncio → Rust async:单线程心智 vs 多线程心智

Python asyncio 是单线程的——你不用想数据竞争。Rust async 默认可以是多线程的——你需要想 SendSyncMutex。从 Python asyncio 切到 Rust async,你不只换了语法,你换了一整套并发心智模型。

陷阱四:Java → Go:从线程池心智到 goroutine 心智

Java 开发者习惯了线程池(固定大小、拒绝策略、队列满的处理)。Go 的 goroutine 让线程池看似不需要了——但如果你不控制并发度,你仍然需要某种形式的限流(semaphore、worker pool)。go func() 太便宜了,便宜到你忘了控制。


8.6 洞察:并发的难度不在 "跑起来",而在 "收回来"

如果你只用一门语言写并发,你不会发现:

所有并发模型的差异,表面上是语法和 API 的差异。深层上是 "多件事同时进行时,你怎么收回来" 这个问题的不同回答。

Go 用 channel 和 select 收回结果。JavaScript/Python 用 await Promise.all / asyncio.gather。Java 用 CompletableFuture.allOf。Kotlin/Swift 用结构化并发——scope 结束就是收回。Rust 用 join! 宏同时驱动多个 future,drop 就是取消。

但 "收回来" 不仅仅是收集结果。它包括:

  1. 错误时怎么办——一个子任务失败了,其他子任务要不要取消?(结构化并发说 "要";传统并发说 "你自己决定")
  2. 超时时怎么办——任务还在跑,但调用者不等了,谁来停止它们?(Go 的 context、Rust 的 drop、Kotlin/Swift 的 scope 取消)
  3. 资源怎么回收——子任务中打开的文件、持有的锁、分配的内存,在取消时如何善后?

最容易写的并发代码是 "启动一切" 的代码。最难写的并发代码是 "在一切出错时,把一切干净收回" 的代码。

而不同的语言,给了你不同力度的 "收回" 机制。Rust 的 Drop 保证资源回收(最彻底)。Swift/Kotlin 的 scope 保证子任务不泄漏(最优雅)。Go 的 context 让你选择是否合作取消(最灵活但也最容易忘)。JavaScript 的传统 Promise 模式在 "收回" 这件事上几乎不提供任何帮助。


本章小结

  • 并发模型不是语法风格,而是系统治理方式。 JS/Python 擅长 I/O 并发,Java/C++ 控制力强但秩序自己守,Go 让并发平民化但边界要自己设计,Swift/Kotlin 的结构化并发让任务来去有踪。

  • 结构化并发是现代并发设计最重要的进化。 子任务的生命周期被纳入父任务的 scope,错误和取消自动向下传播。Kotlin 和 Swift 做进了语言,Go 和 JS 留给了程序员。

  • 取消不是可选项。 真实系统里,超时、用户取消、上游失败每天都在发生。Go 用 context,Kotlin/Swift 用 scope 取消,Rust 用 drop——每种机制的力度不同。

  • 共享状态的保护力度从 "靠自觉" 到 "编译期强制" 有一条完整的光谱。 哪门语言适合你,取决于你更相信自己的纪律,还是编译器的纪律。


如果你只用一门语言,你不会学到这个:

所有并发问题,最后都是同一个问题:当一个东西同时被多个人碰的时候,系统怎么不陷入混乱。而不同语言的回答,构成了从 "你最好小心点" 到 "你碰不了这个" 的完整谱系。