强曰为道
与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

nanomsg / NNG 消息库完全教程 / 第 4 章:nanomsg C API 详解

4.1 nanomsg API 概述

nanomsg 的 API 设计遵循 POSIX 风格,核心函数只有十几个,学习成本极低。

4.1.1 API 函数列表

函数用途头文件
nn_socket()创建 Socket<nanomsg/nn.h>
nn_close()关闭 Socket<nanomsg/nn.h>
nn_bind()绑定地址(服务端)<nanomsg/nn.h>
nn_connect()连接地址(客户端)<nanomsg/nn.h>
nn_send()发送消息<nanomsg/nn.h>
nn_recv()接收消息<nanomsg/nn.h>
nn_sendmsg()发送多段消息(iov)<nanomsg/nn.h>
nn_recvmsg()接收多段消息(iov)<nanomsg/nn.h>
nn_setsockopt()设置选项<nanomsg/nn.h>
nn_getsockopt()获取选项<nanomsg/nn.h>
nn_shutdown()关闭绑定/连接端点<nanomsg/nn.h>
nn_errno()获取错误码<nanomsg/nn.h>
nn_strerror()错误码转字符串<nanomsg/nn.h>
nn_device()创建代理设备<nanomsg/nn.h>
nn_allocmsg()分配消息内存<nanomsg/nn.h>
nn_freemsg()释放消息内存<nanomsg/nn.h>

4.1.2 头文件引入

#include <nanomsg/nn.h>          // 核心 API
#include <nanomsg/pair.h>        // PAIR 协议常量
#include <nanomsg/pubsub.h>      // PUB/SUB 协议常量
#include <nanomsg/reqrep.h>      // REQ/REP 协议常量
#include <nanomsg/pipeline.h>    // PUSH/PULL 协议常量
#include <nanomsg/bus.h>         // BUS 协议常量
#include <nanomsg/survey.h>      // SURVEY 协议常量

4.2 Socket 创建

4.2.1 nn_socket()

int nn_socket(int domain, int protocol);

参数:

参数说明
domainAF_SP标准 SP Socket
domainAF_SP_RAW原始 Socket(底层协议访问)
protocolNN_PAIRPAIR 协议
protocolNN_PUB / NN_SUBPUB/SUB 协议
protocolNN_REQ / NN_REPREQ/REP 协议
protocolNN_PUSH / NN_PULLPUSH/PULL 协议
protocolNN_BUSBUS 协议
protocolNN_SURVEYOR / NN_RESPONDENTSURVEY 协议

返回值: 成功返回 Socket 文件描述符(>= 0),失败返回 -1。

示例:

// 创建 PAIR Socket
int pair_sock = nn_socket(AF_SP, NN_PAIR);

// 创建 PUB Socket
int pub_sock = nn_socket(AF_SP, NN_PUB);

// 创建 REQ Socket
int req_sock = nn_socket(AF_SP, NN_REQ);

// 创建 PUSH Socket
int push_sock = nn_socket(AF_SP, NN_PUSH);

4.2.2 错误处理

int sock = nn_socket(AF_SP, NN_PAIR);
if (sock < 0) {
    fprintf(stderr, "nn_socket failed: %s (errno=%d)\n",
            nn_strerror(nn_errno()), nn_errno());
    exit(1);
}

4.2.3 注意事项

nanomsg 的 Socket 不是文件描述符,不能用于 select()/poll()。如果需要事件驱动,应使用 nn_poll()nn_getfd() 获取可轮询的文件描述符。

一个 Socket 只能绑定一种协议。


4.3 绑定与连接

4.3.1 nn_bind()

服务端使用,将 Socket 绑定到一个地址并开始监听:

int nn_bind(int sock, const char *url);

返回值: 成功返回端点 ID(>= 0),失败返回 -1。

示例:

// TCP 绑定
int ep = nn_bind(sock, "tcp://*:5555");

// TCP 绑定到指定接口
int ep = nn_bind(sock, "tcp://eth0:5555");

// IPC 绑定
int ep = nn_bind(sock, "ipc:///tmp/myapp.ipc");

// inproc 绑定
int ep = nn_bind(sock, "inproc://mychannel");

// 多次绑定(同一 Socket 可绑定多个地址)
nn_bind(sock, "tcp://*:5555");
nn_bind(sock, "tcp://*:5556");

4.3.2 nn_connect()

客户端使用,连接到服务端地址:

int nn_connect(int sock, const char *url);

示例:

// TCP 连接
int ep = nn_connect(sock, "tcp://192.168.1.100:5555");

// IPC 连接
int ep = nn_connect(sock, "ipc:///tmp/myapp.ipc");

// inproc 连接
int ep = nn_connect(sock, "inproc://mychannel");

