Redis 传输协议精讲 / 06 - 发布订阅协议
发布订阅协议
6.1 Pub/Sub 概述
Redis Pub/Sub 是一种消息通信模式:发布者(Publisher)向频道(Channel)发送消息,订阅者(Subscriber)接收该频道的所有消息。
┌───────────┐ PUBLISH channel msg ┌──────────┐
│ Publisher │ ──────────────────────→ │ Redis │
└───────────┘ │ Server │
└────┬─────┘
│
┌───────────────┼───────────────┐
↓ ↓ ↓
┌──────────┐ ┌──────────┐ ┌──────────┐
│ Sub 1 │ │ Sub 2 │ │ Sub 3 │
└──────────┘ └──────────┘ └──────────┘
核心特征
| 特性 | 说明 |
|---|---|
| 模式 | 发布-订阅(Fire and Forget) |
| 持久化 | ❌ 消息不持久化,离线订阅者会丢失消息 |
| 确认机制 | ❌ 无 ACK |
| 消息路由 | 频道匹配(精确 + 模式) |
| 协议变化 | 进入订阅模式后,连接行为改变 |
6.2 SUBSCRIBE 命令
命令格式
SUBSCRIBE channel [channel ...]
RESP 编码
*3\r\n
$9\r\n
SUBSCRIBE\r\n
$7\r\n
channel\r\n
$11\r\n
channel-two\r\n
响应格式
订阅成功后,服务器返回 3 元素数组:
*3\r\n
$9\r\n
subscribe\r\n
$7\r\n
channel\r\n
:1\r\n ← 当前订阅数
结构化表示:
[
"subscribe", ← 消息类型
"channel", ← 频道名
1 ← 当前连接的订阅数
]
多频道订阅
$ telnet 127.0.0.1 6379
*3\r\n$9\r\nSUBSCRIBE\r\n$4\r\nnews\r\n$5\r\ncache\r\n
# 响应(每个频道一个确认)
*3\r\n$9\r\nsubscribe\r\n$4\r\nnews\r\n:1\r\n
*3\r\n$9\r\nsubscribe\r\n$5\r\ncache\r\n:2\r\n
6.3 订阅模式下的连接行为
重要规则
进入订阅模式后,连接的行为发生根本改变:
| 行为 | 普通模式 | 订阅模式 |
|---|---|---|
| 执行命令 | ✅ 允许 | ❌ 只允许 SUBSCRIBE/UNSUBSCRIBE/PSUBSCRIBE/PUNSUBSCRIBE/PING/QUIT |
| 响应格式 | 命令的响应 | 订阅确认 / 消息 / 退订确认 |
| 主动推送 | ❌ | ✅ 服务器推送消息 |
| Pipeline | ✅ | ⚠️ 有限制 |
订阅模式中允许的命令
| 命令 | 说明 |
|---|---|
SUBSCRIBE | 订阅新频道 |
UNSUBSCRIBE | 退订频道 |
PSUBSCRIBE | 模式订阅 |
PUNSUBSCRIBE | 退订模式 |
PING | 心跳检测(返回正常 PONG) |
QUIT | 关闭连接 |
RESET | Redis 7.0+,重置连接状态 |
非法命令示例
# 在订阅模式中执行 GET
*2\r\n$3\r\nGET\r\n$3\r\nkey\r\n
-ERR only (P)SUBSCRIBE / (P)UNSUBSCRIBE / PING / QUIT / RESET are allowed in this context\r\n
6.4 PUBLISH 命令
命令格式
PUBLISH channel message
响应
PUBLISH 返回接收到消息的订阅者数量(整数):
→ PUBLISH news "hello"
← :3 ← 3 个订阅者收到了消息
PUBLISH 的协议细节
import socket
s = socket.create_connection(("127.0.0.1", 6379))
# PUBLISH 本身是一个普通命令
cmd = "*3\r\n$7\r\nPUBLISH\r\n$4\r\nnews\r\n$5\r\nhello\r\n"
s.sendall(cmd.encode())
resp = s.recv(1024)
print(resp) # b':0\r\n'(没有订阅者时返回 0)
6.5 消息接收格式
当频道有新消息时,服务器向所有订阅者推送:
精确订阅的消息格式
*3\r\n
$7\r\n
message\r\n
$7\r\n
channel\r\n
$5\r\n
hello\r\n
结构化表示:
[
"message", ← 固定标识
"channel", ← 频道名
"hello" ← 消息内容
]
消息内容的二进制安全
消息内容使用 Bulk String 编码,支持任意二进制数据:
*3\r\n
$7\r\n
message\r\n
$4\r\n
logs\r\n
$11\r\n
\x00\x01\x02hello\r\n
6.6 PSUBSCRIBE 模式订阅
命令格式
PSUBSCRIBE pattern [pattern ...]
模式语法
| 模式 | 匹配 | 示例 |
|---|---|---|
* | 所有频道 | PSUBSCRIBE * |
news.* | news. 开头的频道 | 匹配 news.sport、news.tech |
news.[ie]* | news.i 或 news.e 开头 | 匹配 news.intl、news.eu |
\\* | 字面量 * | 匹配名为 * 的频道 |
响应格式
→ PSUBSCRIBE news.*
← *3
← $10
← psubscribe
← $7
← news.*
← :1
模式消息格式
通过模式订阅收到的消息格式略有不同(4 元素数组):
*4\r\n
$8\r\n
pmessage\r\n
$7\r\n
news.*\r\n ← 匹配的模式
$9\r\n
news.sport\r\n ← 实际频道名
$5\r\n
match!\r\n ← 消息内容
结构化表示:
[
"pmessage", ← 固定标识(注意是 pmessage 不是 message)
"news.*", ← 匹配的模式
"news.sport", ← 实际频道名
"match!" ← 消息内容
]
精确 vs 模式订阅对比
| 特性 | SUBSCRIBE | PSUBSCRIBE |
|---|---|---|
| 匹配方式 | 精确匹配 | glob 模式匹配 |
| 消息格式 | 3 元素数组 | 4 元素数组 |
| 消息类型 | message | pmessage |
| 性能 | 更高 | 较低(需要模式匹配) |
| 多模式匹配 | N/A | 一个频道可能匹配多个模式,收到多条消息 |
重复消息问题
如果客户端同时通过 SUBSCRIBE 和 PSUBSCRIBE 订阅了同一个频道,或者一个频道匹配了多个模式,可能会收到重复消息:
SUBSCRIBE news
PSUBSCRIBE news
PSUBSCRIBE n*
# 发布 PUBLISH news "hello"
# 客户端收到 3 条消息:
# 1. [message, news, hello] ← 来自 SUBSCRIBE
# 2. [pmessage, news, news, hello] ← 来自 PSUBSCRIBE news
# 3. [pmessage, n*, news, hello] ← 来自 PSUBSCRIBE n*
客户端需要自行去重或避免重叠订阅。
6.7 UNSUBSCRIBE 与 PUNSUBSCRIBE
UNSUBSCRIBE
# 退订特定频道
→ UNSUBSCRIBE news
← *3
← $11
← unsubscribe
← $4
← news
← :0 ← 剩余订阅数
# 退订所有频道(无参数)
→ UNSUBSCRIBE
← *3
← $11
← unsubscribe
← $-1 ← NULL(频道名)
← :0
PUNSUBSCRIBE
# 退订特定模式
→ PUNSUBSCRIBE news.*
← *3
← $12
← punsubscribe
← $7
← news.*
← :0
6.8 Pub/Sub 与连接池
Pub/Sub 对连接池有特殊影响:
问题
大多数连接池假设连接是"通用"的——任何命令都可以在任何连接上执行。但 Pub/Sub 连接处于特殊模式,只能执行有限的命令。
解决方案
import redis
class PubSubManager:
def __init__(self, host="127.0.0.1", port=6379):
# 独立的连接,不使用连接池
self.redis = redis.Redis(host=host, port=port)
self.pubsub = self.redis.pubsub()
def subscribe(self, channel, callback):
"""订阅频道并注册回调"""
self.pubsub.subscribe(**{channel: callback})
# 启动独立线程监听消息
thread = self.pubsub.run_in_thread(sleep_time=0.001)
return thread
def close(self):
self.pubsub.close()
self.redis.close()
# 使用
manager = PubSubManager()
def handle_message(message):
if message["type"] == "message":
print(f"Received: {message['data']}")
thread = manager.subscribe("news", handle_message)
6.9 完整 Pub/Sub 客户端实现
import socket
import threading
from typing import Callable, Dict
class SimplePubSub:
def __init__(self, host="127.0.0.1", port=6379):
self.sock = socket.create_connection((host, port))
self.buffer = b""
self.handlers: Dict[str, Callable] = {}
self.pattern_handlers: Dict[str, Callable] = {}
self.running = False
self._lock = threading.Lock()
def _read_line(self) -> bytes:
while b"\r\n" not in self.buffer:
chunk = self.sock.recv(4096)
if not chunk:
raise ConnectionError("Connection closed")
self.buffer += chunk
pos = self.buffer.index(b"\r\n")
line = self.buffer[:pos]
self.buffer = self.buffer[pos + 2:]
return line
def _read_bulk_string(self) -> bytes | None:
line = self._read_line()
length = int(line[1:])
if length == -1:
return None
while len(self.buffer) < length + 2:
chunk = self.sock.recv(4096)
if not chunk:
raise ConnectionError("Connection closed")
self.buffer += chunk
data = self.buffer[:length]
self.buffer = self.buffer[length + 2:]
return data
def _parse_response(self):
line = self._read_line()
prefix = chr(line[0])
if prefix == ":":
return int(line[1:])
elif prefix == "$":
length = int(line[1:])
if length == -1:
return None
while len(self.buffer) < length + 2:
chunk = self.sock.recv(4096)
self.buffer += chunk
data = self.buffer[:length]
self.buffer = self.buffer[length + 2:]
return data
elif prefix == "*":
count = int(line[1:])
return [self._parse_response() for _ in range(count)]
else:
return line
def subscribe(self, *channels: str):
"""订阅频道"""
args = ["SUBSCRIBE"] + list(channels)
parts = [f"*{len(args)}\r\n".encode()]
for arg in args:
arg_bytes = arg.encode()
parts.append(f"${len(arg_bytes)}\r\n".encode())
parts.append(arg_bytes)
parts.append(b"\r\n")
self.sock.sendall(b"".join(parts))
def on(self, channel: str, callback: Callable):
"""注册消息处理器"""
self.handlers[channel] = callback
def listen(self):
"""监听消息(阻塞)"""
self.running = True
while self.running:
msg = self._parse_response()
if isinstance(msg, list) and len(msg) >= 3:
msg_type = msg[0].decode() if isinstance(msg[0], bytes) else msg[0]
if msg_type == "message":
channel = msg[1].decode()
data = msg[2].decode() if isinstance(msg[2], bytes) else msg[2]
if channel in self.handlers:
self.handlers[channel](channel, data)
elif msg_type == "pmessage":
pattern = msg[1].decode()
channel = msg[2].decode()
data = msg[3].decode() if isinstance(msg[3], bytes) else msg[3]
if pattern in self.pattern_handlers:
self.pattern_handlers[pattern](pattern, channel, data)
def close(self):
self.running = False
self.sock.close()
# 使用示例
def on_news(channel, message):
print(f"[{channel}] {message}")
ps = SimplePubSub()
ps.on("news", on_news)
ps.subscribe("news", "alerts")
ps.listen()
6.10 Pub/Sub 的局限性
| 局限 | 说明 | 替代方案 |
|---|---|---|
| 消息不持久化 | 订阅者离线时消息丢失 | Redis Streams |
| 无消费者组 | 不能负载均衡消费 | Redis Streams + XREADGROUP |
| 无确认机制 | 不知道消息是否被处理 | Redis Streams + XACK |
| 消息堆积 | 发布速度 > 消费速度时,输出缓冲区可能溢出 | Redis Streams |
| 跨节点 | 集群中 PUBLISH 会广播到所有节点 | 限制使用场景 |
Redis Streams vs Pub/Sub
| 特性 | Pub/Sub | Streams |
|---|---|---|
| 消息持久化 | ❌ | ✅ |
| 消费者组 | ❌ | ✅ |
| 消息确认 | ❌ | ✅ |
| 历史回放 | ❌ | ✅ |
| 性能 | 更高 | 略低 |
| 适用场景 | 实时通知、事件广播 | 任务队列、事件溯源 |
6.11 注意事项
⚠️ 输出缓冲区 订阅连接的输出缓冲区有特殊限制:
client-output-buffer-limit pubsub 32mb 8mb 60。如果消费速度跟不上发布速度,连接会被断开。
⚠️ 集群中的 Pub/Sub 在 Redis 集群中,PUBLISH 命令会广播到所有节点。这意味着订阅者可以在任意节点上接收消息,但会增加集群的网络开销。
⚠️ 连接断开后重连 Pub/Sub 连接断开后,订阅关系会丢失。客户端需要实现重连和重新订阅逻辑。
def reconnect_and_resubscribe(pubsub, channels):
"""重连并重新订阅"""
while True:
try:
pubsub.subscribe(*channels)
pubsub.listen()
except ConnectionError:
time.sleep(1)
# 重新连接并订阅
pubsub.reconnect()
6.12 扩展阅读
| 资源 | 说明 |
|---|---|
| Redis Pub/Sub 文档 | 官方文档 |
| Redis Streams 文档 | Pub/Sub 的替代方案 |
| SSUBSCRIBE | Redis 7.0+ Shard Pub/Sub |
上一章:Pipeline 管道机制 | 下一章:事务协议