强曰为道
与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

异步与协程精讲 / 第2章:事件循环 —— 异步的心脏

第2章:事件循环 —— 异步的心脏

2.1 什么是事件循环?

事件循环(Event Loop)是异步编程的核心调度机制。它是一个无限循环,不断地:

  1. 监听事件:哪些 I/O 操作已完成?哪些定时器已到期?
  2. 分发事件:将就绪的事件交给对应的回调函数处理
  3. 执行回调:运行注册好的回调代码
┌───────────────────────────────────┐
│         Event Loop                │
│                                   │
│   ┌───────────┐                   │
│   │  poll I/O  │ ◄──── 有就绪事件?
│   └─────┬─────┘        否 → 阻塞等待
│         │ 是                        │
│         ▼                           │
│   ┌───────────┐                   │
│   │ 执行回调   │                   │
│   └─────┬─────┘                   │
│         │                           │
│         ▼                           │
│   ┌───────────┐                   │
│   │ 检查定时器 │                   │
│   └─────┬─────┘                   │
│         │                           │
│         ▼                           │
│   ┌───────────┐                   │
│   │  回到 poll  │ ─────────────►  │
│   └───────────┘                   │
└───────────────────────────────────┘

为什么事件循环能用单线程处理高并发?

关键在于:大多数 Web 请求的瓶颈是 I/O 等待,而非 CPU 计算。

假设一个请求的生命周期中:

  • 95% 的时间在等待数据库查询、网络调用等 I/O
  • 5% 的时间在执行业务逻辑

单线程 + 事件循环可以在等待 I/O 的 95% 时间里去处理其他请求,从而实现高并发。


2.2 Reactor 模式

Reactor 模式是事件驱动架构的经典设计,由 Doug Schmidt 在 1995 年提出。

核心组件

组件职责类比
Handle(句柄)OS 资源标识(socket fd)餐桌号码
Event Demultiplexer(事件多路复用器)监听多个 Handle 的事件(epoll/kqueue)门口的叫号机
Reactor事件循环的核心,分发事件餐厅经理
EventHandler(事件处理器)处理特定事件的回调厨师/服务员

Reactor 的变体

变体描述适用场景
单线程 Reactor所有 I/O 和业务逻辑在同一线程简单的代理服务器
多线程 Reactor一个 Reactor 线程 + 工作线程池Nginx
主从 Reactor主 Reactor 接受连接,子 Reactor 处理 I/ONetty、Redis 6.0+

伪代码实现

class Reactor:
    def __init__(self):
        self.handlers = {}      # fd -> handler 的映射
        self.running = False

    def register(self, fd, event, handler):
        """注册事件监听"""
        self.handlers[(fd, event)] = handler

    def run(self):
        """事件循环主循环"""
        self.running = True
        while self.running:
            # 阻塞等待就绪事件
            events = epoll_wait(self.handlers.keys(), timeout=-1)
            for fd, event in events:
                handler = self.handlers.get((fd, event))
                if handler:
                    handler.handle_event(fd, event)

使用示例:

reactor = Reactor()

def on_readable(fd):
    data = fd.read()
    process_request(data)

reactor.register(socket_fd, EVENT_READ, on_readable)
reactor.run()

2.3 Proactor 模式

Proactor 模式是 Reactor 的"升级版",它把 I/O 操作本身也交给操作系统完成。

Reactor vs Proactor

特性ReactorProactor
I/O 操作应用程序执行 read/write操作系统执行 read/write
通知时机I/O 就绪时通知I/O 完成时通知
编程模型“准备好了,你来读”“读完了,数据在这”
代表实现epoll、kqueueIOCP (Windows)、io_uring (Linux)
编程复杂度较高(需处理部分读写)较低(一次操作一次回调)

Proactor 工作流

应用程序                     操作系统
   │                           │
   │   ① 提交异步读请求        │
   │ ─────────────────────►   │
   │                           │   ② 内核执行实际 I/O
   │   ③ 应用去做别的事        │
   │                           │
   │                           │   ④ I/O 完成
   │   ⑤ 通知应用程序          │
   │ ◄─────────────────────   │
   │                           │
   │   ⑥ 处理已完成的数据      │
   │                           │

2.4 Linux epoll 深入

epoll 是 Linux 下最核心的 I/O 多路复用机制,是理解 Linux 异步编程的基石。

核心 API

// 创建 epoll 实例
int epoll_create1(int flags);

// 添加/修改/删除监听的文件描述符
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

// 等待事件就绪
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);

水平触发(LT)vs 边缘触发(ET)

