强曰为道
与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

RabbitMQ 消息队列完全教程 / 第 8 章:消息路由

第 8 章:消息路由

路由是 RabbitMQ 消息分发的核心逻辑。本章将系统性地讲解各种路由策略、组合模式和高级用法。


8.1 路由基础

路由决策流程

Producer 发布消息
    │
    ├── exchange = "" (默认) ──> routing_key 匹配队列名
    │
    └── exchange = "xxx" (具名)
            │
            ├── Direct: routing_key == binding_key
            ├── Fanout: 忽略 routing_key,广播所有
            ├── Topic: routing_key 匹配 binding_key 通配符
            └── Headers: 消息 headers 匹配 binding arguments

路由键设计规范

规范说明示例
使用 . 分隔层级类似域名命名order.payment.created
从左到右范围递减通用 → 具体region.service.event
使用小写字母保持一致性user.registered
避免特殊字符仅用字母、数字、.*#
长度适中不宜超过 255 字符

常见命名模式

# 域.动作
order.create
order.payment.success
user.registered
inventory.reserved

# 域.子域.事件
payment.gateway.timeout
notification.email.sent
logistics.tracking.updated

# 环境.域.事件(多环境)
prod.order.created
staging.order.created

8.2 Direct 路由策略

精确匹配

import pika, json

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 声明 Direct 交换机
channel.exchange_declare(exchange='commands', exchange_type='direct', durable=True)

# 声明队列
services = {
    'payment_service': 'payment',
    'shipping_service': 'shipping',
    'notification_service': 'notification',
    'audit_service': 'audit'
}

for queue, routing_key in services.items():
    channel.queue_declare(queue=queue, durable=True)
    channel.queue_bind(exchange='commands', queue=queue, routing_key=routing_key)

# 发送支付命令
channel.basic_publish(
    exchange='commands',
    routing_key='payment',   # 精确路由到 payment_service
    body=json.dumps({'action': 'charge', 'amount': 99.9}),
    properties=pika.BasicProperties(delivery_mode=2)
)

一对多路由(同一路由键绑定多个队列)

# 多个队列绑定同一个路由键
channel.queue_bind(exchange='commands', queue='audit_service', routing_key='payment')
channel.queue_bind(exchange='commands', queue='analytics_service', routing_key='payment')

# 消息同时路由到 audit_service 和 analytics_service

8.3 Topic 路由策略

通配符匹配

通配符含义order.* 匹配order.# 匹配
*一个单词order.paymentorder.payment
#零或多个单词order.payment.doneorder.payment.done

多维度路由架构

场景: 多区域 × 多服务 × 多级别日志系统

Exchange: regional_logs (topic)
    │
    ├── *.error.#     ──> error_queue(所有区域的错误)
    ├── us.*.*         ──> us_queue(美国区域所有日志)
    ├── eu.payment.*   ──> eu_payment_queue(欧洲支付日志)
    ├── #.critical     ──> critical_queue(所有临界日志)
    └── #              ──> archive_queue(全量归档)

消息路由键: "us.payment.error"
→ 匹配 error_queue ✓ (*.error.#)
→ 匹配 us_queue ✓ (us.*.*)
→ 匹配 archive_queue ✓ (#)
→ 不匹配 eu_payment_queue ✗

代码示例

import pika, json

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='regional_logs', exchange_type='topic', durable=True)

# 配置路由规则
routing_rules = {
    'error_queue':     '*.error.#',
    'us_queue':        'us.#',
    'eu_queue':        'eu.#',
    'payment_queue':   '#.payment.*',
    'critical_queue':  '#.critical',
    'archive_queue':   '#',
}

for queue_name, binding_key in routing_rules.items():
    channel.queue_declare(queue=queue_name, durable=True)
    channel.queue_bind(exchange='regional_logs', queue=queue_name, routing_key=binding_key)

# 发送测试日志
logs = [
    ('us.payment.error', 'US 支付系统错误'),
    ('eu.shipping.warning', 'EU 物流警告'),
    ('us.user.critical', 'US 用户系统临界错误'),
    ('eu.payment.info', 'EU 支付系统信息'),
    ('ap.auth.error', '亚太认证系统错误'),
]

for routing_key, message in logs:
    channel.basic_publish(
        exchange='regional_logs',
        routing_key=routing_key,
        body=json.dumps({'level': routing_key.split('.')[2], 'message': message}),
        properties=pika.BasicProperties(delivery_mode=2)
    )
    print(f"[x] {routing_key}: {message}")

connection.close()

8.4 Headers 路由策略

多属性匹配

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='doc_router', exchange_type='headers', durable=True)

# 路由规则 1: PDF 且中文文档
channel.queue_declare(queue='pdf_chinese', durable=True)
channel.queue_bind(
    exchange='doc_router', queue='pdf_chinese',
    arguments={'x-match': 'all', 'format': 'pdf', 'language': 'zh'}
)

