强曰为道
与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

POSIX 标准详解教程 / 第七章:进程间通信(IPC)

第七章:进程间通信(IPC)

掌握 POSIX 进程间通信机制:管道、FIFO、消息队列、共享内存、信号量。


7.1 IPC 概述

7.1.1 为什么需要 IPC

进程拥有独立的地址空间,无法直接访问其他进程的内存。POSIX 提供了多种 IPC 机制:

机制方向数据形式持久性适用场景
匿名管道 (pipe)单向字节流进程生命期父子进程通信
命名管道 (FIFO)单向字节流文件系统无关进程通信
消息队列双向消息(带类型)内核结构化消息传递
共享内存原始内存内核高速数据共享
信号量计数器内核同步/互斥

7.2 匿名管道 (pipe)

7.2.1 管道特性

  • 单向:一端读,一端写
  • 半双工:数据只能单向流动
  • 字节流:无消息边界
  • 容量有限:通常 64KB(Linux 默认 PIPE_BUF = 4096 字节)
  • 只能用于有亲缘关系的进程(父子进程间)

7.2.2 基本管道用法

/*
 * pipe_basic.c - 父子进程通过管道通信
 * 编译: gcc -Wall -o pipe_basic pipe_basic.c
 */
#define _POSIX_C_SOURCE 200809L
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/wait.h>

int main(void)
{
    int pipefd[2];  /* pipefd[0]=读端, pipefd[1]=写端 */

    if (pipe(pipefd) == -1) {
        perror("pipe");
        return EXIT_FAILURE;
    }

    pid_t pid = fork();
    if (pid == -1) {
        perror("fork");
        return EXIT_FAILURE;
    }

    if (pid == 0) {
        /* 子进程:写入数据 */
        close(pipefd[0]);  /* 关闭读端 */

        const char *messages[] = {
            "Hello from child!",
            "POSIX pipes are simple.",
            "Goodbye!"
        };
        for (int i = 0; i < 3; i++) {
            write(pipefd[1], messages[i], strlen(messages[i]));
            write(pipefd[1], "\n", 1);  /* 消息分隔符 */
            sleep(1);
        }

        close(pipefd[1]);
        _exit(EXIT_SUCCESS);
    } else {
        /* 父进程:读取数据 */
        close(pipefd[1]);  /* 关闭写端 */

        char buf[256];
        ssize_t n;
        while ((n = read(pipefd[0], buf, sizeof(buf) - 1)) > 0) {
            buf[n] = '\0';
            printf("[父进程收到] %s", buf);
        }

        close(pipefd[0]);
        waitpid(pid, NULL, 0);
    }

    return EXIT_SUCCESS;
}
$ ./pipe_basic
[父进程收到] Hello from child!
[父进程收到] POSIX pipes are simple.
[父进程收到] Goodbye!

7.2.3 使用管道连接进程(模拟 Shell 管道)

/*
 * pipe_chain.c - 模拟 "ls | grep .c | wc -l"
 * 编译: gcc -Wall -o pipe_chain pipe_chain.c
 */
#define _POSIX_C_SOURCE 200809L
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/wait.h>

static pid_t run_with_pipes(int in_fd, int out_fd,
                             const char *cmd, char *const argv[])
{
    pid_t pid = fork();
    if (pid == -1) { perror("fork"); return -1; }
    if (pid == 0) {
        if (in_fd != STDIN_FILENO) {
            dup2(in_fd, STDIN_FILENO);
            close(in_fd);
        }
        if (out_fd != STDOUT_FILENO) {
            dup2(out_fd, STDOUT_FILENO);
            close(out_fd);
        }
        execvp(cmd, argv);
        perror(cmd);
        _exit(127);
    }
    return pid;
}

int main(void)
{
    int pipe1[2], pipe2[2];
    pipe(pipe1);
    pipe(pipe2);

    /* ls | grep .c | wc -l */
    char *ls_argv[]   = {"ls", NULL};
    char *grep_argv[] = {"grep", ".c", NULL};
    char *wc_argv[]   = {"wc", "-l", NULL};

    pid_t p1 = run_with_pipes(STDIN_FILENO, pipe1[1], "ls", ls_argv);
    close(pipe1[1]);

    pid_t p2 = run_with_pipes(pipe1[0], pipe2[1], "grep", grep_argv);
    close(pipe1[0]);
    close(pipe2[1]);

    pid_t p3 = run_with_pipes(pipe2[0], STDOUT_FILENO, "wc", wc_argv);
    close(pipe2[0]);

    /* 等待所有子进程 */
    waitpid(p1, NULL, 0);
    waitpid(p2, NULL, 0);
    waitpid(p3, NULL, 0);

    return 0;
}

