强曰为道
与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

nanomsg / NNG 消息库完全教程 / 第 6 章:可扩展性与性能

6.1 性能特征概述

nanomsg / NNG 的设计目标是低延迟、低内存、高吞吐。相比 ZeroMQ,它们在轻量级场景下有明显优势。

6.1.1 设计对性能的影响

设计决策性能影响
纯 C 实现无 GC 开销,启动快
无外部依赖无额外内存/线程开销
事件驱动 I/O高并发下低 CPU 占用
零拷贝消息减少内存拷贝,适合大消息
紧凑协议头低带宽占用

6.2 延迟特征

6.2.1 延迟组成

应用层延迟 ──┬── 序列化/反序列化
             ├── 协议层处理
             ├── 操作系统内核 ── TCP 栈 / IPC
             └── 网络传输 (TCP 场景)

6.2.2 典型延迟数据

以下数据基于 Linux 5.x、千兆网卡、x86_64 平台:

场景延迟 (P50)延迟 (P99)说明
inproc (线程间)~1 μs~5 μs共享内存,零拷贝
IPC (Unix Socket)~10 μs~50 μs本地 Socket
TCP (localhost)~30 μs~100 μs回环网络
TCP (局域网)~100 μs~500 μs千兆以太网
TCP (跨数据中心)~5 ms~20 ms取决于物理距离

以上数据仅供参考,实际性能取决于硬件、OS 配置、消息大小等因素。

6.2.3 降低延迟的技巧

技巧效果适用场景
使用 inproc 传输延迟降至 ~1 μs同进程线程间通信
启用 TCP_NODELAY消除 Nagle 延迟低延迟要求的 RPC
使用零拷贝减少内存拷贝大消息 (>1KB)
减小消息大小降低序列化开销所有场景
预分配缓冲区避免运行时分配高频消息
// 启用 TCP_NODELAY
int nodelay = 1;
nng_setopt(sock, NNG_OPT_TCP_NODELAY, &nodelay, sizeof(nodelay));

// 启用 TCP Keepalive
int keepalive = 1;
nng_setopt(sock, NNG_OPT_TCP_KEEPALIVE, &keepalive, sizeof(keepalive));

6.3 吞吐量特征

6.3.1 吞吐量衡量指标

指标说明单位
Messages/sec (msg/s)每秒处理消息数msg/s
Throughput (MB/s)每秒数据量MB/s
Connection scalability最大并发连接数connections

6.3.2 典型吞吐量

传输方式小消息 (64B)中消息 (1KB)大消息 (64KB)
inproc~5M msg/s~3M msg/s~500K msg/s
IPC~500K msg/s~400K msg/s~200K msg/s
TCP (localhost)~300K msg/s~250K msg/s~150K msg/s
TCP (局域网)~200K msg/s~180K msg/s~100K msg/s

6.3.3 消息大小与吞吐量的关系

吞吐量 (MB/s)
│
│        ╭───────────────────
│       ╱
│      ╱
│     ╱
│    ╱
│───╯
│
└──────────────────────────── 消息大小 (bytes)
   64   256  1K   4K   16K  64K

小消息: 协议开销占比高,吞吐量受 msg/s 限制
大消息: 数据占比高,吞吐量受带宽限制

6.4 连接数扩展

6.4.1 连接数上限

nanomsg / NNG 的连接数上限主要受操作系统限制:

资源nanomsgNNG瓶颈
文件描述符受系统 ulimit受系统 ulimitulimit -n
内存~4KB/连接~2KB/连接系统内存
线程2 个内部线程线程池 (可配置)CPU 核心数

6.4.2 调整系统限制

# 查看当前限制
ulimit -n

# 临时增加(当前 shell)
ulimit -n 65536

# 永久增加(/etc/security/limits.conf)
# *  soft  nofile  65536
# *  hard  nofile  65536

# sysctl 调优
sysctl -w net.core.somaxconn=65535
sysctl -w net.ipv4.tcp_max_syn_backlog=65535

6.4.3 NNG 线程池配置

// NNG 默认使用 4 个 I/O 线程
// 可在程序启动前设置环境变量
// NNG_NUM_TASKQ_THREADS=8

// 或在代码中设置
// (NNG 内部使用 nng_taskq_set_threads 但非公开 API)

环境变量方式:

export NNG_NUM_TASKQ_THREADS=8
./myapp

6.5 内存使用

6.5.1 内存组成

总内存 = Socket 基础内存
       + 连接数 × 每连接内存
       + 发送缓冲区 × 缓冲区大小
       + 接收缓冲区 × 缓冲区大小
       + 消息队列 × 消息大小

6.5.2 内存占用估算

组件nanomsgNNG
Socket 基础~2 KB~4 KB
每连接开销~2 KB~2 KB
发送缓冲区 (默认)128 KB128 条消息
接收缓冲区 (默认)128 KB128 条消息

估算示例: 100 个连接,每个 Socket 128 条消息缓冲

