RabbitMQ 消息队列完全教程 / 第 7 章:消费者开发
第 7 章:消费者开发
消费者负责接收和处理消息。本章将详细讲解消费者的核心机制:确认模式、预取控制、消息拒绝和重试策略。
7.1 消费模式对比
| 模式 | 说明 | 推荐场景 |
|---|---|---|
| Push(推模式) | Broker 主动推送消息给消费者 | 事件驱动、实时处理 |
| Pull(拉模式) | 消费者主动拉取消息 | 批量处理、低优先级任务 |
Push 模式(basic.consume)
# 持续订阅,等待 Broker 推送
def callback(ch, method, properties, body):
print(f"[x] 收到: {body.decode()}")
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_consume(queue='task_queue', on_message_callback=callback)
channel.start_consuming() # 阻塞等待
Pull 模式(basic.get)
# 主动拉取单条消息
method, properties, body = channel.basic_get(queue='task_queue', auto_ack=False)
if method:
print(f"[x] 收到: {body.decode()}")
channel.basic_ack(delivery_tag=method.delivery_tag)
else:
print("[*] 队列为空")
💡 提示: 大多数场景推荐使用 Push 模式,它更高效且支持流控。Pull 模式适合轮询场景。
7.2 确认模式详解
自动确认(Auto Ack)
channel.basic_consume(queue='task_queue', on_message_callback=callback, auto_ack=True)
| 特点 | 说明 |
|---|---|
| 优点 | 性能最高,无额外开销 |
| 缺点 | 消息发出即删除,消费者崩溃会丢消息 |
| 适用 | 允许少量丢失的场景(日志、监控) |
⚠️ 注意: 生产环境不建议使用自动确认,消息可能在处理完成前就被删除。
手动确认(Manual Ack)
def callback(ch, method, properties, body):
try:
process_message(body)
# 确认消息(成功处理)
ch.basic_ack(delivery_tag=method.delivery_tag)
except RetryableException:
# 可重试的异常 - 重新入队
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
except PermanentException:
# 永久性异常 - 不重新入队(进入死信)
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
except Exception:
# 未知异常 - 重新入队
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
channel.basic_consume(queue='task_queue', on_message_callback=callback, auto_ack=False)
确认方法对比
| 方法 | 说明 | 消息去向 |
|---|---|---|
basic_ack | 确认成功处理 | 从队列删除 |
basic_nack(requeue=True) | 拒绝并重新入队 | 回到队列头部 |
basic_nack(requeue=False) | 拒绝不入队 | 进入死信队列(如果配置了 DLX) |
basic_reject | 同 nack,但只能拒绝单条 | 同 nack |
批量确认
# 确认 delivery_tag 及之前的所有消息
ch.basic_ack(delivery_tag=method.delivery_tag, multiple=True)
7.3 QoS / Prefetch(预取)
预取限制消费者同时持有的未确认消息数量,实现负载均衡。
设置预取
# 每次最多预取 10 条未确认消息
channel.basic_qos(prefetch_count=10)
# 基于字节的预取(RabbitMQ 3.x 支持有限)
channel.basic_qos(prefetch_size=0, prefetch_count=10, global_qos=False)
预取对负载均衡的影响
场景: 2 个消费者,prefetch_count=1
Queue: [1][2][3][4][5][6]
│ │
v v
Consumer A Consumer B
(处理[1]) (处理[2])
→ 每个消费者一次只处理一条,处理完才获取下一条
→ 消费者之间严格轮询,适合处理时间相近的场景
场景: 2 个消费者,prefetch_count=5
Queue: [1][2][3][4][5][6]
│ │
v v
Consumer A Consumer B
(处理 1-5) (处理 6)
→ 处理快的消费者会获得更多消息
→ 更好的负载均衡效果
预取值推荐
| 场景 | 推荐 prefetch_count | 说明 |
|---|---|---|
| 处理时间短(<100ms) | 20-50 | 提高吞吐 |
| 处理时间中等(100ms-1s) | 5-10 | 平衡吞吐和延迟 |
| 处理时间长(>1s) | 1-3 | 避免消息长时间被占用 |
| 消费者不稳定 | 1 | 快速故障转移 |
7.4 消费者示例(完整)
Python 完整消费者
import pika
import json
import signal
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class MessageConsumer:
def __init__(self, host, queue, prefetch_count=10):
self.host = host
self.queue = queue
self.prefetch_count = prefetch_count
self.should_stop = False
signal.signal(signal.SIGINT, self._handle_signal)
signal.signal(signal.SIGTERM, self._handle_signal)
def _handle_signal(self, signum, frame):
logger.info("收到停止信号,准备优雅关闭...")
self.should_stop = True
self.channel.stop_consuming()
def _connect(self):
self.connection = pika.BlockingConnection(
pika.ConnectionParameters(
host=self.host,
heartbeat=60,
blocked_connection_timeout=300
)
)
self.channel = self.connection.channel()
self.channel.basic_qos(prefetch_count=self.prefetch_count)
def _process_message(self, ch, method, properties, body):
try:
message = json.loads(body)
logger.info(f"处理消息: {message}")
# 业务逻辑
self.handle_business(message)
# 确认消息
ch.basic_ack(delivery_tag=method.delivery_tag)
logger.info(f"消息处理成功: {properties.message_id}")
except json.JSONDecodeError:
logger.error(f"消息格式错误: {body}")
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
except Exception as e:
logger.error(f"消息处理失败: {e}")
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
def handle_business(self, message):
"""业务处理逻辑(子类重写)"""
raise NotImplementedError
def start(self):
self._connect()
self.channel.basic_consume(
queue=self.queue,
on_message_callback=self._process_message,
auto_ack=False
)
logger.info(f"消费者已启动,队列: {self.queue}")
while not self.should_stop:
try:
self.channel.start_consuming()
except pika.exceptions.AMQPConnectionError:
logger.warning("连接断开,5 秒后重连...")
import time
time.sleep(5)
self._connect()
self.channel.basic_consume(
queue=self.queue,
on_message_callback=self._process_message,
auto_ack=False
)
# 使用
class OrderConsumer(MessageConsumer):
def handle_business(self, message):
order_id = message.get('order_id')
logger.info(f"处理订单: {order_id}")
# ... 业务逻辑
consumer = OrderConsumer('localhost', 'order_queue', prefetch_count=5)
consumer.start()
Java 消费者
import com.rabbitmq.client.*;
public class OrderConsumer {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setUsername("admin");
factory.setPassword("admin123");
factory.setAutomaticRecoveryEnabled(true); // 自动重连
factory.setNetworkRecoveryInterval(5000);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 设置 QoS
channel.basicQos(10);
// 消费消息
DeliverCallback deliverCallback = (tag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
try {
System.out.println("[x] 处理: " + message);
// 业务逻辑...
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
} catch (Exception e) {
System.err.println("[!] 处理失败: " + e.getMessage());
channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true);
}
};
channel.basicConsume("order_queue", false, deliverCallback, tag -> {
System.out.println("[!] 消费被取消: " + tag);
});
System.out.println("[*] 等待消息...");
Thread.currentThread().join(); // 阻塞等待
}
}
7.5 消息拒绝与重试
拒绝策略
def process_with_retry(ch, method, properties, body):
retry_count = 0
if properties.headers:
retry_count = properties.headers.get('x-retry-count', 0)
try:
handle_message(body)
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
if retry_count < 3:
# 重试:发布到延迟重试交换机
headers = dict(properties.headers or {})
headers['x-retry-count'] = retry_count + 1
headers['x-original-queue'] = method.routing_key
ch.basic_publish(
exchange='retry_exchange',
routing_key='retry',
body=body,
properties=pika.BasicProperties(
delivery_mode=2,
headers=headers
)
)
ch.basic_ack(delivery_tag=method.delivery_tag)
else:
# 超过重试次数 - 进入死信
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
指数退避重试
import time
import pika
def retry_with_backoff(ch, method, properties, body):
retry_count = properties.headers.get('x-retry-count', 0) if properties.headers else 0
try:
process(body)
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception:
if retry_count < 5:
# 指数退避延迟
delay = min(1000 * (2 ** retry_count), 30000) # 1s, 2s, 4s, 8s, 16s, 30s(max)
headers = dict(properties.headers or {})
headers['x-retry-count'] = retry_count + 1
ch.basic_publish(
exchange='retry_delayed_exchange',
routing_key='retry',
body=body,
properties=pika.BasicProperties(
delivery_mode=2,
headers={'x-delay': delay, **headers}
)
)
ch.basic_ack(delivery_tag=method.delivery_tag)
else:
# 进入死信队列
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
重试架构图
Producer --> Main Queue --> Consumer
│ │
│ 处理失败
│ │
v v
Retry Exchange (delayed)
│
│ 延迟 N 秒
v
Main Queue (重新入队)
│
│ (retry_count < max)
│
v
Consumer (重试)
│
│ (retry_count >= max)
v
DLX Exchange --> Dead Letter Queue --> 告警/人工处理
7.6 消费者并发
多线程消费者
import pika
import threading
import json
class MultiThreadConsumer:
def __init__(self, host, queue, num_threads=4):
self.host = host
self.queue = queue
self.num_threads = num_threads
def _worker(self, worker_id):
connection = pika.BlockingConnection(
pika.ConnectionParameters(self.host)
)
channel = connection.channel()
channel.basic_qos(prefetch_count=5)
def callback(ch, method, properties, body):
print(f"[Worker-{worker_id}] 处理: {body.decode()}")
# 模拟处理
import time
time.sleep(0.1)
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_consume(
queue=self.queue,
on_message_callback=callback,
auto_ack=False
)
print(f"[Worker-{worker_id}] 已启动")
channel.start_consuming()
def start(self):
threads = []
for i in range(self.num_threads):
t = threading.Thread(target=self._worker, args=(i,))
t.daemon = True
t.start()
threads.append(t)
print(f"已启动 {self.num_threads} 个消费者线程")
for t in threads:
t.join()
# 使用
consumer = MultiThreadConsumer('localhost', 'task_queue', num_threads=4)
consumer.start()
进程级并发
# 使用多个进程消费同一个队列
for i in $(seq 1 4); do
python consumer.py &
done
7.7 消费者优雅关闭
import signal
import pika
class GracefulConsumer:
def __init__(self):
self.connection = None
self.channel = None
self._closing = False
signal.signal(signal.SIGINT, self._signal_handler)
signal.signal(signal.SIGTERM, self._signal_handler)
def _signal_handler(self, signum, frame):
print(f"\n收到信号 {signum},正在优雅关闭...")
self._closing = True
if self.channel:
self.channel.stop_consuming()
def start(self):
self.connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)
self.channel = self.connection.channel()
self.channel.basic_qos(prefetch_count=10)
self.channel.basic_consume(
queue='task_queue',
on_message_callback=self._on_message
)
try:
self.channel.start_consuming()
except Exception:
pass
finally:
if self.connection and self.connection.is_open:
self.connection.close()
print("连接已关闭")
def _on_message(self, ch, method, properties, body):
try:
# 处理消息
process(body)
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
# 处理失败,重新入队
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
7.8 注意事项
⚠️ 不要在回调中执行长时间阻塞操作
长时间阻塞会阻止 Channel 接收新的消息和心跳,导致连接超时。
⚠️ 手动确认时注意异常处理
如果消费者崩溃且未发送 ACK,消息会保持 unacked 状态直到连接超时后重新投递。
⚠️ prefetch_count 不宜过大
过大的预取值会导致消息堆积在消费者内存中,影响故障转移速度。
⚠️ 注意消息重复消费
消息可能被多次投递(网络抖动、消费者超时等),消费者必须实现幂等处理。
🔥 最佳实践: 使用手动确认 + 合理的预取值 + 幂等消费 + 优雅关闭。
7.9 扩展阅读
下一章: 第 8 章:消息路由 — 深入掌握消息路由的各种策略和高级用法。