强曰为道
与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

nanomsg / NNG 消息库完全教程 / 第 10 章:最佳实践与总结

10.1 协议选型决策指南

10.1.1 选型决策树

你的通信需求是什么?
│
├── 简单的 1:1 双向通信?
│   └── PAIR(最简单,最低开销)
│
├── 请求-应答(RPC)?
│   ├── 单服务端 ── REQ/REP
│   └── 多服务端负载均衡 ── REQ/REP(自动轮询)
│
├── 广播/发布-订阅?
│   ├── 允许丢消息 ── PUB/SUB
│   ├── 需要可靠投递 ── 推荐替代方案(ZeroMQ XPUB/XSUB + 代理)
│   └── 需要收集响应 ── SURVEYOR/RESPONDENT
│
├── 任务队列(生产者-消费者)?
│   └── PUSH/PULL(背压保护,负载均衡)
│
├── 全连接(集群通信)?
│   └── BUS(N:N 广播)
│
└── 服务发现/健康检查?
    └── SURVEYOR/RESPONDENT(带超时的调查模式)

10.1.2 场景对照表

业务场景推荐协议传输方式关键配置
Web API 网关 → 后端服务REQ/REPTCPSNDTIMEO, RCVTIMEO
实时事件推送PUB/SUBTCP/WSSUB_SUBSCRIBE
日志收集器PUSH/PULLTCPRECVBUF
微服务间 RPCREQ/REP + TLSTLS+TCPmTLS
进程内任务分发PUSH/PULLinproc
IoT 设备数据上报PUSH/PULLTCPRECONNECT_IVL
浏览器实时数据PUB/SUBWSSTLS 证书
集群选主SURVEYTCPSURVEYTIMEO
服务健康检查SURVEYTCP短超时

10.1.3 避免的错误选择

错误后果正确做法
用 PUB/SUB 做 RPC无法返回响应用 REQ/REP
用 REQ/REP 做广播只有一个服务端能收到用 PUB/SUB
用 PAIR 做 1:N 通信只有一个对端能连接用 PUSH/PULL
用 PUSH/PULL 做双向通信PULL 只能收,PUSH 只能发用 REQ/REP 或 PAIR

10.2 错误处理最佳实践

10.2.1 错误处理原则

  1. 始终检查返回值:NNG 的每个函数都可能失败
  2. 区分可恢复和不可恢复错误:超时可重试,内存不足需退出
  3. 优雅降级:连接断开时自动重连,而不是崩溃
  4. 记录所有错误:便于排查问题

10.2.2 统一错误处理宏

#include <nng/nng.h>
#include <stdio.h>
#include <stdlib.h>

// 致命错误:打印并退出
#define FATAL(fmt, ...) do {                    \
    fprintf(stderr, "[FATAL] " fmt "\n",        \
            ##__VA_ARGS__);                      \
    exit(1);                                    \
} while (0)

// 非致命错误:打印并继续
#define WARN(fmt, ...) do {                     \
    fprintf(stderr, "[WARN] " fmt "\n",         \
            ##__VA_ARGS__);                      \
} while (0)

// 检查 NNG 返回值
#define NNG_CHECK(rv, msg) do {                 \
    if ((rv) != 0) {                            \
        FATAL("%s: %s (code=%d)",               \
              msg, nng_strerror(rv), rv);        \
    }                                           \
} while (0)

// 使用示例
int main() {
    nng_socket sock;
    int rv;

    rv = nng_req0_open(&sock);
    NNG_CHECK(rv, "nng_req0_open");

    rv = nng_dial(sock, "tcp://localhost:5555", NULL, 0);
    NNG_CHECK(rv, "nng_dial");

    nng_close(sock);
    return 0;
}

10.2.3 可恢复错误的重试模式

#include <nng/nng.h>
#include <stdio.h>
#include <unistd.h>

#define MAX_RETRIES 5
#define RETRY_DELAY_MS 1000

