强曰为道
与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

RabbitMQ 消息队列完全教程 / 第 9 章:集群与高可用

第 9 章:集群与高可用

单节点 RabbitMQ 无法满足生产环境的高可用要求。本章将讲解如何构建高可用的 RabbitMQ 集群。


9.1 集群架构

集群节点类型

节点类型说明磁盘节点内存节点
disk存储元数据到磁盘
ram元数据仅存内存

💡 提示: RabbitMQ 3.x+ 中所有节点默认都是 disk 节点。ram 节点在旧版本中用于提升性能,现已被仲裁队列取代。

集群中的数据分布

RabbitMQ Cluster (3 nodes)
┌──────────────────────────────────────────────────┐
│                                                  │
│  Node 1 (rabbit@node1)                          │
│  ├── Queue A (Leader)                           │
│  ├── Queue B (Follower)                         │
│  └── Queue C (Follower)                         │
│                                                  │
│  Node 2 (rabbit@node2)                          │
│  ├── Queue A (Follower)                         │
│  ├── Queue B (Leader)                           │
│  └── Queue C (Follower)                         │
│                                                  │
│  Node 3 (rabbit@node3)                          │
│  ├── Queue A (Follower)                         │
│  ├── Queue B (Follower)                         │
│  └── Queue C (Leader)                           │
│                                                  │
└──────────────────────────────────────────────────┘

集群通信

通信方式端口说明
AMQP5672客户端消息通信
Distribution25672Erlang 节点间通信
Management15672管理界面 HTTP API
Clustering4369Erlang Port Mapper Daemon (epmd)

9.2 集群搭建

手动搭建集群

# 在所有节点上设置相同的 Erlang Cookie
sudo rabbitmqctl stop_app
sudo rabbitmqctl reset

# 在 node2 上执行
sudo rabbitmqctl join_cluster rabbit@node1

# 在 node3 上执行
sudo rabbitmqctl join_cluster rabbit@node1

# 启动所有节点
sudo rabbitmqctl start_app

# 验证集群状态
sudo rabbitmqctl cluster_status

Docker Compose 集群

version: '3.8'
services:
  rabbit1:
    image: rabbitmq:4-management
    container_name: rabbit1
    hostname: rabbit1
    environment:
      RABBITMQ_ERLANG_COOKIE: "SWQOKODSQALRPCLNMEQG"
      RABBITMQ_DEFAULT_USER: admin
      RABBITMQ_DEFAULT_PASS: admin123
    ports:
      - "5672:5672"
      - "15672:15672"
    volumes:
      - rabbit1_data:/var/lib/rabbitmq
    networks:
      - rabbit_net

  rabbit2:
    image: rabbitmq:4-management
    container_name: rabbit2
    hostname: rabbit2
    environment:
      RABBITMQ_ERLANG_COOKIE: "SWQOKODSQALRPCLNMEQG"
    ports:
      - "5673:5672"
      - "15673:15672"
    volumes:
      - rabbit2_data:/var/lib/rabbitmq
    networks:
      - rabbit_net
    depends_on:
      - rabbit1

  rabbit3:
    image: rabbitmq:4-management
    container_name: rabbit3
    hostname: rabbit3
    environment:
      RABBITMQ_ERLANG_COOKIE: "SWQOKODSQALRPCLNMEQG"
    ports:
      - "5674:5672"
      - "15674:15672"
    volumes:
      - rabbit3_data:/var/lib/rabbitmq
    networks:
      - rabbit_net
    depends_on:
      - rabbit1

volumes:
  rabbit1_data:
  rabbit2_data:
  rabbit3_data:

networks:
  rabbit_net:
    driver: bridge
# 组建集群
docker exec rabbit2 rabbitmqctl stop_app
docker exec rabbit2 rabbitmqctl reset
docker exec rabbit2 rabbitmqctl join_cluster rabbit@rabbit1
docker exec rabbit2 rabbitmqctl start_app

docker exec rabbit3 rabbitmqctl stop_app
docker exec rabbit3 rabbitmqctl reset
docker exec rabbit3 rabbitmqctl join_cluster rabbit@rabbit1
docker exec rabbit3 rabbitmqctl start_app

9.3 镜像队列(Mirror Queue)

⚠️ 注意: 经典镜像队列在 RabbitMQ 4.0 中已移除。新项目请使用仲裁队列。本节仅作了解。

镜像队列原理

Producer ──> Node 1 (Master) ──同步复制──> Node 2 (Mirror)
                    │                          │
                    └────同步复制────> Node 3 (Mirror)

设置镜像策略

# 所有队列在所有节点上镜像
rabbitmqctl set_policy ha-all '.*' '{"ha-mode":"all","ha-sync-mode":"automatic"}' --apply-to queues

# 指定数量的镜像
rabbitmqctl set_policy ha-two '.*' '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}' --apply-to queues

# 指定节点的镜像
rabbitmqctl set_policy ha-nodes '.*' '{"ha-mode":"nodes","ha-params":["rabbit@node1","rabbit@node2"]}' --apply-to queues

镜像模式说明

ha-mode说明ha-params
all所有节点都镜像
exactly指定镜像数量节点数
nodes指定镜像节点节点列表

9.4 仲裁队列(Quorum Queue)

架构原理

Producer ──> Leader (Node 1)
               │
               ├── Raft 复制 ──> Follower (Node 2)
               └── Raft 复制 ──> Follower (Node 3)
               
写入确认: Leader + 多数 Follower 确认 → 返回成功

声明仲裁队列

