强曰为道
与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

Node.js 开发指南 / 第 6 章 · 异步编程基础

第 6 章 · 异步编程基础

6.1 为什么需要异步

Node.js 采用单线程模型,如果所有操作都是同步的,一个耗时操作就会阻塞整个程序。异步编程让程序可以在等待 I/O 时继续执行其他代码。

// 同步(阻塞)— 不推荐在服务器中使用
const data = fs.readFileSync('/large-file.txt'); // 阻塞!
console.log('文件读取完成');
processOtherRequests(); // 必须等文件读完

// 异步(非阻塞)— 推荐
fs.readFile('/large-file.txt', (err, data) => {
  console.log('文件读取完成');
});
processOtherRequests(); // 立即执行,不等待

6.2 回调函数(Callback)

回调的基本模式

const fs = require('fs');

// 错误优先回调(Error-First Callback)— Node.js 约定
fs.readFile('data.txt', 'utf8', (err, data) => {
  if (err) {
    console.error('读取失败:', err.message);
    return;
  }
  console.log('文件内容:', data);
});

回调地狱(Callback Hell)

// 嵌套回调 — 可读性差,难以维护
fs.readFile('config.json', 'utf8', (err, configData) => {
  if (err) return handleError(err);
  const config = JSON.parse(configData);
  
  fs.readFile(config.dataFile, 'utf8', (err, userData) => {
    if (err) return handleError(err);
    const users = JSON.parse(userData);
    
    fs.writeFile('output.json', JSON.stringify(users), (err) => {
      if (err) return handleError(err);
      console.log('写入完成');
    });
  });
});

封装回调为 Promise

const { promisify } = require('node:util');
const fs = require('fs');

// 方式 1:使用 promisify
const readFile = promisify(fs.readFile);
const writeFile = promisify(fs.writeFile);

// 方式 2:使用 fs/promises(Node.js 10+)
const fsPromises = require('fs/promises');

// 方式 3:手动封装
function delay(ms) {
  return new Promise((resolve) => setTimeout(resolve, ms));
}

6.3 Promise 详解

创建 Promise

// 基本创建
const promise = new Promise((resolve, reject) => {
  // 异步操作
  setTimeout(() => {
    const success = Math.random() > 0.5;
    if (success) {
      resolve('操作成功');    // 状态:fulfilled
    } else {
      reject(new Error('操作失败')); // 状态:rejected
    }
  }, 1000);
});

// Promise 的三种状态:
// pending  → fulfilled(已成功)→ resolve(value)
// pending  → rejected(已拒绝)→ reject(error)
// 一旦状态改变,不可再变

Promise 链式调用

readFile('config.json', 'utf8')
  .then((data) => JSON.parse(data))
  .then((config) => readFile(config.dataFile, 'utf8'))
  .then((userData) => {
    const users = JSON.parse(userData);
    return writeFile('output.json', JSON.stringify(users, null, 2));
  })
  .then(() => console.log('写入完成'))
  .catch((err) => console.error('错误:', err.message))
  .finally(() => console.log('无论成功失败都会执行'));

Promise 静态方法

// Promise.all — 全部成功才成功,有一个失败就失败
const results = await Promise.all([
  fetch('/api/users'),
  fetch('/api/posts'),
  fetch('/api/comments'),
]);
// results = [users, posts, comments]

// Promise.allSettled — 等待所有完成(无论成功失败)
const outcomes = await Promise.allSettled([
  fetch('/api/might-fail-1'),
  fetch('/api/might-fail-2'),
]);
// outcomes = [
//   { status: 'fulfilled', value: ... },
//   { status: 'rejected', reason: ... }
// ]

// Promise.race — 返回最先完成的结果
const fastest = await Promise.race([
  fetch('/api/server1'),
  fetch('/api/server2'),
]);

// Promise.any — 返回最先成功的
const winner = await Promise.any([
  fetch('/api/server1'),
  fetch('/api/server2'),
  fetch('/api/server3'),
]);