场景估算内存
nanomsg2KB + 100×2KB + 256KB ≈ 458 KB
NNG4KB + 100×2KB + 256×1KB ≈ 460 KB

6.5.3 减少内存使用

// 减小缓冲区队列
int bufsize = 16;  // 默认 128
nng_setopt(sock, NNG_OPT_RECVBUF, &bufsize, sizeof(bufsize));
nng_setopt(sock, NNG_OPT_SENDBUF, &bufsize, sizeof(bufsize));

// 限制最大接收消息大小
size_t maxsz = 4096;  // 默认 1MB
nng_setopt(sock, NNG_OPT_RECVMAXSZ, &maxsz, sizeof(maxsz));

6.5.4 消息内存管理

// nanomsg 零拷贝
void *msg = nn_allocmsg(1024, 0);
memcpy(msg, data, 1024);
nn_send(sock, &msg, NN_MSG, 0);
// 发送后不要释放 msg

// NNG 零拷贝
nng_msg *msg;
nng_msg_alloc(&msg, 1024);
memcpy(nng_msg_body(msg), data, 1024);
nng_sendmsg(sock, msg, 0);
// 发送后不要释放 msg

6.6 消息大小

6.6.1 默认限制

默认最大消息可配置上限
nanomsg1 MB无硬限制
NNG1 MBNNG_OPT_RECVMAXSZ

6.6.2 大消息处理策略

消息大小推荐策略说明
< 1 KB直接发送性能最优
1 KB - 64 KB直接发送 + 零拷贝减少拷贝开销
64 KB - 1 MB调整缓冲区 + 零拷贝需增大缓冲区
> 1 MB应用层分片避免单消息过大

6.6.3 应用层分片

#define CHUNK_SIZE 65536  // 64KB

int send_large_message(nng_socket sock, const void *data, size_t len) {
    size_t offset = 0;
    while (offset < len) {
        size_t chunk = (len - offset > CHUNK_SIZE) ? CHUNK_SIZE : (len - offset);

        // 构造消息: [4字节总长][4字节偏移][数据]
        nng_msg *msg;
        nng_msg_alloc(&msg, 8 + chunk);

        void *body = nng_msg_body(msg);
        uint32_t total = (uint32_t)len;
        uint32_t off = (uint32_t)offset;
        memcpy(body, &total, 4);
        memcpy(body + 4, &off, 4);
        memcpy(body + 8, data + offset, chunk);

        int rv = nng_sendmsg(sock, msg, 0);
        if (rv != 0) return rv;

        offset += chunk;
    }
    return 0;
}

6.7 基准测试

6.7.1 使用 nngcat 测试

NNG 自带的 nngcat 工具可用于快速基准测试:

# 终端 1:启动服务端
nngcat --rep --listen tcp://*:5555 --count 10000

# 终端 2:运行客户端(发送 10000 条消息)
time nngcat --req --dial tcp://localhost:5555 \
    --data "Hello" --count 10000 --interval 0

6.7.2 自定义延迟基准测试

// bench_latency.c —— REQ/REP 往返延迟测试
#include <nng/nng.h>
#include <nng/protocol/reqrep0/req.h>
#include <nng/protocol/reqrep0/rep.h>
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <sys/time.h>
#include <pthread.h>

#define MSG_SIZE    64
#define ITERATIONS  10000

static double now_ms(void) {
    struct timeval tv;
    gettimeofday(&tv, NULL);
    return tv.tv_sec * 1000.0 + tv.tv_usec / 1000.0;
}

void *server_thread(void *arg) {
    nng_socket sock;
    nng_rep0_open(&sock);
    nng_listen(sock, "inproc://bench", NULL, 0);

    for (int i = 0; i < ITERATIONS; i++) {
        char buf[MSG_SIZE];
        size_t sz = sizeof(buf);
        nng_recv(sock, buf, &sz, 0);
        nng_send(sock, buf, sz, 0);
    }

    nng_close(sock);
    return NULL;
}

int main() {
    pthread_t srv;
    pthread_create(&srv, NULL, server_thread, NULL);

    // 等待服务端就绪
    usleep(100000);

    nng_socket sock;
    nng_req0_open(&sock);
    nng_dial(sock, "inproc://bench", NULL, 0);

    char data[MSG_SIZE];
    memset(data, 'A', MSG_SIZE);

    double start = now_ms();

    for (int i = 0; i < ITERATIONS; i++) {
        nng_send(sock, data, MSG_SIZE, 0);
        char buf[MSG_SIZE];
        size_t sz = sizeof(buf);
        nng_recv(sock, buf, &sz, 0);
    }

    double elapsed = now_ms() - start;

    printf("Iterations:  %d\n", ITERATIONS);
    printf("Message size: %d bytes\n", MSG_SIZE);
    printf("Total time:  %.2f ms\n", elapsed);
    printf("RTT avg:     %.3f μs\n", elapsed * 1000.0 / ITERATIONS);
    printf("Throughput:  %.0f msg/s\n", ITERATIONS / (elapsed / 1000.0));

    nng_close(sock);
    pthread_join(srv, NULL);
    return 0;
}
cc bench_latency.c -lnng -lpthread -o bench_latency
./bench_latency