channel.queue_declare(
    queue='critical_orders',
    durable=True,
    arguments={
        'x-queue-type': 'quorum',
        'x-quorum-initial-group-size': 3,  # 副本数
        'x-delivery-limit': 5,             # 最大投递次数
        'x-dead-letter-exchange': 'dlx',
        'x-dead-letter-routing-key': 'failed'
    }
)

仲裁队列管理

# 查看仲裁队列状态
rabbitmq-queues quorum_status "critical_orders"

# 查看集群中的仲裁队列成员
rabbitmq-queues members "critical_orders"

# 扩展:添加成员
rabbitmq-queues add_member critical_orders rabbit@node4

# 收缩:移除成员
rabbitmq-queues remove_member critical_orders rabbit@node3

# 查看所有仲裁队列
rabbitmq-queues quorum_status

Leader 选举

Leader 故障 → Raft 选举 → 多数派投票 → 新 Leader 上任
                │
                ├── 多数派可达 → 正常选举
                └── 多数派不可达 → 队列不可用(等待恢复)

9.5 节点管理

常用管理命令

# 查看集群状态
rabbitmqctl cluster_status

# 查看节点信息
rabbitmqctl status

# 停止节点
rabbitmqctl stop_app

# 重置节点(清除所有数据)
rabbitmqctl reset
# force_reset 强制重置(谨慎使用)
rabbitmqctl force_reset

# 移除集群节点
# 在要移除的节点上执行
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl start_app

# 或者在其他节点上强制移除
rabbitmqctl forget_cluster_node rabbit@node3

# 修改节点类型
rabbitmqctl change_cluster_node_type ram   # 修改为内存节点
rabbitmqctl change_cluster_node_type disc  # 修改为磁盘节点

节点健康检查

# 基本健康检查
rabbitmq-diagnostics check_running
rabbitmq-diagnostics check_port_connectivity
rabbitmq-diagnostics check_if_node_is_quorum_critical

# 综合健康检查
rabbitmq-diagnostics health_check

# 列出所有健康检查
rabbitmq-diagnostics list_checks

9.6 网络分区(Network Partition)

什么是网络分区

网络分区前:          网络分区后:
Node1 ── Node2      Node1 ╳ Node2
  │        │           │        │
  └───┬────┘           │        │
      │                │        │
    Node3            Node3    Node3
                   (partition1) (partition2)

分区检测策略

策略说明推荐
ignore不自动处理,手动恢复生产推荐
pause_minority少数派节点暂停自动恢复
pause_if_all_down指定节点不可达时暂停高级场景
autoheal自动选择赢家重启不推荐

配置分区策略

# /etc/rabbitmq/rabbitmq.conf
cluster_partition_handling = ignore
# 或
cluster_partition_handling = pause_minority

网络分区恢复

# 1. 检测分区
rabbitmqctl cluster_status | grep partitions

# 2. 手动恢复(选择数据最新的节点作为主节点)
# 在其他节点上执行
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster rabbit@primary_node
rabbitmqctl start_app

# 3. 或使用自动恢复(如果配置了 autoheal)
rabbitmqctl eval 'rabbit_node_monitor:force_autoheal().'

9.7 脑裂(Split-Brain)防护

脑裂场景

Node1 (认为自己是 Leader)    Node2 (认为自己是 Leader)
         │                          │
    写入数据 A                   写入数据 B
         │                          │
    ──── 网络恢复 ────
    数据不一致!

防护策略

策略说明
仲裁队列Raft 协议保证一致性,不会脑裂
pause_minority自动暂停少数派
监控告警检测分区后立即告警
最少节点数集群至少 3 个节点

9.8 负载均衡

使用 HAProxy

# /etc/haproxy/haproxy.cfg
listen rabbitmq_amqp
    bind *:5672
    mode tcp
    balance roundrobin
    option tcpka
    server rabbit1 10.0.0.1:5672 check inter 5s rise 2 fall 3
    server rabbit2 10.0.0.2:5672 check inter 5s rise 2 fall 3
    server rabbit3 10.0.0.3:5672 check inter 5s rise 2 fall 3

listen rabbitmq_management
    bind *:15672
    mode http
    balance roundrobin
    server rabbit1 10.0.0.1:15672 check
    server rabbit2 10.0.0.2:15672 check
    server rabbit3 10.0.0.3:15672 check

客户端连接多个节点

import pika

# 方式 1: 连接负载均衡器
conn = pika.BlockingConnection(pika.ConnectionParameters('haproxy-host'))

# 方式 2: 多节点故障转移
from pika import SSLOptions
params = [
    pika.ConnectionParameters('node1'),
    pika.ConnectionParameters('node2'),
    pika.ConnectionParameters('node3'),
]

# 简单的故障转移
for param in params:
    try:
        conn = pika.BlockingConnection(param)
        break
    except Exception:
        continue

9.9 注意事项

⚠️ 集群不提供跨节点队列复制

RabbitMQ 集群中队列默认只存在于声明它的节点上。需要通过镜像策略或仲裁队列实现复制。

⚠️ 仲裁队列至少需要 3 个节点

2 节点的仲裁队列无法容忍任何节点故障。

⚠️ 网络分区后数据可能不一致

经典镜像队列在网络分区后可能出现数据丢失,仲裁队列通过 Raft 协议避免此问题。

⚠️ 不要在生产环境使用 autoheal

自动恢复可能导致数据丢失,建议使用 pause_minority + 手动恢复。

🔥 最佳实践: 使用仲裁队列 + 3 节点集群 + pause_minority 策略 + HAProxy 负载均衡。


9.10 扩展阅读


下一章: 第 10 章:插件生态 — 探索 RabbitMQ 丰富的插件生态。