# 路由规则 2: 任何高优先级文档
channel.queue_declare(queue='urgent_docs', durable=True)
channel.queue_bind(
    exchange='doc_router', queue='urgent_docs',
    arguments={'x-match': 'any', 'priority': 'high', 'urgent': 'true'}
)

# 路由规则 3: 所有文档(归档)
channel.queue_declare(queue='all_docs', durable=True)
channel.queue_bind(
    exchange='doc_router', queue='all_docs',
    arguments={'x-match': 'any', 'format': 'pdf', 'format': 'docx', 'format': 'xlsx'}
)

# 发布消息
channel.basic_publish(
    exchange='doc_router',
    routing_key='',  # Headers 交换机忽略 routing_key
    body=b'PDF document content',
    properties=pika.BasicProperties(
        delivery_mode=2,
        headers={'format': 'pdf', 'language': 'zh', 'priority': 'high'}
    )
)

8.5 Exchange-to-Exchange 路由

交换机之间可以建立绑定关系,实现更复杂的路由拓扑。

                    ┌──> Fanout: broadcast ──┬──> sms_queue
Producer ──> Topic: │                        ├──> email_queue
             events │                        └──> push_queue
                    └──> Direct: priority ──> urgent_queue
# 声明交换机
channel.exchange_declare(exchange='events', exchange_type='topic', durable=True)
channel.exchange_declare(exchange='broadcast', exchange_type='fanout', durable=True)
channel.exchange_declare(exchange='priority', exchange_type='direct', durable=True)

# Exchange-to-Exchange 绑定
channel.exchange_bind(destination='broadcast', source='events', routing_key='notification.#')
channel.exchange_bind(destination='priority', source='events', routing_key='#.critical')

# 队列绑定
channel.queue_bind(exchange='broadcast', queue='sms_queue')
channel.queue_bind(exchange='broadcast', queue='email_queue')
channel.queue_bind(exchange='priority', queue='urgent_queue', routing_key='critical')

8.6 死信路由

死信路由链

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# === 死信链: 业务队列 → 重试交换机 → 重试队列 → 业务队列 ===

# 1. 业务交换机和队列
channel.exchange_declare(exchange='biz_exchange', exchange_type='direct', durable=True)
channel.queue_declare(
    queue='biz_queue',
    durable=True,
    arguments={
        'x-dead-letter-exchange': 'dlx_exchange',
        'x-dead-letter-routing-key': 'dead',
        'x-message-ttl': 0  # 不设置队列 TTL,由消息级别控制
    }
)
channel.queue_bind(exchange='biz_exchange', queue='biz_queue', routing_key='task')

# 2. 死信交换机和队列(最终死信)
channel.exchange_declare(exchange='dlx_exchange', exchange_type='direct', durable=True)
channel.queue_declare(queue='dead_letter_queue', durable=True)
channel.queue_bind(exchange='dlx_exchange', queue='dead_letter_queue', routing_key='dead')

# 3. 重试交换机和队列(延迟重试)
channel.exchange_declare(exchange='retry_exchange', exchange_type='x-delayed-message',
                         durable=True, arguments={'x-delayed-type': 'direct'})
channel.queue_declare(
    queue='retry_queue',
    durable=True,
    arguments={
        'x-dead-letter-exchange': 'biz_exchange',
        'x-dead-letter-routing-key': 'task'
    }
)
channel.queue_bind(exchange='retry_exchange', queue='retry_queue', routing_key='retry')

死信路由拓扑

Producer ──> biz_exchange ──> biz_queue ──> Consumer
                                      │
                                      ├── NACK (requeue=false)
                                      │        │
                                      │        v
                                      │   dlx_exchange ──> dead_letter_queue
                                      │                      (告警/人工处理)
                                      │
                                      └── 消息 TTL 过期
                                               │
                                               v
                                          dlx_exchange ──> dead_letter_queue

8.7 延迟消息路由

延迟消息实现方案

方案精确度适用场景
Delayed Message Exchange 插件秒级推荐方案
TTL + DLX分钟级简单延迟(精度不高)
Redis ZSET + 定时扫描秒级不想安装插件
数据库定时任务分钟级已有调度系统

方案一:Delayed Message Exchange(推荐)

# 安装插件后使用
channel.exchange_declare(
    exchange='delayed_ex',
    exchange_type='x-delayed-message',
    durable=True,
    arguments={'x-delayed-type': 'direct'}
)

# 发送延迟消息
channel.basic_publish(
    exchange='delayed_ex',
    routing_key='task',
    body=json.dumps({'task': 'auto_cancel_order', 'order_id': '001'}),
    properties=pika.BasicProperties(
        delivery_mode=2,
        headers={'x-delay': 1800000}  # 延迟 30 分钟
    )
)

