强曰为道
与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

POSIX 标准详解教程 / 第四章:线程

第四章:线程

掌握 POSIX 线程(pthread)API,理解线程同步机制:互斥锁、条件变量、读写锁、屏障。


4.1 POSIX 线程概述

4.1.1 线程 vs 进程

对比维度进程 (Process)线程 (Thread)
地址空间独立共享
创建开销大(复制页表)小(仅创建栈)
通信方式IPC(管道、共享内存等)直接读写共享变量
切换开销大(TLB 刷新)
隔离性强(一个崩溃不影响另一个)弱(一个崩溃整个进程终止)
创建函数fork()pthread_create()

4.1.2 POSIX 线程标准

POSIX 线程定义于 POSIX.1c(IEEE Std 1003.1c-1995),后被合并到 POSIX.1-2001 中。Linux 上的实现为 NPTL(Native POSIX Thread Library)

4.1.3 线程资源模型

进程地址空间
┌─────────────────────────────────────────┐
│              代码段 (.text)              │  ← 共享
├─────────────────────────────────────────┤
│              数据段 (.data/.bss)         │  ← 共享(全局变量)
├─────────────────────────────────────────┤
│                   堆                    │  ← 共享(malloc 分配)
├─────────────────────────────────────────┤
│            线程 1 栈 (8MB)               │  ← 线程 1 私有
├─────────────────────────────────────────┤
│            线程 2 栈 (8MB)               │  ← 线程 2 私有
├─────────────────────────────────────────┤
│            线程 3 栈 (8MB)               │  ← 线程 3 私有
└─────────────────────────────────────────┘

编译时需要链接 pthread 库:gcc -Wall -o program program.c -lpthread


4.2 线程创建与终止

4.2.1 pthread_create() 与 pthread_join()

/*
 * thread_basic.c - 基本线程创建和等待
 * 编译: gcc -Wall -o thread_basic thread_basic.c -lpthread
 */
#define _POSIX_C_SOURCE 200809L
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include <unistd.h>

/* 线程函数参数结构体 */
typedef struct {
    int id;
    char name[32];
} thread_arg_t;

/* 线程入口函数 */
static void *thread_func(void *arg)
{
    thread_arg_t *targ = (thread_arg_t *)arg;

    printf("线程 %d (%s) 开始运行, TID 区域=%p\n",
           targ->id, targ->name, (void *)&targ->id);

    /* 模拟工作 */
    sleep(1);

    printf("线程 %d (%s) 完成\n", targ->id, targ->name);

    /* 返回结果(通过 void* 传递) */
    int *result = malloc(sizeof(int));
    if (result) *result = targ->id * 100;
    return result;
}

int main(void)
{
    #define NUM_THREADS 3
    pthread_t threads[NUM_THREADS];
    thread_arg_t args[NUM_THREADS];
    const char *names[] = {"Worker-A", "Worker-B", "Worker-C"};

    /* 创建线程 */
    for (int i = 0; i < NUM_THREADS; i++) {
        args[i].id = i;
        snprintf(args[i].name, sizeof(args[i].name), "%s", names[i]);

        int ret = pthread_create(&threads[i], NULL, thread_func, &args[i]);
        if (ret != 0) {
            fprintf(stderr, "pthread_create 失败: %s\n", strerror(ret));
            return EXIT_FAILURE;
        }
    }

    /* 等待所有线程完成 */
    printf("主线程等待所有工作线程...\n");
    for (int i = 0; i < NUM_THREADS; i++) {
        void *retval;
        int ret = pthread_join(threads[i], &retval);
        if (ret != 0) {
            fprintf(stderr, "pthread_join 失败: %s\n", strerror(ret));
            continue;
        }
        if (retval) {
            int *result = (int *)retval;
            printf("线程 %d 返回值: %d\n", i, *result);
            free(result);
        }
    }

    printf("所有线程已完成\n");
    return EXIT_SUCCESS;
}
$ ./thread_basic
主线程等待所有工作线程...
线程 0 (Worker-A) 开始运行, TID 区域=0x7f8a0c000b70
线程 1 (Worker-B) 开始运行, TID 区域=0x7f8a0b800b70
线程 2 (Worker-C) 开始运行, TID 区域=0x7f8a0b000b70
线程 0 (Worker-A) 完成
线程 1 (Worker-B) 完成
线程 2 (Worker-C) 完成
线程 0 返回值: 0
线程 1 返回值: 100
线程 2 返回值: 200
所有线程已完成

