强曰为道
与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

nanomsg / NNG 消息库完全教程 / 第 3 章:可扩展性协议详解

3.1 协议概述

可扩展性协议(Scalability Protocols,SP)是 nanomsg / NNG 的核心抽象。每种协议定义了一种通信拓扑和消息路由语义,应用层无需关心底层传输细节。

3.1.1 协议分层

┌─────────────────────────────┐
│       应用层 (Your Code)     │
├─────────────────────────────┤
│   SP 协议层 (本章重点)        │
│  PAIR / PUB / SUB / REQ ... │
├─────────────────────────────┤
│   传输层                     │
│  TCP / IPC / inproc / WS    │
└─────────────────────────────┘

3.1.2 协议总览

协议通信模式消息路由背压(Backpressure)状态
PAIR1:1直连稳定
PUB1:N扇出(Fan-out)❌ 丢弃慢消费者稳定
SUBN:1过滤订阅N/A稳定
REQ1:1轮询(Round-robin)稳定
REPN:1自动路由回请求者稳定
PUSH1:N负载均衡(Load-balance)稳定
PULLN:1公平队列(Fair-queue)N/A稳定
BUSN:N全网广播稳定
SURVEYOR1:N扇出稳定
RESPONDENTN:1自动路由回调查者稳定

3.2 PAIR(点对点)

3.2.1 概念

PAIR 是最简单的协议,提供两个节点之间的 1:1 双向通信。它没有路由逻辑,消息直接从一端发送到另一端。

┌────────┐    双向     ┌────────┐
│ Node A │◄───────────►│ Node B │
└────────┘             └────────┘

3.2.2 特性

  • 只允许一个对端连接
  • 双向通信,双方均可发送和接收
  • 最低协议开销
  • 适合 inproc(线程间通信)

3.2.3 适用场景

场景说明
线程间通信同一进程内两个线程的专用通道
命令通道控制面与数据面的分离
简单 RPC无需复杂的请求路由

3.2.4 代码示例(NNG)

pair_server.c:

#include <nng/nng.h>
#include <nng/protocol/pair0/pair.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>

int main() {
    nng_socket sock;
    int rv;

    if ((rv = nng_pair0_open(&sock)) != 0) {
        fprintf(stderr, "nng_pair0_open: %s\n", nng_strerror(rv));
        return 1;
    }

    if ((rv = nng_listen(sock, "tcp://*:5555", NULL, 0)) != 0) {
        fprintf(stderr, "nng_listen: %s\n", nng_strerror(rv));
        return 1;
    }

    printf("PAIR server listening on tcp://*:5555\n");

    while (1) {
        char *buf = NULL;
        size_t sz;

        if ((rv = nng_recv(sock, &buf, &sz, NNG_FLAG_ALLOC)) == 0) {
            printf("Received: %.*s\n", (int)sz, buf);
            nng_free(buf, sz);

            const char *reply = "PONG";
            nng_send(sock, (void *)reply, strlen(reply), 0);
        }
    }

    nng_close(sock);
    return 0;
}

pair_client.c:

#include <nng/nng.h>
#include <nng/protocol/pair0/pair.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>

int main() {
    nng_socket sock;
    int rv;

    if ((rv = nng_pair0_open(&sock)) != 0) {
        fprintf(stderr, "nng_pair0_open: %s\n", nng_strerror(rv));
        return 1;
    }

    if ((rv = nng_dial(sock, "tcp://localhost:5555", NULL, 0)) != 0) {
        fprintf(stderr, "nng_dial: %s\n", nng_strerror(rv));
        return 1;
    }

    for (int i = 0; i < 5; i++) {
        char msg[32];
        snprintf(msg, sizeof(msg), "PING %d", i);
        printf("Sending: %s\n", msg);
        nng_send(sock, msg, strlen(msg), 0);

        char *buf = NULL;
        size_t sz;
        if (nng_recv(sock, &buf, &sz, NNG_FLAG_ALLOC) == 0) {
            printf("Received: %.*s\n", (int)sz, buf);
            nng_free(buf, sz);
        }
        sleep(1);
    }

    nng_close(sock);
    return 0;
}

3.2.5 注意事项

