强曰为道
与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

RabbitMQ 消息队列完全教程 / 第 4 章:交换机详解

第 4 章:交换机详解

交换机(Exchange)是 RabbitMQ 消息路由的核心组件。本章将逐一讲解每种交换机类型的工作原理、使用场景和最佳实践。


4.1 交换机类型全景

类型路由策略典型场景复杂度
Default路由键匹配队列名简单点对点
Direct路由键精确匹配任务分发、精确路由★★
Fanout广播所有绑定队列通知广播、事件分发
Topic路由键模式匹配日志分类、多维路由★★★
Headers消息头属性匹配复杂条件路由★★★★
Dead Letter无法消费的消息转发消息重试、异常处理★★★
Delayed延迟投递消息定时任务、超时处理★★★

4.2 Default Exchange

默认交换机是 RabbitMQ 预先声明的、名称为空字符串 "" 的 Direct 类型交换机。

工作原理

Producer ──publish──> Exchange "" (direct)
                        │
                   routing_key = "my_queue"
                        │
                        └──> Queue "my_queue"(自动绑定同名队列)

使用示例

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 声明队列(自动绑定到默认交换机,路由键 = 队列名)
channel.queue_declare(queue='task_queue', durable=True)

# 使用默认交换机(exchange=''),路由键为队列名
channel.basic_publish(
    exchange='',                    # 默认交换机
    routing_key='task_queue',       # 路由键 = 队列名
    body='Hello World!',
    properties=pika.BasicProperties(delivery_mode=2)
)

connection.close()

💡 提示: Default Exchange 适合简单的点对点场景,生产环境建议使用具名交换机以便于管理和监控。


4.3 Direct Exchange(直连交换机)

Direct Exchange 将消息路由到 Binding Key 与消息 Routing Key 完全匹配的队列。

路由规则

消息 Routing Key == 队列 Binding Key → 路由成功

架构图

                    ┌── (binding_key: "payment") ──> Queue: payment
Producer ──> Direct ┼── (binding_key: "shipping") ──> Queue: shipping
Exchange            └── (binding_key: "email")    ──> Queue: email

完整代码示例

import pika
import json

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 声明 Direct 交换机
channel.exchange_declare(exchange='order_direct', exchange_type='direct', durable=True)

# 声明多个队列
for queue_name in ['payment_queue', 'shipping_queue', 'email_queue']:
    channel.queue_declare(queue=queue_name, durable=True)

# 绑定队列到交换机
channel.queue_bind(exchange='order_direct', queue='payment_queue', routing_key='payment')
channel.queue_bind(exchange='order_direct', queue='shipping_queue', routing_key='shipping')
channel.queue_bind(exchange='order_direct', queue='email_queue', routing_key='email')

# 发布消息 - 路由到 payment_queue
channel.basic_publish(
    exchange='order_direct',
    routing_key='payment',
    body=json.dumps({'order_id': '001', 'amount': 299.9}),
    properties=pika.BasicProperties(delivery_mode=2)
)

print("[x] 支付消息已发送")
connection.close()

多个队列绑定同一路由键

Producer ──> Direct Exchange
                │
           routing_key = "notification"
                │
                ├──> Queue: sms_queue
                └──> Queue: email_queue

当多个队列使用相同的 Binding Key 绑定时,消息会同时路由到所有匹配的队列,实现类似广播的效果。


4.4 Fanout Exchange(扇出交换机)

Fanout Exchange 将消息广播到所有绑定的队列,忽略 Routing Key

路由规则

所有绑定队列均收到消息(无论 Routing Key 是什么)

架构图

                        ┌──> Queue: sms_queue
Producer ──> Fanout  ──┼──> Queue: email_queue
Exchange               ├──> Queue: push_queue
                       └──> Queue: wechat_queue

代码示例:事件广播系统

import pika
import json

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 声明 Fanout 交换机
channel.exchange_declare(exchange='user_events', exchange_type='fanout', durable=True)

