强曰为道
与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

异步与协程精讲 / 第9章:Rust async —— 零成本抽象的未来

第9章:Rust async —— 零成本抽象的未来

9.1 Rust 异步的设计哲学

Rust 的异步设计遵循两个核心原则:

  1. 零成本抽象(Zero-cost Abstraction):异步代码的运行时开销应等同于手写状态机
  2. 安全并发(Fearless Concurrency):编译期保证无数据竞争

为什么不用 GC 和运行时?

语言内存管理运行时异步调度
GoGC运行时调度器(GMP)内置
Python引用计数 + GC事件循环内置 asyncio
JavaGCJVM + 虚拟线程内置
Rust所有权系统无内置运行时库实现

Rust 的选择:语言层面提供 async/await 语法和 Future trait,具体的运行时(如 Tokio、async-std)由生态提供。


9.2 Future Trait

Future 是 Rust 异步编程的核心 trait。

use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};

pub trait Future {
    type Output;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}

pub enum Poll<T> {
    Ready(T),      // 完成,返回结果
    Pending,       // 未完成,稍后再来
}

手动实现 Future

use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};

struct Delay {
    when: Instant,
}

impl Future for Delay {
    type Output = &'static str;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        if Instant::now() >= self.when {
            Poll::Ready("完成!")
        } else {
            // 注册 waker,让调度器在稍后再次调用 poll
            cx.waker().wake_by_ref();
            Poll::Pending
        }
    }
}

#[tokio::main]
async fn main() {
    let future = Delay {
        when: Instant::now() + Duration::from_secs(1),
    };
    println!("等待...");
    let result = future.await;
    println!("{}", result);
}

poll 的工作原理

┌──────────────────────────────────────────┐
│           async 运行时 (Tokio)           │
│                                          │
│  1. 创建 Future                          │
│  2. 调用 poll()                          │
│     ├── Poll::Ready(value) → 完成        │
│     └── Poll::Pending → 注册 Waker      │
│  3. Future 的条件满足时,Waker 被触发     │
│  4. 运行时再次调用 poll()                │
│  5. 重复直到 Poll::Ready                 │
└──────────────────────────────────────────┘

9.3 async/await 语法

use tokio::time::{sleep, Duration};

async fn fetch_data(url: &str) -> Result<String, reqwest::Error> {
    let response = reqwest::get(url).await?;
    let body = response.text().await?;
    Ok(body)
}

async fn process() {
    let data = fetch_data("https://api.example.com").await.unwrap();
    println!("数据: {}", &data[..100]);
}

#[tokio::main]
async fn main() {
    process().await;
}

async 块和 async move

// async 块 — 创建匿名 Future
let future = async {
    println!("执行异步操作");
    42
};

// async move — 转移所有权到 Future 中
let data = vec![1, 2, 3];
let future = async move {
    println!("{:?}", data);  // data 的所有权被移动到这里
};
// data 已经不能使用了

9.4 Pin 与 Unpin

Pin 是 Rust 异步中最令人困惑的概念之一。

为什么需要 Pin?

async fn 被编译器转换为状态机,其中可能包含自引用结构

async fn example() {
    let data = vec![1, 2, 3];
    let reference = &data;  // data 的引用
    some_async_op().await;  // ← 暂停点
    println!("{:?}", reference);  // ← 暂停后恢复,引用仍然有效
}

编译器生成的状态机大致如下:

enum ExampleStateMachine {
    State0 {
        data: Vec<i32>,
    },
    State1 {
        data: Vec<i32>,
        reference: *const Vec<i32>,  // ← 自引用!
        future: SomeAsyncFuture,
    },
}

问题:如果这个结构体被移动(move),reference 指针就失效了!

Pin 的解决方案:Pin 保证被 Pin 的值不会被移动

Pin 和 Unpin 的关系

Trait含义
Pin<P>保证指针 P 指向的值不会被移动
Unpin编译器标记,表示类型可以安全地被移动
!Unpin不能安全移动(大多数 async 生成的状态机)
// 大多数类型自动实现 Unpin
let mut x = 42;
let pinned = Pin::new(&mut x);  // ✅ i32 实现了 Unpin

// 但 async 块/函数生成的 Future 通常不实现 Unpin
// 需要使用 Box::pin 或 pin! 宏
let future = Box::pin(async { 42 });  // Pin<Box<dyn Future>>

9.5 async 运行时:Tokio

为什么需要运行时?

Rust 的 async/await 语法只负责生成 Future,但 Future 不会自行执行。需要运行时来:

  1. 调度 Future(决定何时 poll)
  2. 提供 I/O 驱动(epoll/kqueue/IOCP)
  3. 提供定时器、网络、文件 I/O 等功能

Tokio 基础

use tokio::net::TcpListener;
use tokio::io::{AsyncReadExt, AsyncWriteExt};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let listener = TcpListener::bind("127.0.0.1:8080").await?;

    loop {
        let (mut socket, addr) = listener.accept().await?;
        println!("新连接: {}", addr);

        tokio::spawn(async move {
            let mut buf = [0u8; 1024];
            loop {
                let n = match socket.read(&mut buf).await {
                    Ok(0) => return,  // 连接关闭
                    Ok(n) => n,
                    Err(e) => {
                        eprintln!("读取错误: {}", e);
                        return;
                    }
                };
                if let Err(e) = socket.write_all(&buf[..n]).await {
                    eprintln!("写入错误: {}", e);
                    return;
                }
            }
        });
    }
}

