RabbitMQ 消息队列完全教程 / 第 10 章:插件生态
第 10 章:插件生态
RabbitMQ 的插件系统极大地扩展了其功能。本章将介绍最常用和最重要的插件及其配置方法。
10.1 插件管理基础
插件管理命令
# 列出所有插件
rabbitmq-plugins list
# 启用插件
rabbitmq-plugins enable <plugin_name>
# 禁用插件
rabbitmq-plugins disable <plugin_name>
# 启用所有插件(不推荐生产)
rabbitmq-plugins enable --all
# 禁用所有插件
rabbitmq-plugins disable --all
插件状态说明
| 状态 | 符号 | 说明 |
|---|---|---|
| enabled | E | 已启用,运行中 |
| disabled | D | 已禁用 |
| running | * | 正在运行 |
# 列出并过滤插件状态
rabbitmq-plugins list -e enabled # 仅显示已启用
rabbitmq-plugins list -m available # 仅显示可用
插件安装路径
| 系统 | 路径 |
|---|---|
| Linux (包管理) | /usr/lib/rabbitmq/plugins/ |
| Docker | /opt/rabbitmq/plugins/ |
| macOS (Homebrew) | $(brew --prefix)/opt/rabbitmq/plugins/ |
# 手动安装第三方插件
cp rabbitmq_custom_plugin.ez /usr/lib/rabbitmq/plugins/
rabbitmq-plugins enable rabbitmq_custom_plugin
10.2 Management UI 插件
rabbitmq_management
rabbitmq-plugins enable rabbitmq_management
管理界面功能
| 功能 | 路径 | 说明 |
|---|---|---|
| 概览 | / | 系统总览 |
| 连接 | /#/connections | 所有客户端连接 |
| 通道 | /#/channels | 所有通道 |
| 交换机 | /#/exchanges | 交换机管理 |
| 队列 | /#/queues | 队列管理 |
| 管理 | /#/admin | 用户、策略、参数管理 |
HTTP API 使用
# 系统概览
curl -u admin:admin123 http://localhost:15672/api/overview
# 列出所有队列
curl -u admin:admin123 http://localhost:15672/api/queues
# 列出所有连接
curl -u admin:admin123 http://localhost:15672/api/connections
# 获取队列详情
curl -u admin:admin123 http://localhost:15672/api/queues/%2F/my_queue
# 发送测试消息
curl -u admin:admin123 -X POST http://localhost:15672/api/exchanges/%2F/amq.default/publish \
-H "Content-Type: application/json" \
-d '{
"properties": {},
"routing_key": "my_queue",
"payload": "test message",
"payload_encoding": "string"
}'
# 消费消息
curl -u admin:admin123 -X POST http://localhost:15672/api/queues/%2F/my_queue/get \
-H "Content-Type: application/json" \
-d '{"count":1,"requeue":false,"encoding":"auto","ackmode":"ack_requeue_true"}'
rabbitmqadmin 命令行工具
# 下载
curl -o rabbitmqadmin http://localhost:15672/cli/rabbitmqadmin
chmod +x rabbitmqadmin
# 列出队列
./rabbitmqadmin list queues name messages consumers
# 发布消息
./rabbitmqadmin publish exchange=amq.default routing_key=test payload="hello"
# 获取消息
./rabbitmqadmin get queue=test ackmode=ack_requeue_true count=5
# 导出定义(配置备份)
./rabbitmqadmin export rabbitmq_definitions.json
# 导入定义
./rabbitmqadmin import rabbitmq_definitions.json
10.3 Delayed Message Exchange 插件
安装
# 下载与 RabbitMQ 版本匹配的插件
# https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/v3.13.0/rabbitmq_delayed_message_exchange-3.13.0.ez
# 复制到插件目录
cp rabbitmq_delayed_message_exchange-3.13.0.ez /usr/lib/rabbitmq/plugins/
# 启用
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
使用方式
import pika
import json
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# 声明延迟交换机
channel.exchange_declare(
exchange='delayed_orders',
exchange_type='x-delayed-message',
durable=True,
arguments={'x-delayed-type': 'direct'} # 内部路由类型
)
channel.queue_declare(queue='order_timeout', durable=True)
channel.queue_bind(exchange='delayed_orders', queue='order_timeout', routing_key='timeout')
# 发送延迟消息
for delay_ms, message in [
(5000, {'order': '001', 'action': 'remind_payment'}), # 5秒
(1800000, {'order': '002', 'action': 'auto_cancel'}), # 30分钟
(86400000, {'order': '003', 'action': 'auto_complete'}), # 24小时
]:
channel.basic_publish(
exchange='delayed_orders',
routing_key='timeout',
body=json.dumps(message),
properties=pika.BasicProperties(
delivery_mode=2,
headers={'x-delay': delay_ms}
)
)
print(f"[x] 延迟 {delay_ms/1000}s: {message}")
connection.close()
修改延迟时间
# 发送新消息时,使用相同 message_id 覆盖旧消息
channel.basic_publish(
exchange='delayed_orders',
routing_key='timeout',
body=json.dumps({'action': 'updated_timeout'}),
properties=pika.BasicProperties(
delivery_mode=2,
message_id='order_001_timeout', # 用于去重/覆盖
headers={'x-delay': 60000}
)
)
注意事项
⚠️ 延迟消息存储在 Mnesia/ETS 中,不适合大量延迟消息。
⚠️ 节点重启后未投递的延迟消息会丢失。
⚠️ 延迟时间精度为秒级,不保证精确到毫秒。
10.4 Shovel 插件
Shovel 用于在 RabbitMQ 实例之间单向转发消息。
架构
Source Broker ──Shovel──> Destination Broker
(Node A) (Node B)
启用 Shovel
rabbitmq-plugins enable rabbitmq_shovel
rabbitmq-plugins enable rabbitmq_shovel_management # 管理界面(可选)
静态 Shovel 配置
# /etc/rabbitmq/rabbitmq.conf 中添加 advanced.config
%% advanced.config
[{rabbitmq_shovel,
[{shovels,
[{my_shovel,
[{source,
[{protocol, amqp091},
{uris, ["amqp://admin:admin123@source-host:5672"]},
{queue, <<"source_queue">>}]},
{destination,
[{protocol, amqp091},
{uris, ["amqp://admin:admin123@dest-host:5672"]},
{exchange, <<"dest_exchange">>},
{publish_properties, [{delivery_mode, 2}]}}]},
{ack_mode, on_confirm},
{reconnect_delay, 5}
]}
]}
]}
].
动态 Shovel 配置
# 通过 rabbitmqctl 创建动态 Shovel
rabbitmqctl set_parameter shovel my_shovel \
'{"src-protocol":"amqp091","src-uri":"amqp://admin:admin123@source:5672","src-queue":"source_queue","dest-protocol":"amqp091","dest-uri":"amqp://admin:admin123@dest:5672","dest-exchange":"dest_exchange","ack-mode":"on-confirm","reconnect-delay":5}'
# 查看 Shovel 状态
rabbitmqctl list_shovels
# 删除 Shovel
rabbitmqctl clear_parameter shovel my_shovel
动态 Shovel(Python)
import requests
import json
# 创建动态 Shovel
response = requests.put(
'http://localhost:15672/api/parameters/shovel/%2F/my_shovel',
auth=('admin', 'admin123'),
json={
'value': {
'src-protocol': 'amqp091',
'src-uri': 'amqp://admin:admin123@source:5672',
'src-queue': 'source_queue',
'dest-protocol': 'amqp091',
'dest-uri': 'amqp://admin:admin123@dest:5672',
'dest-exchange': 'dest_exchange',
'ack-mode': 'on-confirm',
'reconnect-delay': 5
}
}
)
print(response.status_code)
Shovel 适用场景
| 场景 | 说明 |
|---|---|
| 跨数据中心消息同步 | 从 A 机房转发到 B 机房 |
| 消息迁移 | 迁移到新集群 |
| 消息桥接 | 连接不同版本/配置的 RabbitMQ |
| 灾难恢复 | 备份消息到远程集群 |
10.5 Federation 插件
Federation 用于在 RabbitMQ 之间建立联邦交换机/队列,支持多向消息同步。
Federation vs Shovel
| 特性 | Federation | Shovel |
|---|---|---|
| 方向 | 多向(支持环形) | 单向 |
| 配置方式 | 策略(Policy) | 参数(Parameter) |
| 适用场景 | 多数据中心 | 点对点转发 |
| 交换机联邦 | ✅ | — |
| 队列联邦 | ✅ | — |
| 自动发现 | ✅ | ❌ |
启用 Federation
rabbitmq-plugins enable rabbitmq_federation
rabbitmq-plugins enable rabbitmq_federation_management # 管理界面
配置 Federation Upstream
# 设置 upstream
rabbitmqctl set_parameter federation-upstream my_upstream \
'{"uri":"amqp://admin:admin123@upstream-host:5672","prefetch-count":1000,"reconnect-delay":5,"ack-mode":"on-confirm"}'
# 设置策略(将特定交换机联邦到 upstream)
rabbitmqctl set_policy federate-exchanges "^federated\." \
'{"federation-upstream-set":"my_upstream"}' \
--apply-to exchanges
# 设置联邦队列策略
rabbitmqctl set_policy federate-queues "^federated\." \
'{"federation-upstream-set":"my_upstream"}' \
--apply-to queues
Federation 架构
Upstream Broker Downstream Broker
┌─────────────┐ ┌─────────────┐
│ Exchange: │ Federation │ Exchange: │
│ orders.ex │ ──────────────> │ orders.ex │
│ │ (通过 AMQP) │ (federated) │
└─────────────┘ └─────────────┘
│
Binding
│
┌────▼────┐
│ Queue │
└─────────┘
多数据中心 Federation
# DC1 的 upstream 指向 DC2
rabbitmqctl set_parameter federation-upstream dc2_upstream \
'{"uri":"amqp://admin:admin123@dc2-rabbit:5672"}'
# DC2 的 upstream 指向 DC1(形成环形)
rabbitmqctl set_parameter federation-upstream dc1_upstream \
'{"uri":"amqp://admin:admin123@dc1-rabbit:5672"}'
10.6 Prometheus 监控插件
启用 Prometheus 插件
rabbitmq-plugins enable rabbitmq_prometheus
端口和端点
| 端点 | 端口 | 说明 |
|---|---|---|
/metrics | 15692 | Prometheus 格式指标 |
/metrics/per-object | 15692 | 每个对象的详细指标 |
/metrics/detailed | 15692 | 详细指标(高开销) |
Prometheus 配置
# prometheus.yml
scrape_configs:
- job_name: 'rabbitmq'
scrape_interval: 15s
static_configs:
- targets: ['rabbitmq-host:15692']
metrics_path: /metrics
关键指标
| 指标 | 说明 | 告警阈值 |
|---|---|---|
rabbitmq_queues | 队列数量 | > 10000 |
rabbitmq_connections | 连接数 | > 5000 |
rabbitmq_channels | 通道数 | > 10000 |
rabbitmq_queue_messages | 队列消息总数 | > 1000000 |
rabbitmq_queue_messages_ready | 就绪消息数 | 持续增长 |
rabbitmq_queue_messages_unacked | 未确认消息数 | > prefetch * 消费者数 |
rabbitmq_process_resident_memory_bytes | 进程内存 | > 内存阈值 80% |
rabbitmq_disk_space_available_bytes | 可用磁盘空间 | < 2GB |
rabbitmq_channel_messages_published_total | 发布速率 | 无固定阈值 |
rabbitmq_channel_messages_delivered_total | 投递速率 | 无固定阈值 |
Grafana Dashboard
{
"dashboard": {
"title": "RabbitMQ Overview",
"panels": [
{
"title": "Queue Depth",
"targets": [{
"expr": "rabbitmq_queue_messages",
"legendFormat": "{{queue}}"
}]
},
{
"title": "Message Rate",
"targets": [
{"expr": "rate(rabbitmq_channel_messages_published_total[5m])", "legendFormat": "publish"},
{"expr": "rate(rabbitmq_channel_messages_delivered_total[5m])", "legendFormat": "deliver"}
]
},
{
"title": "Connections",
"targets": [{"expr": "rabbitmq_connections"}]
}
]
}
}
10.7 其他重要插件
MQTT 插件
rabbitmq-plugins enable rabbitmq_mqtt
# 端口: 1883 (TCP), 8883 (TLS)
STOMP 插件
rabbitmq-plugins enable rabbitmq_stomp
# 端口: 61613 (TCP), 61614 (TLS)
Web STOMP
rabbitmq-plugins enable rabbitmq_web_stomp
# 提供 WebSocket 端点
Auth Backend HTTP
rabbitmq-plugins enable rabbitmq_auth_backend_http
# 使用 HTTP 服务进行认证
# 配置 HTTP 认证
auth_backends.1 = http
auth_http.http_method = post
auth_http.user_path = http://auth-service/rabbitmq/user
auth_http.vhost_path = http://auth-service/rabbitmq/vhost
auth_http.resource_path = http://auth-service/rabbitmq/resource
auth_http.topic_path = http://auth-service/rabbitmq/topic
Top 插件
rabbitmq-plugins enable rabbitmq_top
# 在管理界面显示进程资源使用排名
10.8 自定义插件开发
插件目录结构
rabbitmq_custom_plugin/
├── src/
│ ├── rabbit_custom_plugin.erl
│ └── rabbit_custom_plugin_app.erl
├── include/
├── priv/
├── Makefile
└── rabbitmq_custom_plugin.app.src
💡 提示: 自定义插件需要 Erlang 开发能力。大多数场景下,使用现有插件 + HTTP API 即可满足需求。
10.9 插件选择建议
| 需求 | 推荐插件 | 说明 |
|---|---|---|
| 管理界面 | rabbitmq_management | 必装 |
| 监控 | rabbitmq_prometheus | 推荐 |
| 延迟消息 | rabbitmq_delayed_message_exchange | 推荐 |
| 跨集群同步 | rabbitmq_federation / rabbitmq_shovel | 按需 |
| MQTT 支持 | rabbitmq_mqtt | IoT 场景 |
| WebSocket | rabbitmq_web_stomp | 浏览器场景 |
| 认证集成 | rabbitmq_auth_backend_http | 企业场景 |
10.10 注意事项
⚠️ 插件版本兼容性
第三方插件版本必须与 RabbitMQ 主版本匹配,否则无法加载。
⚠️ 插件会影响性能
每个启用的插件都会消耗一定资源,生产环境仅启用必要插件。
⚠️ Prometheus 高开销指标
/metrics/detailed 端点会为每个队列/连接/通道生成指标,高基数场景下会产生大量数据。
🔥 最佳实践: 生产环境推荐启用 rabbitmq_management + rabbitmq_prometheus + 按需的业务插件。
10.11 扩展阅读
下一章: 第 11 章:RabbitMQ Streams — 了解 RabbitMQ 的流式处理能力。