强曰为道
与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

nanomsg / NNG 消息库完全教程 / 第 6 章:可扩展性与性能

6.1 性能特征概述

nanomsg / NNG 的设计目标是低延迟、低内存、高吞吐。相比 ZeroMQ,它们在轻量级场景下有明显优势。

6.1.1 设计对性能的影响

设计决策 性能影响
纯 C 实现 无 GC 开销,启动快
无外部依赖 无额外内存/线程开销
事件驱动 I/O 高并发下低 CPU 占用
零拷贝消息 减少内存拷贝,适合大消息
紧凑协议头 低带宽占用

6.2 延迟特征

6.2.1 延迟组成

应用层延迟 ──┬── 序列化/反序列化
             ├── 协议层处理
             ├── 操作系统内核 ── TCP 栈 / IPC
             └── 网络传输 (TCP 场景)

6.2.2 典型延迟数据

以下数据基于 Linux 5.x、千兆网卡、x86_64 平台:

场景 延迟 (P50) 延迟 (P99) 说明
inproc (线程间) ~1 μs ~5 μs 共享内存,零拷贝
IPC (Unix Socket) ~10 μs ~50 μs 本地 Socket
TCP (localhost) ~30 μs ~100 μs 回环网络
TCP (局域网) ~100 μs ~500 μs 千兆以太网
TCP (跨数据中心) ~5 ms ~20 ms 取决于物理距离

以上数据仅供参考,实际性能取决于硬件、OS 配置、消息大小等因素。

6.2.3 降低延迟的技巧

技巧 效果 适用场景
使用 inproc 传输 延迟降至 ~1 μs 同进程线程间通信
启用 TCP_NODELAY 消除 Nagle 延迟 低延迟要求的 RPC
使用零拷贝 减少内存拷贝 大消息 (>1KB)
减小消息大小 降低序列化开销 所有场景
预分配缓冲区 避免运行时分配 高频消息
// 启用 TCP_NODELAY
int nodelay = 1;
nng_setopt(sock, NNG_OPT_TCP_NODELAY, &nodelay, sizeof(nodelay));

// 启用 TCP Keepalive
int keepalive = 1;
nng_setopt(sock, NNG_OPT_TCP_KEEPALIVE, &keepalive, sizeof(keepalive));

6.3 吞吐量特征

6.3.1 吞吐量衡量指标

指标 说明 单位
Messages/sec (msg/s) 每秒处理消息数 msg/s
Throughput (MB/s) 每秒数据量 MB/s
Connection scalability 最大并发连接数 connections

6.3.2 典型吞吐量

传输方式 小消息 (64B) 中消息 (1KB) 大消息 (64KB)
inproc ~5M msg/s ~3M msg/s ~500K msg/s
IPC ~500K msg/s ~400K msg/s ~200K msg/s
TCP (localhost) ~300K msg/s ~250K msg/s ~150K msg/s
TCP (局域网) ~200K msg/s ~180K msg/s ~100K msg/s

6.3.3 消息大小与吞吐量的关系

吞吐量 (MB/s)
│
│        ╭───────────────────
│       ╱
│      ╱
│     ╱
│    ╱
│───╯
│
└──────────────────────────── 消息大小 (bytes)
   64   256  1K   4K   16K  64K

小消息: 协议开销占比高,吞吐量受 msg/s 限制
大消息: 数据占比高,吞吐量受带宽限制

6.4 连接数扩展

6.4.1 连接数上限

nanomsg / NNG 的连接数上限主要受操作系统限制:

资源 nanomsg NNG 瓶颈
文件描述符 受系统 ulimit 受系统 ulimit ulimit -n
内存 ~4KB/连接 ~2KB/连接 系统内存
线程 2 个内部线程 线程池 (可配置) CPU 核心数

6.4.2 调整系统限制

# 查看当前限制
ulimit -n

# 临时增加(当前 shell)
ulimit -n 65536

# 永久增加(/etc/security/limits.conf)
# *  soft  nofile  65536
# *  hard  nofile  65536

# sysctl 调优
sysctl -w net.core.somaxconn=65535
sysctl -w net.ipv4.tcp_max_syn_backlog=65535

6.4.3 NNG 线程池配置

// NNG 默认使用 4 个 I/O 线程
// 可在程序启动前设置环境变量
// NNG_NUM_TASKQ_THREADS=8

// 或在代码中设置
// (NNG 内部使用 nng_taskq_set_threads 但非公开 API)

环境变量方式:

export NNG_NUM_TASKQ_THREADS=8
./myapp

6.5 内存使用

6.5.1 内存组成

总内存 = Socket 基础内存
       + 连接数 × 每连接内存
       + 发送缓冲区 × 缓冲区大小
       + 接收缓冲区 × 缓冲区大小
       + 消息队列 × 消息大小

6.5.2 内存占用估算

组件 nanomsg NNG
Socket 基础 ~2 KB ~4 KB
每连接开销 ~2 KB ~2 KB
发送缓冲区 (默认) 128 KB 128 条消息
接收缓冲区 (默认) 128 KB 128 条消息

