POSIX 标准详解教程 / 第七章:进程间通信(IPC)
第七章:进程间通信(IPC)
掌握 POSIX 进程间通信机制:管道、FIFO、消息队列、共享内存、信号量。
7.1 IPC 概述
7.1.1 为什么需要 IPC
进程拥有独立的地址空间,无法直接访问其他进程的内存。POSIX 提供了多种 IPC 机制:
| 机制 | 方向 | 数据形式 | 持久性 | 适用场景 |
|---|---|---|---|---|
| 匿名管道 (pipe) | 单向 | 字节流 | 进程生命期 | 父子进程通信 |
| 命名管道 (FIFO) | 单向 | 字节流 | 文件系统 | 无关进程通信 |
| 消息队列 | 双向 | 消息(带类型) | 内核 | 结构化消息传递 |
| 共享内存 | — | 原始内存 | 内核 | 高速数据共享 |
| 信号量 | — | 计数器 | 内核 | 同步/互斥 |
7.2 匿名管道 (pipe)
7.2.1 管道特性
- 单向:一端读,一端写
- 半双工:数据只能单向流动
- 字节流:无消息边界
- 容量有限:通常 64KB(Linux 默认 PIPE_BUF = 4096 字节)
- 只能用于有亲缘关系的进程(父子进程间)
7.2.2 基本管道用法
/*
* pipe_basic.c - 父子进程通过管道通信
* 编译: gcc -Wall -o pipe_basic pipe_basic.c
*/
#define _POSIX_C_SOURCE 200809L
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/wait.h>
int main(void)
{
int pipefd[2]; /* pipefd[0]=读端, pipefd[1]=写端 */
if (pipe(pipefd) == -1) {
perror("pipe");
return EXIT_FAILURE;
}
pid_t pid = fork();
if (pid == -1) {
perror("fork");
return EXIT_FAILURE;
}
if (pid == 0) {
/* 子进程:写入数据 */
close(pipefd[0]); /* 关闭读端 */
const char *messages[] = {
"Hello from child!",
"POSIX pipes are simple.",
"Goodbye!"
};
for (int i = 0; i < 3; i++) {
write(pipefd[1], messages[i], strlen(messages[i]));
write(pipefd[1], "\n", 1); /* 消息分隔符 */
sleep(1);
}
close(pipefd[1]);
_exit(EXIT_SUCCESS);
} else {
/* 父进程:读取数据 */
close(pipefd[1]); /* 关闭写端 */
char buf[256];
ssize_t n;
while ((n = read(pipefd[0], buf, sizeof(buf) - 1)) > 0) {
buf[n] = '\0';
printf("[父进程收到] %s", buf);
}
close(pipefd[0]);
waitpid(pid, NULL, 0);
}
return EXIT_SUCCESS;
}
$ ./pipe_basic
[父进程收到] Hello from child!
[父进程收到] POSIX pipes are simple.
[父进程收到] Goodbye!
7.2.3 使用管道连接进程(模拟 Shell 管道)
/*
* pipe_chain.c - 模拟 "ls | grep .c | wc -l"
* 编译: gcc -Wall -o pipe_chain pipe_chain.c
*/
#define _POSIX_C_SOURCE 200809L
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/wait.h>
static pid_t run_with_pipes(int in_fd, int out_fd,
const char *cmd, char *const argv[])
{
pid_t pid = fork();
if (pid == -1) { perror("fork"); return -1; }
if (pid == 0) {
if (in_fd != STDIN_FILENO) {
dup2(in_fd, STDIN_FILENO);
close(in_fd);
}
if (out_fd != STDOUT_FILENO) {
dup2(out_fd, STDOUT_FILENO);
close(out_fd);
}
execvp(cmd, argv);
perror(cmd);
_exit(127);
}
return pid;
}
int main(void)
{
int pipe1[2], pipe2[2];
pipe(pipe1);
pipe(pipe2);
/* ls | grep .c | wc -l */
char *ls_argv[] = {"ls", NULL};
char *grep_argv[] = {"grep", ".c", NULL};
char *wc_argv[] = {"wc", "-l", NULL};
pid_t p1 = run_with_pipes(STDIN_FILENO, pipe1[1], "ls", ls_argv);
close(pipe1[1]);
pid_t p2 = run_with_pipes(pipe1[0], pipe2[1], "grep", grep_argv);
close(pipe1[0]);
close(pipe2[1]);
pid_t p3 = run_with_pipes(pipe2[0], STDOUT_FILENO, "wc", wc_argv);
close(pipe2[0]);
/* 等待所有子进程 */
waitpid(p1, NULL, 0);
waitpid(p2, NULL, 0);
waitpid(p3, NULL, 0);
return 0;
}
7.3 命名管道 (FIFO)
7.3.1 FIFO 特性
| 特性 | 匿名管道 (pipe) | 命名管道 (FIFO) |
|---|---|---|
| 文件系统存在 | ❌ | ✅ 有路径名 |
| 无关进程通信 | ❌ | ✅ |
| 创建方式 | pipe() | mkfifo() |
| 持久性 | 进程生命期 | 文件系统(需手动删除) |
7.3.2 FIFO 写入端
/*
* fifo_writer.c - FIFO 写入端
* 编译: gcc -Wall -o fifo_writer fifo_writer.c
* 运行: 先运行此程序,再运行 fifo_reader
*/
#define _POSIX_C_SOURCE 200809L
#include <stdio.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <unistd.h>
#include <string.h>
int main(void)
{
const char *fifo_path = "/tmp/posix_demo_fifo";
/* 创建 FIFO(如果不存在) */
if (mkfifo(fifo_path, 0666) == -1) {
perror("mkfifo(可能已存在)");
}
printf("打开 FIFO 写入端(等待读取端打开)...\n");
int fd = open(fifo_path, O_WRONLY);
if (fd == -1) { perror("open"); return 1; }
const char *messages[] = {
"Message 1: Hello FIFO!",
"Message 2: POSIX IPC",
"Message 3: Goodbye!",
};
for (int i = 0; i < 3; i++) {
write(fd, messages[i], strlen(messages[i]));
write(fd, "\n", 1);
printf("发送: %s\n", messages[i]);
sleep(1);
}
close(fd);
printf("写入端关闭\n");
return 0;
}
7.3.3 FIFO 读取端
/*
* fifo_reader.c - FIFO 读取端
* 编译: gcc -Wall -o fifo_reader fifo_reader.c
*/
#define _POSIX_C_SOURCE 200809L
#include <stdio.h>
#include <fcntl.h>
#include <unistd.h>
int main(void)
{
const char *fifo_path = "/tmp/posix_demo_fifo";
printf("打开 FIFO 读取端(等待写入端打开)...\n");
int fd = open(fifo_path, O_RDONLY);
if (fd == -1) { perror("open"); return 1; }
char buf[256];
ssize_t n;
while ((n = read(fd, buf, sizeof(buf) - 1)) > 0) {
buf[n] = '\0';
printf("收到: %s", buf);
}
close(fd);
unlink(fifo_path); /* 清理 FIFO 文件 */
printf("读取端关闭,FIFO 已删除\n");
return 0;
}
7.4 POSIX 消息队列 (Message Queue)
7.4.1 特性
| 特性 | 说明 |
|---|---|
| 消息边界 | 保留消息边界(不像管道的字节流) |
| 优先级 | 每条消息有优先级,高优先级先读取 |
| 异步通知 | 支持信号通知(mq_notify) |
| 持久性 | 内核维护,进程关闭后消息仍在 |
7.4.2 消息队列使用
/*
* mq_send.c - POSIX 消息队列发送端
* 编译: gcc -Wall -o mq_send mq_send -lrt
*/
#define _POSIX_C_SOURCE 200809L
#include <stdio.h>
#include <stdlib.h>
#include <mqueue.h>
#include <fcntl.h>
#include <string.h>
int main(void)
{
const char *mq_name = "/posix_demo_mq";
/* 打开(创建)消息队列 */
struct mq_attr attr = {
.mq_flags = 0,
.mq_maxmsg = 10, /* 最大消息数 */
.mq_msgsize = 256, /* 单条消息最大字节数 */
.mq_curmsgs = 0,
};
mqd_t mq = mq_open(mq_name, O_CREAT | O_WRONLY, 0644, &attr);
if (mq == (mqd_t)-1) {
perror("mq_open");
return EXIT_FAILURE;
}
/* 发送带优先级的消息 */
struct {
const char *text;
unsigned int priority;
} messages[] = {
{"Low priority message", 1},
{"Normal priority message", 5},
{"High priority message", 10},
{"Urgent: system alert!", 15},
};
for (int i = 0; i < 4; i++) {
if (mq_send(mq, messages[i].text,
strlen(messages[i].text),
messages[i].priority) == -1) {
perror("mq_send");
} else {
printf("发送 [优先级 %2d]: %s\n",
messages[i].priority, messages[i].text);
}
}
mq_close(mq);
printf("发送端完成\n");
return EXIT_SUCCESS;
}
/*
* mq_receive.c - POSIX 消息队列接收端
* 编译: gcc -Wall -o mq_receive mq_receive -lrt
*/
#define _POSIX_C_SOURCE 200809L
#include <stdio.h>
#include <stdlib.h>
#include <mqueue.h>
#include <fcntl.h>
int main(void)
{
const char *mq_name = "/posix_demo_mq";
mqd_t mq = mq_open(mq_name, O_RDONLY);
if (mq == (mqd_t)-1) {
perror("mq_open");
return EXIT_FAILURE;
}
/* 获取队列属性 */
struct mq_attr attr;
mq_getattr(mq, &attr);
printf("队列: 最大消息=%ld, 消息大小=%ld, 当前消息=%ld\n",
attr.mq_maxmsg, attr.mq_msgsize, attr.mq_curmsgs);
/* 接收消息(按优先级从高到低) */
char buf[257];
unsigned int priority;
ssize_t n;
while ((n = mq_receive(mq, buf, sizeof(buf) - 1, &priority)) > 0) {
buf[n] = '\0';
printf("收到 [优先级 %2d]: %s\n", priority, buf);
}
mq_close(mq);
mq_unlink(mq_name); /* 删除消息队列 */
printf("接收端完成,队列已删除\n");
return EXIT_SUCCESS;
}
# 终端 1:发送
$ ./mq_send
发送 [优先级 1]: Low priority message
发送 [优先级 5]: Normal priority message
发送 [优先级 10]: High priority message
发送 [优先级 15]: Urgent: system alert!
发送端完成
# 终端 2:接收(注意顺序:按优先级从高到低)
$ ./mq_receive
队列: 最大消息=10, 消息大小=256, 当前消息=4
收到 [优先级 15]: Urgent: system alert!
收到 [优先级 10]: High priority message
收到 [优先级 5]: Normal priority message
收到 [优先级 1]: Low priority message
接收端完成,队列已删除
注意:消息队列需要
-lrt链接选项。Linux 内核需配置CONFIG_POSIX_MQUEUE。消息队列挂载在/dev/mqueue。
7.5 POSIX 共享内存 (Shared Memory)
7.5.1 共享内存是最快的 IPC
进程 A 的地址空间 进程 B 的地址空间
┌──────────────┐ ┌──────────────┐
│ │ │ │
│ 私有数据 │ │ 私有数据 │
│ │ │ │
├──────────────┤ ├──────────────┤
│ 共享内存段 │◄── 同一块物理内存 ──►│ 共享内存段 │
│ │ │ │
└──────────────┘ └──────────────┘
数据无需拷贝,直接映射到多个进程的地址空间——这是最快的 IPC 方式。
7.5.2 共享内存操作流程
进程 A (写端): 进程 B (读端):
shm_open() → fd shm_open() → fd
ftruncate() 设置大小
mmap() → 指针 mmap() → 指针
写入数据 ──────────────────────→ 读取数据
munmap() munmap()
shm_unlink() 删除
7.5.3 共享内存写端
/*
* shm_write.c - POSIX 共享内存写端
* 编译: gcc -Wall -o shm_write shm_write -lrt
*/
#define _POSIX_C_SOURCE 200809L
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <fcntl.h>
#include <sys/mman.h>
#include <unistd.h>
#define SHM_NAME "/posix_demo_shm"
#define SHM_SIZE 4096
typedef struct {
int sequence;
char data[256];
} shared_data_t;
int main(void)
{
/* 创建共享内存对象 */
int fd = shm_open(SHM_NAME, O_CREAT | O_RDWR, 0644);
if (fd == -1) { perror("shm_open"); return EXIT_FAILURE; }
/* 设置大小 */
if (ftruncate(fd, SHM_SIZE) == -1) {
perror("ftruncate");
return EXIT_FAILURE;
}
/* 映射到地址空间 */
shared_data_t *shared = mmap(NULL, SHM_SIZE,
PROT_READ | PROT_WRITE,
MAP_SHARED, fd, 0);
if (shared == MAP_FAILED) {
perror("mmap");
return EXIT_FAILURE;
}
/* 写入数据 */
for (int i = 0; i < 5; i++) {
shared->sequence = i;
snprintf(shared->data, sizeof(shared->data),
"Shared message #%d from PID %d", i, getpid());
printf("写入: seq=%d, data='%s'\n",
shared->sequence, shared->data);
sleep(1);
}
/* 清理 */
munmap(shared, SHM_SIZE);
close(fd);
shm_unlink(SHM_NAME);
printf("共享内存已删除\n");
return EXIT_SUCCESS;
}
7.5.4 共享内存读端
/*
* shm_read.c - POSIX 共享内存读端
* 编译: gcc -Wall -o shm_read shm_read -lrt
* 运行: 先运行 shm_write,再运行 shm_read
*/
#define _POSIX_C_SOURCE 200809L
#include <stdio.h>
#include <stdlib.h>
#include <fcntl.h>
#include <sys/mman.h>
#include <unistd.h>
#define SHM_NAME "/posix_demo_shm"
#define SHM_SIZE 4096
typedef struct {
int sequence;
char data[256];
} shared_data_t;
int main(void)
{
int fd = shm_open(SHM_NAME, O_RDONLY, 0);
if (fd == -1) { perror("shm_open"); return EXIT_FAILURE; }
shared_data_t *shared = mmap(NULL, SHM_SIZE,
PROT_READ, MAP_SHARED, fd, 0);
if (shared == MAP_FAILED) {
perror("mmap");
return EXIT_FAILURE;
}
/* 轮询读取(实际应用应使用信号量同步) */
int last_seq = -1;
for (int i = 0; i < 10; i++) {
if (shared->sequence != last_seq) {
printf("读取: seq=%d, data='%s'\n",
shared->sequence, shared->data);
last_seq = shared->sequence;
}
usleep(500000);
}
munmap(shared, SHM_SIZE);
close(fd);
return EXIT_SUCCESS;
}
7.6 POSIX 信号量 (Semaphore)
7.6.1 信号量概念
信号量是一个计数器,用于:
- 互斥:初始值 1,类似互斥锁
- 同步:协调多个进程的执行顺序
- 资源计数:初始值 N,限制同时访问资源的进程数
7.6.2 命名信号量
/*
* sem_named.c - POSIX 命名信号量用法
* 编译: gcc -Wall -o sem_named sem_named -lrt
*/
#define _POSIX_C_SOURCE 200809L
#include <stdio.h>
#include <stdlib.h>
#include <semaphore.h>
#include <fcntl.h>
#include <sys/wait.h>
#include <unistd.h>
#define SEM_NAME "/posix_demo_sem"
int main(void)
{
/* 创建命名信号量,初始值为 1(互斥) */
sem_t *sem = sem_open(SEM_NAME, O_CREAT | O_EXCL, 0644, 1);
if (sem == SEM_FAILED) {
/* 可能已存在,删除后重建 */
sem_unlink(SEM_NAME);
sem = sem_open(SEM_NAME, O_CREAT | O_EXCL, 0644, 1);
if (sem == SEM_FAILED) {
perror("sem_open");
return EXIT_FAILURE;
}
}
/* 创建 3 个子进程竞争资源 */
for (int i = 0; i < 3; i++) {
if (fork() == 0) {
printf(" 进程 %d: 等待信号量...\n", i);
sem_wait(sem); /* P 操作(等待/减 1) */
/* 临界区 */
printf(" 进程 %d: 进入临界区\n", i);
sleep(1);
printf(" 进程 %d: 离开临界区\n", i);
sem_post(sem); /* V 操作(释放/加 1) */
_exit(0);
}
}
/* 等待所有子进程 */
for (int i = 0; i < 3; i++)
wait(NULL);
/* 清理 */
sem_close(sem);
sem_unlink(SEM_NAME);
printf("信号量已清理\n");
return EXIT_SUCCESS;
}
7.6.3 生产者-消费者(使用信号量)
/*
* sem_prod_cons.c - 使用信号量实现生产者-消费者
* 编译: gcc -Wall -o sem_prod_cons sem_prod_cons -lrt
*/
#define _POSIX_C_SOURCE 200809L
#include <stdio.h>
#include <stdlib.h>
#include <semaphore.h>
#include <fcntl.h>
#include <unistd.h>
#include <sys/mman.h>
#include <sys/wait.h>
#define BUFFER_SIZE 5
#define ITEMS 10
typedef struct {
int buffer[BUFFER_SIZE];
int in; /* 写入位置 */
int out; /* 读取位置 */
} ring_buffer_t;
int main(void)
{
/* 使用 mmap 创建共享内存(匿名映射简化示例) */
ring_buffer_t *rb = mmap(NULL, sizeof(ring_buffer_t),
PROT_READ | PROT_WRITE,
MAP_SHARED | MAP_ANONYMOUS, -1, 0);
rb->in = 0;
rb->out = 0;
/* 信号量:空位数、满位数、互斥 */
sem_t *empty = sem_open("/demo_empty", O_CREAT | O_EXCL, 0644, BUFFER_SIZE);
sem_t *full = sem_open("/demo_full", O_CREAT | O_EXCL, 0644, 0);
sem_t *mutex = sem_open("/demo_mutex", O_CREAT | O_EXCL, 0644, 1);
if (fork() == 0) {
/* 生产者 */
for (int i = 0; i < ITEMS; i++) {
sem_wait(empty); /* 等待空位 */
sem_wait(mutex); /* 进入临界区 */
rb->buffer[rb->in] = i;
printf("生产: item=%d, pos=%d\n", i, rb->in);
rb->in = (rb->in + 1) % BUFFER_SIZE;
sem_post(mutex); /* 离开临界区 */
sem_post(full); /* 增加满位 */
usleep(100000);
}
_exit(0);
}
if (fork() == 0) {
/* 消费者 */
for (int i = 0; i < ITEMS; i++) {
sem_wait(full); /* 等待数据 */
sem_wait(mutex); /* 进入临界区 */
int item = rb->buffer[rb->out];
printf(" 消费: item=%d, pos=%d\n", item, rb->out);
rb->out = (rb->out + 1) % BUFFER_SIZE;
sem_post(mutex); /* 离开临界区 */
sem_post(empty); /* 增加空位 */
usleep(150000);
}
_exit(0);
}
/* 父进程等待 */
wait(NULL);
wait(NULL);
/* 清理 */
sem_close(empty); sem_unlink("/demo_empty");
sem_close(full); sem_unlink("/demo_full");
sem_close(mutex); sem_unlink("/demo_mutex");
munmap(rb, sizeof(ring_buffer_t));
printf("生产者-消费者完成\n");
return 0;
}
7.7 IPC 机制对比
| 特性 | 管道 | FIFO | 消息队列 | 共享内存 | 信号量 |
|---|---|---|---|---|---|
| 速度 | 中 | 中 | 中 | 快 | — |
| 数据形式 | 字节流 | 字节流 | 消息 | 原始内存 | 计数器 |
| 亲缘要求 | 父子 | 无 | 无 | 无 | 无 |
| 持久性 | 进程内 | 文件系统 | 内核 | 内核 | 内核 |
| 同步机制 | 阻塞 | 阻塞 | 阻塞 | 需额外同步 | 内置 |
| 典型用法 | Shell 管道 | 客户端-服务器 | 任务队列 | 大数据共享 | 互斥/同步 |
7.8 注意事项
⚠️ 管道缓冲区满:
write()在管道满时阻塞。使用F_SETPIPE_SZ(Linux 扩展)调整大小。
⚠️ 共享内存无同步:共享内存本身没有同步机制,必须配合信号量或互斥锁使用,否则会产生数据竞争。
⚠️ 信号量与信号的区别:信号量(
sem_t)是同步机制,不是信号(signal)。信号量通过sem_wait/sem_post操作,是线程/进程安全的。
⚠️ 消息队列持久性:POSIX 消息队列在内核中持久存在,直到显式删除 (
mq_unlink)。程序崩溃后消息不会丢失。
⚠️ 资源泄漏:IPC 对象(管道、共享内存、信号量)使用后必须关闭和删除,否则会残留(查看
/dev/mqueue/、/dev/shm/)。
7.9 扩展阅读
man 7 pipe、man 3 mq_overview、man 7 shm_overview、man 7 sem_overview- APUE 第 15 章:Interprocess Communication
- TLPI 第 44-55 章:IPC 系列章节
- System V IPC:
shmget()、msgget()、semget()——POSIX 推荐替代方案 - D-Bus:基于消息的高级 IPC 框架(Linux 桌面)
7.10 本章小结
| 要点 | 说明 |
|---|---|
| 管道 (pipe) | 最简单的 IPC,父子进程间单向通信 |
| FIFO | 命名管道,无关进程可通过文件系统路径通信 |
| 消息队列 | 带优先级的消息传递,保留消息边界 |
| 共享内存 | 最快的 IPC,需自行同步 |
| 信号量 | 用于进程/线程间同步和互斥 |
| dup2() | 重定向文件描述符,常用于管道连接 |