4.2.2 pthread_detach():分离线程

如果不需要等待线程结束,可以将其设为分离状态

/* 分离线程:线程终止时自动释放资源 */
pthread_t tid;
pthread_create(&tid, NULL, thread_func, NULL);
pthread_detach(tid);
/* 此后不能再对该线程调用 pthread_join() */

4.2.3 线程属性

/*
 * thread_attr.c - 使用线程属性设置栈大小和分离状态
 * 编译: gcc -Wall -o thread_attr thread_attr.c -lpthread
 */
#define _POSIX_C_SOURCE 200809L
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>

static void *thread_func(void *arg)
{
    (void)arg;
    printf("  线程: 栈大小已自定义\n");
    return NULL;
}

int main(void)
{
    pthread_t tid;
    pthread_attr_t attr;

    pthread_attr_init(&attr);

    /* 设置分离状态 */
    pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);

    /* 设置栈大小为 4MB(默认通常为 8MB) */
    pthread_attr_setstacksize(&attr, 4 * 1024 * 1024);

    /* 查询栈大小 */
    size_t stacksize;
    pthread_attr_getstacksize(&attr, &stacksize);
    printf("设置的栈大小: %zu 字节 (%.1f MB)\n",
           stacksize, stacksize / (1024.0 * 1024.0));

    pthread_create(&tid, &attr, thread_func, NULL);
    pthread_join(tid, NULL);

    pthread_attr_destroy(&attr);
    return 0;
}

4.3 互斥锁 (Mutex)

4.3.1 互斥锁的作用

互斥锁保证同一时刻只有一个线程能访问共享资源,防止竞态条件(Race Condition)

线程 A: lock(mtx) → 读写共享数据 → unlock(mtx)
线程 B: lock(mtx) → [等待] ..............→ 读写共享数据 → unlock(mtx)

4.3.2 无锁 vs 有锁对比

/*
 * mutex_race.c - 演示竞态条件和互斥锁保护
 * 编译: gcc -Wall -o mutex_race mutex_race.c -lpthread
 */
#define _POSIX_C_SOURCE 200809L
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>

#define ITERATIONS 1000000

static long counter_no_lock = 0;
static long counter_with_lock = 0;
static pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER;

static void *increment_no_lock(void *arg)
{
    (void)arg;
    for (int i = 0; i < ITERATIONS; i++)
        counter_no_lock++;  /* 数据竞争! */
    return NULL;
}

static void *increment_with_lock(void *arg)
{
    (void)arg;
    for (int i = 0; i < ITERATIONS; i++) {
        pthread_mutex_lock(&mtx);
        counter_with_lock++;
        pthread_mutex_unlock(&mtx);
    }
    return NULL;
}

int main(void)
{
    pthread_t t1, t2;

    /* 测试无锁 */
    pthread_create(&t1, NULL, increment_no_lock, NULL);
    pthread_create(&t2, NULL, increment_no_lock, NULL);
    pthread_join(t1, NULL);
    pthread_join(t2, NULL);
    printf("无锁结果:   期望=%d, 实际=%ld (错误: %ld)\n",
           ITERATIONS * 2, counter_no_lock,
           ITERATIONS * 2 - counter_no_lock);

    /* 测试有锁 */
    pthread_create(&t1, NULL, increment_with_lock, NULL);
    pthread_create(&t2, NULL, increment_with_lock, NULL);
    pthread_join(t1, NULL);
    pthread_join(t2, NULL);
    printf("有锁结果:   期望=%d, 实际=%ld (错误: %ld)\n",
           ITERATIONS * 2, counter_with_lock,
           ITERATIONS * 2 - counter_with_lock);

    pthread_mutex_destroy(&mtx);
    return 0;
}
$ ./mutex_race
无锁结果:   期望=2000000, 实际=1843562 (错误: 156438)
有锁结果:   期望=2000000, 实际=2000000 (错误: 0)

4.3.3 互斥锁类型

