异步与协程精讲 / 第9章:Rust async —— 零成本抽象的未来
第9章:Rust async —— 零成本抽象的未来
9.1 Rust 异步的设计哲学
Rust 的异步设计遵循两个核心原则:
- 零成本抽象(Zero-cost Abstraction):异步代码的运行时开销应等同于手写状态机
- 安全并发(Fearless Concurrency):编译期保证无数据竞争
为什么不用 GC 和运行时?
| 语言 | 内存管理 | 运行时 | 异步调度 |
|---|---|---|---|
| Go | GC | 运行时调度器(GMP) | 内置 |
| Python | 引用计数 + GC | 事件循环 | 内置 asyncio |
| Java | GC | JVM + 虚拟线程 | 内置 |
| Rust | 所有权系统 | 无内置运行时 | 库实现 |
Rust 的选择:语言层面提供
async/await语法和Futuretrait,具体的运行时(如 Tokio、async-std)由生态提供。
9.2 Future Trait
Future 是 Rust 异步编程的核心 trait。
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
pub trait Future {
type Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}
pub enum Poll<T> {
Ready(T), // 完成,返回结果
Pending, // 未完成,稍后再来
}
手动实现 Future
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
struct Delay {
when: Instant,
}
impl Future for Delay {
type Output = &'static str;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if Instant::now() >= self.when {
Poll::Ready("完成!")
} else {
// 注册 waker,让调度器在稍后再次调用 poll
cx.waker().wake_by_ref();
Poll::Pending
}
}
}
#[tokio::main]
async fn main() {
let future = Delay {
when: Instant::now() + Duration::from_secs(1),
};
println!("等待...");
let result = future.await;
println!("{}", result);
}
poll 的工作原理
┌──────────────────────────────────────────┐
│ async 运行时 (Tokio) │
│ │
│ 1. 创建 Future │
│ 2. 调用 poll() │
│ ├── Poll::Ready(value) → 完成 │
│ └── Poll::Pending → 注册 Waker │
│ 3. Future 的条件满足时,Waker 被触发 │
│ 4. 运行时再次调用 poll() │
│ 5. 重复直到 Poll::Ready │
└──────────────────────────────────────────┘
9.3 async/await 语法
use tokio::time::{sleep, Duration};
async fn fetch_data(url: &str) -> Result<String, reqwest::Error> {
let response = reqwest::get(url).await?;
let body = response.text().await?;
Ok(body)
}
async fn process() {
let data = fetch_data("https://api.example.com").await.unwrap();
println!("数据: {}", &data[..100]);
}
#[tokio::main]
async fn main() {
process().await;
}
async 块和 async move
// async 块 — 创建匿名 Future
let future = async {
println!("执行异步操作");
42
};
// async move — 转移所有权到 Future 中
let data = vec![1, 2, 3];
let future = async move {
println!("{:?}", data); // data 的所有权被移动到这里
};
// data 已经不能使用了
9.4 Pin 与 Unpin
Pin 是 Rust 异步中最令人困惑的概念之一。
为什么需要 Pin?
async fn 被编译器转换为状态机,其中可能包含自引用结构:
async fn example() {
let data = vec![1, 2, 3];
let reference = &data; // data 的引用
some_async_op().await; // ← 暂停点
println!("{:?}", reference); // ← 暂停后恢复,引用仍然有效
}
编译器生成的状态机大致如下:
enum ExampleStateMachine {
State0 {
data: Vec<i32>,
},
State1 {
data: Vec<i32>,
reference: *const Vec<i32>, // ← 自引用!
future: SomeAsyncFuture,
},
}
问题:如果这个结构体被移动(move),reference 指针就失效了!
Pin 的解决方案:Pin 保证被 Pin 的值不会被移动。
Pin 和 Unpin 的关系
| Trait | 含义 |
|---|---|
Pin<P> | 保证指针 P 指向的值不会被移动 |
Unpin | 编译器标记,表示类型可以安全地被移动 |
!Unpin | 不能安全移动(大多数 async 生成的状态机) |
// 大多数类型自动实现 Unpin
let mut x = 42;
let pinned = Pin::new(&mut x); // ✅ i32 实现了 Unpin
// 但 async 块/函数生成的 Future 通常不实现 Unpin
// 需要使用 Box::pin 或 pin! 宏
let future = Box::pin(async { 42 }); // Pin<Box<dyn Future>>
9.5 async 运行时:Tokio
为什么需要运行时?
Rust 的 async/await 语法只负责生成 Future,但 Future 不会自行执行。需要运行时来:
- 调度 Future(决定何时 poll)
- 提供 I/O 驱动(epoll/kqueue/IOCP)
- 提供定时器、网络、文件 I/O 等功能
Tokio 基础
use tokio::net::TcpListener;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let listener = TcpListener::bind("127.0.0.1:8080").await?;
loop {
let (mut socket, addr) = listener.accept().await?;
println!("新连接: {}", addr);
tokio::spawn(async move {
let mut buf = [0u8; 1024];
loop {
let n = match socket.read(&mut buf).await {
Ok(0) => return, // 连接关闭
Ok(n) => n,
Err(e) => {
eprintln!("读取错误: {}", e);
return;
}
};
if let Err(e) = socket.write_all(&buf[..n]).await {
eprintln!("写入错误: {}", e);
return;
}
}
});
}
}
Tokio 运行时类型
// 多线程运行时(默认)
#[tokio::main]
async fn main() { }
// 等价于
fn main() {
tokio::runtime::Builder::new_multi_thread()
.worker_threads(4)
.enable_all()
.build()
.unwrap()
.block_on(async { })
}
// 单线程运行时(current_thread)
#[tokio::main(flavor = "current_thread")]
async fn main() { }
9.6 并发组合
tokio::join! — 并发等待
async fn fetch_user() -> User { /* ... */ }
async fn fetch_orders() -> Vec<Order> { /* ... */ }
#[tokio::main]
async fn main() {
// 同时执行两个 Future
let (user, orders) = tokio::join!(
fetch_user(),
fetch_orders(),
);
}
tokio::select! — 竞速
use tokio::time::{sleep, Duration};
#[tokio::main]
async fn main() {
tokio::select! {
result = fetch_from_primary() => {
println!("主服务: {:?}", result);
}
result = fetch_from_backup() => {
println!("备份服务: {:?}", result);
}
_ = sleep(Duration::from_secs(5)) => {
println!("超时!");
}
}
}
tokio::spawn — 并发任务
#[tokio::main]
async fn main() {
let mut handles = vec![];
for i in 0..100 {
let handle = tokio::spawn(async move {
// 每个任务独立执行
expensive_io(i).await
});
handles.push(handle);
}
let results: Vec<_> = futures::future::join_all(handles).await;
}
错误处理
use tokio::task::JoinError;
#[tokio::main]
async fn main() {
let handle = tokio::spawn(async {
// 可能 panic
panic!("出错了!");
});
match handle.await {
Ok(result) => println!("成功: {:?}", result),
Err(e) if e.is_panic() => println!("任务 panic 了"),
Err(e) if e.is_cancelled() => println!("任务被取消"),
Err(e) => println!("其他错误: {}", e),
}
}
9.7 async 与所有权
Rust 的所有权系统在异步代码中带来独特的挑战。
// ❌ 编译错误:借用在 await 点之后失效
async fn buggy(data: &Vec<i32>) -> i32 {
let reference = &data[0];
some_async_op().await; // ← 暂停点
*reference // 编译器可能拒绝
}
// ✅ 解决方案一:使用 async move
async fn correct(data: Vec<i32>) -> i32 {
let reference = &data[0];
some_async_op().await;
*reference // data 仍然有效
}
// ✅ 解决方案二:在 await 前拷贝
async fn also_correct(data: &Vec<i32>) -> i32 {
let value = data[0]; // 拷贝
some_async_op().await;
value
}
9.8 Stream — 异步迭代器
use tokio_stream::StreamExt;
use tokio::time::{interval, Duration};
#[tokio::main]
async fn main() {
// 创建一个 Stream
let mut stream = tokio_stream::iter(vec![1, 2, 3, 4, 5]);
// 异步迭代
while let Some(value) = stream.next().await {
println!("收到: {}", value);
}
// 使用 map/filter
let doubled: Vec<_> = tokio_stream::iter(vec![1, 2, 3, 4, 5])
.map(|x| x * 2)
.filter(|x| *x > 4)
.collect()
.await;
println!("{:?}", doubled); // [6, 8, 10]
}
9.9 性能对比
异步运行时对比
| 运行时 | 线程模型 | I/O 驱动 | 特点 |
|---|---|---|---|
| Tokio | 多线程工作窃取 | epoll/kqueue/IOCP | 生态最丰富,生产首选 |
| async-std | 多线程 | 类似 Tokio | API 类似标准库 |
| smol | 多线程 | 极简 | 轻量级,适合嵌入 |
| glommio | 每核一个线程 | io_uring | 针对 io_uring 优化 |
Rust vs Go vs Node.js 性能
基准测试(简单的 HTTP echo 服务器,QPS):
| 语言/框架 | QPS | 内存 | 延迟 P99 |
|---|---|---|---|
| Rust (Tokio + Axum) | ~300,000 | ~5MB | ~1ms |
| Go (net/http) | ~250,000 | ~10MB | ~1.5ms |
| Node.js (Express) | ~50,000 | ~50MB | ~5ms |
| Python (FastAPI + uvloop) | ~30,000 | ~30MB | ~8ms |
注意:以上数据为典型场景估算,实际性能取决于具体工作负载。
9.10 业务场景:高性能 HTTP API
use axum::{routing::get, Json, Router};
use serde::Serialize;
#[derive(Serialize)]
struct HealthResponse {
status: String,
uptime: u64,
}
async fn health() -> Json<HealthResponse> {
Json(HealthResponse {
status: "ok".to_string(),
uptime: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs(),
})
}
#[tokio::main]
async fn main() {
let app = Router::new()
.route("/health", get(health))
.route("/api/users", get(list_users))
.route("/api/users/:id", get(get_user));
let listener = tokio::net::TcpListener::bind("0.0.0.0:8080")
.await
.unwrap();
axum::serve(listener, app).await.unwrap();
}
9.11 本章小结
| 要点 | 说明 |
|---|---|
| Future trait | poll 方法 + Poll 枚举,惰性执行 |
| async/await | 编译器转换为状态机 |
| Pin | 防止自引用结构被移动 |
| Tokio | 最流行的异步运行时,工作窃取调度 |
| join!/select! | 并发组合和竞速选择 |
| Stream | 异步迭代器 |
| 零成本抽象 | 编译期优化,运行时无开销 |
下一章预告:Java 21 的虚拟线程如何用全新的方式解决异步编程的复杂性?
扩展阅读
- Rust Async Book — 官方异步编程指南
- Tokio 文档 — 最流行的 Rust 异步运行时
- Pin 详解
- Zero-cost Futures in Rust — Without Boats
- Rust and Tokio 性能调优