估算示例: 100 个连接,每个 Socket 128 条消息缓冲

场景 估算内存
nanomsg 2KB + 100×2KB + 256KB ≈ 458 KB
NNG 4KB + 100×2KB + 256×1KB ≈ 460 KB

6.5.3 减少内存使用

// 减小缓冲区队列
int bufsize = 16;  // 默认 128
nng_setopt(sock, NNG_OPT_RECVBUF, &bufsize, sizeof(bufsize));
nng_setopt(sock, NNG_OPT_SENDBUF, &bufsize, sizeof(bufsize));

// 限制最大接收消息大小
size_t maxsz = 4096;  // 默认 1MB
nng_setopt(sock, NNG_OPT_RECVMAXSZ, &maxsz, sizeof(maxsz));

6.5.4 消息内存管理

// nanomsg 零拷贝
void *msg = nn_allocmsg(1024, 0);
memcpy(msg, data, 1024);
nn_send(sock, &msg, NN_MSG, 0);
// 发送后不要释放 msg

// NNG 零拷贝
nng_msg *msg;
nng_msg_alloc(&msg, 1024);
memcpy(nng_msg_body(msg), data, 1024);
nng_sendmsg(sock, msg, 0);
// 发送后不要释放 msg

6.6 消息大小

6.6.1 默认限制

默认最大消息 可配置上限
nanomsg 1 MB 无硬限制
NNG 1 MB NNG_OPT_RECVMAXSZ

6.6.2 大消息处理策略

消息大小 推荐策略 说明
< 1 KB 直接发送 性能最优
1 KB - 64 KB 直接发送 + 零拷贝 减少拷贝开销
64 KB - 1 MB 调整缓冲区 + 零拷贝 需增大缓冲区
> 1 MB 应用层分片 避免单消息过大

6.6.3 应用层分片

#define CHUNK_SIZE 65536  // 64KB

int send_large_message(nng_socket sock, const void *data, size_t len) {
    size_t offset = 0;
    while (offset < len) {
        size_t chunk = (len - offset > CHUNK_SIZE) ? CHUNK_SIZE : (len - offset);

        // 构造消息: [4字节总长][4字节偏移][数据]
        nng_msg *msg;
        nng_msg_alloc(&msg, 8 + chunk);

        void *body = nng_msg_body(msg);
        uint32_t total = (uint32_t)len;
        uint32_t off = (uint32_t)offset;
        memcpy(body, &total, 4);
        memcpy(body + 4, &off, 4);
        memcpy(body + 8, data + offset, chunk);

        int rv = nng_sendmsg(sock, msg, 0);
        if (rv != 0) return rv;

        offset += chunk;
    }
    return 0;
}

6.7 基准测试

6.7.1 使用 nngcat 测试

NNG 自带的 nngcat 工具可用于快速基准测试:

# 终端 1:启动服务端
nngcat --rep --listen tcp://*:5555 --count 10000

# 终端 2:运行客户端(发送 10000 条消息)
time nngcat --req --dial tcp://localhost:5555 \
    --data "Hello" --count 10000 --interval 0

6.7.2 自定义延迟基准测试

// bench_latency.c —— REQ/REP 往返延迟测试
#include <nng/nng.h>
#include <nng/protocol/reqrep0/req.h>
#include <nng/protocol/reqrep0/rep.h>
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <sys/time.h>
#include <pthread.h>

#define MSG_SIZE    64
#define ITERATIONS  10000

static double now_ms(void) {
    struct timeval tv;
    gettimeofday(&tv, NULL);
    return tv.tv_sec * 1000.0 + tv.tv_usec / 1000.0;
}

void *server_thread(void *arg) {
    nng_socket sock;
    nng_rep0_open(&sock);
    nng_listen(sock, "inproc://bench", NULL, 0);

    for (int i = 0; i < ITERATIONS; i++) {
        char buf[MSG_SIZE];
        size_t sz = sizeof(buf);
        nng_recv(sock, buf, &sz, 0);
        nng_send(sock, buf, sz, 0);
    }

    nng_close(sock);
    return NULL;
}

int main() {
    pthread_t srv;
    pthread_create(&srv, NULL, server_thread, NULL);

    // 等待服务端就绪
    usleep(100000);

    nng_socket sock;
    nng_req0_open(&sock);
    nng_dial(sock, "inproc://bench", NULL, 0);

    char data[MSG_SIZE];
    memset(data, 'A', MSG_SIZE);

    double start = now_ms();

    for (int i = 0; i < ITERATIONS; i++) {
        nng_send(sock, data, MSG_SIZE, 0);
        char buf[MSG_SIZE];
        size_t sz = sizeof(buf);
        nng_recv(sock, buf, &sz, 0);
    }

    double elapsed = now_ms() - start;

    printf("Iterations:  %d\n", ITERATIONS);
    printf("Message size: %d bytes\n", MSG_SIZE);
    printf("Total time:  %.2f ms\n", elapsed);
    printf("RTT avg:     %.3f μs\n", elapsed * 1000.0 / ITERATIONS);
    printf("Throughput:  %.0f msg/s\n", ITERATIONS / (elapsed / 1000.0));

    nng_close(sock);
    pthread_join(srv, NULL);
    return 0;
}
cc bench_latency.c -lnng -lpthread -o bench_latency
./bench_latency