PAIR 只支持一个对端。如果有多个客户端尝试连接,只有一个能成功,其他会失败或排队。需要 1:N 通信时,请使用 REQ/REP 或 PUSH/PULL。


3.3 PUB / SUB(发布-订阅)

3.3.1 概念

PUB/SUB 是经典的发布-订阅模式。发布者(PUB)向所有订阅者(SUB)广播消息,订阅者可以设置过滤前缀来只接收感兴趣的消息。

                 ┌──────────┐
         ┌──────►│ SUB (A)  │  接收所有消息
┌────────┤       └──────────┘
│  PUB   │
│ Server ├──────►┌──────────┐
│        │       │ SUB (B)  │  只接收 "weather" 前缀
└────────┘       └──────────┘
         └──────►┌──────────┐
                 │ SUB (C)  │  只接收 "stock" 前缀
                 └──────────┘

3.3.2 特性

  • PUB:扇出(Fan-out)广播,消息发送给所有连接的 SUB
  • SUB:通过主题过滤(Topic Filter)选择性接收
  • PUB 端无背压:慢速消费者的消息会被丢弃
  • SUB 端需要设置订阅过滤,否则默认不接收任何消息

3.3.3 主题过滤

SUB 通过消息前缀匹配进行过滤:

  • 设置 ""(空字符串)接收所有消息
  • 设置 "weather" 接收以 "weather" 开头的消息
  • 可设置多个前缀

3.3.4 代码示例(NNG)

pub_server.c:

#include <nng/nng.h>
#include <nng/protocol/pubsub0/pub.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>

int main() {
    nng_socket sock;
    int rv;

    if ((rv = nng_pub0_open(&sock)) != 0) {
        fprintf(stderr, "nng_pub0_open: %s\n", nng_strerror(rv));
        return 1;
    }

    if ((rv = nng_listen(sock, "tcp://*:5556", NULL, 0)) != 0) {
        fprintf(stderr, "nng_listen: %s\n", nng_strerror(rv));
        return 1;
    }

    printf("PUB server on tcp://*:5556\n");

    const char *topics[] = {
        "weather sunny 25C",
        "stock AAPL 150.25",
        "weather rainy 18C",
        "stock GOOG 2800.00",
        "weather cloudy 20C",
    };

    int i = 0;
    while (1) {
        const char *msg = topics[i % 5];
        printf("Publishing: %s\n", msg);
        nng_send(sock, (void *)msg, strlen(msg), 0);
        i++;
        sleep(1);
    }

    nng_close(sock);
    return 0;
}

sub_client.c:

#include <nng/nng.h>
#include <nng/protocol/pubsub0/sub.h>
#include <stdio.h>
#include <string.h>

int main() {
    nng_socket sock;
    int rv;

    if ((rv = nng_sub0_open(&sock)) != 0) {
        fprintf(stderr, "nng_sub0_open: %s\n", nng_strerror(rv));
        return 1;
    }

    // 设置订阅前缀 —— 只接收 "weather" 开头的消息
    const char *filter = "weather";
    if ((rv = nng_setopt(sock, NNG_OPT_SUB_SUBSCRIBE, filter, strlen(filter))) != 0) {
        fprintf(stderr, "nng_setopt: %s\n", nng_strerror(rv));
        return 1;
    }

    if ((rv = nng_dial(sock, "tcp://localhost:5556", NULL, 0)) != 0) {
        fprintf(stderr, "nng_dial: %s\n", nng_strerror(rv));
        return 1;
    }

    printf("SUB client subscribed to 'weather'\n");

    while (1) {
        char *buf = NULL;
        size_t sz;
        if ((rv = nng_recv(sock, &buf, &sz, NNG_FLAG_ALLOC)) == 0) {
            printf("Received: %.*s\n", (int)sz, buf);
            nng_free(buf, sz);
        }
    }

    nng_close(sock);
    return 0;
}

3.3.5 注意事项

SUB 默认不接收任何消息:必须调用 NNG_OPT_SUB_SUBSCRIBE 设置至少一个过滤前缀(空字符串 "" 表示接收全部)。

