强曰为道
与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

异步与协程精讲 / 第5章:async/await —— 异步的终极语法糖

第5章:async/await —— 异步的终极语法糖

5.1 什么是 async/await?

async/await 是建立在 Promise 之上的语法糖(Syntactic Sugar),它让异步代码看起来像同步代码,极大地提高了可读性和可维护性。

核心价值:async/await 没有引入新的异步机制,它只是让 Promise 的使用变得更加自然。

基本语法

JavaScript:

// 一个 async 函数总是返回 Promise
async function getUserProfile(userId) {
    const user = await getUser(userId);         // 暂停,等待结果
    const posts = await getPosts(user.id);      // 暂停,等待结果
    const friends = await getFriends(user.id);  // 暂停,等待结果
    return { user, posts, friends };            // 自动包装为 Promise.resolve()
}

// 调用 async 函数
getUserProfile(123).then(profile => console.log(profile));
// 或者在另一个 async 函数中
const profile = await getUserProfile(123);

Python:

import asyncio

async def get_user_profile(user_id: int) -> dict:
    user = await get_user(user_id)          # 暂停,等待结果
    posts = await get_posts(user['id'])     # 暂停,等待结果
    friends = await get_friends(user['id']) # 暂停,等待结果
    return {'user': user, 'posts': posts, 'friends': friends}

profile = asyncio.run(get_user_profile(123))

C# (.NET):

async Task<UserProfile> GetUserProfileAsync(int userId)
{
    var user = await GetUserAsync(userId);
    var posts = await GetPostsAsync(user.Id);
    var friends = await GetFriendsAsync(user.Id);
    return new UserProfile { User = user, Posts = posts, Friends = friends };
}

5.2 async/await 的本质

理解 async/await 的本质有助于写出更好的代码。

await 是暂停,不是阻塞

时间线:

线程:  ├─── getUser ───►│ 等待... │◄── getPosts ──►│ 等待... │◄── getFriends ───┤
事件循环:              │ 处理其他请求 │                │ 处理其他请求 │                 │
操作行为线程状态
await somePromise暂停当前函数执行,将控制权交还给事件循环不阻塞,线程去做其他事
Promise 完成恢复函数执行,从暂停处继续重新获得控制权

编译器视角

async/await 在底层被编译器/解释器转换为状态机:

// 你写的代码
async function fetchData() {
    const a = await step1();
    const b = await step2(a);
    const c = await step3(b);
    return c;
}

// 编译器转换后的伪代码(概念性)
function fetchData() {
    return new Promise((resolve, reject) => {
        function step1_handler() {
            step1().then(a => step2_handler(a), reject);
        }
        function step2_handler(a) {
            step2(a).then(b => step3_handler(b), reject);
        }
        function step3_handler(b) {
            step3(b).then(c => resolve(c), reject);
        }
        step1_handler();
    });
}

5.3 错误处理

try/catch — 最直观的方式

async function fetchUserData(userId) {
    try {
        const user = await getUser(userId);
        const posts = await getPosts(user.id);
        return { user, posts };
    } catch (err) {
        // 捕获 await 过程中抛出的任何错误
        console.error('获取用户数据失败:', err.message);
        throw err;  // 重新抛出,让调用方处理
    }
}

统一错误处理中间件

// Express 风格的异步错误处理
const asyncHandler = (fn) => (req, res, next) => {
    Promise.resolve(fn(req, res, next)).catch(next);
};

// 使用
app.get('/users/:id', asyncHandler(async (req, res) => {
    const user = await User.findById(req.params.id);
    if (!user) {
        return res.status(404).json({ error: '用户不存在' });
    }
    res.json(user);
}));

并发操作的错误处理

// ❌ 一个失败,全部失败
async function fetchAll(urls) {
    // 如果任何一个请求失败,整个 Promise.all 会 reject
    const responses = await Promise.all(urls.map(url => fetch(url)));
    return responses;
}

// ✅ 捕获单个错误,不影响其他
async function fetchAllSafe(urls) {
    const results = await Promise.allSettled(
        urls.map(url => fetch(url))
    );
    return results.map(r => r.status === 'fulfilled' ? r.value : null);
}

// ✅ 更优雅的方式:为每个 Promise 添加 catch
async function fetchAllWithCatch(urls) {
    const safeFetch = url => fetch(url).catch(err => ({ error: err.message }));
    const responses = await Promise.all(urls.map(safeFetch));
    return responses;
}