// Promise.resolve / Promise.reject
const resolved = Promise.resolve(42);
const rejected = Promise.reject(new Error('失败'));

// Promise.try(Node.js 22+)
const result = await Promise.try(() => {
  return mightThrowSync();
});

实用工具函数

// 延迟函数
const delay = (ms) => new Promise((resolve) => setTimeout(resolve, ms));

// 超时控制
function withTimeout(promise, ms) {
  const timeout = new Promise((_, reject) => {
    setTimeout(() => reject(new Error('操作超时')), ms);
  });
  return Promise.race([promise, timeout]);
}

// 重试函数
async function retry(fn, maxRetries = 3, delayMs = 1000) {
  for (let i = 0; i <= maxRetries; i++) {
    try {
      return await fn();
    } catch (err) {
      if (i === maxRetries) throw err;
      console.log(`重试第 ${i + 1} 次...`);
      await delay(delayMs * (i + 1)); // 指数退避
    }
  }
}

// 使用示例
const data = await retry(
  () => fetch('https://api.example.com/data'),
  3,
  1000
);

6.4 async/await

基本语法

// async 函数总是返回 Promise
async function fetchData() {
  return 'data'; // 自动包装为 Promise.resolve('data')
}

// await 暂停执行,等待 Promise 完成
async function processData() {
  try {
    const configData = await readFile('config.json', 'utf8');
    const config = JSON.parse(configData);
    
    const userData = await readFile(config.dataFile, 'utf8');
    const users = JSON.parse(userData);
    
    await writeFile('output.json', JSON.stringify(users, null, 2));
    console.log('处理完成');
  } catch (err) {
    console.error('错误:', err.message);
  }
}

并发控制

// ❌ 顺序执行(慢)
async function sequential() {
  const user = await fetch('/api/user/1');
  const posts = await fetch('/api/posts');
  const comments = await fetch('/api/comments');
  // 总耗时 = 请求1 + 请求2 + 请求3
}

// ✅ 并行执行(快)
async function parallel() {
  const [user, posts, comments] = await Promise.all([
    fetch('/api/user/1'),
    fetch('/api/posts'),
    fetch('/api/comments'),
  ]);
  // 总耗时 = max(请求1, 请求2, 请求3)
}

// ✅ 有依赖关系时分组并行
async function grouped() {
  const user = await fetch('/api/user/1');
  
  // 这两个请求可以并行
  const [posts, comments] = await Promise.all([
    fetch(`/api/posts?userId=${user.id}`),
    fetch(`/api/comments?userId=${user.id}`),
  ]);
}

顶层 await(Top-Level Await)

// ESM 模块中可以直接使用顶层 await
const response = await fetch('https://api.example.com/data');
const data = await response.json();
console.log(data);

// CJS 中不能使用顶层 await

for 循环中的异步

// ❌ forEach 不会等待 async 函数
async function wrong() {
  [1, 2, 3].forEach(async (num) => {
    await delay(1000);
    console.log(num);
  });
  console.log('先于 forEach 完成!'); // 会先执行
}

// ✅ 使用 for...of
async function sequentialLoop() {
  for (const num of [1, 2, 3]) {
    await delay(1000);
    console.log(num);
  }
  console.log('循环完成后执行');
}

// ✅ 使用 Promise.all 并行处理
async function parallelLoop() {
  await Promise.all(
    [1, 2, 3].map(async (num) => {
      await delay(1000);
      console.log(num);
    })
  );
  console.log('全部完成');
}

// ✅ 带并发限制的批处理
async function batchProcess(items, batchSize = 5) {
  for (let i = 0; i < items.length; i += batchSize) {
    const batch = items.slice(i, i + batchSize);
    await Promise.all(batch.map((item) => processItem(item)));
    console.log(`已处理 ${Math.min(i + batchSize, items.length)}/${items.length}`);
  }
}

6.5 错误处理最佳实践

// ✅ 使用 try/catch 处理 async 错误
async function safeOperation() {
  try {
    const data = await riskyOperation();
    return { success: true, data };
  } catch (err) {
    return { success: false, error: err.message };
  }
}