PUB 无背压:如果 SUB 消费速度跟不上,PUB 会丢弃消息。不适合需要可靠投递的场景。

消息不持久化:PUB/SUB 是纯内存模式,无磁盘存储。


3.4 REQ / REP(请求-应答)

3.4.1 概念

REQ/REP 实现经典的请求-应答模式。客户端(REQ)发送请求,服务端(REP)处理后返回响应。支持多服务端负载均衡。

┌────────┐  Request   ┌────────┐
│  REQ   ├───────────►│  REP   │
│ Client │◄───────────┤ Server │
└────────┘  Response  └────────┘

多个 REQ 连接多个 REP 时,REQ 端自动轮询分发:

         ┌───► REP Server A
REQ ─────┼───► REP Server B
         └───► REP Server C

3.4.2 特性

  • REQ 必须严格遵守 发送→接收→发送→接收 的交替模式
  • REP 同样严格遵守 接收→发送→接收→发送
  • REQ 支持自动重试和超时
  • REP 会自动将响应路由回发起请求的 REQ

3.4.3 代码示例(NNG)

rep_server.c:

#include <nng/nng.h>
#include <nng/protocol/reqrep0/rep.h>
#include <stdio.h>
#include <string.h>
#include <stdlib.h>

int main() {
    nng_socket sock;
    int rv;

    if ((rv = nng_rep0_open(&sock)) != 0) {
        fprintf(stderr, "nng_rep0_open: %s\n", nng_strerror(rv));
        return 1;
    }

    if ((rv = nng_listen(sock, "tcp://*:5557", NULL, 0)) != 0) {
        fprintf(stderr, "nng_listen: %s\n", nng_strerror(rv));
        return 1;
    }

    printf("REP server on tcp://*:5557\n");

    while (1) {
        char *buf = NULL;
        size_t sz;

        if ((rv = nng_recv(sock, &buf, &sz, NNG_FLAG_ALLOC)) != 0) {
            fprintf(stderr, "nng_recv: %s\n", nng_strerror(rv));
            break;
        }

        printf("Request: %.*s\n", (int)sz, buf);

        // 简单回转(echo)
        nng_send(sock, buf, sz, 0);
        nng_free(buf, sz);
    }

    nng_close(sock);
    return 0;
}

req_client.c:

#include <nng/nng.h>
#include <nng/protocol/reqrep0/req.h>
#include <stdio.h>
#include <string.h>

int main() {
    nng_socket sock;
    int rv;

    if ((rv = nng_req0_open(&sock)) != 0) {
        fprintf(stderr, "nng_req0_open: %s\n", nng_strerror(rv));
        return 1;
    }

    if ((rv = nng_dial(sock, "tcp://localhost:5557", NULL, 0)) != 0) {
        fprintf(stderr, "nng_dial: %s\n", nng_strerror(rv));
        return 1;
    }

    for (int i = 0; i < 3; i++) {
        char msg[64];
        snprintf(msg, sizeof(msg), "Request #%d", i);

        printf("Sending: %s\n", msg);
        if ((rv = nng_send(sock, msg, strlen(msg), 0)) != 0) {
            fprintf(stderr, "nng_send: %s\n", nng_strerror(rv));
            break;
        }

        char *buf = NULL;
        size_t sz;
        if ((rv = nng_recv(sock, &buf, &sz, NNG_FLAG_ALLOC)) != 0) {
            fprintf(stderr, "nng_recv: %s\n", nng_strerror(rv));
            break;
        }
        printf("Reply: %.*s\n", (int)sz, buf);
        nng_free(buf, sz);
    }

    nng_close(sock);
    return 0;
}

3.4.4 注意事项

严格交替:REQ 端必须先发后收,REP 端必须先收后发。违反顺序会导致 NNG_ESTATE 错误。

REP 自动路由:REP socket 内部维护请求来源信息,send 时自动路由回正确的 REQ,无需应用层处理。

多 REQ 连 REP:REQ 端会对多个 REP 进行轮询(Round-robin),实现简单的负载均衡。


3.5 PUSH / PULL(任务分发)

3.5.1 概念

PUSH/PULL 实现生产者-消费者模型。PUSH 端将任务分发给多个 PULL 端(负载均衡),PULL 端从多个 PUSH 端公平接收任务。

         ┌───► PULL Worker A