5.4 并发控制

串行 vs 并发

// ❌ 串行 — 一个接一个,总耗时 = 所有耗时之和
async function fetchSerial(urls) {
    const results = [];
    for (const url of urls) {
        const response = await fetch(url);   // 等一个完成再请求下一个
        results.push(await response.json());
    }
    return results;
}

// ✅ 并发 — 同时发起,总耗时 = 最慢的那个
async function fetchParallel(urls) {
    const promises = urls.map(async url => {
        const response = await fetch(url);
        return response.json();
    });
    return Promise.all(promises);
}

并发限制(Concurrency Limiting)

在实际生产中,不能无限制地并发(会压垮服务器或耗尽本地资源)。需要限制同时进行的异步操作数量。

// 使用 p-limit 库
import pLimit from 'p-limit';

const limit = pLimit(5); // 最多同时 5 个

const tasks = urls.map(url =>
    limit(() => fetch(url).then(r => r.json()))
);

const results = await Promise.all(tasks);

手写并发限制器:

class ConcurrencyLimiter {
    constructor(maxConcurrency) {
        this.max = maxConcurrency;
        this.running = 0;
        this.queue = [];
    }

    async add(fn) {
        if (this.running >= this.max) {
            await new Promise(resolve => this.queue.push(resolve));
        }
        this.running++;
        try {
            return await fn();
        } finally {
            this.running--;
            if (this.queue.length > 0) {
                this.queue.shift()();
            }
        }
    }
}

// 使用
const limiter = new ConcurrencyLimiter(3);
const results = await Promise.all(
    urls.map(url => limiter.add(() => fetch(url).then(r => r.json())))
);

Python 并发控制

import asyncio

async def fetch_all_with_limit(urls: list[str], max_concurrency: int = 5):
    semaphore = asyncio.Semaphore(max_concurrency)

    async def fetch_one(url: str):
        async with semaphore:  # 获取信号量
            async with aiohttp.ClientSession() as session:
                async with session.get(url) as response:
                    return await response.json()

    tasks = [fetch_one(url) for url in urls]
    return await asyncio.gather(*tasks)

5.5 async/await 的限制与陷阱

陷阱一:在非 async 函数中使用 await

// ❌ 语法错误 — await 只能在 async 函数中使用
function notAsync() {
    const data = await fetch('/api');  // SyntaxError
}

// ✅ 解决方案
async function isAsync() {
    const data = await fetch('/api');
    return data;
}

// ✅ 顶层 await(ES2022,仅在模块中可用)
// module.mjs
const data = await fetch('/api');

陷阱二:在循环中错误地使用 await

// ❌ 性能问题:串行执行
async function processItems(items) {
    for (const item of items) {
        await process(item);  // 每次都等待,串行执行
    }
}

// ✅ 并发执行
async function processItems(items) {
    await Promise.all(items.map(item => process(item)));
}

// ✅ 需要串行时(如数据库事务),这是正确的
async function processItemsSequentially(items) {
    for (const item of items) {
        await process(item);  // 有意识地串行
    }
}

陷阱三:忘记 await

// ❌ 忘记 await — 函数在后台执行,无法获取结果
async function buggy() {
    const data = fetch('/api');  // data 是 Promise,不是数据!
    console.log(data);  // 输出: Promise { <pending> }
}

// ✅ 正确写法
async function correct() {
    const response = await fetch('/api');
    const data = await response.json();
    console.log(data);
}

陷阱四:async 函数中的 return vs return await

async function example() {
    try {
        return await fetch('/api');  // try/catch 能捕获 fetch 的错误
    } catch (err) {
        return null;
    }
}

async function example2() {
    try {
        return fetch('/api');  // ⚠️ return 的是 Promise,try/catch 无法捕获
    } catch (err) {
        return null;  // 不会执行
    }
}

规则:在 try 块中,总是使用 return await 而非直接 return

陷阱五:并发但无错误隔离

// ❌ 一个 Promise 失败,其他的也取消不了(Promise 不能取消)
async function dangerous() {
    const [a, b, c] = await Promise.all([
        fetchA(),  // 如果 B 失败,A 和 C 仍在执行
        fetchB(),
        fetchC(),
    ]);
}

