强曰为道
与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

POSIX 标准详解教程 / 第七章:进程间通信(IPC)

第七章:进程间通信(IPC)

掌握 POSIX 进程间通信机制:管道、FIFO、消息队列、共享内存、信号量。


7.1 IPC 概述

7.1.1 为什么需要 IPC

进程拥有独立的地址空间,无法直接访问其他进程的内存。POSIX 提供了多种 IPC 机制:

机制 方向 数据形式 持久性 适用场景
匿名管道 (pipe) 单向 字节流 进程生命期 父子进程通信
命名管道 (FIFO) 单向 字节流 文件系统 无关进程通信
消息队列 双向 消息(带类型) 内核 结构化消息传递
共享内存 原始内存 内核 高速数据共享
信号量 计数器 内核 同步/互斥

7.2 匿名管道 (pipe)

7.2.1 管道特性

  • 单向:一端读,一端写
  • 半双工:数据只能单向流动
  • 字节流:无消息边界
  • 容量有限:通常 64KB(Linux 默认 PIPE_BUF = 4096 字节)
  • 只能用于有亲缘关系的进程(父子进程间)

7.2.2 基本管道用法

/*
 * pipe_basic.c - 父子进程通过管道通信
 * 编译: gcc -Wall -o pipe_basic pipe_basic.c
 */
#define _POSIX_C_SOURCE 200809L
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/wait.h>

int main(void)
{
    int pipefd[2];  /* pipefd[0]=读端, pipefd[1]=写端 */

    if (pipe(pipefd) == -1) {
        perror("pipe");
        return EXIT_FAILURE;
    }

    pid_t pid = fork();
    if (pid == -1) {
        perror("fork");
        return EXIT_FAILURE;
    }

    if (pid == 0) {
        /* 子进程:写入数据 */
        close(pipefd[0]);  /* 关闭读端 */

        const char *messages[] = {
            "Hello from child!",
            "POSIX pipes are simple.",
            "Goodbye!"
        };
        for (int i = 0; i < 3; i++) {
            write(pipefd[1], messages[i], strlen(messages[i]));
            write(pipefd[1], "\n", 1);  /* 消息分隔符 */
            sleep(1);
        }

        close(pipefd[1]);
        _exit(EXIT_SUCCESS);
    } else {
        /* 父进程:读取数据 */
        close(pipefd[1]);  /* 关闭写端 */

        char buf[256];
        ssize_t n;
        while ((n = read(pipefd[0], buf, sizeof(buf) - 1)) > 0) {
            buf[n] = '\0';
            printf("[父进程收到] %s", buf);
        }

        close(pipefd[0]);
        waitpid(pid, NULL, 0);
    }

    return EXIT_SUCCESS;
}
$ ./pipe_basic
[父进程收到] Hello from child!
[父进程收到] POSIX pipes are simple.
[父进程收到] Goodbye!

7.2.3 使用管道连接进程(模拟 Shell 管道)

/*
 * pipe_chain.c - 模拟 "ls | grep .c | wc -l"
 * 编译: gcc -Wall -o pipe_chain pipe_chain.c
 */
#define _POSIX_C_SOURCE 200809L
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/wait.h>

static pid_t run_with_pipes(int in_fd, int out_fd,
                             const char *cmd, char *const argv[])
{
    pid_t pid = fork();
    if (pid == -1) { perror("fork"); return -1; }
    if (pid == 0) {
        if (in_fd != STDIN_FILENO) {
            dup2(in_fd, STDIN_FILENO);
            close(in_fd);
        }
        if (out_fd != STDOUT_FILENO) {
            dup2(out_fd, STDOUT_FILENO);
            close(out_fd);
        }
        execvp(cmd, argv);
        perror(cmd);
        _exit(127);
    }
    return pid;
}

int main(void)
{
    int pipe1[2], pipe2[2];
    pipe(pipe1);
    pipe(pipe2);

    /* ls | grep .c | wc -l */
    char *ls_argv[]   = {"ls", NULL};
    char *grep_argv[] = {"grep", ".c", NULL};
    char *wc_argv[]   = {"wc", "-l", NULL};

    pid_t p1 = run_with_pipes(STDIN_FILENO, pipe1[1], "ls", ls_argv);
    close(pipe1[1]);

    pid_t p2 = run_with_pipes(pipe1[0], pipe2[1], "grep", grep_argv);
    close(pipe1[0]);
    close(pipe2[1]);

    pid_t p3 = run_with_pipes(pipe2[0], STDOUT_FILENO, "wc", wc_argv);
    close(pipe2[0]);

    /* 等待所有子进程 */
    waitpid(p1, NULL, 0);
    waitpid(p2, NULL, 0);
    waitpid(p3, NULL, 0);

    return 0;
}