int send_with_retry(nng_socket sock, const void *data, size_t len) {
    int rv;
    for (int attempt = 1; attempt <= MAX_RETRIES; attempt++) {
        rv = nng_send(sock, (void *)data, len, 0);
        if (rv == 0) {
            return 0;  // 成功
        }

        WARN("send attempt %d/%d failed: %s",
             attempt, MAX_RETRIES, nng_strerror(rv));

        if (rv == NNG_ETIMEDOUT || rv == NNG_EAGAIN) {
            // 可重试的错误
            usleep(RETRY_DELAY_MS * 1000 * attempt);  // 指数退避
        } else {
            // 不可重试的错误
            break;
        }
    }
    return rv;
}

int recv_with_retry(nng_socket sock, void **buf, size_t *sz) {
    int rv;
    for (int attempt = 1; attempt <= MAX_RETRIES; attempt++) {
        rv = nng_recv(sock, buf, sz, NNG_FLAG_ALLOC);
        if (rv == 0) {
            return 0;
        }

        if (rv == NNG_ETIMEDOUT) {
            WARN("recv timeout, attempt %d/%d", attempt, MAX_RETRIES);
            continue;
        }

        // 其他错误不重试
        WARN("recv failed: %s", nng_strerror(rv));
        break;
    }
    return rv;
}

10.2.4 优雅关闭

#include <nng/nng.h>
#include <signal.h>
#include <stdio.h>
#include <stdbool.h>

static volatile bool g_running = true;

void signal_handler(int sig) {
    (void)sig;
    g_running = false;
}

int main() {
    nng_socket sock;
    int rv;

    // 注册信号处理
    signal(SIGINT, signal_handler);
    signal(SIGTERM, signal_handler);

    // 设置 linger 时间(等待未发送消息的时间)
    int linger_ms = 5000;
    nng_setopt(sock, NNG_OPT_LINGER, &linger_ms, sizeof(linger_ms));

    // 设置接收超时(让循环能检查 g_running)
    nng_setopt_ms(sock, NNG_OPT_RECVTIMEO, 1000);

    // ... 初始化 socket ...

    while (g_running) {
        // 收发消息(超时后会返回,检查 g_running)
        char *buf = NULL;
        size_t sz;
        rv = nng_recv(sock, &buf, &sz, NNG_FLAG_ALLOC);
        if (rv == NNG_ETIMEDOUT) {
            continue;  // 检查 g_running
        }
        if (rv != 0) {
            WARN("recv error: %s", nng_strerror(rv));
            break;
        }

        // 处理消息...
        nng_free(buf, sz);
    }

    // 优雅关闭
    printf("Shutting down...\n");
    nng_close(sock);  // 等待 linger 时间
    printf("Done.\n");
    return 0;
}

10.3 性能调优策略

10.3.1 调优清单

层面调优项推荐值说明
传输层优先使用 inproc同进程通信首选
优先使用 IPC同机通信首选
TCP_NODELAY1消除 Nagle 延迟
TCP_KEEPALIVE1检测死连接
缓冲区RECVBUF256-1024增大接收队列
SENDBUF256-1024增大发送队列
RECVMAXSZ按需限制最大消息大小
超时RECVTIMEO按需避免永久阻塞
SENDTIMEO按需避免发送挂起
重连RECONNMINT100ms初始重连间隔
RECONNMAXT30s最大重连间隔
OSulimit -n65536文件描述符限制
TCP 缓冲区16MB网络吞吐量

10.3.2 消息设计

// ❌ 错误:频繁发送小消息
for (int i = 0; i < 1000; i++) {
    nng_send(sock, &data[i], 1, 0);  // 1 字节 × 1000 次
}

// ✅ 正确:批量发送
nng_send(sock, data, 1000, 0);  // 1000 字节 × 1 次
// ❌ 错误:每次发送都分配/释放内存
while (running) {
    char *msg = malloc(1024);
    fill_data(msg, 1024);
    nng_send(sock, msg, 1024, 0);
    free(msg);  // 发送后立即释放(但 send 可能还在进行)
}

// ✅ 正确:使用零拷贝或预分配
char msg_buf[1024];  // 预分配
while (running) {
    fill_data(msg_buf, 1024);
    nng_send(sock, msg_buf, 1024, 0);  // NNG 内部会拷贝
}

10.3.3 连接管理

// ✅ 正确:服务端使用 listen,客户端使用 dial
// 服务端
nng_listen(sock, "tcp://0.0.0.0:5555", NULL, 0);