# 声明各服务的队列
channels = ['sms_queue', 'email_queue', 'push_queue']
for q in channels:
    channel.queue_declare(queue=q, durable=True)
    channel.queue_bind(exchange='user_events', queue=q)

# 发布用户注册事件
event = {
    'event': 'user_registered',
    'user_id': 'u_1001',
    'username': 'zhangsan',
    'email': 'zhangsan@example.com',
    'phone': '13800138000'
}

channel.basic_publish(
    exchange='user_events',
    routing_key='',        # Fanout 忽略 routing_key
    body=json.dumps(event),
    properties=pika.BasicProperties(delivery_mode=2)
)

print("[x] 用户注册事件已广播")
connection.close()

Fanout + 临时队列(排他队列)

# 消费者使用临时队列接收广播
result = channel.queue_declare(queue='', exclusive=True)  # 匿名排他队列
queue_name = result.method.queue

channel.queue_bind(exchange='user_events', queue=queue_name)

def callback(ch, method, properties, body):
    print(f"[x] 收到事件: {body.decode()}")

channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()

4.5 Topic Exchange(主题交换机)

Topic Exchange 使用通配符模式匹配路由消息,是最灵活的交换机类型。

匹配规则

通配符说明示例
*匹配恰好一个单词order.* 匹配 order.payment,不匹配 order.payment.success
#匹配零个或多个单词order.# 匹配 orderorder.paymentorder.payment.success

词边界规则

路由键以 . 分隔为单词:order.payment.success → 三个单词:orderpaymentsuccess

匹配示例

绑定键 (Binding Key)匹配的路由键不匹配的路由键
order.*order.payment, order.shippingorder.payment.success
*.paymentorder.payment, subscription.paymentorder.payment.success
order.#order, order.payment, order.payment.successuser.order
#任意路由键
order.*.successorder.payment.successorder.success
#.error.#error, order.error, order.error.fatal, payment.error.timeoutorder.errors

架构图

Producer ──> Topic Exchange
                │
                ├── (binding: "order.payment.#") ──> Queue: payment_logs
                ├── (binding: "order.shipping.#") ──> Queue: shipping_logs
                ├── (binding: "#.error.#")        ──> Queue: error_logs
                └── (binding: "#")                 ──> Queue: all_logs