// ✅ 封装错误处理辅助函数
function to(promise) {
  return promise.then(
    (data) => [null, data],
    (err) => [err, null]
  );
}

// 使用方式(类似 Go 风格)
async function fetchData() {
  const [err, data] = await to(fetch('/api/data'));
  if (err) {
    console.error('请求失败:', err.message);
    return;
  }
  console.log('数据:', data);
}

// ✅ 全局未捕获 Promise 拒绝处理
process.on('unhandledRejection', (reason, promise) => {
  console.error('未处理的 Promise 拒绝:', reason);
  // 记录日志后优雅退出
  process.exit(1);
});

6.6 异步迭代器

// 异步生成器
async function* fetchPages(baseUrl, totalPages) {
  for (let page = 1; page <= totalPages; page++) {
    const response = await fetch(`${baseUrl}?page=${page}`);
    const data = await response.json();
    yield data;
  }
}

// 使用 for await...of 遍历
async function processPages() {
  for await (const pageData of fetchPages('/api/items', 10)) {
    console.log('处理第', pageData.page, '页');
    await saveToDatabase(pageData.items);
  }
}

// 可读流也是异步可迭代的
const fs = require('fs');
const readline = require('readline');

async function processLargeFile(filePath) {
  const rl = readline.createInterface({
    input: fs.createReadStream(filePath),
    crlfDelay: Infinity,
  });

  let lineCount = 0;
  for await (const line of rl) {
    lineCount++;
    // 逐行处理,不会一次性加载到内存
    if (lineCount % 10000 === 0) {
      console.log(`已处理 ${lineCount} 行`);
    }
  }
  console.log(`共处理 ${lineCount} 行`);
}

6.7 事件发射器(EventEmitter)

const { EventEmitter } = require('events');

// 创建事件发射器
const emitter = new EventEmitter();

// 注册监听器
emitter.on('data', (chunk) => {
  console.log('收到数据:', chunk);
});

emitter.once('end', () => {
  console.log('数据接收完毕(只触发一次)');
});

// 触发事件
emitter.emit('data', 'Hello');
emitter.emit('data', 'World');
emitter.emit('end');

// 错误处理 — 始终监听 error 事件
emitter.on('error', (err) => {
  console.error('事件错误:', err.message);
});

// 实际应用:创建自定义类
class DataFetcher extends EventEmitter {
  async fetch(url) {
    this.emit('start', url);
    try {
      const response = await fetch(url);
      const data = await response.json();
      this.emit('data', data);
      this.emit('end');
      return data;
    } catch (err) {
      this.emit('error', err);
      throw err;
    }
  }
}

const fetcher = new DataFetcher();
fetcher.on('start', (url) => console.log('开始请求:', url));
fetcher.on('data', (data) => console.log('收到数据:', data));
fetcher.on('error', (err) => console.error('错误:', err));
fetcher.fetch('https://api.example.com/data');

注意事项

⚠️ 不要混用回调和 Promise:对同一个操作不要同时使用回调和 Promise,选择一种风格保持一致。

⚠️ 始终处理 Promise 的拒绝:未处理的 Promise.reject 会导致 UnhandledPromiseRejection,在 Node.js 15+ 中会使进程崩溃。

⚠️ 避免在循环中使用 await 而不必要:如果操作之间没有依赖关系,使用 Promise.all 并行处理。

⚠️ 注意 forEach 与 async 的陷阱forEach 不会等待回调中的 await,使用 for...ofPromise.all(arr.map(...)) 替代。

业务场景

  1. API 聚合:使用 Promise.all 并行请求多个微服务并聚合结果
  2. 文件批处理:使用异步迭代器逐行处理大文件,避免内存溢出
  3. 限流请求:使用 batchProcess 控制并发数量,避免打爆下游服务
  4. 超时控制:使用 Promise.race 为网络请求添加超时保护

扩展阅读


上一章第 5 章 · 模块系统 下一章第 7 章 · 事件循环 — 深入理解 Node.js 事件循环的机制和阶段。