nanomsg / NNG 消息库完全教程 / 第 10 章:最佳实践与总结
10.1 协议选型决策指南
10.1.1 选型决策树
你的通信需求是什么?
│
├── 简单的 1:1 双向通信?
│ └── PAIR(最简单,最低开销)
│
├── 请求-应答(RPC)?
│ ├── 单服务端 ── REQ/REP
│ └── 多服务端负载均衡 ── REQ/REP(自动轮询)
│
├── 广播/发布-订阅?
│ ├── 允许丢消息 ── PUB/SUB
│ ├── 需要可靠投递 ── 推荐替代方案(ZeroMQ XPUB/XSUB + 代理)
│ └── 需要收集响应 ── SURVEYOR/RESPONDENT
│
├── 任务队列(生产者-消费者)?
│ └── PUSH/PULL(背压保护,负载均衡)
│
├── 全连接(集群通信)?
│ └── BUS(N:N 广播)
│
└── 服务发现/健康检查?
└── SURVEYOR/RESPONDENT(带超时的调查模式)
10.1.2 场景对照表
| 业务场景 | 推荐协议 | 传输方式 | 关键配置 |
|---|---|---|---|
| Web API 网关 → 后端服务 | REQ/REP | TCP | SNDTIMEO, RCVTIMEO |
| 实时事件推送 | PUB/SUB | TCP/WS | SUB_SUBSCRIBE |
| 日志收集器 | PUSH/PULL | TCP | RECVBUF |
| 微服务间 RPC | REQ/REP + TLS | TLS+TCP | mTLS |
| 进程内任务分发 | PUSH/PULL | inproc | — |
| IoT 设备数据上报 | PUSH/PULL | TCP | RECONNECT_IVL |
| 浏览器实时数据 | PUB/SUB | WSS | TLS 证书 |
| 集群选主 | SURVEY | TCP | SURVEYTIMEO |
| 服务健康检查 | SURVEY | TCP | 短超时 |
10.1.3 避免的错误选择
| 错误 | 后果 | 正确做法 |
|---|---|---|
| 用 PUB/SUB 做 RPC | 无法返回响应 | 用 REQ/REP |
| 用 REQ/REP 做广播 | 只有一个服务端能收到 | 用 PUB/SUB |
| 用 PAIR 做 1:N 通信 | 只有一个对端能连接 | 用 PUSH/PULL |
| 用 PUSH/PULL 做双向通信 | PULL 只能收,PUSH 只能发 | 用 REQ/REP 或 PAIR |
10.2 错误处理最佳实践
10.2.1 错误处理原则
- 始终检查返回值:NNG 的每个函数都可能失败
- 区分可恢复和不可恢复错误:超时可重试,内存不足需退出
- 优雅降级:连接断开时自动重连,而不是崩溃
- 记录所有错误:便于排查问题
10.2.2 统一错误处理宏
#include <nng/nng.h>
#include <stdio.h>
#include <stdlib.h>
// 致命错误:打印并退出
#define FATAL(fmt, ...) do { \
fprintf(stderr, "[FATAL] " fmt "\n", \
##__VA_ARGS__); \
exit(1); \
} while (0)
// 非致命错误:打印并继续
#define WARN(fmt, ...) do { \
fprintf(stderr, "[WARN] " fmt "\n", \
##__VA_ARGS__); \
} while (0)
// 检查 NNG 返回值
#define NNG_CHECK(rv, msg) do { \
if ((rv) != 0) { \
FATAL("%s: %s (code=%d)", \
msg, nng_strerror(rv), rv); \
} \
} while (0)
// 使用示例
int main() {
nng_socket sock;
int rv;
rv = nng_req0_open(&sock);
NNG_CHECK(rv, "nng_req0_open");
rv = nng_dial(sock, "tcp://localhost:5555", NULL, 0);
NNG_CHECK(rv, "nng_dial");
nng_close(sock);
return 0;
}
10.2.3 可恢复错误的重试模式
#include <nng/nng.h>
#include <stdio.h>
#include <unistd.h>
#define MAX_RETRIES 5
#define RETRY_DELAY_MS 1000
int send_with_retry(nng_socket sock, const void *data, size_t len) {
int rv;
for (int attempt = 1; attempt <= MAX_RETRIES; attempt++) {
rv = nng_send(sock, (void *)data, len, 0);
if (rv == 0) {
return 0; // 成功
}
WARN("send attempt %d/%d failed: %s",
attempt, MAX_RETRIES, nng_strerror(rv));
if (rv == NNG_ETIMEDOUT || rv == NNG_EAGAIN) {
// 可重试的错误
usleep(RETRY_DELAY_MS * 1000 * attempt); // 指数退避
} else {
// 不可重试的错误
break;
}
}
return rv;
}
int recv_with_retry(nng_socket sock, void **buf, size_t *sz) {
int rv;
for (int attempt = 1; attempt <= MAX_RETRIES; attempt++) {
rv = nng_recv(sock, buf, sz, NNG_FLAG_ALLOC);
if (rv == 0) {
return 0;
}
if (rv == NNG_ETIMEDOUT) {
WARN("recv timeout, attempt %d/%d", attempt, MAX_RETRIES);
continue;
}
// 其他错误不重试
WARN("recv failed: %s", nng_strerror(rv));
break;
}
return rv;
}
10.2.4 优雅关闭
#include <nng/nng.h>
#include <signal.h>
#include <stdio.h>
#include <stdbool.h>
static volatile bool g_running = true;
void signal_handler(int sig) {
(void)sig;
g_running = false;
}
int main() {
nng_socket sock;
int rv;
// 注册信号处理
signal(SIGINT, signal_handler);
signal(SIGTERM, signal_handler);
// 设置 linger 时间(等待未发送消息的时间)
int linger_ms = 5000;
nng_setopt(sock, NNG_OPT_LINGER, &linger_ms, sizeof(linger_ms));
// 设置接收超时(让循环能检查 g_running)
nng_setopt_ms(sock, NNG_OPT_RECVTIMEO, 1000);
// ... 初始化 socket ...
while (g_running) {
// 收发消息(超时后会返回,检查 g_running)
char *buf = NULL;
size_t sz;
rv = nng_recv(sock, &buf, &sz, NNG_FLAG_ALLOC);
if (rv == NNG_ETIMEDOUT) {
continue; // 检查 g_running
}
if (rv != 0) {
WARN("recv error: %s", nng_strerror(rv));
break;
}
// 处理消息...
nng_free(buf, sz);
}
// 优雅关闭
printf("Shutting down...\n");
nng_close(sock); // 等待 linger 时间
printf("Done.\n");
return 0;
}
10.3 性能调优策略
10.3.1 调优清单
| 层面 | 调优项 | 推荐值 | 说明 |
|---|---|---|---|
| 传输层 | 优先使用 inproc | — | 同进程通信首选 |
| 优先使用 IPC | — | 同机通信首选 | |
| TCP_NODELAY | 1 | 消除 Nagle 延迟 | |
| TCP_KEEPALIVE | 1 | 检测死连接 | |
| 缓冲区 | RECVBUF | 256-1024 | 增大接收队列 |
| SENDBUF | 256-1024 | 增大发送队列 | |
| RECVMAXSZ | 按需 | 限制最大消息大小 | |
| 超时 | RECVTIMEO | 按需 | 避免永久阻塞 |
| SENDTIMEO | 按需 | 避免发送挂起 | |
| 重连 | RECONNMINT | 100ms | 初始重连间隔 |
| RECONNMAXT | 30s | 最大重连间隔 | |
| OS | ulimit -n | 65536 | 文件描述符限制 |
| TCP 缓冲区 | 16MB | 网络吞吐量 |
10.3.2 消息设计
// ❌ 错误:频繁发送小消息
for (int i = 0; i < 1000; i++) {
nng_send(sock, &data[i], 1, 0); // 1 字节 × 1000 次
}
// ✅ 正确:批量发送
nng_send(sock, data, 1000, 0); // 1000 字节 × 1 次
// ❌ 错误:每次发送都分配/释放内存
while (running) {
char *msg = malloc(1024);
fill_data(msg, 1024);
nng_send(sock, msg, 1024, 0);
free(msg); // 发送后立即释放(但 send 可能还在进行)
}
// ✅ 正确:使用零拷贝或预分配
char msg_buf[1024]; // 预分配
while (running) {
fill_data(msg_buf, 1024);
nng_send(sock, msg_buf, 1024, 0); // NNG 内部会拷贝
}
10.3.3 连接管理
// ✅ 正确:服务端使用 listen,客户端使用 dial
// 服务端
nng_listen(sock, "tcp://0.0.0.0:5555", NULL, 0);
// 客户端(支持自动重连)
nng_dialer dialer;
nng_dialer_create(&dialer, sock);
nng_dialer_setopt_ms(dialer, NNG_OPT_RECONNMINT, 100);
nng_dialer_setopt_ms(dialer, NNG_OPT_RECONNMAXT, 30000);
nng_dialer_start(dialer, 0);
10.4 常见陷阱
10.4.1 协议状态错误
// ❌ 错误:REQ 连续发送两次
nng_send(sock, "msg1", 4, 0);
nng_send(sock, "msg2", 4, 0); // NNG_ESTATE! REQ 必须先收后发
// ✅ 正确:REQ 严格交替 send-recv
nng_send(sock, "msg1", 4, 0);
nng_recv(sock, buf, &sz, 0);
nng_send(sock, "msg2", 4, 0);
nng_recv(sock, buf, &sz, 0);
10.4.2 SUB 不接收消息
// ❌ 错误:SUB 没有设置订阅
nng_sub0_open(&sock);
nng_dial(sock, "tcp://localhost:5556", NULL, 0);
nng_recv(sock, buf, &sz, 0); // 永远收不到消息!
// ✅ 正确:设置订阅过滤
nng_setopt(sock, NNG_OPT_SUB_SUBSCRIBE, "", 0); // 接收所有
10.4.3 内存泄漏
// ❌ 错误:零拷贝接收后忘记释放
void *buf = NULL;
nng_recv(sock, &buf, &sz, NNG_FLAG_ALLOC);
// ... 使用 buf ...
// 忘记 nng_free(buf, sz); ← 内存泄漏!
// ✅ 正确:始终释放
nng_free(buf, sz);
10.4.4 消息所有权
// ❌ 错误:sendmsg 后再次使用消息
nng_msg *msg;
nng_msg_alloc(&msg, 0);
nng_msg_append(msg, "data", 4);
nng_sendmsg(sock, msg, 0);
nng_msg_free(msg); // 错误!sendmsg 已接管所有权
// ✅ 正确:sendmsg 后不要释放
nng_sendmsg(sock, msg, 0);
// msg 已被 NNG 接管,不要操作
10.4.5 inproc 连接顺序
// ❌ 错误:inproc 先 connect 后 listen
nng_dial(sock, "inproc://test", NULL, 0); // 可能失败
nng_listen(other, "inproc://test", NULL, 0);
// ✅ 正确:先 listen 后 connect
nng_listen(other, "inproc://test", NULL, 0);
nng_dial(sock, "inproc://test", NULL, 0);
10.5 生产环境检查清单
10.5.1 上线前检查
| 检查项 | 状态 | 说明 |
|---|---|---|
| 使用 NNG 而非 nanomsg | □ | 新项目必须用 NNG |
| 启用 TLS/mTLS | □ | 跨网络通信必须加密 |
| 设置所有超时 | □ | RECVTIMEO, SENDTIMEO |
| 设置 RECVMAXSZ | □ | 防止内存耗尽 |
| 错误处理覆盖 | □ | 所有 NNG 调用检查返回值 |
| 优雅关闭 | □ | 信号处理 + linger |
| 日志记录 | □ | 关键操作有日志 |
| 健康检查 | □ | 提供健康检查端点 |
| 自动重连 | □ | Dialer 设置重连间隔 |
| 资源限制 | □ | ulimit, Docker limits |
| 监控指标 | □ | 消息计数、延迟、错误率 |
10.5.2 监控指标
// 建议收集的指标
typedef struct {
uint64_t messages_sent;
uint64_t messages_received;
uint64_t send_errors;
uint64_t recv_errors;
uint64_t reconnect_count;
double avg_latency_us;
double p99_latency_us;
} nng_metrics;
// 使用原子操作更新
#include <stdatomic.h>
static atomic_uint_fast64_t g_sent = 0;
static atomic_uint_fast64_t g_recv = 0;
// 在发送时
atomic_fetch_add(&g_sent, 1);
// 在接收时
atomic_fetch_add(&g_recv, 1);
10.6 替代方案对比
10.6.1 消息库横向对比
| 特性 | nanomsg | NNG | ZeroMQ | MQTT (Mosquitto) | RabbitMQ | Kafka |
|---|---|---|---|---|---|---|
| 类型 | 库 | 库 | 库 | 协议+Broker | Broker | Broker |
| 语言 | C | C | C++ | C | Erlang | Java |
| 许可证 | MIT | MIT | LGPL | EPL | MPL | Apache |
| TLS | ❌ | ✅ | ⚠️ | ✅ | ✅ | ✅ |
| 持久化 | ❌ | ❌ | ❌ | ⚠️ | ✅ | ✅ |
| 内存占用 | 极小 | 小 | 小 | 中 | 大 | 大 |
| 延迟 | 极低 | 极低 | 低 | 中 | 中 | 高 |
| 吞吐 | 高 | 高 | 高 | 中 | 中 | 极高 |
| 学习曲线 | 低 | 低 | 中 | 低 | 中 | 高 |
| 生态 | 弱 | 中 | 强 | 强 | 强 | 强 |
10.6.2 何时选择 NNG
选择 NNG:
- 需要嵌入到应用中的轻量消息库
- 对延迟和内存有严格要求
- 进程内/进程间通信为主
- 不需要消息持久化
- 许可证要求宽松(MIT)
选择 ZeroMQ:
- 需要更丰富的语言绑定和社区支持
- 团队已有 ZeroMQ 经验
- 需要更多协议扩展(如 PARALLEL)
选择 MQTT:
- IoT 设备通信
- 需要标准协议(互操作性)
- 带宽受限的网络
- 需要 Broker 中转
选择 RabbitMQ:
- 需要消息持久化
- 需要复杂路由规则
- 需要消息确认和重试
- 企业级特性(管理界面、集群)
选择 Kafka:
- 大数据量日志/事件流
- 需要消息回放
- 需要严格顺序保证
- 流处理场景
10.6.3 混合架构
实际项目中可以混合使用多种消息技术:
┌──────────┐ NNG REQ/REP ┌──────────┐
│ Web 前端 │───────────────│ API 网关 │
└──────────┘ └─────┬────┘
│
NNG PUSH/PULL│
▼
┌──────────┐
│ 业务服务 │
└─────┬────┘
│
Kafka / NATS │
▼
┌──────────┐
│ 数据管道 │
└──────────┘
10.7 迁移指南
10.7.1 从 nanomsg 迁移到 NNG
| nanomsg | NNG | 备注 |
|---|---|---|
nn_socket(AF_SP, NN_PAIR) | nng_pair0_open(&sock) | 函数名变化 |
nn_bind(sock, url) | nng_listen(sock, url, NULL, 0) | 返回值处理 |
nn_connect(sock, url) | nng_dial(sock, url, NULL, 0) | 返回值处理 |
nn_send(sock, buf, len, 0) | nng_send(sock, buf, len, 0) | 兼容 |
nn_recv(sock, buf, len, 0) | nng_recv(sock, buf, &sz, 0) | 参数变化 |
nn_close(sock) | nng_close(sock) | 兼容 |
nn_errno() | 函数返回值 | 错误处理方式变化 |
nn_setsockopt(...) | nng_setopt(...) | 简化 |
10.7.2 迁移步骤
- 替换头文件:
nanomsg/*.h→nng/nng.h+ 协议头文件 - 替换函数调用
- 改变错误处理模式(从
nn_errno()到返回值检查) - 编译时链接
-lnng替代-lnanomsg - 利用 NNG 新特性(异步 I/O、TLS、Context)
10.8 学习路径建议
10.8.1 初学者
第 1 章 (概述) → 第 2 章 (安装) → 第 3 章 (协议) → 第 4/5 章 (API)
→ 动手写一个简单的 REQ/REP 程序
→ 尝试 PUSH/PULL 任务队列
→ 尝试 PUB/SUB 事件广播
10.8.2 进阶开发者
第 6 章 (性能) → 第 7 章 (TLS) → 第 8 章 (IPC)
→ 为现有项目添加 TLS 支持
→ 进行基准测试和性能调优
→ 实现异步消息处理(AIO)
10.8.3 架构师
第 9 章 (Docker) → 第 10 章 (最佳实践)
→ 设计微服务通信架构
→ 评估 NNG vs 其他方案
→ 制定团队技术规范
10.9 总结
10.9.1 nanomsg / NNG 的核心价值
| 价值 | 说明 |
|---|---|
| 简洁 | 几十个 API 函数,一天学会 |
| 轻量 | 内存占用小,启动快 |
| 高效 | 延迟低,吞吐高 |
| 可嵌入 | 纯 C,无外部依赖 |
| 灵活 | 多种协议和传输可组合 |
| 安全 | NNG 原生 TLS 支持 |
| 现代 | 异步 I/O,Context 并发 |
10.9.2 关键要点
- 新项目用 NNG,不要用 nanomsg
- 协议选型是关键,错误的选择会导致架构问题
- 始终处理错误,NNG 的每个调用都可能失败
- 设置超时,避免程序永久阻塞
- 跨网络通信必须用 TLS
- 优先使用 inproc/IPC,性能远优于 TCP
- 利用自动重连,NNG Dialer 支持自动重连
10.10 扩展阅读
10.10.1 官方资源
10.10.2 相关书籍和文章
- ZeroMQ: Messaging for Many Applications — Pieter Hintjens
- Building Microservices — Sam Newman
10.10.3 社区
上一章:第 9 章:Docker 与微服务 | 返回:目录