6.7.3 自定义吞吐量基准测试

// bench_throughput.c —— PUB/SUB 吞吐量测试
#include <nng/nng.h>
#include <nng/protocol/pubsub0/pub.h>
#include <nng/protocol/pubsub0/sub.h>
#include <stdio.h>
#include <string.h>
#include <sys/time.h>
#include <pthread.h>

#define MSG_COUNT  100000
#define MSG_SIZE   256

static double now_ms(void) {
    struct timeval tv;
    gettimeofday(&tv, NULL);
    return tv.tv_sec * 1000.0 + tv.tv_usec / 1000.0;
}

void *publisher(void *arg) {
    nng_socket *sock = (nng_socket *)arg;

    // 等待订阅者连接
    usleep(500000);

    char data[MSG_SIZE];
    memset(data, 'B', MSG_SIZE);

    double start = now_ms();

    for (int i = 0; i < MSG_COUNT; i++) {
        nng_send(*sock, data, MSG_SIZE, 0);
    }

    double elapsed = now_ms() - start;
    printf("Published %d messages in %.2f ms\n", MSG_COUNT, elapsed);
    printf("Throughput: %.0f msg/s (%.2f MB/s)\n",
           MSG_COUNT / (elapsed / 1000.0),
           (double)MSG_COUNT * MSG_SIZE / elapsed / 1000.0);

    return NULL;
}

int main() {
    nng_socket pub, sub;
    pthread_t pub_thread;

    nng_pub0_open(&pub);
    nng_sub0_open(&sub);

    nng_setopt_ms(sub, NNG_OPT_RECVTIMEO, 10000);
    nng_setopt(sub, NNG_OPT_SUB_SUBSCRIBE, "", 0);

    nng_listen(pub, "inproc://throughput", NULL, 0);
    nng_dial(sub, "inproc://throughput", NULL, 0);

    pthread_create(&pub_thread, NULL, publisher, &pub);

    int received = 0;
    char buf[MSG_SIZE];
    size_t sz;

    while (received < MSG_COUNT) {
        sz = sizeof(buf);
        if (nng_recv(sub, buf, &sz, 0) == 0) {
            received++;
        }
    }

    printf("Received %d messages\n", received);

    nng_close(pub);
    nng_close(sub);
    pthread_join(pub_thread, NULL);
    return 0;
}

6.8 性能调优清单

6.8.1 传输层选择

优先级传输方式适用场景
1inproc同进程线程间
2IPC (Unix Socket)同机进程间
3TCP跨机器通信

6.8.2 调优参数

参数调优方向说明
NNG_OPT_TCP_NODELAY设为 1消除 Nagle 算法延迟
NNG_OPT_RECVBUF增大提高接收吞吐
NNG_OPT_SENDBUF增大提高发送吞吐
NNG_OPT_RECVMAXSZ按需调整防止过大消息
NNG_OPT_RECONNMINT按需调整控制重连频率

6.8.3 操作系统调优

# 增大文件描述符限制
ulimit -n 65536

# 增大 TCP 缓冲区
sysctl -w net.core.rmem_max=16777216
sysctl -w net.core.wmem_max=16777216
sysctl -w net.ipv4.tcp_rmem="4096 131072 16777216"
sysctl -w net.ipv4.tcp_wmem="4096 131072 16777216"

# 启用 TCP Fast Open
sysctl -w net.ipv4.tcp_fastopen=3

# 调整 TCP 拥塞算法
sysctl -w net.ipv4.tcp_congestion_control=bbr

6.9 与 ZeroMQ 性能对比

6.9.1 典型对比数据

指标nanomsgNNGZeroMQ
inproc RTT~1.5 μs~1.2 μs~1.0 μs
TCP RTT (localhost)~35 μs~30 μs~25 μs
TCP 吞吐 (1KB)~400K msg/s~450K msg/s~500K msg/s
内存/连接~4 KB~4 KB~8 KB
启动时间< 1 ms< 1 ms~5 ms

ZeroMQ 在纯性能上略优,但 NNG 在内存占用和启动速度上更优。

6.9.2 何时性能差异重要

  • 嵌入式设备:NNG 内存优势明显
  • 大规模连接(>10K):NNG 内存优势显著
  • 超低延迟(<10 μs):差异不大,取决于传输选择
  • 批量数据传输:性能差异可忽略

6.10 注意事项

基准测试环境:性能数据高度依赖环境。务必在目标硬件上进行基准测试,不要直接引用文档数据。

消息大小分布:实际应用中的消息大小通常不均匀,基准测试应模拟真实分布。

协议开销:不同协议的开销不同。REQ/REP 需要维护请求-响应状态,PUB/SUB 有主题过滤开销。


6.11 扩展阅读


上一章第 5 章:NNG 现代 API 详解 | 下一章第 7 章:TLS 与安全通信