// 客户端(支持自动重连)
nng_dialer dialer;
nng_dialer_create(&dialer, sock);
nng_dialer_setopt_ms(dialer, NNG_OPT_RECONNMINT, 100);
nng_dialer_setopt_ms(dialer, NNG_OPT_RECONNMAXT, 30000);
nng_dialer_start(dialer, 0);

10.4 常见陷阱

10.4.1 协议状态错误

// ❌ 错误:REQ 连续发送两次
nng_send(sock, "msg1", 4, 0);
nng_send(sock, "msg2", 4, 0);  // NNG_ESTATE! REQ 必须先收后发

// ✅ 正确:REQ 严格交替 send-recv
nng_send(sock, "msg1", 4, 0);
nng_recv(sock, buf, &sz, 0);
nng_send(sock, "msg2", 4, 0);
nng_recv(sock, buf, &sz, 0);

10.4.2 SUB 不接收消息

// ❌ 错误:SUB 没有设置订阅
nng_sub0_open(&sock);
nng_dial(sock, "tcp://localhost:5556", NULL, 0);
nng_recv(sock, buf, &sz, 0);  // 永远收不到消息!

// ✅ 正确:设置订阅过滤
nng_setopt(sock, NNG_OPT_SUB_SUBSCRIBE, "", 0);  // 接收所有

10.4.3 内存泄漏

// ❌ 错误:零拷贝接收后忘记释放
void *buf = NULL;
nng_recv(sock, &buf, &sz, NNG_FLAG_ALLOC);
// ... 使用 buf ...
// 忘记 nng_free(buf, sz); ← 内存泄漏!

// ✅ 正确:始终释放
nng_free(buf, sz);

10.4.4 消息所有权

// ❌ 错误:sendmsg 后再次使用消息
nng_msg *msg;
nng_msg_alloc(&msg, 0);
nng_msg_append(msg, "data", 4);
nng_sendmsg(sock, msg, 0);
nng_msg_free(msg);  // 错误!sendmsg 已接管所有权

// ✅ 正确:sendmsg 后不要释放
nng_sendmsg(sock, msg, 0);
// msg 已被 NNG 接管,不要操作

10.4.5 inproc 连接顺序

// ❌ 错误:inproc 先 connect 后 listen
nng_dial(sock, "inproc://test", NULL, 0);  // 可能失败
nng_listen(other, "inproc://test", NULL, 0);

// ✅ 正确:先 listen 后 connect
nng_listen(other, "inproc://test", NULL, 0);
nng_dial(sock, "inproc://test", NULL, 0);

10.5 生产环境检查清单

10.5.1 上线前检查

检查项状态说明
使用 NNG 而非 nanomsg新项目必须用 NNG
启用 TLS/mTLS跨网络通信必须加密
设置所有超时RECVTIMEO, SENDTIMEO
设置 RECVMAXSZ防止内存耗尽
错误处理覆盖所有 NNG 调用检查返回值
优雅关闭信号处理 + linger
日志记录关键操作有日志
健康检查提供健康检查端点
自动重连Dialer 设置重连间隔
资源限制ulimit, Docker limits
监控指标消息计数、延迟、错误率

10.5.2 监控指标

// 建议收集的指标
typedef struct {
    uint64_t messages_sent;
    uint64_t messages_received;
    uint64_t send_errors;
    uint64_t recv_errors;
    uint64_t reconnect_count;
    double   avg_latency_us;
    double   p99_latency_us;
} nng_metrics;

// 使用原子操作更新
#include <stdatomic.h>

static atomic_uint_fast64_t g_sent = 0;
static atomic_uint_fast64_t g_recv = 0;

// 在发送时
atomic_fetch_add(&g_sent, 1);

// 在接收时
atomic_fetch_add(&g_recv, 1);

10.6 替代方案对比

10.6.1 消息库横向对比

特性nanomsgNNGZeroMQMQTT (Mosquitto)RabbitMQKafka
类型协议+BrokerBrokerBroker
语言CCC++CErlangJava
许可证MITMITLGPLEPLMPLApache
TLS⚠️
持久化⚠️
内存占用极小
延迟极低极低
吞吐极高
学习曲线
生态

10.6.2 何时选择 NNG