7.3 命名管道 (FIFO)

7.3.1 FIFO 特性

特性 匿名管道 (pipe) 命名管道 (FIFO)
文件系统存在 ✅ 有路径名
无关进程通信
创建方式 pipe() mkfifo()
持久性 进程生命期 文件系统(需手动删除)

7.3.2 FIFO 写入端

/*
 * fifo_writer.c - FIFO 写入端
 * 编译: gcc -Wall -o fifo_writer fifo_writer.c
 * 运行: 先运行此程序,再运行 fifo_reader
 */
#define _POSIX_C_SOURCE 200809L
#include <stdio.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <unistd.h>
#include <string.h>

int main(void)
{
    const char *fifo_path = "/tmp/posix_demo_fifo";

    /* 创建 FIFO(如果不存在) */
    if (mkfifo(fifo_path, 0666) == -1) {
        perror("mkfifo(可能已存在)");
    }

    printf("打开 FIFO 写入端(等待读取端打开)...\n");
    int fd = open(fifo_path, O_WRONLY);
    if (fd == -1) { perror("open"); return 1; }

    const char *messages[] = {
        "Message 1: Hello FIFO!",
        "Message 2: POSIX IPC",
        "Message 3: Goodbye!",
    };

    for (int i = 0; i < 3; i++) {
        write(fd, messages[i], strlen(messages[i]));
        write(fd, "\n", 1);
        printf("发送: %s\n", messages[i]);
        sleep(1);
    }

    close(fd);
    printf("写入端关闭\n");
    return 0;
}

7.3.3 FIFO 读取端

/*
 * fifo_reader.c - FIFO 读取端
 * 编译: gcc -Wall -o fifo_reader fifo_reader.c
 */
#define _POSIX_C_SOURCE 200809L
#include <stdio.h>
#include <fcntl.h>
#include <unistd.h>

int main(void)
{
    const char *fifo_path = "/tmp/posix_demo_fifo";

    printf("打开 FIFO 读取端(等待写入端打开)...\n");
    int fd = open(fifo_path, O_RDONLY);
    if (fd == -1) { perror("open"); return 1; }

    char buf[256];
    ssize_t n;
    while ((n = read(fd, buf, sizeof(buf) - 1)) > 0) {
        buf[n] = '\0';
        printf("收到: %s", buf);
    }

    close(fd);
    unlink(fifo_path);  /* 清理 FIFO 文件 */
    printf("读取端关闭,FIFO 已删除\n");
    return 0;
}

7.4 POSIX 消息队列 (Message Queue)

7.4.1 特性