消息路由键: "order.payment.error"
→ 匹配 payment_logs (order.payment.#) ✓
→ 匹配 error_logs (#.error.#) ✓
→ 匹配 all_logs (#) ✓
→ 不匹配 shipping_logs ✗

完整代码示例:日志分级系统

import pika
import json
from datetime import datetime

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 声明 Topic 交换机
channel.exchange_declare(exchange='app_logs', exchange_type='topic', durable=True)

# 声明队列
queues_config = {
    'error_queue':     '#.ERROR.#',       # 所有错误日志
    'warning_queue':   '#.WARNING.#',     # 所有警告日志
    'payment_queue':   'payment.#',       # 支付相关日志
    'critical_queue':  '*.CRITICAL.#',    # 临界错误
    'all_queue':       '#',               # 所有日志
}

for queue_name, binding_key in queues_config.items():
    channel.queue_declare(queue=queue_name, durable=True)
    channel.queue_bind(exchange='app_logs', queue=queue_name, routing_key=binding_key)

# 发送日志
logs = [
    ('payment.INFO', '用户支付成功'),
    ('payment.ERROR', '支付网关超时'),
    ('order.WARNING', '库存不足'),
    ('system.CRITICAL', '数据库连接失败'),
    ('user.INFO', '用户登录'),
]

for routing_key, message in logs:
    log_entry = {
        'level': routing_key.split('.')[1],
        'service': routing_key.split('.')[0],
        'message': message,
        'timestamp': datetime.now().isoformat()
    }
    channel.basic_publish(
        exchange='app_logs',
        routing_key=routing_key,
        body=json.dumps(log_entry),
        properties=pika.BasicProperties(delivery_mode=2)
    )
    print(f"[x] {routing_key}: {message}")

connection.close()

4.6 Headers Exchange(头部交换机)

Headers Exchange 根据消息的 Headers 属性(而非 Routing Key)进行路由匹配。

匹配模式

模式说明
x-match=all所有 header 键值对都匹配(AND)
x-match=any任意一个 header 键值对匹配(OR)

代码示例

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 声明 Headers 交换机
channel.exchange_declare(exchange='doc_headers', exchange_type='headers', durable=True)

# 声明队列
channel.queue_declare(queue='pdf_queue', durable=True)
channel.queue_declare(queue='urgent_queue', durable=True)

# 绑定 - 匹配 format=pdf 的文档
channel.queue_bind(
    exchange='doc_headers', queue='pdf_queue',
    arguments={'x-match': 'all', 'format': 'pdf'}
)

# 绑定 - 匹配 priority=high 或 format=pdf
channel.queue_bind(
    exchange='doc_headers', queue='urgent_queue',
    arguments={'x-match': 'any', 'priority': 'high', 'format': 'pdf'}
)

# 发布消息(通过 headers 路由)
channel.basic_publish(
    exchange='doc_headers',
    routing_key='',  # Headers 交换机忽略 routing_key
    body=b'PDF document content',
    properties=pika.BasicProperties(
        delivery_mode=2,
        headers={'format': 'pdf', 'priority': 'high', 'language': 'zh'}
    )
)

print("[x] 文档消息已发送")
connection.close()

Headers 匹配行为

消息 Headersx-match=all, format=pdfx-match=any, priority=high, format=pdf
{format: pdf}✅ 匹配✅ 匹配
{format: pdf, lang: zh}✅ 匹配✅ 匹配
{format: docx}❌ 不匹配❌ 不匹配
{priority: high}❌ 不匹配✅ 匹配
{priority: high, format: docx}❌ 不匹配✅ 匹配

4.7 Dead Letter Exchange(死信交换机)

死信(Dead Letter)是无法被正常消费的消息。死信交换机负责接收这些消息并路由到死信队列。

消息成为死信的条件

条件说明
消费者拒绝消息basic.rejectbasic.nackrequeue=false
消息 TTL 过期消息或队列设置的 TTL 到期
队列已满超出 x-max-lengthx-max-length-bytes
消息被拒绝且无重新入队ACK 模式下的 NACK

配置死信交换机

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 1. 声明死信交换机和死信队列
channel.exchange_declare(exchange='dlx_exchange', exchange_type='direct', durable=True)
channel.queue_declare(queue='dead_letter_queue', durable=True)
channel.queue_bind(exchange='dlx_exchange', queue='dead_letter_queue', routing_key='dead')

# 2. 声明业务队列(配置死信交换机)
channel.queue_declare(
    queue='business_queue',
    durable=True,
    arguments={
        'x-dead-letter-exchange': 'dlx_exchange',
        'x-dead-letter-routing-key': 'dead',
        'x-message-ttl': 30000,         # 消息 30 秒未消费成为死信
        'x-max-length': 1000            # 超出 1000 条成为死信
    }
)

channel.queue_bind(exchange='business_exchange', queue='business_queue', routing_key='task')

死信处理模式

# 死信队列消费者 - 记录/告警/重试
def dead_letter_handler(ch, method, properties, body):
    print(f"[DLX] 收到死信: {body.decode()}")
    print(f"  原交换机: {properties.headers.get('x-first-death-exchange', 'N/A')}")
    print(f"  原队列: {properties.headers.get('x-first-death-queue', 'N/A')}")
    print(f"  死因: {properties.headers.get('x-first-death-reason', 'N/A')}")
    
    # 记录到数据库 / 发送告警 / 尝试重试
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_consume(queue='dead_letter_queue', on_message_callback=dead_letter_handler)

死信消息的 Headers

Header说明
x-first-death-reason第一次成为死信的原因
x-first-death-queue第一次成为死信的队列名
x-first-death-exchange第一次成为死信的交换机名
x-death死信历史记录(数组)

4.8 Delayed Message Exchange(延迟交换机)

延迟交换机通过 rabbitmq_delayed_message_exchange 插件实现消息的定时投递。

安装插件

# 下载插件(版本需匹配 RabbitMQ 版本)
wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/v3.13.0/rabbitmq_delayed_message_exchange-3.13.0.ez
cp rabbitmq_delayed_message_exchange-3.13.0.ez /opt/rabbitmq/plugins/
rabbitmq-plugins enable rabbitmq_delayed_message_exchange

使用延迟交换机

import pika
import json
import time

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 声明延迟交换机(type=x-delayed-message)
args = {'x-delayed-type': 'direct'}  # 内部路由类型
channel.exchange_declare(
    exchange='delayed_exchange',
    exchange_type='x-delayed-message',
    durable=True,
    arguments=args
)

channel.queue_declare(queue='delayed_queue', durable=True)
channel.queue_bind(exchange='delayed_exchange', queue='delayed_queue', routing_key='delay')

# 发送延迟消息(延迟 5 秒)
channel.basic_publish(
    exchange='delayed_exchange',
    routing_key='delay',
    body=json.dumps({
        'task': 'send_reminder',
        'user_id': 'u_1001',
        'message': '您的订单即将超时,请尽快支付'
    }),
    properties=pika.BasicProperties(
        delivery_mode=2,
        headers={'x-delay': 5000}  # 延迟 5000 毫秒
    )
)

print(f"[x] 延迟消息已发送,将在 5 秒后投递")
connection.close()

延迟交换机适用场景

📌 业务场景:

场景延迟时间说明
订单超时取消30 分钟下单后 30 分钟未支付自动取消
消息重试指数退避失败后延迟重试
定时提醒自定义预约提醒、到期提醒
延迟关闭连接5 分钟用户离开后延迟处理

4.9 交换机高级特性

Alternate Exchange(备用交换机)

当消息无法路由到任何队列时,转发到备用交换机。

# 声明备用交换机
channel.exchange_declare(exchange='fallback_exchange', exchange_type='fanout', durable=True)
channel.queue_declare(queue='unroutable_queue', durable=True)
channel.queue_bind(exchange='fallback_exchange', queue='unroutable_queue')

# 声明主交换机,指定备用交换机
channel.exchange_declare(
    exchange='main_exchange',
    exchange_type='direct',
    durable=True,
    arguments={'alternate-exchange': 'fallback_exchange'}
)

Internal Exchange(内部交换机)

内部交换机不能被生产者直接发布消息,只能通过其他交换机绑定转发。

channel.exchange_declare(
    exchange='internal_router',
    exchange_type='topic',
    durable=True,
    internal=True  # 标记为内部交换机
)

4.10 交换机选择决策表

需求推荐交换机类型理由
简单点对点Default / Direct最简单直接
任务分发到不同工作队列Direct精确路由
事件广播给所有订阅者Fanout零配置广播
按规则分发日志/事件Topic灵活的通配符匹配
基于消息属性的复杂路由Headers支持多条件匹配
消息延迟投递Delayed (插件)原生延迟支持
处理无法路由的消息Alternate Exchange兜底路由
处理无法消费的消息Dead Letter异常消息处理

4.11 注意事项

⚠️ 不要使用匿名交换机(空字符串)进行生产发布

匿名交换机仅适合简单场景,生产环境应使用具名交换机以便于监控和管理。

⚠️ Fanout 交换机会忽略 Routing Key

即使设置了 Routing Key,Fanout 也会忽略它。

⚠️ Topic 匹配性能

# 通配符在路由键很长时可能导致性能下降,避免过度使用。

⚠️ 延迟交换机不保证精确延迟

延迟消息的实际投递时间可能略有偏差,不适合对精度要求极高的场景。

⚠️ 死信循环

如果死信交换机指向的队列又配置了同一个死信交换机,会形成死信循环。需确保死信队列不配置死信转发。

🔥 最佳实践: 为每个业务域使用独立的交换机,配合有意义的命名约定(如 order.eventspayment.commands)。


4.12 扩展阅读


下一章: 第 5 章:队列详解 — 深入理解队列类型、持久化策略、仲裁队列等核心概念。