类型说明
普通锁PTHREAD_MUTEX_NORMAL不检测死锁,默认类型
检错锁PTHREAD_MUTEX_ERRORCHECK重复加锁返回错误
递归锁PTHREAD_MUTEX_RECURSIVE同一线程可多次加锁
默认锁PTHREAD_MUTEX_DEFAULT平台相关(Linux 为普通锁)
/* 初始化递归锁 */
pthread_mutex_t rmtx;
pthread_mutexattr_t attr;
pthread_mutexattr_init(&attr);
pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
pthread_mutex_init(&rmtx, &attr);
pthread_mutexattr_destroy(&attr);

/* 递归锁允许同一线程多次加锁 */
pthread_mutex_lock(&rmtx);
pthread_mutex_lock(&rmtx);   /* 成功(普通锁这里会死锁) */
pthread_mutex_unlock(&rmtx);
pthread_mutex_unlock(&rmtx);

4.4 条件变量 (Condition Variable)

4.4.1 概念

条件变量用于等待某个条件为真,典型配合互斥锁使用。它是线程间通知机制的核心:

等待方:                          通知方:
lock(mtx)                        lock(mtx)
while (!条件)                     设置条件为真
    cond_wait(cv, mtx)            cond_signal(cv) / cond_broadcast(cv)
使用共享资源                      unlock(mtx)
unlock(mtx)

关键:必须使用 while 循环而非 if 来检查条件(防止虚假唤醒 spurious wakeup)。

4.4.2 生产者-消费者模型

/*
 * producer_consumer.c - 使用条件变量实现生产者-消费者
 * 编译: gcc -Wall -o producer_consumer producer_consumer.c -lpthread
 */
#define _POSIX_C_SOURCE 200809L
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>

#define QUEUE_SIZE 10
#define ITEMS_TO_PRODUCE 20

static int queue[QUEUE_SIZE];
static int head = 0, tail = 0, count = 0;
static pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t not_full = PTHREAD_COND_INITIALIZER;
static pthread_cond_t not_empty = PTHREAD_COND_INITIALIZER;
static int done = 0;

static void enqueue(int item)
{
    queue[tail] = item;
    tail = (tail + 1) % QUEUE_SIZE;
    count++;
}

static int dequeue(void)
{
    int item = queue[head];
    head = (head + 1) % QUEUE_SIZE;
    count--;
    return item;
}

static void *producer(void *arg)
{
    (void)arg;
    for (int i = 0; i < ITEMS_TO_PRODUCE; i++) {
        pthread_mutex_lock(&mtx);

        /* 队列满时等待 */
        while (count == QUEUE_SIZE)
            pthread_cond_wait(&not_full, &mtx);

        enqueue(i);
        printf("[生产者] 生产 %d (队列长度: %d)\n", i, count);

        pthread_cond_signal(&not_empty);  /* 通知消费者 */
        pthread_mutex_unlock(&mtx);

        usleep(50000);  /* 模拟生产耗时 */
    }

    /* 通知消费者生产完毕 */
    pthread_mutex_lock(&mtx);
    done = 1;
    pthread_cond_broadcast(&not_empty);
    pthread_mutex_unlock(&mtx);

    return NULL;
}

static void *consumer(void *arg)
{
    int id = *(int *)arg;
    while (1) {
        pthread_mutex_lock(&mtx);

        /* 队列空时等待 */
        while (count == 0 && !done)
            pthread_cond_wait(&not_empty, &mtx);

        if (count == 0 && done) {
            pthread_mutex_unlock(&mtx);
            break;
        }

        int item = dequeue();
        printf("  [消费者 %d] 消费 %d (队列长度: %d)\n", id, item, count);

        pthread_cond_signal(&not_full);  /* 通知生产者 */
        pthread_mutex_unlock(&mtx);

        usleep(80000);  /* 模拟消费耗时 */
    }
    return NULL;
}

int main(void)
{
    pthread_t prod;
    pthread_t cons[2];
    int cons_ids[] = {0, 1};

    pthread_create(&prod, NULL, producer, NULL);
    for (int i = 0; i < 2; i++)
        pthread_create(&cons[i], NULL, consumer, &cons_ids[i]);

    pthread_join(prod, NULL);
    for (int i = 0; i < 2; i++)
        pthread_join(cons[i], NULL);

    printf("所有生产/消费完毕\n");
    pthread_mutex_destroy(&mtx);
    pthread_cond_destroy(&not_full);
    pthread_cond_destroy(&not_empty);
    return 0;
}