模式行为优点缺点
水平触发(LT)只要 fd 可读/可写就持续通知编程简单,不易丢事件可能重复通知,性能略低
边缘触发(ET)仅在状态变化时通知一次性能高,通知次数少必须一次性读完,否则可能丢事件

注意:大部分框架(libuv、Tokio)使用 LT 模式。ET 模式需要配合非阻塞 I/O,一次读到 EAGAIN 为止。

完整示例:简易 TCP Echo 服务器

#include <sys/epoll.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <unistd.h>
#include <fcntl.h>
#include <string.h>

#define MAX_EVENTS 1024
#define BUF_SIZE   4096

static void set_nonblocking(int fd) {
    int flags = fcntl(fd, F_GETFL, 0);
    fcntl(fd, F_SETFL, flags | O_NONBLOCK);
}

int main() {
    int listen_fd = socket(AF_INET, SOCK_STREAM, 0);
    set_nonblocking(listen_fd);

    struct sockaddr_in addr = {
        .sin_family = AF_INET,
        .sin_port = htons(8080),
        .sin_addr.s_addr = INADDR_ANY
    };
    bind(listen_fd, (struct sockaddr*)&addr, sizeof(addr));
    listen(listen_fd, SOMAXCONN);

    int epfd = epoll_create1(0);
    struct epoll_event ev = { .events = EPOLLIN, .data.fd = listen_fd };
    epoll_ctl(epfd, EPOLL_CTL_ADD, listen_fd, &ev);

    struct epoll_event events[MAX_EVENTS];
    char buf[BUF_SIZE];

    while (1) {
        int nfds = epoll_wait(epfd, events, MAX_EVENTS, -1);
        for (int i = 0; i < nfds; i++) {
            if (events[i].data.fd == listen_fd) {
                // 新连接到达
                int client_fd = accept(listen_fd, NULL, NULL);
                set_nonblocking(client_fd);
                ev.events = EPOLLIN | EPOLLET;
                ev.data.fd = client_fd;
                epoll_ctl(epfd, EPOLL_CTL_ADD, client_fd, &ev);
            } else {
                // 数据可读
                int fd = events[i].data.fd;
                ssize_t n = read(fd, buf, BUF_SIZE);
                if (n <= 0) {
                    close(fd);
                    epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL);
                } else {
                    write(fd, buf, n);  // echo
                }
            }
        }
    }
}

2.5 macOS kqueue 与 Windows IOCP

kqueue(macOS / FreeBSD)

int kq = kqueue();

struct kevent changes[1];
EV_SET(&changes[0], sockfd, EVFILT_READ, EV_ADD, 0, 0, NULL);

struct kevent events[10];
int nev = kevent(kq, changes, 1, events, 10, NULL);
for (int i = 0; i < nev; i++) {
    if (events[i].filter == EVFILT_READ) {
        handle_read(events[i].ident);
    }
}

IOCP(Windows)

IOCP 是 Proactor 模型的典型实现:

HANDLE iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
// 将 socket 关联到 IOCP
CreateIoCompletionPort((HANDLE)socket, iocp, completion_key, 0);

// 提交异步 I/O 请求
WSABUF buf = { .len = 4096, .buf = buffer };
WSARecv(socket, &buf, 1, NULL, &flags, &overlapped, NULL);

// 等待 I/O 完成
DWORD bytes;
ULONG_PTR key;
OVERLAPPED *ov;
GetQueuedCompletionStatus(iocp, &bytes, &key, &ov, INFINITE);
// bytes 是实际读取的字节数,数据在 buffer 中

跨平台对比表

特性epoll (Linux)kqueue (macOS/BSD)IOCP (Windows)
模型ReactorReactorProactor
事件类型读/写/错误读/写/信号/定时器/文件读/写/接受连接
边缘触发支持(EPOLLET)支持(EV_CLEAR)N/A(完成通知)
线程安全需要锁需要锁内置线程池
代表框架Tokio、libuvlibuv、kqueueIOCP、Boost.Asio

2.6 libuv:Node.js 的引擎

libuv 是一个跨平台的异步 I/O 库,是 Node.js 的底层引擎。

libuv 架构

┌─────────────────────────────────────────────┐
│                JavaScript                    │
│         (你的 Node.js 应用代码)              │
├─────────────────────────────────────────────┤
│                Node.js Core                  │
│           (V8 + C++ 绑定层)                 │
├─────────────────────────────────────────────┤
│                 libuv                        │
│  ┌─────────┐  ┌──────────┐  ┌───────────┐  │
│  │Event Loop│  │Thread Pool│  │  Platform  │  │
│  │ (主线程) │  │(工作线程) │  │  (I/O后端) │  │
│  └─────────┘  └──────────┘  └───────────┘  │
├─────────────────────────────────────────────┤
│     epoll / kqueue / IOCP / poll / select   │
└─────────────────────────────────────────────┘

