强曰为道
与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

POSIX 标准详解教程 / 第六章:I/O 模型

第六章:I/O 模型

掌握 POSIX I/O 模型:阻塞/非阻塞 I/O、read/write、select/poll/epoll、多路复用。


6.1 POSIX I/O 概述

6.1.1 一切皆文件描述符

POSIX 将所有 I/O 统一为文件描述符(File Descriptor, fd) 的读写操作。无论底层是磁盘文件、终端、管道还是网络套接字,都使用同一套 API:

open() → read() / write() / lseek() → close()
socket() → recv() / send() / close()

6.1.2 标准文件描述符

fd名称默认关联
0标准输入STDIN_FILENO键盘
1标准输出STDOUT_FILENO终端
2标准错误STDERR_FILENO终端

6.1.3 I/O 模型分类

POSIX I/O 模型
├── 阻塞 I/O (Blocking I/O)
│   └── read()/write() 在数据未就绪时挂起进程
├── 非阻塞 I/O (Non-blocking I/O)
│   └── read()/write() 立即返回 EAGAIN/EWOULDBLOCK
├── I/O 多路复用 (I/O Multiplexing)
│   ├── select()   ← POSIX 标准
│   ├── poll()     ← POSIX 标准
│   └── epoll      ← Linux 扩展
└── 异步 I/O (Asynchronous I/O)
    └── aio_read()/aio_write() ← POSIX AIO

6.2 基本 I/O 操作

6.2.1 read() 与 write()

/*
 * basic_io.c - 基本 read/write 操作
 * 编译: gcc -Wall -o basic_io basic_io.c
 */
#define _POSIX_C_SOURCE 200809L
#include <stdio.h>
#include <fcntl.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>

/* 安全读取:处理 EINTR 和部分读取 */
static ssize_t safe_read(int fd, void *buf, size_t count)
{
    ssize_t total = 0;
    while ((size_t)total < count) {
        ssize_t n = read(fd, (char *)buf + total, count - total);
        if (n == -1) {
            if (errno == EINTR) continue;  /* 被信号中断,重试 */
            return -1;                     /* 其他错误 */
        }
        if (n == 0) break;  /* EOF */
        total += n;
    }
    return total;
}

/* 安全写入:处理 EINTR 和部分写入 */
static ssize_t safe_write(int fd, const void *buf, size_t count)
{
    ssize_t total = 0;
    while ((size_t)total < count) {
        ssize_t n = write(fd, (const char *)buf + total, count - total);
        if (n == -1) {
            if (errno == EINTR) continue;
            return -1;
        }
        total += n;
    }
    return total;
}

int main(void)
{
    const char *path = "/tmp/posix_io_test.txt";

    /* 创建文件 */
    int fd = open(path, O_WRONLY | O_CREAT | O_TRUNC, 0644);
    if (fd == -1) { perror("open"); return 1; }

    const char *data = "Hello, POSIX I/O!\nThis is line 2.\n";
    ssize_t written = safe_write(fd, data, strlen(data));
    printf("写入 %zd 字节\n", written);
    close(fd);

    /* 读取文件 */
    fd = open(path, O_RDONLY);
    if (fd == -1) { perror("open"); return 1; }

    char buf[256];
    ssize_t nread = safe_read(fd, buf, sizeof(buf) - 1);
    if (nread > 0) {
        buf[nread] = '\0';
        printf("读取 %zd 字节:\n%s", nread, buf);
    }
    close(fd);

    unlink(path);
    return 0;
}

6.2.2 read() 返回值含义