Tokio 运行时类型

// 多线程运行时(默认)
#[tokio::main]
async fn main() { }

// 等价于
fn main() {
    tokio::runtime::Builder::new_multi_thread()
        .worker_threads(4)
        .enable_all()
        .build()
        .unwrap()
        .block_on(async { })
}

// 单线程运行时(current_thread)
#[tokio::main(flavor = "current_thread")]
async fn main() { }

9.6 并发组合

tokio::join! — 并发等待

async fn fetch_user() -> User { /* ... */ }
async fn fetch_orders() -> Vec<Order> { /* ... */ }

#[tokio::main]
async fn main() {
    // 同时执行两个 Future
    let (user, orders) = tokio::join!(
        fetch_user(),
        fetch_orders(),
    );
}

tokio::select! — 竞速

use tokio::time::{sleep, Duration};

#[tokio::main]
async fn main() {
    tokio::select! {
        result = fetch_from_primary() => {
            println!("主服务: {:?}", result);
        }
        result = fetch_from_backup() => {
            println!("备份服务: {:?}", result);
        }
        _ = sleep(Duration::from_secs(5)) => {
            println!("超时!");
        }
    }
}

tokio::spawn — 并发任务

#[tokio::main]
async fn main() {
    let mut handles = vec![];

    for i in 0..100 {
        let handle = tokio::spawn(async move {
            // 每个任务独立执行
            expensive_io(i).await
        });
        handles.push(handle);
    }

    let results: Vec<_> = futures::future::join_all(handles).await;
}

错误处理

use tokio::task::JoinError;

#[tokio::main]
async fn main() {
    let handle = tokio::spawn(async {
        // 可能 panic
        panic!("出错了!");
    });

    match handle.await {
        Ok(result) => println!("成功: {:?}", result),
        Err(e) if e.is_panic() => println!("任务 panic 了"),
        Err(e) if e.is_cancelled() => println!("任务被取消"),
        Err(e) => println!("其他错误: {}", e),
    }
}

9.7 async 与所有权

Rust 的所有权系统在异步代码中带来独特的挑战。

// ❌ 编译错误:借用在 await 点之后失效
async fn buggy(data: &Vec<i32>) -> i32 {
    let reference = &data[0];
    some_async_op().await;  // ← 暂停点
    *reference  // 编译器可能拒绝
}

// ✅ 解决方案一:使用 async move
async fn correct(data: Vec<i32>) -> i32 {
    let reference = &data[0];
    some_async_op().await;
    *reference  // data 仍然有效
}

// ✅ 解决方案二:在 await 前拷贝
async fn also_correct(data: &Vec<i32>) -> i32 {
    let value = data[0];  // 拷贝
    some_async_op().await;
    value
}

9.8 Stream — 异步迭代器

use tokio_stream::StreamExt;
use tokio::time::{interval, Duration};

#[tokio::main]
async fn main() {
    // 创建一个 Stream
    let mut stream = tokio_stream::iter(vec![1, 2, 3, 4, 5]);

    // 异步迭代
    while let Some(value) = stream.next().await {
        println!("收到: {}", value);
    }

    // 使用 map/filter
    let doubled: Vec<_> = tokio_stream::iter(vec![1, 2, 3, 4, 5])
        .map(|x| x * 2)
        .filter(|x| *x > 4)
        .collect()
        .await;
    println!("{:?}", doubled);  // [6, 8, 10]
}

9.9 性能对比

异步运行时对比

运行时线程模型I/O 驱动特点
Tokio多线程工作窃取epoll/kqueue/IOCP生态最丰富,生产首选
async-std多线程类似 TokioAPI 类似标准库
smol多线程极简轻量级,适合嵌入
glommio每核一个线程io_uring针对 io_uring 优化

Rust vs Go vs Node.js 性能

基准测试(简单的 HTTP echo 服务器,QPS):

语言/框架QPS内存延迟 P99
Rust (Tokio + Axum)~300,000~5MB~1ms
Go (net/http)~250,000~10MB~1.5ms
Node.js (Express)~50,000~50MB~5ms
Python (FastAPI + uvloop)~30,000~30MB~8ms

注意:以上数据为典型场景估算,实际性能取决于具体工作负载。


9.10 业务场景:高性能 HTTP API

use axum::{routing::get, Json, Router};
use serde::Serialize;

#[derive(Serialize)]
struct HealthResponse {
    status: String,
    uptime: u64,
}

async fn health() -> Json<HealthResponse> {
    Json(HealthResponse {
        status: "ok".to_string(),
        uptime: std::time::SystemTime::now()
            .duration_since(std::time::UNIX_EPOCH)
            .unwrap()
            .as_secs(),
    })
}

#[tokio::main]
async fn main() {
    let app = Router::new()
        .route("/health", get(health))
        .route("/api/users", get(list_users))
        .route("/api/users/:id", get(get_user));

    let listener = tokio::net::TcpListener::bind("0.0.0.0:8080")
        .await
        .unwrap();

    axum::serve(listener, app).await.unwrap();
}

9.11 本章小结

要点说明
Future traitpoll 方法 + Poll 枚举,惰性执行
async/await编译器转换为状态机
Pin防止自引用结构被移动
Tokio最流行的异步运行时,工作窃取调度
join!/select!并发组合和竞速选择
Stream异步迭代器
零成本抽象编译期优化,运行时无开销

下一章预告:Java 21 的虚拟线程如何用全新的方式解决异步编程的复杂性?


扩展阅读