libuv 事件循环阶段

阶段职责示例
timers执行到期的定时器回调setTimeout()
pending callbacks执行延迟到下一轮循环的 I/O 回调TCP 错误
idle, prepare内部使用-
poll执行 I/O 回调(核心阶段)fs.readFile()
check执行 setImmediate() 回调setImmediate()
close callbacks执行关闭事件的回调socket.on('close')

Node.js 中的事件循环

const fs = require('fs');

// Phase 1: 进入事件循环
console.log('1. Start');

// Phase 2: I/O 回调(poll 阶段执行)
fs.readFile(__filename, () => {
    console.log('4. I/O callback');

    // 微任务(在当前阶段之后立即执行)
    Promise.resolve().then(() => console.log('5. Microtask'));

    // 宏任务(下一个阶段执行)
    setImmediate(() => console.log('6. setImmediate'));
    setTimeout(() => console.log('7. setTimeout'), 0);
});

// Phase 3: 微任务
Promise.resolve().then(() => console.log('3. Microtask'));

// Phase 4: 继续执行
console.log('2. End');

// 输出顺序:
// 1. Start
// 2. End
// 3. Microtask
// 4. I/O callback
// 5. Microtask
// 6. setImmediate
// 7. setTimeout

2.7 io_uring:Linux 异步 I/O 的未来

io_uring 是 Linux 5.1(2019)引入的新一代异步 I/O 接口,旨在解决传统 AIO 的局限性。

核心设计

┌─────────────┐          ┌─────────────┐
│ 应用程序     │          │  内核        │
│             │          │             │
│  ┌───────┐  │  共享内存  │  ┌───────┐  │
│  │ SQ    │──┼──────────┼──│ SQ    │  │
│  │(提交队列)│ │          │  │(内核读) │  │
│  └───────┘  │          │  └───────┘  │
│             │          │             │
│  ┌───────┐  │  共享内存  │  ┌───────┐  │
│  │ CQ    │◄─┼──────────┼──│ CQ    │  │
│  │(完成队列)│ │          │  │(内核写) │  │
│  └───────┘  │          │  └───────┘  │
└─────────────┘          └─────────────┘

- SQ(Submission Queue):应用提交 I/O 请求
- CQ(Completion Queue):内核写入完成结果
- 无需系统调用:通过 mmap 共享内存,减少上下文切换

性能对比

指标epoll + read/writeio_uring
系统调用次数每次 I/O 两次(epoll_wait + read)批量提交,可为零
内核上下文切换每次 I/O 至少一次大幅减少
吞吐量(QPS)基准提升 30%-100%
适用场景通用高吞吐网络/存储

2.8 业务场景:高并发 Web 服务器设计

场景描述

一个电商 API 网关,需要同时处理 50,000 个长连接(WebSocket),每个连接的请求延迟 50-200ms。

架构选型

方案线程数内存QPS复杂度
Thread-per-Connection50,000~50GB~10,000
线程池(200 线程)200~200MB~5,000
事件循环 + 协程~CPU 核数~500MB~100,000
多进程 + 事件循环CPU 核数 × 2~1GB~200,000

推荐方案:多进程(Worker)+ 每进程一个事件循环 + 协程

                    ┌──────────────────┐
                    │   Master 进程    │
                    │  (负载均衡)       │
                    └────────┬─────────┘
                             │
              ┌──────────────┼──────────────┐
              │              │              │
              ▼              ▼              ▼
        ┌─────────┐    ┌─────────┐    ┌─────────┐
        │Worker 0 │    │Worker 1 │    │Worker 2 │
        │ Event   │    │ Event   │    │ Event   │
        │ Loop    │    │ Loop    │    │ Loop    │
        │+ 协程   │    │+ 协程   │    │+ 协程   │
        └─────────┘    └─────────┘    └─────────┘

2.9 本章小结

要点说明
事件循环单线程处理高并发的核心机制
ReactorI/O 就绪时通知(epoll、kqueue)
ProactorI/O 完成时通知(IOCP、io_uring)
LT vs ET水平触发简单可靠,边缘触发性能高
libuv跨平台事件循环库,Node.js 引擎
io_uringLinux 异步 I/O 的未来,零系统调用

下一章预告:了解了事件循环之后,我们将学习最早的异步编程方式——回调函数(Callback),以及它带来的"回调地狱"问题。


扩展阅读