// 连接多个服务端(REQ 可轮询多个 REP)
nn_connect(sock, "tcp://server1:5555");
nn_connect(sock, "tcp://server2:5555");
nn_connect(sock, "tcp://server3:5555");

4.3.3 端点(Endpoint)管理

nn_bind()nn_connect() 返回的端点 ID 可用于后续管理:

// 创建端点
int ep = nn_connect(sock, "tcp://server:5555");

// 关闭特定端点(不影响其他端点)
nn_shutdown(sock, ep);

4.3.4 地址格式

传输格式示例
TCPtcp://<interface>:<port>tcp://*:5555, tcp://192.168.1.1:5555
IPCipc://<path>ipc:///tmp/app.ipc
inprocinproc://<name>inproc://channel1
WebSocketws://<interface>:<port>ws://*:8080

4.3.5 完整示例

#include <nanomsg/nn.h>
#include <nanomsg/reqrep.h>
#include <stdio.h>
#include <string.h>

int main() {
    int sock = nn_socket(AF_SP, NN_REP);
    if (sock < 0) {
        fprintf(stderr, "Socket error: %s\n", nn_strerror(nn_errno()));
        return 1;
    }

    // 绑定多个地址
    int ep1 = nn_bind(sock, "tcp://*:5555");
    int ep2 = nn_bind(sock, "ipc:///tmp/rep.ipc");

    if (ep1 < 0 || ep2 < 0) {
        fprintf(stderr, "Bind error: %s\n", nn_strerror(nn_errno()));
        nn_close(sock);
        return 1;
    }

    printf("Listening on tcp://*:5555 and ipc:///tmp/rep.ipc\n");

    // ... 收发消息 ...

    // 关闭特定端点
    nn_shutdown(sock, ep1);

    // 关闭 Socket
    nn_close(sock);
    return 0;
}

4.4 消息发送

4.4.1 nn_send()

int nn_send(int sock, const void *buf, size_t len, int flags);

参数:

参数说明
sockSocket 文件描述符
buf消息数据指针
len消息长度(字节)
flags0(阻塞)或 NN_DONTWAIT(非阻塞)

返回值: 成功返回发送的字节数,失败返回 -1。

示例:

const char *msg = "Hello, nanomsg!";
int bytes = nn_send(sock, msg, strlen(msg), 0);
if (bytes < 0) {
    fprintf(stderr, "nn_send: %s\n", nn_strerror(nn_errno()));
}

4.4.2 非阻塞发送

int bytes = nn_send(sock, msg, len, NN_DONTWAIT);
if (bytes < 0) {
    if (nn_errno() == EAGAIN) {
        printf("发送缓冲区满,稍后重试\n");
    }
}

4.4.3 nn_sendmsg() —— 多段消息(iov)

发送由多个不连续内存块组成的消息(scatter-gather I/O):

struct nn_iov iov[3];
iov[0].iov_base = "Header: ";
iov[0].iov_len  = 8;
iov[1].iov_base = "Body content";
iov[1].iov_len  = 12;
iov[2].iov_base = " | Footer";
iov[2].iov_len  = 9;

int bytes = nn_sendmsg(sock, iov, 3, 0);

4.4.4 零拷贝发送

使用 nn_allocmsg() 分配的消息内存,nanomsg 可以实现零拷贝发送:

// 分配消息缓冲区
void *msg = nn_allocmsg(13, 0);
memcpy(msg, "Hello, NNG!", 13);

// 发送(零拷贝,发送后 msg 被 nanomsg 接管)
nn_send(sock, &msg, NN_MSG, 0);
// 注意:发送成功后不要再释放 msg

NN_MSG 是特殊标记,告诉 nn_send() 第二个参数是指向消息指针的指针。


4.5 消息接收

4.5.1 nn_recv()

int nn_recv(int sock, void *buf, size_t len, int flags);

参数:

参数说明
sockSocket 文件描述符
buf接收缓冲区
len缓冲区大小
flags0(阻塞)或 NN_DONTWAIT(非阻塞)

返回值: 成功返回接收的字节数,失败返回 -1。

示例:

char buf[256];
int bytes = nn_recv(sock, buf, sizeof(buf), 0);
if (bytes > 0) {
    printf("Received %d bytes: %.*s\n", bytes, bytes, buf);
}

4.5.2 零拷贝接收

void *buf = NULL;
int bytes = nn_recv(sock, &buf, NN_MSG, 0);
if (bytes > 0) {
    printf("Received %d bytes: %.*s\n", bytes, bytes, (char *)buf);
    nn_freemsg(buf);  // 必须手动释放
}

4.5.3 nn_recvmsg() —— 多段消息接收

