强曰为道
与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

RabbitMQ 消息队列完全教程 / 第 13 章:消息模式

第 13 章:消息模式

本章汇总 RabbitMQ 中最经典和实用的消息设计模式,帮助你针对不同业务场景选择最合适的模式。


13.1 模式总览

模式核心思想交换机类型典型场景
Work Queue竞争消费,任务分发Default/Direct异步任务处理
Publish/Subscribe广播给所有消费者Fanout事件通知
Routing按规则路由到特定队列Direct日志分级
Topics模式匹配路由Topic多维度分发
RPC请求-响应模式Default远程调用
Priority Queue高优先级消息优先消费DirectVIP 处理
Delayed Message延迟投递消息Delayed超时处理
Competing Consumers多消费者竞争消费Direct水平扩展
Claim Check大消息外置存储Direct大文件处理

13.2 Work Queue(工作队列)

多个消费者竞争消费同一个队列的消息,实现负载均衡。

架构

Producer ──> [Queue] ──> Consumer 1
                    ├──> Consumer 2
                    └──> Consumer 3

Python 实现

# Producer
import pika, json, time

conn = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
ch = conn.channel()
ch.queue_declare(queue='work_queue', durable=True)

for i in range(20):
    task = {'id': i, 'data': f'task_{i}', 'complexity': i % 3}
    ch.basic_publish(
        exchange='', routing_key='work_queue',
        body=json.dumps(task),
        properties=pika.BasicProperties(delivery_mode=2)
    )
    print(f"[x] 发送任务 {i}")

conn.close()
# Consumer
import pika, json, time

conn = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
ch = conn.channel()
ch.queue_declare(queue='work_queue', durable=True)
ch.basic_qos(prefetch_count=1)

def callback(ch, method, properties, body):
    task = json.loads(body)
    print(f"[x] 处理任务 {task['id']}")
    time.sleep(task['complexity'] + 1)  # 模拟耗时
    print(f"[✓] 任务 {task['id']} 完成")
    ch.basic_ack(delivery_tag=method.delivery_tag)

ch.basic_consume(queue='work_queue', on_message_callback=callback)
ch.start_consuming()

📌 业务场景: 后台任务处理(报表生成、数据导出、图片处理)


13.3 Publish/Subscribe(发布订阅)

生产者发布的消息被所有订阅者同时接收。

架构

                        ┌──> Queue A ──> Consumer A
Producer ──> Fanout ──┼──> Queue B ──> Consumer B
Exchange               └──> Queue C ──> Consumer C

实现

# Producer - 事件发布者
import pika, json

conn = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
ch = conn.channel()
ch.exchange_declare(exchange='events', exchange_type='fanout', durable=True)

event = {'type': 'user_registered', 'user_id': 'u001', 'email': 'user@test.com'}
ch.basic_publish(
    exchange='events',
    routing_key='',
    body=json.dumps(event),
    properties=pika.BasicProperties(delivery_mode=2)
)
print("[x] 事件已发布")
conn.close()
# Consumer - 事件订阅者(每个服务一个实例)
import pika, json

conn = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
ch = conn.channel()
ch.exchange_declare(exchange='events', exchange_type='fanout', durable=True)

# 使用排他临时队列
result = ch.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
ch.queue_bind(exchange='events', queue=queue_name)

def callback(ch, method, properties, body):
    event = json.loads(body)
    print(f"[x] 收到事件: {event['type']}")
    # 处理事件...
    ch.basic_ack(delivery_tag=method.delivery_tag)

ch.basic_consume(queue=queue_name, on_message_callback=callback)
ch.start_consuming()

📌 业务场景: 用户注册后同时通知邮件服务、积分服务、统计服务


13.4 Routing(路由模式)

根据路由键将消息分发到不同的队列。

实现

# Producer
import pika, json

conn = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
ch = conn.channel()
ch.exchange_declare(exchange='logs_direct', exchange_type='direct', durable=True)

# 发送不同级别的日志
for level, message in [
    ('error', '数据库连接失败'),
    ('info', '用户登录成功'),
    ('warning', '磁盘空间不足'),
    ('error', '支付网关超时')
]:
    ch.basic_publish(
        exchange='logs_direct',
        routing_key=level,
        body=json.dumps({'level': level, 'message': message}),
        properties=pika.BasicProperties(delivery_mode=2)
    )
