nanomsg / NNG 消息库完全教程 / 第 5 章:NNG 现代 API 详解
5.1 NNG 与 nanomsg 的设计差异
NNG 不仅是 nanomsg 的升级,而是一次架构重写。核心改进包括:
| 特性 | nanomsg | NNG |
|---|---|---|
| I/O 模型 | 同步阻塞 | 异步事件驱动 |
| 并发模型 | 一 Socket 一线程 | 共享 Socket + Context |
| 内存管理 | 固定分配器 | 可插拔分配器 |
| TLS | 不支持 | 原生支持 |
| WebSocket | 不支持 | 原生支持 |
| 零拷贝 | 有限 | 完整 AIO 零拷贝 |
| API 风格 | POSIX-like | 现代 C(返回错误码) |
| 错误处理 | nn_errno() | 函数返回值 nng_err |
5.2 NNG 核心 API
5.2.1 API 函数分类
Socket 管理:
| 函数 | 用途 |
|---|---|
nng_pair0_open() | 创建 PAIR Socket |
nng_pub0_open() / nng_sub0_open() | 创建 PUB/SUB Socket |
nng_req0_open() / nng_rep0_open() | 创建 REQ/REP Socket |
nng_push0_open() / nng_pull0_open() | 创建 PUSH/PULL Socket |
nng_bus0_open() | 创建 BUS Socket |
nng_surveyor0_open() / nng_respondent0_open() | 创建 SURVEY Socket |
nng_close() | 关闭 Socket |
连接管理:
| 函数 | 用途 |
|---|---|
nng_listen() | 监听地址 |
nng_dial() | 连接地址 |
nng_listener_create() | 创建 Listener 对象 |
nng_dialer_create() | 创建 Dialer 对象 |
nng_listener_start() | 启动 Listener |
nng_dialer_start() | 启动 Dialer |
nng_listener_close() | 关闭 Listener |
nng_dialer_close() | 关闭 Dialer |
消息收发:
| 函数 | 用途 |
|---|---|
nng_send() | 发送消息 |
nng_recv() | 接收消息 |
nng_sendmsg() | 发送 nng_msg 对象 |
nng_recvmsg() | 接收 nng_msg 对象 |
nng_msg_alloc() | 分配消息 |
nng_msg_free() | 释放消息 |
nng_msg_body() | 获取消息体指针 |
nng_msg_len() | 获取消息长度 |
nng_msg_append() | 追加数据到消息 |
nng_msg_insert() | 在消息头部插入数据 |
nng_msg_trim() | 从消息头部裁剪数据 |
nng_msg_chop() | 从消息尾部裁剪数据 |
选项配置:
| 函数 | 用途 |
|---|---|
nng_setopt() | 设置 Socket 选项 |
nng_getopt() | 获取 Socket 选项 |
nng_setopt_ms() | 设置毫秒选项 |
nng_setopt_int() | 设置整数选项 |
nng_setopt_string() | 设置字符串选项 |
nng_setopt_size() | 设置 size_t 选项 |
AIO(异步 I/O):
| 函数 | 用途 |
|---|---|
nng_aio_alloc() | 分配 AIO 对象 |
nng_aio_free() | 释放 AIO 对象 |
nng_aio_begin() | 开始异步操作 |
nng_aio_finish() | 完成异步操作 |
nng_aio_wait() | 等待异步操作完成 |
nng_aio_result() | 获取异步操作结果 |
nng_aio_set_timeout() | 设置超时 |
nng_aio_set_cb() | 设置完成回调 |
nng_recv_aio() | 异步接收 |
nng_send_aio() | 异步发送 |
5.3 Socket 创建与连接
5.3.1 基本创建
#include <nng/nng.h>
#include <nng/protocol/pair0/pair.h>
int main() {
nng_socket sock;
int rv;
// 创建 Socket
if ((rv = nng_pair0_open(&sock)) != 0) {
fprintf(stderr, "nng_pair0_open: %s\n", nng_strerror(rv));
return 1;
}
// 监听
if ((rv = nng_listen(sock, "tcp://*:5555", NULL, 0)) != 0) {
fprintf(stderr, "nng_listen: %s\n", nng_strerror(rv));
return 1;
}
// 使用 Socket ...
nng_close(sock);
return 0;
}
5.3.2 nng_listen() vs nng_dial()
// listen: 监听传入连接(服务端模式)
nng_listener listener;
rv = nng_listen(sock, "tcp://*:5555", &listener, 0);
// dial: 主动连接(客户端模式)
nng_dialer dialer;
rv = nng_dial(sock, "tcp://server:5555", &dialer, 0);
listener 和 dialer 参数可以传 NULL(不需要引用),也可以传指针以获取对象引用用于后续管理。
5.4 Listener 与 Dialer 对象
5.4.1 Listener 精细控制
nng_listener listener;
nng_socket sock;
nng_rep0_open(&sock);
// 创建 Listener(不立即启动)
nng_listener_create(&listener, sock);
// 配置 Listener 选项
nng_listener_setopt(listener, NNG_OPT_TCP_NODELAY, &(int){1}, sizeof(int));
nng_listener_setopt_string(listener, NNG_OPT_LISTEN_URL, "tcp://0.0.0.0:5555");
// 启动
nng_listener_start(listener, 0);
// 后续可关闭 Listener(不影响已建立的连接)
nng_listener_close(listener);
5.4.2 Dialer 精细控制
nng_dialer dialer;
nng_socket sock;
nng_req0_open(&sock);
// 创建 Dialer(不立即连接)
nng_dialer_create(&dialer, sock);
// 配置重连策略
nng_dialer_setopt_ms(dialer, NNG_OPT_RECONNMINT, 100); // 最小重连间隔
nng_dialer_setopt_ms(dialer, NNG_OPT_RECONNMAXT, 30000); // 最大重连间隔
// 启动连接
nng_dialer_start(dialer, 0);
// Dialer 会自动重连
5.4.3 Listener/Dialer 选项
| 选项 | 类型 | 说明 |
|---|---|---|
NNG_OPT_URL | string | 完整 URL |
NNG_OPT_MAXTTL | int | 最大跳数 |
NNG_OPT_RECVMAXSZ | size_t | 最大接收消息大小 |
NNG_OPT_TCP_NODELAY | bool | Nagle 算法开关 |
NNG_OPT_TCP_KEEPALIVE | bool | TCP Keepalive |
NNG_OPT_RECONNMINT | ms | 最小重连间隔 |
NNG_OPT_RECONNMAXT | ms | 最大重连间隔 |
5.5 消息操作
5.5.1 简单发送接收
// 发送
const char *msg = "Hello";
nng_send(sock, (void *)msg, strlen(msg), 0);
// 接收(需分配缓冲区)
char buf[256];
size_t sz = sizeof(buf);
nng_recv(sock, buf, &sz, 0);
printf("Received %zu bytes: %.*s\n", sz, (int)sz, buf);
5.5.2 零拷贝接收
void *buf = NULL;
size_t sz;
int rv = nng_recv(sock, &buf, &sz, NNG_FLAG_ALLOC);
if (rv == 0) {
printf("Received: %.*s\n", (int)sz, buf);
nng_free(buf, sz); // 必须释放
}
5.5.3 nng_msg 对象操作
nng_msg 是 NNG 的消息对象,支持头部追加(header)和消息体操作:
nng_msg *msg;
// 分配消息
nng_msg_alloc(&msg, 0);
// 追加消息体
nng_msg_append(msg, "Hello", 5);
// 追加头部(协议头)
nng_msg_header(msg); // 获取头部指针
// 发送消息对象
nng_sendmsg(sock, msg, 0);
// 注意:发送后 msg 被接管,不要再操作
// 接收消息对象
nng_msg *rmsg;
nng_recvmsg(sock, &rmsg, 0);
// 获取消息体
void *body = nng_msg_body(rmsg);
size_t len = nng_msg_len(rmsg);
printf("Body: %.*s\n", (int)len, (char *)body);
// 释放
nng_msg_free(rmsg);
5.5.4 消息管道操作(Pipeline)
nng_msg *msg;
nng_msg_alloc(&msg, 0);
// 在消息头部插入路由信息
nng_msg_insert(msg, "ROUTE", 5);
// 追加消息体
nng_msg_append(msg, "Payload", 7);
// 从头部裁剪(消费掉路由信息)
nng_msg_trim(msg, 5);
// 从尾部裁剪
nng_msg_chop(msg, 3);
5.6 异步 I/O(AIO)
NNG 的 AIO 是其最核心的现代特性,允许非阻塞地执行 I/O 操作。
5.6.1 AIO 基本使用
#include <nng/nng.h>
#include <nng/protocol/reqrep0/rep.h>
#include <stdio.h>
#include <string.h>
// AIO 完成回调
void recv_callback(void *arg) {
nng_aio *aio = (nng_aio *)arg;
int rv = nng_aio_result(aio);
if (rv != 0) {
fprintf(stderr, "Async recv failed: %s\n", nng_strerror(rv));
return;
}
// 获取接收到的消息
nng_msg *msg = nng_aio_get_msg(aio);
void *body = nng_msg_body(msg);
size_t len = nng_msg_len(msg);
printf("Async received: %.*s\n", (int)len, (char *)body);
nng_msg_free(msg);
}
int main() {
nng_socket sock;
nng_aio *aio;
int rv;
nng_rep0_open(&sock);
nng_listen(sock, "tcp://*:5555", NULL, 0);
// 分配 AIO
nng_aio_alloc(&aio, recv_callback, NULL);
// 设置超时
nng_aio_set_timeout(aio, 5000); // 5 秒
// 发起异步接收
nng_recv_aio(sock, aio);
// 主线程可以做其他事情...
// nng_aio_wait(aio); // 如果需要等待完成
// 清理
nng_aio_free(aio);
nng_close(sock);
return 0;
}
5.6.2 异步发送
void send_callback(void *arg) {
nng_aio *aio = (nng_aio *)arg;
int rv = nng_aio_result(aio);
if (rv == 0) {
printf("Message sent successfully\n");
} else {
fprintf(stderr, "Send failed: %s\n", nng_strerror(rv));
}
}
int main() {
nng_socket sock;
nng_aio *aio;
nng_req0_open(&sock);
nng_dial(sock, "tcp://server:5555", NULL, 0);
nng_aio_alloc(&aio, send_callback, NULL);
nng_aio_set_timeout(aio, 3000);
// 准备消息
nng_msg *msg;
nng_msg_alloc(&msg, 0);
nng_msg_append(msg, "Hello async!", 12);
nng_aio_set_msg(aio, msg);
// 异步发送
nng_send_aio(sock, aio);
nng_aio_wait(aio);
nng_aio_free(aio);
nng_close(sock);
return 0;
}
5.6.3 AIO 状态机模式
实现一个简单的异步消息处理循环:
#include <nng/nng.h>
#include <nng/protocol/reqrep0/rep.h>
#include <stdio.h>
#include <string.h>
typedef struct {
nng_socket sock;
nng_aio *aio;
} server_ctx;
void server_recv_cb(void *arg);
void server_send_cb(void *arg) {
server_ctx *ctx = (server_ctx *)arg;
int rv = nng_aio_result(ctx->aio);
if (rv != 0) {
fprintf(stderr, "Send error: %s\n", nng_strerror(rv));
return;
}
// 发送完成,继续接收下一个
nng_recv_aio(ctx->sock, ctx->aio);
}
void server_recv_cb(void *arg) {
server_ctx *ctx = (server_ctx *)arg;
int rv = nng_aio_result(ctx->aio);
if (rv != 0) {
fprintf(stderr, "Recv error: %s\n", nng_strerror(rv));
return;
}
// 处理消息
nng_msg *msg = nng_aio_get_msg(ctx->aio);
void *body = nng_msg_body(msg);
size_t len = nng_msg_len(msg);
printf("Received: %.*s\n", (int)len, (char *)body);
// 构造响应(复用消息对象)
nng_msg_clear(msg);
nng_msg_append(msg, "OK", 2);
nng_aio_set_msg(ctx->aio, msg);
// 设置发送回调
nng_aio_set_cb(ctx->aio, server_send_cb, ctx);
nng_send_aio(ctx->sock, ctx->aio);
}
int main() {
server_ctx ctx;
int rv;
if ((rv = nng_rep0_open(&ctx.sock)) != 0) {
fprintf(stderr, "open: %s\n", nng_strerror(rv));
return 1;
}
if ((rv = nng_listen(ctx.sock, "tcp://*:5555", NULL, 0)) != 0) {
fprintf(stderr, "listen: %s\n", nng_strerror(rv));
return 1;
}
nng_aio_alloc(&ctx.aio, server_recv_cb, &ctx);
nng_aio_set_timeout(ctx.aio, 60000);
// 启动异步接收循环
nng_recv_aio(ctx.sock, ctx.aio);
printf("Async REP server running on tcp://*:5555\n");
// 主线程阻塞(或做其他工作)
nng_aio_wait(ctx.aio);
nng_aio_free(ctx.aio);
nng_close(ctx.sock);
return 0;
}
5.7 Context(上下文)
Context 是 NNG 的重要创新,允许在单个 Socket 上创建多个独立的会话上下文,常用于多线程共享 Socket。
5.7.1 为什么需要 Context
nanomsg: 每个线程一个 Socket(资源开销大)
NNG: 多个线程共享一个 Socket,每个线程有自己的 Context
┌─────────┐
│ Socket │ (单个,线程安全)
├─────────┤
│ Context1 │ (线程 A 的会话)
│ Context2 │ (线程 B 的会话)
│ Context3 │ (线程 C 的会话)
└─────────┘
5.7.2 Context API
| 函数 | 用途 |
|---|---|
nng_ctx_open() | 在 Socket 上创建 Context |
nng_ctx_close() | 关闭 Context |
nng_ctx_send() | 通过 Context 发送 |
nng_ctx_recv() | 通过 Context 接收 |
nng_ctx_sendmsg() | 通过 Context 发送消息对象 |
nng_ctx_recvmsg() | 通过 Context 接收消息对象 |
nng_ctx_get() | 获取 Context 选项 |
nng_ctx_set() | 设置 Context 选项 |
nng_ctx_id() | 获取 Context ID |
5.7.3 Context 示例(多线程 REP 服务)
#include <nng/nng.h>
#include <nng/protocol/reqrep0/rep.h>
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <pthread.h>
typedef struct {
nng_ctx ctx;
int id;
} worker_args;
void *worker_thread(void *arg) {
worker_args *wa = (worker_args *)arg;
int id = wa->id;
printf("Worker %d started\n", id);
while (1) {
nng_msg *msg;
int rv;
// 通过 Context 接收(独立的接收队列)
rv = nng_ctx_recvmsg(wa->ctx, &msg, 0);
if (rv != 0) {
fprintf(stderr, "Worker %d recv error: %s\n", id, nng_strerror(rv));
break;
}
void *body = nng_msg_body(msg);
size_t len = nng_msg_len(msg);
printf("Worker %d received: %.*s\n", id, (int)len, (char *)body);
// 处理请求并准备响应
nng_msg_clear(msg);
char reply[64];
snprintf(reply, sizeof(reply), "Response from worker %d", id);
nng_msg_append(msg, reply, strlen(reply));
// 通过 Context 发送
rv = nng_ctx_sendmsg(wa->ctx, msg, 0);
if (rv != 0) {
fprintf(stderr, "Worker %d send error: %s\n", id, nng_strerror(rv));
nng_msg_free(msg);
break;
}
}
return NULL;
}
int main() {
nng_socket sock;
int rv;
int num_workers = 4;
pthread_t threads[4];
worker_args args[4];
if ((rv = nng_rep0_open(&sock)) != 0) {
fprintf(stderr, "nng_rep0_open: %s\n", nng_strerror(rv));
return 1;
}
if ((rv = nng_listen(sock, "tcp://*:5555", NULL, 0)) != 0) {
fprintf(stderr, "nng_listen: %s\n", nng_strerror(rv));
return 1;
}
// 创建 Context 并启动工作线程
for (int i = 0; i < num_workers; i++) {
rv = nng_ctx_open(&args[i].ctx, sock);
if (rv != 0) {
fprintf(stderr, "nng_ctx_open: %s\n", nng_strerror(rv));
return 1;
}
args[i].id = i;
pthread_create(&threads[i], NULL, worker_thread, &args[i]);
}
printf("REP server with %d workers on tcp://*:5555\n", num_workers);
// 等待所有线程
for (int i = 0; i < num_workers; i++) {
pthread_join(threads[i], NULL);
}
nng_close(sock);
return 0;
}
重要:Context 只在 REQ/REP 协议中工作,因为 REQ/REP 是有状态的协议(需要匹配请求和响应)。PAIR、PUB、PUSH 等无状态协议不需要 Context。
5.8 常用 Socket 选项
5.8.1 选项一览
| 选项 | 类型 | 说明 | 默认值 |
|---|---|---|---|
NNG_OPT_RECVTIMEO | ms | 接收超时 | 无限 |
NNG_OPT_SENDTIMEO | ms | 发送超时 | 无限 |
NNG_OPT_RECONNMINT | ms | 最小重连间隔 | 100ms |
NNG_OPT_RECONNMAXT | ms | 最大重连间隔 | 0 |
NNG_OPT_RECVBUF | int | 接收队列大小 | 128 |
NNG_OPT_SENDBUF | int | 发送队列大小 | 128 |
NNG_OPT_RECVMAXSZ | size_t | 最大接收消息大小 | 1MB |
NNG_OPT_TCP_NODELAY | bool | 禁用 Nagle | true |
NNG_OPT_TCP_KEEPALIVE | bool | TCP Keepalive | false |
NNG_OPT_URL | string | 当前 URL | (自动) |
NNG_OPT_PEERNAME | string | 对端名称 | (自动) |
NNG_OPT_SUB_SUBSCRIBE | bytes | SUB 订阅 | (无) |
NNG_OPT_SUB_UNSUBSCRIBE | bytes | SUB 取消订阅 | (无) |
5.8.2 设置示例
// 设置接收超时 5 秒
nng_setopt_ms(sock, NNG_OPT_RECVTIMEO, 5000);
// 设置发送超时 3 秒
nng_setopt_ms(sock, NNG_OPT_SENDTIMEO, 3000);
// 设置最大接收消息 10MB
size_t maxsz = 10 * 1024 * 1024;
nng_setopt(sock, NNG_OPT_RECVMAXSZ, &maxsz, sizeof(maxsz));
// 设置 TCP NoDelay
int nodelay = 1;
nng_setopt(sock, NNG_OPT_TCP_NODELAY, &nodelay, sizeof(nodelay));
// 设置接收队列大小
int bufsize = 256;
nng_setopt(sock, NNG_OPT_RECVBUF, &bufsize, sizeof(bufsize));
// SUB 订阅
const char *topic = "weather";
nng_setopt(sock, NNG_OPT_SUB_SUBSCRIBE, topic, strlen(topic));
5.9 错误处理模式
5.9.1 错误码
NNG 的函数返回 int 类型的错误码(nng_err 枚举),成功返回 0。
常见错误码:
| 错误码 | 说明 | 常见原因 |
|---|---|---|
0 | 成功 | — |
NNG_ECLOSED | Socket 已关闭 | 在关闭的 Socket 上操作 |
NNG_ETIMEDOUT | 超时 | recv/send 超时 |
NNG_ECONNREFUSED | 连接被拒 | 目标服务未启动 |
NNG_ECONNRESET | 连接重置 | 对端断开 |
NNG_ENOMEM | 内存不足 | 系统资源不足 |
NNG_ESTATE | 状态错误 | 协议状态不匹配(如连续 send) |
NNG_EINVAL | 无效参数 | 参数错误 |
NNG_ENOTSUP | 不支持 | 操作不被当前协议支持 |
NNG_EADDRINUSE | 地址被占用 | 端口已被绑定 |
NNG_EAGAIN | 资源暂不可用 | 非阻塞模式下无数据 |
5.9.2 错误处理宏
推荐的错误处理模式:
#define CHECK(rv, msg) do { \
if ((rv) != 0) { \
fprintf(stderr, "%s: %s\n", \
msg, nng_strerror(rv)); \
exit(1); \
} \
} while (0)
int main() {
nng_socket sock;
int rv;
rv = nng_req0_open(&sock);
CHECK(rv, "nng_req0_open");
rv = nng_dial(sock, "tcp://localhost:5555", NULL, 0);
CHECK(rv, "nng_dial");
// ... 使用 Socket ...
nng_close(sock);
return 0;
}
5.10 完整 NNG 服务器示例
#include <nng/nng.h>
#include <nng/protocol/reqrep0/rep.h>
#include <stdio.h>
#include <string.h>
#include <signal.h>
static volatile int running = 1;
void handle_signal(int sig) {
running = 0;
}
int main() {
nng_socket sock;
int rv;
signal(SIGINT, handle_signal);
signal(SIGTERM, handle_signal);
// 创建 REP Socket
if ((rv = nng_rep0_open(&sock)) != 0) {
fprintf(stderr, "Socket: %s\n", nng_strerror(rv));
return 1;
}
// 设置选项
nng_setopt_ms(sock, NNG_OPT_RECVTIMEO, 1000);
// 监听
if ((rv = nng_listen(sock, "tcp://*:5555", NULL, 0)) != 0) {
fprintf(stderr, "Listen: %s\n", nng_strerror(rv));
nng_close(sock);
return 1;
}
printf("NNG REP server running. Ctrl+C to stop.\n");
while (running) {
char *buf = NULL;
size_t sz;
rv = nng_recv(sock, &buf, &sz, NNG_FLAG_ALLOC);
if (rv == NNG_ETIMEDOUT) {
continue; // 超时,检查 running 标志
}
if (rv != 0) {
fprintf(stderr, "Recv: %s\n", nng_strerror(rv));
break;
}
printf("Request: %.*s\n", (int)sz, buf);
nng_free(buf, sz);
const char *reply = "OK";
if ((rv = nng_send(sock, (void *)reply, strlen(reply), 0)) != 0) {
fprintf(stderr, "Send: %s\n", nng_strerror(rv));
break;
}
}
printf("\nShutting down...\n");
nng_close(sock);
return 0;
}
5.11 注意事项
Context 限制:Context 仅支持 REQ/REP 和 SURVEYOR/RESPONDENT 协议,不支持 PAIR/PUB/SUB/PUSH/PULL。
AIO 回调线程:AIO 回调在 NNG 内部线程池执行,不要在回调中做耗时操作。
Socket 线程安全:NNG Socket 是线程安全的,但 Context 的收发操作是独立的。
消息所有权:
nng_sendmsg()和nng_aio_set_msg()会接管消息所有权,不要再调用nng_msg_free()。
5.12 扩展阅读
上一章:第 4 章:nanomsg C API 详解 | 下一章:第 6 章:可扩展性与性能