选择 NNG:

  • 需要嵌入到应用中的轻量消息库
  • 对延迟和内存有严格要求
  • 进程内/进程间通信为主
  • 不需要消息持久化
  • 许可证要求宽松(MIT)

选择 ZeroMQ:

  • 需要更丰富的语言绑定和社区支持
  • 团队已有 ZeroMQ 经验
  • 需要更多协议扩展(如 PARALLEL)

选择 MQTT:

  • IoT 设备通信
  • 需要标准协议(互操作性)
  • 带宽受限的网络
  • 需要 Broker 中转

选择 RabbitMQ:

  • 需要消息持久化
  • 需要复杂路由规则
  • 需要消息确认和重试
  • 企业级特性(管理界面、集群)

选择 Kafka:

  • 大数据量日志/事件流
  • 需要消息回放
  • 需要严格顺序保证
  • 流处理场景

10.6.3 混合架构

实际项目中可以混合使用多种消息技术:

┌──────────┐  NNG REQ/REP   ┌──────────┐
│ Web 前端  │───────────────│ API 网关  │
└──────────┘               └─────┬────┘
                                 │
                    NNG PUSH/PULL│
                                 ▼
                           ┌──────────┐
                           │ 业务服务  │
                           └─────┬────┘
                                 │
                    Kafka / NATS │
                                 ▼
                           ┌──────────┐
                           │ 数据管道  │
                           └──────────┘

10.7 迁移指南

10.7.1 从 nanomsg 迁移到 NNG

nanomsgNNG备注
nn_socket(AF_SP, NN_PAIR)nng_pair0_open(&sock)函数名变化
nn_bind(sock, url)nng_listen(sock, url, NULL, 0)返回值处理
nn_connect(sock, url)nng_dial(sock, url, NULL, 0)返回值处理
nn_send(sock, buf, len, 0)nng_send(sock, buf, len, 0)兼容
nn_recv(sock, buf, len, 0)nng_recv(sock, buf, &sz, 0)参数变化
nn_close(sock)nng_close(sock)兼容
nn_errno()函数返回值错误处理方式变化
nn_setsockopt(...)nng_setopt(...)简化

10.7.2 迁移步骤

  1. 替换头文件:nanomsg/*.hnng/nng.h + 协议头文件
  2. 替换函数调用
  3. 改变错误处理模式(从 nn_errno() 到返回值检查)
  4. 编译时链接 -lnng 替代 -lnanomsg
  5. 利用 NNG 新特性(异步 I/O、TLS、Context)

10.8 学习路径建议

10.8.1 初学者

第 1 章 (概述) → 第 2 章 (安装) → 第 3 章 (协议) → 第 4/5 章 (API)
→ 动手写一个简单的 REQ/REP 程序
→ 尝试 PUSH/PULL 任务队列
→ 尝试 PUB/SUB 事件广播

10.8.2 进阶开发者

第 6 章 (性能) → 第 7 章 (TLS) → 第 8 章 (IPC)
→ 为现有项目添加 TLS 支持
→ 进行基准测试和性能调优
→ 实现异步消息处理(AIO)

10.8.3 架构师

第 9 章 (Docker) → 第 10 章 (最佳实践)
→ 设计微服务通信架构
→ 评估 NNG vs 其他方案
→ 制定团队技术规范

10.9 总结

10.9.1 nanomsg / NNG 的核心价值

价值说明
简洁几十个 API 函数,一天学会
轻量内存占用小,启动快
高效延迟低,吞吐高
可嵌入纯 C,无外部依赖
灵活多种协议和传输可组合
安全NNG 原生 TLS 支持
现代异步 I/O,Context 并发

10.9.2 关键要点

  1. 新项目用 NNG,不要用 nanomsg
  2. 协议选型是关键,错误的选择会导致架构问题
  3. 始终处理错误,NNG 的每个调用都可能失败
  4. 设置超时,避免程序永久阻塞
  5. 跨网络通信必须用 TLS
  6. 优先使用 inproc/IPC,性能远优于 TCP
  7. 利用自动重连,NNG Dialer 支持自动重连

10.10 扩展阅读

10.10.1 官方资源

10.10.2 相关书籍和文章

10.10.3 社区


上一章第 9 章:Docker 与微服务 | 返回目录