返回值含义
> 0成功读取的字节数
= 0EOF(文件结束或连接关闭)
-1错误(检查 errno

注意read() 可能返回少于请求的字节数(部分读取),不一定意味着 EOF。需要循环读取直到获得所需字节数或遇到 EOF。

6.2.3 分散/聚集 I/O (Scatter/Gather)

/*
 * scatter_gather.c - 使用 readv/writev 进行分散/聚集 I/O
 * 编译: gcc -Wall -o scatter_gather scatter_gather.c
 */
#define _POSIX_C_SOURCE 200809L
#include <stdio.h>
#include <fcntl.h>
#include <sys/uio.h>
#include <unistd.h>
#include <string.h>

int main(void)
{
    const char *path = "/tmp/posix_iovec_test";

    /* 准备多个缓冲区 */
    const char *header = "HEADER:";
    const char *body   = "Hello, scatter/gather I/O!";
    const char *footer = ":FOOTER\n";

    struct iovec iov_w[3] = {
        { .iov_base = (void *)header, .iov_len = strlen(header) },
        { .iov_base = (void *)body,   .iov_len = strlen(body) },
        { .iov_base = (void *)footer, .iov_len = strlen(footer) },
    };

    /* 写入:一次系统调用写入多个缓冲区 */
    int fd = open(path, O_WRONLY | O_CREAT | O_TRUNC, 0644);
    ssize_t nw = writev(fd, iov_w, 3);
    printf("writev 写入 %zd 字节\n", nw);
    close(fd);

    /* 读取:一次系统调用读入多个缓冲区 */
    char hbuf[16], bbuf[128], fbuf[16];
    struct iovec iov_r[3] = {
        { .iov_base = hbuf, .iov_len = sizeof(hbuf) - 1 },
        { .iov_base = bbuf, .iov_len = sizeof(bbuf) - 1 },
        { .iov_base = fbuf, .iov_len = sizeof(fbuf) - 1 },
    };

    fd = open(path, O_RDONLY);
    ssize_t nr = readv(fd, iov_r, 3);
    hbuf[iov_r[0].iov_len < sizeof(hbuf) - 1 ? iov_r[0].iov_len : sizeof(hbuf) - 1] = '\0';
    bbuf[iov_r[1].iov_len < sizeof(bbuf) - 1 ? iov_r[1].iov_len : sizeof(bbuf) - 1] = '\0';
    fbuf[iov_r[2].iov_len < sizeof(fbuf) - 1 ? iov_r[2].iov_len : sizeof(fbuf) - 1] = '\0';
    printf("readv 读取 %zd 字节\n", nr);
    printf("header: '%s'\n", hbuf);
    printf("body:   '%s'\n", bbuf);
    printf("footer: '%s'\n", fbuf);
    close(fd);

    unlink(path);
    return 0;
}

6.3 阻塞 vs 非阻塞 I/O

6.3.1 模型对比

特性阻塞 I/O非阻塞 I/O
无数据时进程挂起等待立即返回 EAGAIN
编程复杂度简单复杂(需轮询)
CPU 利用率低(等待时 CPU 空闲)高(忙等待)
设置方式默认O_NONBLOCK 标志

6.3.2 非阻塞 I/O 示例

/*
 * nonblocking_io.c - 非阻塞 I/O 演示
 * 编译: gcc -Wall -o nonblocking_io nonblocking_io.c
 */
#define _POSIX_C_SOURCE 200809L
#include <stdio.h>
#include <fcntl.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>

int main(void)
{
    int pipefd[2];
    pipe(pipefd);

    /* 设置读端为非阻塞 */
    int flags = fcntl(pipefd[0], F_GETFL);
    fcntl(pipefd[0], F_SETFL, flags | O_NONBLOCK);

    /* 尝试从空管道读取 */
    char buf[64];
    ssize_t n = read(pipefd[0], buf, sizeof(buf));
    if (n == -1) {
        if (errno == EAGAIN || errno == EWOULDBLOCK) {
            printf("管道无数据,read() 立即返回 EAGAIN\n");
        } else {
            perror("read");
        }
    }

    /* 写入数据 */
    write(pipefd[1], "hello", 5);

    /* 再次读取 */
    n = read(pipefd[0], buf, sizeof(buf));
    if (n > 0) {
        buf[n] = '\0';
        printf("读取到数据: '%s' (%zd 字节)\n", buf, n);
    }

    close(pipefd[0]);
    close(pipefd[1]);
    return 0;
}

6.4 select():POSIX 标准多路复用

6.4.1 select() 接口

int select(int nfds,
           fd_set *readfds,      /* 可读 fd 集合 */
           fd_set *writefds,     /* 可写 fd 集合 */
           fd_set *exceptfds,    /* 异常 fd 集合 */
           struct timeval *timeout);
参数说明
nfds最大 fd 值 + 1
readfds监控可读性(NULL 不监控)
writefds监控可写性(NULL 不监控)
exceptfds监控异常(NULL 不监控)
timeout超时(NULL = 永久阻塞,0 = 立即返回)

6.4.2 select() 示例

/*
 * select_demo.c - 使用 select() 监控多个文件描述符
 * 编译: gcc -Wall -o select_demo select_demo.c
 */
#define _POSIX_C_SOURCE 200809L
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/select.h>
#include <string.h>

int main(void)
{
    int pipe1[2], pipe2[2];
    pipe(pipe1);
    pipe(pipe2);

    /* 子进程 1:向 pipe1 写入 */
    if (fork() == 0) {
        close(pipe1[0]);
        sleep(1);
        write(pipe1[1], "Data from child 1\n", 18);
        close(pipe1[1]);
        _exit(0);
    }

    /* 子进程 2:向 pipe2 写入 */
    if (fork() == 0) {
        close(pipe2[0]);
        sleep(2);
        write(pipe2[1], "Data from child 2\n", 18);
        close(pipe2[1]);
        _exit(0);
    }

    close(pipe1[1]);
    close(pipe2[1]);

    /* 父进程:使用 select() 同时监控两个管道 */
    fd_set rfds;
    int maxfd = (pipe1[0] > pipe2[0] ? pipe1[0] : pipe2[0]) + 1;
    int pipes_alive = 2;

    while (pipes_alive > 0) {
        FD_ZERO(&rfds);
        FD_SET(pipe1[0], &rfds);
        FD_SET(pipe2[0], &rfds);

        struct timeval tv = { .tv_sec = 5, .tv_usec = 0 };
        int ready = select(maxfd, &rfds, NULL, NULL, &tv);

        if (ready == -1) {
            perror("select");
            break;
        }
        if (ready == 0) {
            printf("超时\n");
            break;
        }

        char buf[128];
        if (FD_ISSET(pipe1[0], &rfds)) {
            ssize_t n = read(pipe1[0], buf, sizeof(buf) - 1);
            if (n > 0) { buf[n] = '\0'; printf("[pipe1] %s", buf); }
            else { close(pipe1[0]); pipes_alive--; }
        }
        if (FD_ISSET(pipe2[0], &rfds)) {
            ssize_t n = read(pipe2[0], buf, sizeof(buf) - 1);
            if (n > 0) { buf[n] = '\0'; printf("[pipe2] %s", buf); }
            else { close(pipe2[0]); pipes_alive--; }
        }
    }

    close(pipe1[0]);
    close(pipe2[0]);
    return 0;
}

6.4.3 select() 的局限性

限制说明
fd 数量上限FD_SETSIZE(通常为 1024)
每次调用需重置fd_set 会被内核修改,需要重新设置
O(n) 扫描需要遍历整个 fd_set 检查就绪 fd
拷贝开销每次调用需将 fd_set 拷贝到内核空间

6.5 poll():改进的多路复用

6.5.1 poll() 接口

int poll(struct pollfd fds[], nfds_t nfds, int timeout);
struct pollfd {
    int   fd;        /* 文件描述符 */
    short events;    /* 关注的事件 */
    short revents;   /* 返回的事件 */
};

6.5.2 poll 事件标志

事件说明
POLLIN有数据可读(含普通数据和优先数据)
POLLOUT可以写入
POLLERR错误(仅 revents)
POLLHUP挂断(仅 revents)
POLLNVAL无效 fd(仅 revents)
POLLPRI优先数据可读(如 TCP 带外数据)

6.5.3 poll() 示例

/*
 * poll_demo.c - 使用 poll() 监控多个管道
 * 编译: gcc -Wall -o poll_demo poll_demo.c
 */
#define _POSIX_C_SOURCE 200809L
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <poll.h>
#include <string.h>

#define NUM_PIPES 3

int main(void)
{
    int pipes[NUM_PIPES][2];
    struct pollfd pfds[NUM_PIPES];

    /* 创建管道并设置 poll 监控 */
    for (int i = 0; i < NUM_PIPES; i++) {
        pipe(pipes[i]);
        pfds[i].fd = pipes[i][0];      /* 监控读端 */
        pfds[i].events = POLLIN;
        pfds[i].revents = 0;
    }

    /* 子进程:分别向管道写入数据 */
    for (int i = 0; i < NUM_PIPES; i++) {
        if (fork() == 0) {
            close(pipes[i][0]);  /* 关闭读端 */
            sleep(i + 1);
            char msg[64];
            int len = snprintf(msg, sizeof(msg), "Data from pipe %d\n", i);
            write(pipes[i][1], msg, len);
            close(pipes[i][1]);
            _exit(0);
        }
        close(pipes[i][1]);  /* 父进程关闭写端 */
    }

    /* 父进程:使用 poll() 等待数据 */
    int active = NUM_PIPES;
    while (active > 0) {
        int ready = poll(pfds, NUM_PIPES, 5000);
        if (ready == -1) { perror("poll"); break; }
        if (ready == 0) { printf("超时\n"); break; }

        for (int i = 0; i < NUM_PIPES; i++) {
            if (pfds[i].revents & POLLIN) {
                char buf[128];
                ssize_t n = read(pfds[i].fd, buf, sizeof(buf) - 1);
                if (n > 0) {
                    buf[n] = '\0';
                    printf("[pipe %d] %s", i, buf);
                }
                if (n <= 0) {
                    close(pfds[i].fd);
                    pfds[i].fd = -1;
                    active--;
                }
            }
        }
    }

    printf("所有管道数据读取完毕\n");
    return 0;
}

6.5.4 select() vs poll() 对比

对比项select()poll()
fd 数量限制FD_SETSIZE (1024)无硬限制
事件模型可读/可写/异常分离集合统一 events/revents
精度微秒(timeval)毫秒
参数修改每次需重置 fd_setevents 不变,revents 由内核设置
效率O(n)O(n)
可移植性更广泛POSIX.1-2001

6.6 epoll:Linux 高性能 I/O 多路复用

6.6.1 epoll 的优势

注意epoll 是 Linux 专有扩展,不是 POSIX 标准。但在 Linux 高性能编程中至关重要。

特性select/pollepoll
复杂度O(n)(每次遍历所有 fd)O(1)(就绪事件通知)
fd 数量受限支持百万级
内核数据结构每次拷贝 fd_set红黑树 + 就绪链表
触发模式水平触发 (LT)LT 和边缘触发 (ET)

6.6.2 epoll 使用模式

/*
 * epoll_demo.c - 使用 epoll 监控管道读取
 * 编译: gcc -Wall -o epoll_demo epoll_demo.c
 * 注意: epoll 是 Linux 扩展,非 POSIX 标准
 */
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/epoll.h>
#include <string.h>

#define MAX_EVENTS 10
#define NUM_PIPES  3

int main(void)
{
    int pipes[NUM_PIPES][2];

    /* 创建管道 */
    for (int i = 0; i < NUM_PIPES; i++)
        pipe(pipes[i]);

    /* 创建 epoll 实例 */
    int epfd = epoll_create1(0);
    if (epfd == -1) { perror("epoll_create1"); return 1; }

    /* 注册管道读端 */
    struct epoll_event ev;
    for (int i = 0; i < NUM_PIPES; i++) {
        ev.events = EPOLLIN;
        ev.data.fd = pipes[i][0];
        epoll_ctl(epfd, EPOLL_CTL_ADD, pipes[i][0], &ev);
    }

    /* 子进程写入数据 */
    for (int i = 0; i < NUM_PIPES; i++) {
        if (fork() == 0) {
            close(pipes[i][0]);
            sleep(i + 1);
            char msg[64];
            int len = snprintf(msg, sizeof(msg), "Pipe %d data\n", i);
            write(pipes[i][1], msg, len);
            close(pipes[i][1]);
            _exit(0);
        }
        close(pipes[i][1]);
    }

    /* 等待事件 */
    struct epoll_event events[MAX_EVENTS];
    int active = NUM_PIPES;

    while (active > 0) {
        int nfds = epoll_wait(epfd, events, MAX_EVENTS, 5000);
        if (nfds == -1) { perror("epoll_wait"); break; }
        if (nfds == 0) { printf("超时\n"); break; }

        for (int i = 0; i < nfds; i++) {
            char buf[128];
            ssize_t n = read(events[i].data.fd, buf, sizeof(buf) - 1);
            if (n > 0) {
                buf[n] = '\0';
                printf("[epoll] fd=%d: %s", events[i].data.fd, buf);
            }
            if (n <= 0) {
                close(events[i].data.fd);
                active--;
            }
        }
    }

    close(epfd);
    printf("所有数据读取完毕\n");
    return 0;
}

6.6.3 触发模式

模式说明特点
LT (Level Triggered)水平触发(默认)只要 fd 就绪就持续通知,编程简单
ET (Edge Triggered)边缘触发仅在状态变化时通知,需一次性读完所有数据
/* 边缘触发模式:必须使用非阻塞 fd */
ev.events = EPOLLIN | EPOLLET;  /* ET 模式 */
fcntl(fd, F_SETFL, O_NONBLOCK); /* 必须非阻塞 */

/* ET 模式下必须循环读取直到 EAGAIN */
while (1) {
    ssize_t n = read(fd, buf, sizeof(buf));
    if (n == -1) {
        if (errno == EAGAIN) break;  /* 数据读完 */
        perror("read");
        break;
    }
    if (n == 0) break;  /* EOF */
    process(buf, n);
}

6.7 标准 I/O (stdio) vs 文件描述符 I/O

特性文件描述符 I/O标准 I/O (stdio)
缓冲无缓冲有缓冲(行缓冲/全缓冲)
函数read()/write()fread()/fwrite()
性能系统调用频繁减少系统调用次数
线程安全需自行同步内置锁(FILE*)
适用场景网络编程、管道、二进制 I/O文件处理、行文本处理

6.7.1 缓冲模式

模式设置函数触发条件
无缓冲setbuf(stream, NULL)每次 I/O 立即调用 read/write
行缓冲默认(终端)遇到 \n 时刷新
全缓冲默认(文件)缓冲区满时刷新
/* 手动刷新缓冲区 */
fflush(stdout);          /* 刷新 stdout 缓冲区 */
setvbuf(stdout, NULL, _IONBF, 0);  /* 设置无缓冲 */

6.8 业务场景:并发回声服务器

/*
 * echo_server.c - 使用 poll() 的并发回声服务器
 * 编译: gcc -Wall -o echo_server echo_server.c
 * 测试: echo "hello" | nc localhost 8080
 */
#define _POSIX_C_SOURCE 200809L
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <poll.h>
#include <errno.h>

#define PORT 8080
#define MAX_CLIENTS 128

int main(void)
{
    /* 创建服务器 socket */
    int server_fd = socket(AF_INET, SOCK_STREAM, 0);
    int opt = 1;
    setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));

    struct sockaddr_in addr = {
        .sin_family = AF_INET,
        .sin_port = htons(PORT),
        .sin_addr.s_addr = INADDR_ANY,
    };
    bind(server_fd, (struct sockaddr *)&addr, sizeof(addr));
    listen(server_fd, 128);

    /* poll 数组 */
    struct pollfd fds[MAX_CLIENTS];
    int nfds = 1;
    fds[0].fd = server_fd;
    fds[0].events = POLLIN;

    printf("回声服务器启动,端口 %d\n", PORT);

    while (1) {
        int ready = poll(fds, nfds, -1);
        if (ready == -1) { if (errno == EINTR) continue; perror("poll"); break; }

        /* 检查新连接 */
        if (fds[0].revents & POLLIN) {
            int client_fd = accept(server_fd, NULL, NULL);
            if (client_fd >= 0 && nfds < MAX_CLIENTS) {
                fds[nfds].fd = client_fd;
                fds[nfds].events = POLLIN;
                printf("新连接 fd=%d (总计 %d)\n", client_fd, nfds);
                nfds++;
            } else if (client_fd >= 0) {
                close(client_fd);
            }
        }

        /* 检查客户端数据 */
        for (int i = 1; i < nfds; i++) {
            if (fds[i].revents & POLLIN) {
                char buf[1024];
                ssize_t n = read(fds[i].fd, buf, sizeof(buf));
                if (n <= 0) {
                    printf("断开 fd=%d\n", fds[i].fd);
                    close(fds[i].fd);
                    fds[i] = fds[nfds - 1];
                    nfds--;
                    i--;
                } else {
                    /* 回声:将数据发回 */
                    write(fds[i].fd, buf, n);
                }
            }
        }
    }

    close(server_fd);
    return 0;
}