7.3 命名管道 (FIFO)

7.3.1 FIFO 特性

特性匿名管道 (pipe)命名管道 (FIFO)
文件系统存在✅ 有路径名
无关进程通信
创建方式pipe()mkfifo()
持久性进程生命期文件系统(需手动删除)

7.3.2 FIFO 写入端

/*
 * fifo_writer.c - FIFO 写入端
 * 编译: gcc -Wall -o fifo_writer fifo_writer.c
 * 运行: 先运行此程序,再运行 fifo_reader
 */
#define _POSIX_C_SOURCE 200809L
#include <stdio.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <unistd.h>
#include <string.h>

int main(void)
{
    const char *fifo_path = "/tmp/posix_demo_fifo";

    /* 创建 FIFO(如果不存在) */
    if (mkfifo(fifo_path, 0666) == -1) {
        perror("mkfifo(可能已存在)");
    }

    printf("打开 FIFO 写入端(等待读取端打开)...\n");
    int fd = open(fifo_path, O_WRONLY);
    if (fd == -1) { perror("open"); return 1; }

    const char *messages[] = {
        "Message 1: Hello FIFO!",
        "Message 2: POSIX IPC",
        "Message 3: Goodbye!",
    };

    for (int i = 0; i < 3; i++) {
        write(fd, messages[i], strlen(messages[i]));
        write(fd, "\n", 1);
        printf("发送: %s\n", messages[i]);
        sleep(1);
    }

    close(fd);
    printf("写入端关闭\n");
    return 0;
}

7.3.3 FIFO 读取端

/*
 * fifo_reader.c - FIFO 读取端
 * 编译: gcc -Wall -o fifo_reader fifo_reader.c
 */
#define _POSIX_C_SOURCE 200809L
#include <stdio.h>
#include <fcntl.h>
#include <unistd.h>

int main(void)
{
    const char *fifo_path = "/tmp/posix_demo_fifo";

    printf("打开 FIFO 读取端(等待写入端打开)...\n");
    int fd = open(fifo_path, O_RDONLY);
    if (fd == -1) { perror("open"); return 1; }

    char buf[256];
    ssize_t n;
    while ((n = read(fd, buf, sizeof(buf) - 1)) > 0) {
        buf[n] = '\0';
        printf("收到: %s", buf);
    }

    close(fd);
    unlink(fifo_path);  /* 清理 FIFO 文件 */
    printf("读取端关闭,FIFO 已删除\n");
    return 0;
}

7.4 POSIX 消息队列 (Message Queue)

7.4.1 特性