PUSH ────┼───► PULL Worker B    (负载均衡)
         └───► PULL Worker C
PUSH Producer A ──┐
                  ├──► PULL  (公平队列)
PUSH Producer B ──┘

3.5.2 特性

  • PUSH:负载均衡(Round-robin)分发消息到 PULL
  • PULL:公平队列(Fair-queue)从多个 PUSH 接收消息
  • PUSH 有背压:所有 PULL 都忙时,PUSH 会阻塞
  • PULL 只能接收,不能发送;PUSH 只能发送,不能接收

3.5.3 典型架构:任务队列

┌─────────────┐
│  Producer   │  PUSH
│  (任务提交)  ├──────────┐
└─────────────┘          │
                    ┌────▼────┐
                    │ Worker 1│  PULL
                    └─────────┘
┌─────────────┐     ┌─────────┐
│  Producer   │ PUSH│ Worker 2│  PULL
│  (任务提交)  ├────►└─────────┘
└─────────────┘     ┌─────────┐
                    │ Worker 3│  PULL
                    └─────────┘

3.5.4 代码示例(NNG)

push_producer.c:

#include <nng/nng.h>
#include <nng/protocol/pipeline0/push.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>

int main() {
    nng_socket sock;
    int rv;

    if ((rv = nng_push0_open(&sock)) != 0) {
        fprintf(stderr, "nng_push0_open: %s\n", nng_strerror(rv));
        return 1;
    }

    if ((rv = nng_listen(sock, "tcp://*:5558", NULL, 0)) != 0) {
        fprintf(stderr, "nng_listen: %s\n", nng_strerror(rv));
        return 1;
    }

    printf("PUSH producer on tcp://*:5558\n");

    for (int i = 0; i < 10; i++) {
        char task[64];
        snprintf(task, sizeof(task), "Task #%d", i);
        printf("Sending: %s\n", task);
        nng_send(sock, task, strlen(task), 0);
        usleep(500000); // 500ms
    }

    nng_close(sock);
    return 0;
}

pull_worker.c:

#include <nng/nng.h>
#include <nng/protocol/pipeline0/pull.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>

int main(int argc, char *argv[]) {
    nng_socket sock;
    int rv;
    const char *url = (argc > 1) ? argv[1] : "tcp://localhost:5558";

    if ((rv = nng_pull0_open(&sock)) != 0) {
        fprintf(stderr, "nng_pull0_open: %s\n", nng_strerror(rv));
        return 1;
    }

    if ((rv = nng_dial(sock, url, NULL, 0)) != 0) {
        fprintf(stderr, "nng_dial: %s\n", nng_strerror(rv));
        return 1;
    }

    printf("PULL worker connected to %s\n", url);

    while (1) {
        char *buf = NULL;
        size_t sz;
        if ((rv = nng_recv(sock, &buf, &sz, NNG_FLAG_ALLOC)) == 0) {
            printf("Processing: %.*s\n", (int)sz, buf);
            nng_free(buf, sz);
            usleep(1000000); // 模拟处理耗时 1s
        }
    }

    nng_close(sock);
    return 0;
}

3.5.5 注意事项

单向通信:PUSH 只能发,PULL 只能收。需要双向通信请用 REQ/REP。

背压机制:PUSH 端在所有 PULL 都满时会阻塞发送,防止消息丢失。但这也意味着 PUSH 可能挂起。

负载均衡粒度:消息按 Round-robin 分配,不感知 Worker 的实际负载。


3.6 BUS(总线)

3.6.1 概念

BUS 实现 N:N 全连接通信。每个节点发送的消息会广播给其他所有节点。

┌────────┐      ┌────────┐
│ Node A │◄────►│ Node B │
└───┬────┘      └────┬───┘
    │                │
    └──────┬─────────┘
           ▼
      ┌────────┐
      │ Node C │
      └────────┘

3.6.2 特性

  • 所有节点地位平等
  • 消息广播给所有其他节点(不回显给自己)
  • 适合集群内部通信、状态同步

3.6.3 代码示例(NNG)

bus_node.c:

#include <nng/nng.h>
#include <nng/protocol/bus0/bus.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>

int main(int argc, char *argv[]) {
    nng_socket sock;
    int rv;

    if (argc < 3) {
        fprintf(stderr, "Usage: %s <bind_url> <dial_url>\n", argv[0]);
        fprintf(stderr, "Example: %s tcp://*:5560 tcp://localhost:5561\n", argv[0]);
        return 1;
    }

    if ((rv = nng_bus0_open(&sock)) != 0) {
        fprintf(stderr, "nng_bus0_open: %s\n", nng_strerror(rv));
        return 1;
    }

    // 绑定一个地址
    if ((rv = nng_listen(sock, argv[1], NULL, 0)) != 0) {
        fprintf(stderr, "nng_listen(%s): %s\n", argv[1], nng_strerror(rv));
        return 1;
    }

    // 连接其他节点
    if ((rv = nng_dial(sock, argv[2], NULL, 0)) != 0) {
        fprintf(stderr, "nng_dial(%s): %s\n", argv[2], nng_strerror(rv));
        return 1;
    }

    printf("BUS node: bind=%s, dial=%s\n", argv[1], argv[2]);

    // 发送消息
    for (int i = 0; i < 5; i++) {
        char msg[64];
        snprintf(msg, sizeof(msg), "Message from %s #%d", argv[1], i);
        nng_send(sock, msg, strlen(msg), 0);
        printf("Sent: %s\n", msg);
        sleep(1);
    }

    // 接收消息
    nng_setopt_ms(sock, NNG_OPT_RECVTIMEO, 3000);
    while (1) {
        char *buf = NULL;
        size_t sz;
        if (nng_recv(sock, &buf, &sz, NNG_FLAG_ALLOC) == 0) {
            printf("Received: %.*s\n", (int)sz, buf);
            nng_free(buf, sz);
        } else {
            break;
        }
    }

    nng_close(sock);
    return 0;
}

多节点运行:

# 节点 A(先启动,绑定两个端口)
./bus_node tcp://*:5560 tcp://localhost:5561

# 节点 B
./bus_node tcp://*:5561 tcp://localhost:5560

# 节点 C
./bus_node tcp://*:5562 tcp://localhost:5560

3.6.4 注意事项

连接拓扑:BUS 需要节点之间相互连接。如果有 N 个节点,需要至少 N-1 条连接才能保证全网可达。

不支持桥接:BUS 协议不会转发消息,只转发自己产生的消息。


3.7 SURVEYOR / RESPONDENT(调查模式)

3.7.1 概念

SURVEYOR/RESPONDENT 实现"调查-响应"模式。调查者(SURVEYOR)向所有响应者(RESPONDENT)广播一个调查请求,然后收集所有响应。

                    ┌──────────────┐
        ┌──────────►│ RESPONDENT A │
        │           └──────────────┘
┌───────┴──────┐    ┌──────────────┐
│  SURVEYOR    │───►│ RESPONDENT B │
│  (调查者)    │    └──────────────┘
└───────┬──────┘    ┌──────────────┘
        └──────────►│ RESPONDENT C │
                    └──────────────┘

3.7.2 特性

  • SURVEYOR 有截止时间(Survey Deadline),超时后不再接收响应
  • RESPONDENT 自动将响应路由回发起调查的 SURVEYOR
  • 适用于服务发现、健康检查、集群选举等场景

3.7.3 适用场景

场景说明
服务发现新节点上线时广播查询,收集所有已知节点
健康检查定期调查所有节点的状态
集群选举发起投票,收集所有节点的意见
配置同步广播配置请求,收集节点的当前配置

3.7.4 代码示例(NNG)

surveyor.c:

#include <nng/nng.h>
#include <nng/protocol/survey0/survey.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>