struct nn_iov iov[3];
iov[0].iov_base = malloc(100);
iov[0].iov_len  = 100;
iov[1].iov_base = malloc(1000);
iov[1].iov_len  = 1000;
iov[2].iov_base = malloc(100);
iov[2].iov_len  = 100;

int bytes = nn_recvmsg(sock, iov, 3, 0);

4.5.4 非阻塞接收

char buf[256];
int bytes = nn_recv(sock, buf, sizeof(buf), NN_DONTWAIT);
if (bytes < 0) {
    if (nn_errno() == EAGAIN) {
        printf("没有消息可接收\n");
    }
}

4.6 Socket 选项

4.6.1 设置选项

int nn_setsockopt(int sock, int level, int option,
                  const void *val, size_t len);

4.6.2 常用选项

选项类型说明默认值
NN_RCVTIMEOint (ms)接收超时-1 (无限)
NN_SNDTIMEOint (ms)发送超时-1 (无限)
NN_RCVBUFint接收缓冲区大小 (字节)128KB
NN_SNDBUFint发送缓冲区大小 (字节)128KB
NN_LINGERint (ms)关闭时等待未发送消息的时间1000ms
NN_RECONNECT_IVLint (ms)重连间隔100ms
NN_RECONNECT_IVL_MAXint (ms)最大重连间隔0 (无退避)
NN_SNDPRIOint发送优先级 (0-15)8
NN_RCVPRIOint接收优先级 (0-15)8
NN_IPV4ONLYint只使用 IPv41
NN_SUB_SUBSCRIBEstringSUB 订阅前缀(无)
NN_SUB_UNSUBSCRIBEstringSUB 取消订阅(无)

4.6.3 设置超时示例

// 设置接收超时 5 秒
int timeout = 5000;
nn_setsockopt(sock, NN_SOL_SOCKET, NN_RCVTIMEO, &timeout, sizeof(timeout));

// 设置发送超时 3 秒
timeout = 3000;
nn_setsockopt(sock, NN_SOL_SOCKET, NN_SNDTIMEO, &timeout, sizeof(timeout));

// 现在 recv/send 会在超时后返回 -1,errno 为 EAGAIN
char buf[256];
int bytes = nn_recv(sock, buf, sizeof(buf), 0);
if (bytes < 0 && nn_errno() == EAGAIN) {
    printf("接收超时\n");
}

4.6.4 SUB 订阅设置

// 订阅所有消息
const char *all = "";
nn_setsockopt(sock, NN_SUB, NN_SUB_SUBSCRIBE, all, 0);

// 订阅 "weather" 前缀
const char *weather = "weather";
nn_setsockopt(sock, NN_SUB, NN_SUB_SUBSCRIBE, weather, strlen(weather));

// 取消订阅 "weather"
nn_setsockopt(sock, NN_SUB, NN_SUB_UNSUBSCRIBE, weather, strlen(weather));

4.6.5 获取选项

int val;
size_t len = sizeof(val);
nn_getsockopt(sock, NN_SOL_SOCKET, NN_RCVBUF, &val, &len);
printf("Receive buffer: %d bytes\n", val);

4.7 使用 nn_poll() 多路复用

nn_poll() 类似 poll(),可以同时监听多个 Socket 的可读/可写状态:

#include <nanomsg/nn.h>
#include <nanomsg/pair.h>
#include <nanomsg/pubsub.h>
#include <stdio.h>
#include <string.h>

int main() {
    int pair = nn_socket(AF_SP, NN_PAIR);
    int sub  = nn_socket(AF_SP, NN_SUB);

    nn_bind(pair, "tcp://*:5560");
    nn_connect(sub, "tcp://localhost:5561");

    const char *filter = "";
    nn_setsockopt(sub, NN_SUB, NN_SUB_SUBSCRIBE, filter, 0);

    // 设置 poll
    struct nn_pollfd fds[2];
    fds[0].fd = pair;
    fds[0].events = NN_POLLIN;  // 监听可读
    fds[1].fd = sub;
    fds[1].events = NN_POLLIN;

    // 超时 5000ms
    int rc = nn_poll(fds, 2, 5000);
    if (rc == 0) {
        printf("Timeout, no events.\n");
    } else if (rc > 0) {
        if (fds[0].revents & NN_POLLIN) {
            char *buf = NULL;
            nn_recv(pair, &buf, NN_MSG, 0);
            printf("PAIR: %s\n", (char *)buf);
            nn_freemsg(buf);
        }
        if (fds[1].revents & NN_POLLIN) {
            char *buf = NULL;
            nn_recv(sub, &buf, NN_MSG, 0);
            printf("SUB: %s\n", (char *)buf);
            nn_freemsg(buf);
        }
    } else {
        fprintf(stderr, "nn_poll error: %s\n", nn_strerror(nn_errno()));
    }

    nn_close(pair);
    nn_close(sub);
    return 0;
}