特性 说明
消息边界 保留消息边界(不像管道的字节流)
优先级 每条消息有优先级,高优先级先读取
异步通知 支持信号通知(mq_notify
持久性 内核维护,进程关闭后消息仍在

7.4.2 消息队列使用

/*
 * mq_send.c - POSIX 消息队列发送端
 * 编译: gcc -Wall -o mq_send mq_send -lrt
 */
#define _POSIX_C_SOURCE 200809L
#include <stdio.h>
#include <stdlib.h>
#include <mqueue.h>
#include <fcntl.h>
#include <string.h>

int main(void)
{
    const char *mq_name = "/posix_demo_mq";

    /* 打开(创建)消息队列 */
    struct mq_attr attr = {
        .mq_flags = 0,
        .mq_maxmsg = 10,        /* 最大消息数 */
        .mq_msgsize = 256,      /* 单条消息最大字节数 */
        .mq_curmsgs = 0,
    };

    mqd_t mq = mq_open(mq_name, O_CREAT | O_WRONLY, 0644, &attr);
    if (mq == (mqd_t)-1) {
        perror("mq_open");
        return EXIT_FAILURE;
    }

    /* 发送带优先级的消息 */
    struct {
        const char *text;
        unsigned int priority;
    } messages[] = {
        {"Low priority message",    1},
        {"Normal priority message", 5},
        {"High priority message",   10},
        {"Urgent: system alert!",   15},
    };

    for (int i = 0; i < 4; i++) {
        if (mq_send(mq, messages[i].text,
                    strlen(messages[i].text),
                    messages[i].priority) == -1) {
            perror("mq_send");
        } else {
            printf("发送 [优先级 %2d]: %s\n",
                   messages[i].priority, messages[i].text);
        }
    }

    mq_close(mq);
    printf("发送端完成\n");
    return EXIT_SUCCESS;
}
/*
 * mq_receive.c - POSIX 消息队列接收端
 * 编译: gcc -Wall -o mq_receive mq_receive -lrt
 */
#define _POSIX_C_SOURCE 200809L
#include <stdio.h>
#include <stdlib.h>
#include <mqueue.h>
#include <fcntl.h>

int main(void)
{
    const char *mq_name = "/posix_demo_mq";

    mqd_t mq = mq_open(mq_name, O_RDONLY);
    if (mq == (mqd_t)-1) {
        perror("mq_open");
        return EXIT_FAILURE;
    }

    /* 获取队列属性 */
    struct mq_attr attr;
    mq_getattr(mq, &attr);
    printf("队列: 最大消息=%ld, 消息大小=%ld, 当前消息=%ld\n",
           attr.mq_maxmsg, attr.mq_msgsize, attr.mq_curmsgs);

    /* 接收消息(按优先级从高到低) */
    char buf[257];
    unsigned int priority;
    ssize_t n;

    while ((n = mq_receive(mq, buf, sizeof(buf) - 1, &priority)) > 0) {
        buf[n] = '\0';
        printf("收到 [优先级 %2d]: %s\n", priority, buf);
    }

    mq_close(mq);
    mq_unlink(mq_name);  /* 删除消息队列 */
    printf("接收端完成,队列已删除\n");
    return EXIT_SUCCESS;
}
# 终端 1:发送
$ ./mq_send
发送 [优先级  1]: Low priority message
发送 [优先级  5]: Normal priority message
发送 [优先级 10]: High priority message
发送 [优先级 15]: Urgent: system alert!
发送端完成

# 终端 2:接收(注意顺序:按优先级从高到低)
$ ./mq_receive
队列: 最大消息=10, 消息大小=256, 当前消息=4
收到 [优先级 15]: Urgent: system alert!
收到 [优先级 10]: High priority message
收到 [优先级  5]: Normal priority message
收到 [优先级  1]: Low priority message
接收端完成,队列已删除

注意:消息队列需要 -lrt 链接选项。Linux 内核需配置 CONFIG_POSIX_MQUEUE。消息队列挂载在 /dev/mqueue


7.5 POSIX 共享内存 (Shared Memory)

7.5.1 共享内存是最快的 IPC

进程 A 的地址空间            进程 B 的地址空间
┌──────────────┐            ┌──────────────┐
│              │            │              │
│  私有数据    │            │  私有数据    │
│              │            │              │
├──────────────┤            ├──────────────┤
│  共享内存段  │◄── 同一块物理内存 ──►│  共享内存段  │
│              │            │              │
└──────────────┘            └──────────────┘

数据无需拷贝,直接映射到多个进程的地址空间——这是最快的 IPC 方式。

7.5.2 共享内存操作流程

进程 A (写端):                进程 B (读端):
shm_open() → fd               shm_open() → fd
ftruncate() 设置大小
mmap() → 指针                 mmap() → 指针
写入数据 ──────────────────────→ 读取数据
munmap()                      munmap()
shm_unlink() 删除

7.5.3 共享内存写端

/*
 * shm_write.c - POSIX 共享内存写端
 * 编译: gcc -Wall -o shm_write shm_write -lrt
 */
#define _POSIX_C_SOURCE 200809L
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <fcntl.h>
#include <sys/mman.h>
#include <unistd.h>

#define SHM_NAME "/posix_demo_shm"
#define SHM_SIZE 4096

typedef struct {
    int sequence;
    char data[256];
} shared_data_t;

int main(void)
{
    /* 创建共享内存对象 */
    int fd = shm_open(SHM_NAME, O_CREAT | O_RDWR, 0644);
    if (fd == -1) { perror("shm_open"); return EXIT_FAILURE; }

    /* 设置大小 */
    if (ftruncate(fd, SHM_SIZE) == -1) {
        perror("ftruncate");
        return EXIT_FAILURE;
    }

    /* 映射到地址空间 */
    shared_data_t *shared = mmap(NULL, SHM_SIZE,
                                  PROT_READ | PROT_WRITE,
                                  MAP_SHARED, fd, 0);
    if (shared == MAP_FAILED) {
        perror("mmap");
        return EXIT_FAILURE;
    }

    /* 写入数据 */
    for (int i = 0; i < 5; i++) {
        shared->sequence = i;
        snprintf(shared->data, sizeof(shared->data),
                 "Shared message #%d from PID %d", i, getpid());
        printf("写入: seq=%d, data='%s'\n",
               shared->sequence, shared->data);
        sleep(1);
    }

    /* 清理 */
    munmap(shared, SHM_SIZE);
    close(fd);
    shm_unlink(SHM_NAME);
    printf("共享内存已删除\n");

    return EXIT_SUCCESS;
}

7.5.4 共享内存读端

/*
 * shm_read.c - POSIX 共享内存读端
 * 编译: gcc -Wall -o shm_read shm_read -lrt
 * 运行: 先运行 shm_write,再运行 shm_read
 */
#define _POSIX_C_SOURCE 200809L
#include <stdio.h>
#include <stdlib.h>
#include <fcntl.h>
#include <sys/mman.h>
#include <unistd.h>

#define SHM_NAME "/posix_demo_shm"
#define SHM_SIZE 4096

typedef struct {
    int sequence;
    char data[256];
} shared_data_t;

int main(void)
{
    int fd = shm_open(SHM_NAME, O_RDONLY, 0);
    if (fd == -1) { perror("shm_open"); return EXIT_FAILURE; }

    shared_data_t *shared = mmap(NULL, SHM_SIZE,
                                  PROT_READ, MAP_SHARED, fd, 0);
    if (shared == MAP_FAILED) {
        perror("mmap");
        return EXIT_FAILURE;
    }

    /* 轮询读取(实际应用应使用信号量同步) */
    int last_seq = -1;
    for (int i = 0; i < 10; i++) {
        if (shared->sequence != last_seq) {
            printf("读取: seq=%d, data='%s'\n",
                   shared->sequence, shared->data);
            last_seq = shared->sequence;
        }
        usleep(500000);
    }

    munmap(shared, SHM_SIZE);
    close(fd);
    return EXIT_SUCCESS;
}

7.6 POSIX 信号量 (Semaphore)

7.6.1 信号量概念

信号量是一个计数器,用于:

  • 互斥:初始值 1,类似互斥锁
  • 同步:协调多个进程的执行顺序
  • 资源计数:初始值 N,限制同时访问资源的进程数

7.6.2 命名信号量

/*
 * sem_named.c - POSIX 命名信号量用法
 * 编译: gcc -Wall -o sem_named sem_named -lrt
 */
#define _POSIX_C_SOURCE 200809L
#include <stdio.h>
#include <stdlib.h>
#include <semaphore.h>
#include <fcntl.h>
#include <sys/wait.h>
#include <unistd.h>

#define SEM_NAME "/posix_demo_sem"

int main(void)
{
    /* 创建命名信号量,初始值为 1(互斥) */
    sem_t *sem = sem_open(SEM_NAME, O_CREAT | O_EXCL, 0644, 1);
    if (sem == SEM_FAILED) {
        /* 可能已存在,删除后重建 */
        sem_unlink(SEM_NAME);
        sem = sem_open(SEM_NAME, O_CREAT | O_EXCL, 0644, 1);
        if (sem == SEM_FAILED) {
            perror("sem_open");
            return EXIT_FAILURE;
        }
    }

    /* 创建 3 个子进程竞争资源 */
    for (int i = 0; i < 3; i++) {
        if (fork() == 0) {
            printf("  进程 %d: 等待信号量...\n", i);
            sem_wait(sem);  /* P 操作(等待/减 1) */

            /* 临界区 */
            printf("  进程 %d: 进入临界区\n", i);
            sleep(1);
            printf("  进程 %d: 离开临界区\n", i);

            sem_post(sem);  /* V 操作(释放/加 1) */
            _exit(0);
        }
    }

    /* 等待所有子进程 */
    for (int i = 0; i < 3; i++)
        wait(NULL);

    /* 清理 */
    sem_close(sem);
    sem_unlink(SEM_NAME);
    printf("信号量已清理\n");

    return EXIT_SUCCESS;
}

7.6.3 生产者-消费者(使用信号量)

/*
 * sem_prod_cons.c - 使用信号量实现生产者-消费者
 * 编译: gcc -Wall -o sem_prod_cons sem_prod_cons -lrt
 */
#define _POSIX_C_SOURCE 200809L
#include <stdio.h>
#include <stdlib.h>
#include <semaphore.h>
#include <fcntl.h>
#include <unistd.h>
#include <sys/mman.h>
#include <sys/wait.h>

#define BUFFER_SIZE 5
#define ITEMS 10

typedef struct {
    int buffer[BUFFER_SIZE];
    int in;   /* 写入位置 */
    int out;  /* 读取位置 */
} ring_buffer_t;

int main(void)
{
    /* 使用 mmap 创建共享内存(匿名映射简化示例) */
    ring_buffer_t *rb = mmap(NULL, sizeof(ring_buffer_t),
                              PROT_READ | PROT_WRITE,
                              MAP_SHARED | MAP_ANONYMOUS, -1, 0);
    rb->in = 0;
    rb->out = 0;

    /* 信号量:空位数、满位数、互斥 */
    sem_t *empty = sem_open("/demo_empty", O_CREAT | O_EXCL, 0644, BUFFER_SIZE);
    sem_t *full  = sem_open("/demo_full",  O_CREAT | O_EXCL, 0644, 0);
    sem_t *mutex = sem_open("/demo_mutex", O_CREAT | O_EXCL, 0644, 1);

    if (fork() == 0) {
        /* 生产者 */
        for (int i = 0; i < ITEMS; i++) {
            sem_wait(empty);  /* 等待空位 */
            sem_wait(mutex);  /* 进入临界区 */

            rb->buffer[rb->in] = i;
            printf("生产: item=%d, pos=%d\n", i, rb->in);
            rb->in = (rb->in + 1) % BUFFER_SIZE;

            sem_post(mutex);  /* 离开临界区 */
            sem_post(full);   /* 增加满位 */
            usleep(100000);
        }
        _exit(0);
    }

    if (fork() == 0) {
        /* 消费者 */
        for (int i = 0; i < ITEMS; i++) {
            sem_wait(full);   /* 等待数据 */
            sem_wait(mutex);  /* 进入临界区 */

            int item = rb->buffer[rb->out];
            printf("  消费: item=%d, pos=%d\n", item, rb->out);
            rb->out = (rb->out + 1) % BUFFER_SIZE;

            sem_post(mutex);  /* 离开临界区 */
            sem_post(empty);  /* 增加空位 */
            usleep(150000);
        }
        _exit(0);
    }

    /* 父进程等待 */
    wait(NULL);
    wait(NULL);

    /* 清理 */
    sem_close(empty); sem_unlink("/demo_empty");
    sem_close(full);  sem_unlink("/demo_full");
    sem_close(mutex); sem_unlink("/demo_mutex");
    munmap(rb, sizeof(ring_buffer_t));

    printf("生产者-消费者完成\n");
    return 0;
}

7.7 IPC 机制对比

特性 管道 FIFO 消息队列 共享内存 信号量
速度
数据形式 字节流 字节流 消息 原始内存 计数器
亲缘要求 父子
持久性 进程内 文件系统 内核 内核 内核
同步机制 阻塞 阻塞 阻塞 需额外同步 内置
典型用法 Shell 管道 客户端-服务器 任务队列 大数据共享 互斥/同步

7.8 注意事项

⚠️ 管道缓冲区满write() 在管道满时阻塞。使用 F_SETPIPE_SZ(Linux 扩展)调整大小。

⚠️ 共享内存无同步:共享内存本身没有同步机制,必须配合信号量或互斥锁使用,否则会产生数据竞争。

⚠️ 信号量与信号的区别:信号量(sem_t)是同步机制,不是信号(signal)。信号量通过 sem_wait/sem_post 操作,是线程/进程安全的。

⚠️ 消息队列持久性:POSIX 消息队列在内核中持久存在,直到显式删除 (mq_unlink)。程序崩溃后消息不会丢失。

⚠️ 资源泄漏:IPC 对象(管道、共享内存、信号量)使用后必须关闭和删除,否则会残留(查看 /dev/mqueue//dev/shm/)。


7.9 扩展阅读

  1. man 7 pipeman 3 mq_overviewman 7 shm_overviewman 7 sem_overview
  2. APUE 第 15 章:Interprocess Communication
  3. TLPI 第 44-55 章:IPC 系列章节
  4. System V IPCshmget()msgget()semget()——POSIX 推荐替代方案
  5. D-Bus:基于消息的高级 IPC 框架(Linux 桌面)

7.10 本章小结

要点 说明
管道 (pipe) 最简单的 IPC,父子进程间单向通信
FIFO 命名管道,无关进程可通过文件系统路径通信
消息队列 带优先级的消息传递,保留消息边界
共享内存 最快的 IPC,需自行同步
信号量 用于进程/线程间同步和互斥
dup2() 重定向文件描述符,常用于管道连接