4.5 读写锁 (Read-Write Lock)

4.5.1 适用场景

读写锁适用于读多写少的场景:多个读者可以同时读取,但写者需要独占访问。

操作读者写者
已有读者✅ 并发读❌ 等待
已有写者❌ 等待❌ 等待

4.5.2 读写锁示例

/*
 * rwlock.c - 读写锁保护共享配置
 * 编译: gcc -Wall -o rwlock rwlock.c -lpthread
 */
#define _POSIX_C_SOURCE 200809L
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include <unistd.h>

/* 共享配置 */
typedef struct {
    char key[64];
    char value[256];
    int version;
} config_entry_t;

static config_entry_t config = {"server_port", "8080", 1};
static pthread_rwlock_t rwlock = PTHREAD_RWLOCK_INITIALIZER;

/* 读线程:读取配置 */
static void *reader_thread(void *arg)
{
    int id = *(int *)arg;
    for (int i = 0; i < 3; i++) {
        pthread_rwlock_rdlock(&rwlock);
        printf("[读者 %d] key=%s, value=%s, version=%d\n",
               id, config.key, config.value, config.version);
        pthread_rwlock_unlock(&rwlock);
        usleep(100000);
    }
    return NULL;
}

/* 写线程:更新配置 */
static void *writer_thread(void *arg)
{
    (void)arg;
    const char *new_values[] = {"8443", "9090", "443"};
    for (int i = 0; i < 3; i++) {
        usleep(200000);  /* 等待读者先读 */
        pthread_rwlock_wrlock(&rwlock);
        strncpy(config.value, new_values[i], sizeof(config.value) - 1);
        config.version++;
        printf("[写者] 更新: value=%s, version=%d\n",
               config.value, config.version);
        pthread_rwlock_unlock(&rwlock);
    }
    return NULL;
}

int main(void)
{
    pthread_t readers[3], writer;
    int reader_ids[] = {0, 1, 2};

    for (int i = 0; i < 3; i++)
        pthread_create(&readers[i], NULL, reader_thread, &reader_ids[i]);
    pthread_create(&writer, NULL, writer_thread, NULL);

    for (int i = 0; i < 3; i++)
        pthread_join(readers[i], NULL);
    pthread_join(writer, NULL);

    pthread_rwlock_destroy(&rwlock);
    return 0;
}

4.6 屏障 (Barrier)

4.6.1 概念

屏障使一组线程在某一点同步等待,直到所有线程都到达该点才继续执行。

/*
 * barrier.c - 使用屏障同步多阶段计算
 * 编译: gcc -Wall -o barrier barrier.c -lpthread
 */
#define _POSIX_C_SOURCE 200809L
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>

#define NUM_THREADS 4
#define NUM_PHASES  3

static pthread_barrier_t barrier;

static void *worker(void *arg)
{
    int id = *(int *)arg;

    for (int phase = 0; phase < NUM_PHASES; phase++) {
        /* 模拟计算 */
        printf("  线程 %d: 阶段 %d 计算中...\n", id, phase);
        usleep((100000 + id * 50000));

        /* 等待所有线程完成此阶段 */
        int ret = pthread_barrier_wait(&barrier);
        if (ret == PTHREAD_BARRIER_SERIAL_THREAD) {
            /* 只有一个线程会收到此返回值 */
            printf("=== 阶段 %d 全部完成 ===\n\n", phase);
        }
    }
    return NULL;
}

int main(void)
{
    pthread_t threads[NUM_THREADS];
    int ids[NUM_THREADS];

    pthread_barrier_init(&barrier, NULL, NUM_THREADS);

    for (int i = 0; i < NUM_THREADS; i++) {
        ids[i] = i;
        pthread_create(&threads[i], NULL, worker, &ids[i]);
    }

    for (int i = 0; i < NUM_THREADS; i++)
        pthread_join(threads[i], NULL);

    pthread_barrier_destroy(&barrier);
    printf("所有阶段完成\n");
    return 0;
}

4.7 线程特有数据 (Thread-Specific Data)

4.7.1 POSIX Thread-Local Storage

/*
 * thread_local.c - 线程特有数据(TSD)
 * 编译: gcc -Wall -o thread_local thread_local.c -lpthread
 */
#define _POSIX_C_SOURCE 200809L
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>