4.8 设备与代理

nn_device() 创建一个代理(proxy),将一个 Socket 的消息转发到另一个 Socket。常用于构建消息中间件:

#include <nanomsg/nn.h>
#include <nanomsg/pubsub.h>
#include <stdio.h>

int main() {
    int front = nn_socket(AF_SP, NN_XPUB);
    int back  = nn_socket(AF_SP, NN_XSUB);

    // XPUB/XSUB 是 PUB/SUB 的中间代理版本
    nn_bind(front, "tcp://*:5560");  // 订阅者连接这里
    nn_bind(back,  "tcp://*:5561");  // 发布者连接这里

    printf("Proxy running: XSUB:5561 -> XPUB:5560\n");

    // 阻塞,转发消息
    nn_device(front, back);

    // nn_device 永不返回(除非出错)
    nn_close(front);
    nn_close(back);
    return 0;
}

使用代理的架构:

Publisher A ──┐                    ┌── Subscriber A
              ├──► XSUB ──► XPUB ──┤
Publisher B ──┘    (Proxy)         └── Subscriber B

代理使发布者和订阅者完全解耦,无需知道对方地址。


4.9 完整生命周期示例

#include <nanomsg/nn.h>
#include <nanomsg/reqrep.h>
#include <stdio.h>
#include <string.h>
#include <stdlib.h>

int main() {
    // 1. 创建 Socket
    int sock = nn_socket(AF_SP, NN_REP);
    if (sock < 0) {
        fprintf(stderr, "nn_socket: %s\n", nn_strerror(nn_errno()));
        return 1;
    }

    // 2. 设置选项
    int timeout = 10000;  // 10 秒接收超时
    nn_setsockopt(sock, NN_SOL_SOCKET, NN_RCVTIMEO, &timeout, sizeof(timeout));

    int bufsize = 1048576;  // 1MB 接收缓冲区
    nn_setsockopt(sock, NN_SOL_SOCKET, NN_RCVBUF, &bufsize, sizeof(bufsize));

    // 3. 绑定地址
    int ep = nn_bind(sock, "tcp://*:5555");
    if (ep < 0) {
        fprintf(stderr, "nn_bind: %s\n", nn_strerror(nn_errno()));
        nn_close(sock);
        return 1;
    }
    printf("Server started on tcp://*:5555\n");

    // 4. 消息循环
    while (1) {
        // 接收消息(零拷贝)
        void *buf = NULL;
        int bytes = nn_recv(sock, &buf, NN_MSG, 0);
        if (bytes < 0) {
            if (nn_errno() == EAGAIN) {
                printf("Timeout, continuing...\n");
                continue;
            }
            fprintf(stderr, "nn_recv: %s\n", nn_strerror(nn_errno()));
            break;
        }

        printf("Received %d bytes: %.*s\n", bytes, bytes, (char *)buf);
        nn_freemsg(buf);

        // 发送响应
        const char *reply = "OK";
        nn_send(sock, reply, strlen(reply), 0);
    }

    // 5. 关闭
    nn_shutdown(sock, ep);
    nn_close(sock);

    printf("Server stopped.\n");
    return 0;
}

4.10 nanomsg vs NNG API 对照

操作nanomsgNNG
创建 Socketnn_socket(AF_SP, NN_PAIR)nng_pair0_open(&sock)
绑定nn_bind(sock, url)nng_listen(sock, url, &listener, 0)
连接nn_connect(sock, url)nng_dial(sock, url, &dialer, 0)
发送nn_send(sock, buf, len, 0)nng_send(sock, buf, len, 0)
接收nn_recv(sock, buf, len, 0)nng_recv(sock, buf, &sz, 0)
设置超时nn_setsockopt(..., NN_RCVTIMEO, ...)nng_setopt_ms(sock, NNG_OPT_RECVTIMEO, ms)
关闭nn_close(sock)nng_close(sock)
错误信息nn_strerror(nn_errno())nng_strerror(rv)

4.11 注意事项

线程安全:nanomsg 的 Socket 不是线程安全的。不要在多个线程中同时操作同一个 Socket。需要并发时,每个线程创建独立的 Socket,通过 inproc 通信。

零拷贝内存管理:使用 NN_MSG 接收的消息必须通过 nn_freemsg() 释放,否则会内存泄漏。

Linger 时间nn_close() 默认等待 1 秒让未发送的消息完成投递。可设置 NN_LINGER 为 0 来立即关闭。


4.12 扩展阅读


上一章第 3 章:可扩展性协议详解 | 下一章第 5 章:NNG 现代 API 详解