6.9 注意事项

⚠️ 部分读写read()/write() 可能返回少于请求的字节数。循环调用直到读写完成或遇到错误。

⚠️ EINTR 处理:所有慢速系统调用都可能被信号中断并返回 EINTR。要么使用 SA_RESTART,要么手动重试。

⚠️ select() fd_set 限制FD_SETSIZE 通常为 1024。超过此限制需要使用 poll()epoll

⚠️ epoll 不是 POSIXepoll 是 Linux 专有扩展。跨平台程序应使用 poll() 或封装层(如 libevent、libuv)。

⚠️ 非阻塞 I/O + 边缘触发:ET 模式必须配合非阻塞 fd 使用,且必须循环读取直到 EAGAIN


6.10 扩展阅读

  1. man 2 readman 2 writeman 2 pollman 2 select
  2. man 7 epoll — Linux epoll 详解
  3. APUE 第 14 章:Advanced I/O
  4. TLPI 第 63 章:Alternative I/O Models
  5. 《Unix Network Programming》 — W. Richard Stevens 著,网络编程经典
  6. libevent / libuv:跨平台 I/O 多路复用库

6.11 本章小结

要点说明
文件描述符POSIX I/O 的统一抽象
read/write基本 I/O,可能部分读写,可能被信号中断
select()POSIX 标准,fd 上限 1024,O(n) 扫描
poll()POSIX 标准,无 fd 上限,仍为 O(n)
epollLinux 扩展,O(1) 通知,支持 LT/ET
缓冲 I/Ostdio 缓冲减少系统调用,但需注意 fflush