6.7.3 自定义吞吐量基准测试

// bench_throughput.c —— PUB/SUB 吞吐量测试
#include <nng/nng.h>
#include <nng/protocol/pubsub0/pub.h>
#include <nng/protocol/pubsub0/sub.h>
#include <stdio.h>
#include <string.h>
#include <sys/time.h>
#include <pthread.h>

#define MSG_COUNT  100000
#define MSG_SIZE   256

static double now_ms(void) {
    struct timeval tv;
    gettimeofday(&tv, NULL);
    return tv.tv_sec * 1000.0 + tv.tv_usec / 1000.0;
}

void *publisher(void *arg) {
    nng_socket *sock = (nng_socket *)arg;

    // 等待订阅者连接
    usleep(500000);

    char data[MSG_SIZE];
    memset(data, 'B', MSG_SIZE);

    double start = now_ms();

    for (int i = 0; i < MSG_COUNT; i++) {
        nng_send(*sock, data, MSG_SIZE, 0);
    }

    double elapsed = now_ms() - start;
    printf("Published %d messages in %.2f ms\n", MSG_COUNT, elapsed);
    printf("Throughput: %.0f msg/s (%.2f MB/s)\n",
           MSG_COUNT / (elapsed / 1000.0),
           (double)MSG_COUNT * MSG_SIZE / elapsed / 1000.0);

    return NULL;
}

int main() {
    nng_socket pub, sub;
    pthread_t pub_thread;

    nng_pub0_open(&pub);
    nng_sub0_open(&sub);

    nng_setopt_ms(sub, NNG_OPT_RECVTIMEO, 10000);
    nng_setopt(sub, NNG_OPT_SUB_SUBSCRIBE, "", 0);

    nng_listen(pub, "inproc://throughput", NULL, 0);
    nng_dial(sub, "inproc://throughput", NULL, 0);

    pthread_create(&pub_thread, NULL, publisher, &pub);

    int received = 0;
    char buf[MSG_SIZE];
    size_t sz;

    while (received < MSG_COUNT) {
        sz = sizeof(buf);
        if (nng_recv(sub, buf, &sz, 0) == 0) {
            received++;
        }
    }

    printf("Received %d messages\n", received);

    nng_close(pub);
    nng_close(sub);
    pthread_join(pub_thread, NULL);
    return 0;
}

6.8 性能调优清单

6.8.1 传输层选择

优先级 传输方式 适用场景
1 inproc 同进程线程间
2 IPC (Unix Socket) 同机进程间
3 TCP 跨机器通信

6.8.2 调优参数

参数 调优方向 说明
NNG_OPT_TCP_NODELAY 设为 1 消除 Nagle 算法延迟
NNG_OPT_RECVBUF 增大 提高接收吞吐
NNG_OPT_SENDBUF 增大 提高发送吞吐
NNG_OPT_RECVMAXSZ 按需调整 防止过大消息
NNG_OPT_RECONNMINT 按需调整 控制重连频率

6.8.3 操作系统调优

# 增大文件描述符限制
ulimit -n 65536

# 增大 TCP 缓冲区
sysctl -w net.core.rmem_max=16777216
sysctl -w net.core.wmem_max=16777216
sysctl -w net.ipv4.tcp_rmem="4096 131072 16777216"
sysctl -w net.ipv4.tcp_wmem="4096 131072 16777216"

# 启用 TCP Fast Open
sysctl -w net.ipv4.tcp_fastopen=3

# 调整 TCP 拥塞算法
sysctl -w net.ipv4.tcp_congestion_control=bbr

6.9 与 ZeroMQ 性能对比

6.9.1 典型对比数据

指标 nanomsg NNG ZeroMQ
inproc RTT ~1.5 μs ~1.2 μs ~1.0 μs
TCP RTT (localhost) ~35 μs ~30 μs ~25 μs
TCP 吞吐 (1KB) ~400K msg/s ~450K msg/s ~500K msg/s
内存/连接 ~4 KB ~4 KB ~8 KB
启动时间 < 1 ms < 1 ms ~5 ms

ZeroMQ 在纯性能上略优,但 NNG 在内存占用和启动速度上更优。

6.9.2 何时性能差异重要

  • 嵌入式设备:NNG 内存优势明显
  • 大规模连接(>10K):NNG 内存优势显著
  • 超低延迟(<10 μs):差异不大,取决于传输选择
  • 批量数据传输:性能差异可忽略

6.10 注意事项

基准测试环境:性能数据高度依赖环境。务必在目标硬件上进行基准测试,不要直接引用文档数据。

消息大小分布:实际应用中的消息大小通常不均匀,基准测试应模拟真实分布。

协议开销:不同协议的开销不同。REQ/REP 需要维护请求-响应状态,PUB/SUB 有主题过滤开销。


6.11 扩展阅读


上一章第 5 章:NNG 现代 API 详解 | 下一章第 7 章:TLS 与安全通信