conn.close()
# Consumer - 仅接收 error 日志
import pika

conn = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
ch = conn.channel()
ch.exchange_declare(exchange='logs_direct', exchange_type='direct', durable=True)

ch.queue_declare(queue='error_logs', durable=True)
ch.queue_bind(exchange='logs_direct', queue='error_logs', routing_key='error')

def callback(ch, method, properties, body):
    print(f"[ERROR] {body.decode()}")
    ch.basic_ack(delivery_tag=method.delivery_tag)

ch.basic_consume(queue='error_logs', on_message_callback=callback)
ch.start_consuming()

📌 业务场景: 日志分级处理、告警分发


13.5 Topics(主题模式)

使用通配符进行灵活的消息路由。

实现

# Producer
import pika, json

conn = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
ch = conn.channel()
ch.exchange_declare(exchange='sensor_data', exchange_type='topic', durable=True)

# 发送传感器数据
sensors = [
    ('sensor.temperature.room1', {'temp': 25.5, 'room': 'room1'}),
    ('sensor.humidity.room1', {'humidity': 60, 'room': 'room1'}),
    ('sensor.temperature.room2', {'temp': 22.3, 'room': 'room2'}),
    ('sensor.alert.temperature', {'alert': 'high', 'value': 38.2}),
]

for routing_key, data in sensors:
    ch.basic_publish(
        exchange='sensor_data',
        routing_key=routing_key,
        body=json.dumps(data),
        properties=pika.BasicProperties(delivery_mode=2)
    )
conn.close()
# Consumer - 接收所有温度数据
import pika

conn = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
ch = conn.channel()
ch.exchange_declare(exchange='sensor_data', exchange_type='topic', durable=True)

ch.queue_declare(queue='temperature_data', durable=True)
ch.queue_bind(exchange='sensor_data', queue='temperature_data', routing_key='sensor.temperature.*')

ch.queue_declare(queue='alerts', durable=True)
ch.queue_bind(exchange='sensor_data', queue='alerts', routing_key='sensor.alert.#')

📌 业务场景: IoT 传感器数据分发、多维度事件路由


13.6 RPC(远程过程调用)

通过消息队列实现同步请求-响应模式。

架构

Client                              Server
  │                                    │
  │──1. 发送请求 (reply_to, corr_id)──>│
  │                                    │──3. 处理请求
  │<──4. 发送响应 (corr_id)──────────│
  │                                    │
  │──2. 监听回复队列                    │

Server 实现

# RPC Server
import pika, json

conn = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
ch = conn.channel()
ch.queue_declare(queue='rpc_queue', durable=True)
ch.basic_qos(prefetch_count=1)

def fibonacci(n):
    if n <= 1:
        return n
    return fibonacci(n-1) + fibonacci(n-2)

def on_request(ch, method, props, body):
    request = json.loads(body)
    n = request['n']
    print(f"[x] 计算 fibonacci({n})")
    result = fibonacci(n)
    
    response = {'n': n, 'result': result}
    ch.basic_publish(
        exchange='',
        routing_key=props.reply_to,
        properties=pika.BasicProperties(
            correlation_id=props.correlation_id,
            content_type='application/json'
        ),
        body=json.dumps(response)
    )
    ch.basic_ack(delivery_tag=method.delivery_tag)
    print(f"[✓] fibonacci({n}) = {result}")

ch.basic_consume(queue='rpc_queue', on_message_callback=on_request)
print("[*] RPC Server 等待请求...")
ch.start_consuming()

Client 实现

# RPC Client
import pika, json, uuid

class RpcClient:
    def __init__(self):
        self.conn = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
        self.ch = self.conn.channel()
        
        # 声明回调队列
        result = self.ch.queue_declare(queue='', exclusive=True)
        self.callback_queue = result.method.queue
        self.ch.basic_consume(
            queue=self.callback_queue,
            on_message_callback=self._on_response,
            auto_ack=True
        )
        
        self.response = None
        self.corr_id = None
    
    def _on_response(self, ch, method, props, body):
        if self.corr_id == props.correlation_id:
            self.response = json.loads(body)
    
    def call(self, n):
        self.response = None
        self.corr_id = str(uuid.uuid4())
        
        self.ch.basic_publish(
            exchange='',
            routing_key='rpc_queue',
            properties=pika.BasicProperties(
                reply_to=self.callback_queue,
                correlation_id=self.corr_id,
                content_type='application/json'
            ),
            body=json.dumps({'n': n})
        )
        
        while self.response is None:
            self.conn.process_data_events()
        
        return self.response