特性说明
消息边界保留消息边界(不像管道的字节流)
优先级每条消息有优先级,高优先级先读取
异步通知支持信号通知(mq_notify
持久性内核维护,进程关闭后消息仍在

7.4.2 消息队列使用

/*
 * mq_send.c - POSIX 消息队列发送端
 * 编译: gcc -Wall -o mq_send mq_send -lrt
 */
#define _POSIX_C_SOURCE 200809L
#include <stdio.h>
#include <stdlib.h>
#include <mqueue.h>
#include <fcntl.h>
#include <string.h>

int main(void)
{
    const char *mq_name = "/posix_demo_mq";

    /* 打开(创建)消息队列 */
    struct mq_attr attr = {
        .mq_flags = 0,
        .mq_maxmsg = 10,        /* 最大消息数 */
        .mq_msgsize = 256,      /* 单条消息最大字节数 */
        .mq_curmsgs = 0,
    };

    mqd_t mq = mq_open(mq_name, O_CREAT | O_WRONLY, 0644, &attr);
    if (mq == (mqd_t)-1) {
        perror("mq_open");
        return EXIT_FAILURE;
    }

    /* 发送带优先级的消息 */
    struct {
        const char *text;
        unsigned int priority;
    } messages[] = {
        {"Low priority message",    1},
        {"Normal priority message", 5},
        {"High priority message",   10},
        {"Urgent: system alert!",   15},
    };

    for (int i = 0; i < 4; i++) {
        if (mq_send(mq, messages[i].text,
                    strlen(messages[i].text),
                    messages[i].priority) == -1) {
            perror("mq_send");
        } else {
            printf("发送 [优先级 %2d]: %s\n",
                   messages[i].priority, messages[i].text);
        }
    }

    mq_close(mq);
    printf("发送端完成\n");
    return EXIT_SUCCESS;
}
/*
 * mq_receive.c - POSIX 消息队列接收端
 * 编译: gcc -Wall -o mq_receive mq_receive -lrt
 */
#define _POSIX_C_SOURCE 200809L
#include <stdio.h>
#include <stdlib.h>
#include <mqueue.h>
#include <fcntl.h>

int main(void)
{
    const char *mq_name = "/posix_demo_mq";

    mqd_t mq = mq_open(mq_name, O_RDONLY);
    if (mq == (mqd_t)-1) {
        perror("mq_open");
        return EXIT_FAILURE;
    }

    /* 获取队列属性 */
    struct mq_attr attr;
    mq_getattr(mq, &attr);
    printf("队列: 最大消息=%ld, 消息大小=%ld, 当前消息=%ld\n",
           attr.mq_maxmsg, attr.mq_msgsize, attr.mq_curmsgs);

    /* 接收消息(按优先级从高到低) */
    char buf[257];
    unsigned int priority;
    ssize_t n;

    while ((n = mq_receive(mq, buf, sizeof(buf) - 1, &priority)) > 0) {
        buf[n] = '\0';
        printf("收到 [优先级 %2d]: %s\n", priority, buf);
    }

    mq_close(mq);
    mq_unlink(mq_name);  /* 删除消息队列 */
    printf("接收端完成,队列已删除\n");
    return EXIT_SUCCESS;
}
# 终端 1:发送
$ ./mq_send
发送 [优先级  1]: Low priority message
发送 [优先级  5]: Normal priority message
发送 [优先级 10]: High priority message
发送 [优先级 15]: Urgent: system alert!
发送端完成

# 终端 2:接收(注意顺序:按优先级从高到低)
$ ./mq_receive
队列: 最大消息=10, 消息大小=256, 当前消息=4
收到 [优先级 15]: Urgent: system alert!
收到 [优先级 10]: High priority message
收到 [优先级  5]: Normal priority message
收到 [优先级  1]: Low priority message
接收端完成,队列已删除

注意:消息队列需要 -lrt 链接选项。Linux 内核需配置 CONFIG_POSIX_MQUEUE。消息队列挂载在 /dev/mqueue


7.5 POSIX 共享内存 (Shared Memory)

7.5.1 共享内存是最快的 IPC

进程 A 的地址空间            进程 B 的地址空间
┌──────────────┐            ┌──────────────┐
│              │            │              │
│  私有数据    │            │  私有数据    │
│              │            │              │
├──────────────┤            ├──────────────┤
│  共享内存段  │◄── 同一块物理内存 ──►│  共享内存段  │
│              │            │              │
└──────────────┘            └──────────────┘

数据无需拷贝,直接映射到多个进程的地址空间——这是最快的 IPC 方式。

7.5.2 共享内存操作流程

进程 A (写端):                进程 B (读端):
shm_open() → fd               shm_open() → fd
ftruncate() 设置大小
mmap() → 指针                 mmap() → 指针
写入数据 ──────────────────────→ 读取数据
munmap()                      munmap()
shm_unlink() 删除

7.5.3 共享内存写端

/*
 * shm_write.c - POSIX 共享内存写端
 * 编译: gcc -Wall -o shm_write shm_write -lrt
 */
#define _POSIX_C_SOURCE 200809L
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <fcntl.h>
#include <sys/mman.h>
#include <unistd.h>

#define SHM_NAME "/posix_demo_shm"
#define SHM_SIZE 4096

typedef struct {
    int sequence;
    char data[256];
} shared_data_t;

int main(void)
{
    /* 创建共享内存对象 */
    int fd = shm_open(SHM_NAME, O_CREAT | O_RDWR, 0644);
    if (fd == -1) { perror("shm_open"); return EXIT_FAILURE; }

    /* 设置大小 */
    if (ftruncate(fd, SHM_SIZE) == -1) {
        perror("ftruncate");
        return EXIT_FAILURE;
    }

    /* 映射到地址空间 */
    shared_data_t *shared = mmap(NULL, SHM_SIZE,
                                  PROT_READ | PROT_WRITE,
                                  MAP_SHARED, fd, 0);
    if (shared == MAP_FAILED) {
        perror("mmap");
        return EXIT_FAILURE;
    }

    /* 写入数据 */
    for (int i = 0; i < 5; i++) {
        shared->sequence = i;
        snprintf(shared->data, sizeof(shared->data),
                 "Shared message #%d from PID %d", i, getpid());
        printf("写入: seq=%d, data='%s'\n",
               shared->sequence, shared->data);
        sleep(1);
    }

    /* 清理 */
    munmap(shared, SHM_SIZE);
    close(fd);
    shm_unlink(SHM_NAME);
    printf("共享内存已删除\n");

    return EXIT_SUCCESS;
}

7.5.4 共享内存读端

/*
 * shm_read.c - POSIX 共享内存读端
 * 编译: gcc -Wall -o shm_read shm_read -lrt
 * 运行: 先运行 shm_write,再运行 shm_read
 */
#define _POSIX_C_SOURCE 200809L
#include <stdio.h>
#include <stdlib.h>
#include <fcntl.h>
#include <sys/mman.h>
#include <unistd.h>

#define SHM_NAME "/posix_demo_shm"
#define SHM_SIZE 4096

typedef struct {
    int sequence;
    char data[256];
} shared_data_t;

int main(void)
{
    int fd = shm_open(SHM_NAME, O_RDONLY, 0);
    if (fd == -1) { perror("shm_open"); return EXIT_FAILURE; }

    shared_data_t *shared = mmap(NULL, SHM_SIZE,
                                  PROT_READ, MAP_SHARED, fd, 0);
    if (shared == MAP_FAILED) {
        perror("mmap");
        return EXIT_FAILURE;
    }

    /* 轮询读取(实际应用应使用信号量同步) */
    int last_seq = -1;
    for (int i = 0; i < 10; i++) {
        if (shared->sequence != last_seq) {
            printf("读取: seq=%d, data='%s'\n",
                   shared->sequence, shared->data);
            last_seq = shared->sequence;
        }
        usleep(500000);
    }

    munmap(shared, SHM_SIZE);
    close(fd);
    return EXIT_SUCCESS;
}

7.6 POSIX 信号量 (Semaphore)

7.6.1 信号量概念

信号量是一个计数器,用于:

  • 互斥:初始值 1,类似互斥锁
  • 同步:协调多个进程的执行顺序
  • 资源计数:初始值 N,限制同时访问资源的进程数

7.6.2 命名信号量

/*
 * sem_named.c - POSIX 命名信号量用法
 * 编译: gcc -Wall -o sem_named sem_named -lrt
 */
#define _POSIX_C_SOURCE 200809L
#include <stdio.h>
#include <stdlib.h>
#include <semaphore.h>
#include <fcntl.h>
#include <sys/wait.h>
#include <unistd.h>

#define SEM_NAME "/posix_demo_sem"

int main(void)
{
    /* 创建命名信号量,初始值为 1(互斥) */
    sem_t *sem = sem_open(SEM_NAME, O_CREAT | O_EXCL, 0644, 1);
    if (sem == SEM_FAILED) {
        /* 可能已存在,删除后重建 */
        sem_unlink(SEM_NAME);
        sem = sem_open(SEM_NAME, O_CREAT | O_EXCL, 0644, 1);
        if (sem == SEM_FAILED) {
            perror("sem_open");
            return EXIT_FAILURE;
        }
    }

    /* 创建 3 个子进程竞争资源 */
    for (int i = 0; i < 3; i++) {
        if (fork() == 0) {
            printf("  进程 %d: 等待信号量...\n", i);
            sem_wait(sem);  /* P 操作(等待/减 1) */

            /* 临界区 */
            printf("  进程 %d: 进入临界区\n", i);
            sleep(1);
            printf("  进程 %d: 离开临界区\n", i);

            sem_post(sem);  /* V 操作(释放/加 1) */
            _exit(0);
        }
    }

    /* 等待所有子进程 */
    for (int i = 0; i < 3; i++)
        wait(NULL);

    /* 清理 */
    sem_close(sem);
    sem_unlink(SEM_NAME);
    printf("信号量已清理\n");

    return EXIT_SUCCESS;
}

7.6.3 生产者-消费者(使用信号量)

/*
 * sem_prod_cons.c - 使用信号量实现生产者-消费者
 * 编译: gcc -Wall -o sem_prod_cons sem_prod_cons -lrt
 */
#define _POSIX_C_SOURCE 200809L
#include <stdio.h>
#include <stdlib.h>
#include <semaphore.h>
#include <fcntl.h>
#include <unistd.h>
#include <sys/mman.h>
#include <sys/wait.h>

#define BUFFER_SIZE 5
#define ITEMS 10

typedef struct {
    int buffer[BUFFER_SIZE];
    int in;   /* 写入位置 */
    int out;  /* 读取位置 */
} ring_buffer_t;

int main(void)
{
    /* 使用 mmap 创建共享内存(匿名映射简化示例) */
    ring_buffer_t *rb = mmap(NULL, sizeof(ring_buffer_t),
                              PROT_READ | PROT_WRITE,
                              MAP_SHARED | MAP_ANONYMOUS, -1, 0);
    rb->in = 0;
    rb->out = 0;

    /* 信号量:空位数、满位数、互斥 */
    sem_t *empty = sem_open("/demo_empty", O_CREAT | O_EXCL, 0644, BUFFER_SIZE);
    sem_t *full  = sem_open("/demo_full",  O_CREAT | O_EXCL, 0644, 0);
    sem_t *mutex = sem_open("/demo_mutex", O_CREAT | O_EXCL, 0644, 1);

    if (fork() == 0) {
        /* 生产者 */
        for (int i = 0; i < ITEMS; i++) {
            sem_wait(empty);  /* 等待空位 */
            sem_wait(mutex);  /* 进入临界区 */

            rb->buffer[rb->in] = i;
            printf("生产: item=%d, pos=%d\n", i, rb->in);
            rb->in = (rb->in + 1) % BUFFER_SIZE;

            sem_post(mutex);  /* 离开临界区 */
            sem_post(full);   /* 增加满位 */
            usleep(100000);
        }
        _exit(0);
    }

    if (fork() == 0) {
        /* 消费者 */
        for (int i = 0; i < ITEMS; i++) {
            sem_wait(full);   /* 等待数据 */
            sem_wait(mutex);  /* 进入临界区 */

            int item = rb->buffer[rb->out];
            printf("  消费: item=%d, pos=%d\n", item, rb->out);
            rb->out = (rb->out + 1) % BUFFER_SIZE;

            sem_post(mutex);  /* 离开临界区 */
            sem_post(empty);  /* 增加空位 */
            usleep(150000);
        }
        _exit(0);
    }

    /* 父进程等待 */
    wait(NULL);
    wait(NULL);

    /* 清理 */
    sem_close(empty); sem_unlink("/demo_empty");
    sem_close(full);  sem_unlink("/demo_full");
    sem_close(mutex); sem_unlink("/demo_mutex");
    munmap(rb, sizeof(ring_buffer_t));

    printf("生产者-消费者完成\n");
    return 0;
}

7.7 IPC 机制对比

特性管道FIFO消息队列共享内存信号量
速度
数据形式字节流字节流消息原始内存计数器
亲缘要求父子
持久性进程内文件系统内核内核内核
同步机制阻塞阻塞阻塞需额外同步内置
典型用法Shell 管道客户端-服务器任务队列大数据共享互斥/同步

7.8 注意事项

⚠️ 管道缓冲区满write() 在管道满时阻塞。使用 F_SETPIPE_SZ(Linux 扩展)调整大小。

⚠️ 共享内存无同步:共享内存本身没有同步机制,必须配合信号量或互斥锁使用,否则会产生数据竞争。

⚠️ 信号量与信号的区别:信号量(sem_t)是同步机制,不是信号(signal)。信号量通过 sem_wait/sem_post 操作,是线程/进程安全的。

⚠️ 消息队列持久性:POSIX 消息队列在内核中持久存在,直到显式删除 (mq_unlink)。程序崩溃后消息不会丢失。

⚠️ 资源泄漏:IPC 对象(管道、共享内存、信号量)使用后必须关闭和删除,否则会残留(查看 /dev/mqueue//dev/shm/)。


7.9 扩展阅读

  1. man 7 pipeman 3 mq_overviewman 7 shm_overviewman 7 sem_overview
  2. APUE 第 15 章:Interprocess Communication
  3. TLPI 第 44-55 章:IPC 系列章节
  4. System V IPCshmget()msgget()semget()——POSIX 推荐替代方案
  5. D-Bus:基于消息的高级 IPC 框架(Linux 桌面)

7.10 本章小结

要点说明
管道 (pipe)最简单的 IPC,父子进程间单向通信
FIFO命名管道,无关进程可通过文件系统路径通信
消息队列带优先级的消息传递,保留消息边界
共享内存最快的 IPC,需自行同步
信号量用于进程/线程间同步和互斥
dup2()重定向文件描述符,常用于管道连接