static pthread_key_t tsd_key;
static pthread_once_t once_control = PTHREAD_ONCE_INIT;

static void tsd_destructor(void *value)
{
    printf("  TSD 析构: %s\n", (char *)value);
    free(value);
}

static void tsd_init(void)
{
    pthread_key_create(&tsd_key, tsd_destructor);
}

static void *worker(void *arg)
{
    int id = *(int *)arg;

    /* 确保 key 已创建 */
    pthread_once(&once_control, tsd_init);

    /* 设置本线程的特有数据 */
    char *msg = malloc(64);
    snprintf(msg, 64, "线程 %d 的私有数据", id);
    pthread_setspecific(tsd_key, msg);

    /* 读取本线程的特有数据 */
    char *data = (char *)pthread_getspecific(tsd_key);
    printf("[线程 %d] TSD = %s\n", id, data);

    return NULL;
}

int main(void)
{
    pthread_t threads[3];
    int ids[] = {0, 1, 2};

    for (int i = 0; i < 3; i++)
        pthread_create(&threads[i], NULL, worker, &ids[i]);
    for (int i = 0; i < 3; i++)
        pthread_join(threads[i], NULL);

    printf("所有线程已完成\n");
    return 0;
}

4.8 自旋锁 (Spin Lock)

锁类型等待方式适用场景
互斥锁 (Mutex)阻塞(睡眠)一般场景,临界区较长
自旋锁 (Spin Lock)忙等待(循环)临界区极短,多处理器系统
读写锁 (RWLock)阻塞读多写少
/* 自旋锁使用(适合极短临界区) */
pthread_spinlock_t spinlock;
pthread_spin_init(&spinlock, PTHREAD_PROCESS_PRIVATE);

pthread_spin_lock(&spinlock);
/* 极短的原子操作 */
counter++;
pthread_spin_unlock(&spinlock);

pthread_spin_destroy(&spinlock);

4.9 线程安全函数

POSIX 定义了线程安全函数(Thread-Safe Functions)和非线程安全函数

非线程安全线程安全替代说明
strtok()strtok_r()字符串分割
ctime()ctime_r()时间转字符串
localtime()localtime_r()获取本地时间
gmtime()gmtime_r()获取 UTC 时间
rand()rand_r()随机数生成
readdir()readdir_r()目录遍历
gethostbyname()gethostbyname_r()DNS 查询
strerror()strerror_r()错误码转字符串

规则:在多线程程序中,必须使用 _r(reentrant)后缀的版本。


4.10 注意事项

⚠️ 死锁预防:始终以相同顺序获取多个锁。使用检错锁(PTHREAD_MUTEX_ERRORCHECK)便于调试。

⚠️ 条件变量虚假唤醒:条件变量可能在没有 signal/broadcast 的情况下返回。必须while 循环检查条件。

⚠️ fork() 与线程fork() 只复制调用线程,其他线程不复存在。持有锁的线程在子进程中消失会导致死锁。使用 pthread_atfork() 注册清理函数。

⚠️ 栈溢出:线程默认栈大小通常为 8MB。递归过深或大局部变量会导致栈溢出且没有明确的错误信号(段错误)。使用 pthread_attr_setstacksize() 或减少递归深度。

⚠️ false sharing:不同线程频繁修改同一缓存行上的不同变量,会导致缓存一致性开销。使用对齐(__attribute__((aligned(64))))将热点数据隔离到不同缓存行。


4.11 扩展阅读

  1. man 7 pthreads — POSIX 线程概述
  2. man 3 pthread_create — 线程创建
  3. APUE 第 11-12 章:Threads, Thread Control
  4. TLPI 第 29-33 章:POSIX Threads 系列
  5. 《Programming with POSIX Threads》 — David R. Butenhof 著
  6. glibc NPTL 实现源码https://sourceware.org/git/?p=glibc.git;a=tree;f=nptl

4.12 本章小结

要点说明
pthread_create/join创建和等待线程
互斥锁 (Mutex)保护共享资源,防止竞态条件
条件变量 (Cond)线程间通知机制,配合互斥锁使用
读写锁 (RWLock)读多写少场景的优化
屏障 (Barrier)多线程同步点
线程特有数据 (TSD)线程私有存储
线程安全函数使用 _r 后缀的可重入版本