// ✅ 使用 AbortController 取消请求
async function safe() {
    const controller = new AbortController();
    const timeout = setTimeout(() => controller.abort(), 5000);

    try {
        const [a, b, c] = await Promise.all([
            fetch('/api/a', { signal: controller.signal }),
            fetch('/api/b', { signal: controller.signal }),
            fetch('/api/c', { signal: controller.signal }),
        ]);
    } finally {
        clearTimeout(timeout);
    }
}

5.6 高级模式

模式一:串行管道

const pipe = (...fns) => (input) =>
    fns.reduce((promise, fn) => promise.then(fn), Promise.resolve(input));

// 使用
const processOrder = pipe(
    validateOrder,
    checkInventory,
    calculatePrice,
    processPayment,
    sendConfirmation
);

const result = await processOrder(orderData);

模式二:可重试的 async 函数

async function retryable(fn, { retries = 3, delay = 1000, backoff = 2 } = {}) {
    for (let attempt = 1; attempt <= retries; attempt++) {
        try {
            return await fn();
        } catch (err) {
            if (attempt === retries) throw err;
            const waitTime = delay * Math.pow(backoff, attempt - 1);
            console.warn(`第 ${attempt} 次失败,${waitTime}ms 后重试:`, err.message);
            await new Promise(r => setTimeout(r, waitTime));
        }
    }
}

// 使用
const data = await retryable(
    () => fetch('/api/flaky').then(r => r.json()),
    { retries: 3, delay: 1000 }
);

模式三:可取消的 async 操作

class CancellationToken {
    constructor() {
        this.cancelled = false;
        this.listeners = [];
    }

    cancel() {
        this.cancelled = true;
        this.listeners.forEach(fn => fn());
    }

    onCancelled(fn) {
        if (this.cancelled) fn();
        else this.listeners.push(fn);
    }

    throwIfCancelled() {
        if (this.cancelled) throw new Error('操作已取消');
    }
}

async function longRunningTask(token) {
    for (let i = 0; i < 100; i++) {
        token.throwIfCancelled();
        await processBatch(i);
    }
}

const token = new CancellationToken();
setTimeout(() => token.cancel(), 5000); // 5 秒后取消

try {
    await longRunningTask(token);
} catch (err) {
    if (err.message === '操作已取消') {
        console.log('任务被取消');
    }
}

5.7 各语言 async/await 对比

特性JavaScriptPythonC#RustJava
关键字async/awaitasync/awaitasync/awaitasync/.await无(虚拟线程)
返回类型Promise<T>CoroutineTask<T>impl Future<Output=T>-
顶层 awaitES2022不支持不支持不支持-
取消支持AbortControllerCancelledErrorCancellationToken无内置Thread.interrupt()
背压无内置无内置IAsyncEnumerableStream traitFlow

5.8 业务场景:数据管道

场景

构建一个数据处理管道:从数据源读取 → 清洗 → 转换 → 写入目标。

async function dataPipeline(config) {
    const { source, transforms, destination, concurrency = 10 } = config;

    // 1. 读取数据(分页)
    const pages = readPages(source);

    // 2. 限制并发处理
    const limit = pLimit(concurrency);

    // 3. 处理每个分页
    const results = await Promise.all(
        pages.map(page => limit(async () => {
            try {
                const raw = await readPage(page);
                const cleaned = cleanData(raw);
                const transformed = transforms.reduce(
                    (data, fn) => fn(data), cleaned
                );
                await writePage(destination, transformed);
                return { page: page.id, status: 'success', count: transformed.length };
            } catch (err) {
                return { page: page.id, status: 'error', error: err.message };
            }
        }))
    );

    const succeeded = results.filter(r => r.status === 'success');
    const failed = results.filter(r => r.status === 'error');

    console.log(`处理完成: ${succeeded.length} 成功, ${failed.length} 失败`);
    return results;
}

5.9 本章小结

要点说明
本质async/await 是 Promise 的语法糖
await 是暂停不阻塞线程,控制权交还事件循环
错误处理使用 try/catch,注意 return vs return await
并发控制使用 p-limit 或 Semaphore 限制并发数
常见陷阱忘记 await、循环中串行、无法取消
最佳实践并发优先、错误隔离、超时控制

下一章预告:async/await 虽然好用,但它本质上还是基于 Promise 的。而协程(Coroutine)提供了一种更底层、更灵活的异步原语。下一章我们将探讨协程的本质。


扩展阅读