方案二:TTL + DLX(不依赖插件)

# 创建延迟队列(消息只停留指定 TTL 后进入 DLX)
channel.queue_declare(
    queue='delay_30min',
    durable=True,
    arguments={
        'x-message-ttl': 1800000,         # 30 分钟
        'x-dead-letter-exchange': 'biz_exchange',
        'x-dead-letter-routing-key': 'task',
        'x-max-length': 0                 # 不限制(或设置合理值)
    }
)

# 发布消息到延迟队列
channel.basic_publish(
    exchange='',
    routing_key='delay_30min',
    body=json.dumps({'task': 'auto_cancel', 'order_id': '001'}),
    properties=pika.BasicProperties(delivery_mode=2)
)
# 30 分钟后消息自动进入 biz_exchange,路由到 biz_queue

多级延迟阶梯

# 5 个延迟级别: 1s, 5s, 30s, 5min, 30min
DELAY_QUEUES = {
    'delay_1s':    1000,
    'delay_5s':    5000,
    'delay_30s':   30000,
    'delay_5min':  300000,
    'delay_30min': 1800000,
}

for queue_name, ttl in DELAY_QUEUES.items():
    channel.queue_declare(
        queue=queue_name,
        durable=True,
        arguments={
            'x-message-ttl': ttl,
            'x-dead-letter-exchange': 'biz_exchange',
            'x-dead-letter-routing-key': 'task'
        }
    )

# 发送延迟消息(选择合适的延迟级别)
def send_delayed(message, delay_ms):
    queue_name = 'delay_30min'  # 默认最大延迟
    for name, ttl in sorted(DELAY_QUEUES.items(), key=lambda x: x[1]):
        if delay_ms <= ttl:
            queue_name = name
            break
    
    channel.basic_publish(
        exchange='',
        routing_key=queue_name,
        body=json.dumps(message),
        properties=pika.BasicProperties(delivery_mode=2)
    )

8.8 路由策略组合模式

模式一:事件驱动架构(EDA)

Domain Events Exchange (topic)
    │
    ├── order.#           ──> Order Service Queue
    ├── payment.#         ──> Payment Service Queue
    ├── inventory.#       ──> Inventory Service Queue
    ├── notification.#    ──> Notification Service Queue
    └── #.error           ──> Error Handler Queue

模式二:CQRS 事件分发

Command Exchange (direct)
    │
    ├── create_order  ──> Order Command Queue
    ├── cancel_order  ──> Order Command Queue
    └── refund        ──> Payment Command Queue

Event Exchange (fanout) ──> 所有读模型服务

模式三:多级路由

Producer ──> L1 Exchange (topic)
                  │
                  ├── routing: "us.#" ──> L2 Exchange: us_regional (direct)
                  │                          ├── payment ──> us_payment_q
                  │                          └── shipping ──> us_shipping_q
                  │
                  └── routing: "eu.#" ──> L2 Exchange: eu_regional (direct)
                                             ├── payment ──> eu_payment_q
                                             └── shipping ──> eu_shipping_q

8.9 路由调试

使用管理 API 查看路由

# 查看交换机绑定
curl -u admin:admin123 http://localhost:15672/api/exchanges/%2F/my_exchange/bindings

# 查看队列绑定
curl -u admin:admin123 http://localhost:15672/api/queues/%2F/my_queue/bindings

# 测试消息路由(rabbitmqadmin)
rabbitmqadmin publish exchange=my_exchange routing_key=test.payload payload="test"

路由失败排查

# 使用 mandatory + return 回调检测路由失败
channel.add_on_return_callback(
    lambda ch, method, props, body:
        print(f"[!] 路由失败: exchange={method.exchange}, "
              f"routing_key={method.routing_key}, "
              f"reply={method.reply_text}")
)

# 使用 alternate-exchange 兜底
channel.exchange_declare(
    exchange='main_exchange',
    exchange_type='direct',
    durable=True,
    arguments={'alternate-exchange': 'unroutable_exchange'}
)

8.10 注意事项

⚠️ Topic 匹配的性能考量

# 通配符会导致 Broker 遍历所有绑定规则,绑定数量多时性能下降。尽量使用 * 或精确路由键。

⚠️ 路由键区分大小写

Order.Createorder.create 是不同的路由键。

⚠️ Headers 匹配的额外开销

Headers 交换机需要检查消息头的每个键值对,比 Direct/Topic 性能略低。

⚠️ Dead Letter 循环

确保死信队列本身不配置死信交换机指向同一队列,否则会形成无限循环。

🔥 最佳实践: 使用 Topic 交换机 + 有意义的路由键命名规范,实现灵活的消息路由。


8.11 扩展阅读


下一章: 第 9 章:集群与高可用 — 学习 RabbitMQ 集群架构、镜像队列和仲裁队列。