int main() {
    nng_socket sock;
    int rv;

    if ((rv = nng_surveyor0_open(&sock)) != 0) {
        fprintf(stderr, "nng_surveyor0_open: %s\n", nng_strerror(rv));
        return 1;
    }

    if ((rv = nng_listen(sock, "tcp://*:5559", NULL, 0)) != 0) {
        fprintf(stderr, "nng_listen: %s\n", nng_strerror(rv));
        return 1;
    }

    printf("SURVEYOR on tcp://*:5559\n");
    sleep(2); // 等待 RESPONDENT 连接

    // 发起调查
    const char *survey_msg = "Who are you?";
    printf("Survey: %s\n", survey_msg);
    nng_send(sock, (void *)survey_msg, strlen(survey_msg), 0);

    // 设置接收超时(5 秒收集响应)
    nng_setopt_ms(sock, NNG_OPT_SURVEYOR_SURVEYTIMEO, 5000);

    // 收集响应
    while (1) {
        char *buf = NULL;
        size_t sz;
        rv = nng_recv(sock, &buf, &sz, NNG_FLAG_ALLOC);
        if (rv == NNG_ETIMEDOUT) {
            printf("Survey period ended.\n");
            break;
        } else if (rv == 0) {
            printf("Response: %.*s\n", (int)sz, buf);
            nng_free(buf, sz);
        }
    }

    nng_close(sock);
    return 0;
}

respondent.c:

#include <nng/nng.h>
#include <nng/protocol/survey0/respond.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>

int main(int argc, char *argv[]) {
    nng_socket sock;
    int rv;
    const char *name = (argc > 1) ? argv[1] : "DefaultNode";

    if ((rv = nng_respondent0_open(&sock)) != 0) {
        fprintf(stderr, "nng_respondent0_open: %s\n", nng_strerror(rv));
        return 1;
    }

    if ((rv = nng_dial(sock, "tcp://localhost:5559", NULL, 0)) != 0) {
        fprintf(stderr, "nng_dial: %s\n", nng_strerror(rv));
        return 1;
    }

    printf("RESPONDENT '%s' connected\n", name);

    while (1) {
        char *buf = NULL;
        size_t sz;
        if ((rv = nng_recv(sock, &buf, &sz, NNG_FLAG_ALLOC)) == 0) {
            printf("Survey received: %.*s\n", (int)sz, buf);
            nng_free(buf, sz);

            // 响应
            char reply[64];
            snprintf(reply, sizeof(reply), "I am %s", name);
            nng_send(sock, reply, strlen(reply), 0);
        }
    }

    nng_close(sock);
    return 0;
}

3.7.5 注意事项

调查超时:必须设置 NNG_OPT_SURVEYOR_SURVEYTIMEO,否则 SURVEYOR 会永远等待。

RESPONDENT 需先连接:SURVEYOR 发起调查前,RESPONDENT 应已连接,否则会丢失响应。

不能在同一 socket 上连续发起调查:必须等待当前调查结束后才能发起下一次。


3.8 协议选型指南

3.8.1 决策树

需要什么样的通信?
│
├── 1:1 双向?
│   └── PAIR
│
├── 请求-应答?
│   └── REQ / REP
│
├── 广播/发布?
│   ├── 允许丢失? ── PUB / SUB
│   └── 需要响应? ── SURVEYOR / RESPONDENT
│
├── 任务分发?
│   └── PUSH / PULL
│
└── 全连接?
    └── BUS

3.8.2 场景对照表

业务场景推荐协议理由
微服务 RPCREQ/REP请求-应答模式,支持负载均衡
实时数据推送PUB/SUB广播高效,支持主题过滤
日志收集PUSH/PULL负载均衡分发,背压保护
服务健康检查SURVEYOR/RESPONDENT一对多调查,超时机制完善
集群状态同步BUS全连接,所有节点互相同步
命令控制通道PAIR简单可靠,开销最低
任务队列PUSH/PULL生产者-消费者模型
IoT 数据采集PUSH/PULL 或 PUB/SUB设备少用 PUSH/PULL,设备多用 PUB/SUB

3.9 注意事项

协议与传输解耦:每种协议可以运行在任何传输层(TCP、IPC、inproc)上,两者独立选择。

单 socket 单协议:一个 socket 只能绑定一种协议,不能混用。

NNG 协议版本:NNG 使用 pair0pub0 等命名(带版本号),为未来协议升级预留空间。


3.10 扩展阅读


上一章第 2 章:安装与环境搭建 | 下一章第 4 章:nanomsg C API 详解