# 使用
client = RpcClient()
for n in [10, 20, 30]:
    response = client.call(n)
    print(f"[✓] fibonacci({n}) = {response['result']}")

📌 业务场景: 需要同步结果的远程计算、查询


13.7 Delayed Message(延迟消息模式)

延迟消息 + DLX 实现

import pika, json

conn = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
ch = conn.channel()

# 业务交换机和队列
ch.exchange_declare(exchange='order_events', exchange_type='direct', durable=True)
ch.queue_declare(queue='order_timeout', durable=True,
    arguments={
        'x-dead-letter-exchange': 'order_events',
        'x-dead-letter-routing-key': 'timeout_process'
    })
ch.queue_bind(exchange='order_events', queue='order_timeout', routing_key='timeout_process')

# 延迟队列(消息在 TTL 过期后进入 DLX)
ch.queue_declare(queue='delay_30min', durable=True,
    arguments={
        'x-message-ttl': 1800000,  # 30 分钟
        'x-dead-letter-exchange': 'order_events',
        'x-dead-letter-routing-key': 'timeout_process',
        'x-max-length': 100000
    })

# 创建订单时发送延迟消息
order = {'order_id': 'ORD001', 'user_id': 'U1001', 'status': 'pending'}
ch.basic_publish(
    exchange='', routing_key='delay_30min',
    body=json.dumps(order),
    properties=pika.BasicProperties(delivery_mode=2)
)
print("[x] 订单超时消息已入队,30 分钟后处理")
conn.close()

📌 业务场景: 订单超时取消、定时提醒、延迟重试


13.8 Claim Check(消息存证模式)

大消息不直接发送,而是将内容存到外部存储,消息中只传递引用。

实现

import pika, json, hashlib, redis

redis_client = redis.Redis(host='localhost', port=6379, db=0)

def send_claim_check(channel, exchange, routing_key, large_data):
    """发送 Claim Check 消息"""
    # 1. 将大数据存入外部存储
    check_id = hashlib.sha256(large_data.encode()).hexdigest()[:16]
    redis_client.setex(f'claim:{check_id}', 3600, large_data)  # 1 小时过期
    
    # 2. 发送引用消息
    claim_message = {
        'check_id': check_id,
        'content_type': 'application/json',
        'size': len(large_data)
    }
    channel.basic_publish(
        exchange=exchange,
        routing_key=routing_key,
        body=json.dumps(claim_message),
        properties=pika.BasicProperties(delivery_mode=2)
    )

def receive_claim_check(body):
    """接收并还原 Claim Check 消息"""
    claim = json.loads(body)
    check_id = claim['check_id']
    
    # 从外部存储获取原始数据
    original_data = redis_client.get(f'claim:{check_id}')
    if original_data:
        return json.loads(original_data)
    else:
        raise Exception(f"Claim data expired or not found: {check_id}")

📌 业务场景: 大文件传输、日志批量处理


13.9 模式选择决策

需要同步响应?
├── 是 ──> RPC 模式
└── 否 ──> 消息需要广播?
            ├── 是 ──> Publish/Subscribe (Fanout)
            └── 否 ──> 需要延迟?
                       ├── 是 ──> Delayed Message
                       └── 否 ──> 需要优先级?
                                  ├── 是 ──> Priority Queue
                                  └── 否 ──> 需要复杂路由?
                                             ├── 是 ──> Topic / Headers
                                             └── 否 ──> Work Queue (Direct)

13.10 注意事项

⚠️ RPC 模式的超时处理

客户端需要设置超时机制,避免永久等待响应。

⚠️ Fanout 不过滤消息

所有绑定队列都会收到完整消息,注意消息大小对网络的影响。

⚠️ 延迟消息不保证精确

TTL 到期后的投递可能有延迟,不适合精确定时场景。

⚠️ Claim Check 的存储管理

外部存储中的引用数据需要定期清理,防止数据泄漏。

🔥 最佳实践: 优先选择异步模式(Work Queue / Pub-Sub),仅在必要时使用 RPC。


13.11 扩展阅读


下一章: 第 14 章:Docker 与 Kubernetes — 生产级容器化部署方案。