强曰为道
与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

Redis 完全指南 / 08 - 发布订阅

发布订阅

8.1 Pub/Sub 概述

Redis 的 Pub/Sub(发布/订阅)是一种消息通信模式:发送者(Publisher)发送消息,订阅者(Subscriber)接收消息。Redis 充当消息代理(Broker),将消息路由到所有订阅者。

Pub/Sub 架构

Publisher 1 ──PUBLISH──→ ┌──────────┐ ──推送──→ Subscriber A
Publisher 2 ──PUBLISH──→ │  Redis   │ ──推送──→ Subscriber B
Publisher 3 ──PUBLISH──→ │  (Broker)│ ──推送──→ Subscriber C
                         └──────────┘

Pub/Sub 特性

特性说明
实时推送消息发布后立即推送给所有订阅者
不持久化消息不会存储,订阅者断开连接后丢失
不确认机制没有 ACK,不保证送达
无消费者组每个订阅者都收到全量消息
Fire-and-Forget发后即忘,适合实时通知

8.2 基本 Pub/Sub 命令

发布消息

# 发布消息到频道
PUBLISH channel:news "Breaking: Redis 8.0 Released!"
# (integer) 3   ← 返回收到消息的订阅者数量

PUBLISH channel:sports "Team A won the match"
# (integer) 2

订阅频道

# 订阅频道(进入订阅模式,阻塞等待消息)
SUBSCRIBE channel:news channel:sports
# 1) "subscribe"
# 2) "channel:news"
# 3) (integer) 1    ← 当前订阅的频道数量

# 进入订阅模式后,只能使用以下命令:
# SUBSCRIBE, UNSUBSCRIBE, PING, QUIT

# 收到的消息格式:
# 1) "message"          ← 消息类型
# 2) "channel:news"     ← 频道名
# 3) "Breaking news!"   ← 消息内容

# 退订频道
UNSUBSCRIBE channel:news
UNSUBSCRIBE              # 退订所有频道

模式订阅(Pattern Subscribe)

# 使用通配符订阅匹配的频道
PSUBSCRIBE channel:*           # 订阅所有 channel: 开头的频道
PSUBSCRIBE news:* sports:*     # 订阅多个模式
PSUBSCRIBE *                   # 订阅所有频道

# 消息格式(比 SUBSCRIBE 多一个字段):
# 1) "pmessage"           ← 消息类型
# 2) "channel:*"          ← 匹配的模式
# 3) "channel:news"       ← 实际频道名
# 4) "Hello World"        ← 消息内容

# 退订模式
PUNSUBSCRIBE channel:*
PUNSUBSCRIBE                 # 退订所有模式

查看订阅信息

# 查看频道的订阅者数量
PUBSUB CHANNELS              # 列出所有有订阅者的频道
PUBSUB CHANNELS channel:*    # 匹配模式

# 查看频道的订阅者数量
PUBSUB NUMSUB channel:news channel:sports
# 1) "channel:news"
# 2) (integer) 3
# 3) "channel:sports"
# 4) (integer) 2

# 查看模式订阅的数量
PUBSUB NUMPAT               # 当前客户端的模式订阅数量

8.3 多客户端演示

使用 redis-cli 演示

终端 1(订阅者 A):

redis-cli
> SUBSCRIBE channel:news
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "channel:news"
3) (integer) 1

终端 2(订阅者 B):

redis-cli
> SUBSCRIBE channel:news channel:sports

终端 3(发布者):

redis-cli
> PUBLISH channel:news "Hello subscribers!"
(integer) 2

> PUBLISH channel:sports "Goal!"
(integer) 1

终端 1 和 2 都会收到 “Hello subscribers!",终端 2 还会收到 “Goal!"。

Python 客户端实现

import redis
import threading
import time

def subscriber(name, channels):
    """订阅者线程"""
    r = redis.Redis(host='localhost', port=6379, decode_responses=True)
    pubsub = r.pubsub()
    
    # 订阅频道
    pubsub.subscribe(*channels)
    print(f"[{name}] Subscribed to {channels}")
    
    # 监听消息
    for message in pubsub.listen():
        if message['type'] == 'message':
            print(f"[{name}] Received: {message['data']} (from {message['channel']})")
        elif message['type'] == 'subscribe':
            print(f"[{name}] Subscribed to {message['channel']}")

def publisher():
    """发布者"""
    r = redis.Redis(host='localhost', port=6379, decode_responses=True)
    
    for i in range(5):
        msg = f"Message #{i+1}"
        count = r.publish('channel:news', msg)
        print(f"[Publisher] Sent: '{msg}' to {count} subscribers")
        time.sleep(1)

# 启动订阅者(后台线程)
t1 = threading.Thread(target=subscriber, args=("Sub-A", ["channel:news"]), daemon=True)
t2 = threading.Thread(target=subscriber, args=("Sub-B", ["channel:news", "channel:sports"]), daemon=True)
t1.start()
t2.start()

time.sleep(1)  # 等待订阅者就绪

# 启动发布者
publisher()

模式订阅(Pattern)

import redis

r = redis.Redis(host='localhost', port=6379, decode_responses=True)
pubsub = r.pubsub()

# 模式订阅
pubsub.psubscribe('user:*:events')

# 发布消息
r.publish('user:1001:events', '{"action":"login","ip":"192.168.1.1"}')
r.publish('user:1002:events', '{"action":"purchase","amount":99.9}')

for message in pubsub.listen():
    if message['type'] == 'pmessage':
        print(f"Pattern: {message['pattern']}")
        print(f"Channel: {message['channel']}")
        print(f"Data: {message['data']}")

8.4 Pub/Sub 局限性

消息丢失场景

场景消息是否丢失
订阅者正常在线✅ 收到
订阅者断开连接后重连❌ 丢失
发布时无订阅者❌ 丢失
订阅者处理速度慢❌ 可能丢失(输出缓冲区满)

Pub/Sub vs Stream 对比

特性Pub/SubStream
消息持久化
消息确认✅ XACK
消费者组
历史消息回溯✅ XRANGE
多消费者✅ 每个都收到✅ 可配置
适用场景实时通知、配置广播事件溯源、异步任务

8.5 使用 Stream 替代 Pub/Sub

Stream 结合了 Pub/Sub 的推送特性和消息队列的可靠性:

# 生产者
XADD events * type "user_login" user_id "1001" ip "192.168.1.1"

# 消费者组(每条消息只被组内一个消费者处理)
XGROUP CREATE events analytics $ MKSTREAM

# 消费者 A 读取
XREADGROUP GROUP analytics consumer-a COUNT 1 BLOCK 5000 STREAMS events >

# 确认消息
XACK events analytics 1715318400000-0

广播模式(Stream + 每个服务一个消费者组)

# 创建多个消费者组,每组相当于 Pub/Sub 的一个订阅者
XGROUP CREATE events notification-svc $ MKSTREAM
XGROUP CREATE events analytics-svc $ MKSTREAM
XGROUP CREATE events audit-svc $ MKSTREAM

# 三个服务各自消费,每条消息被三个服务都处理
XREADGROUP GROUP notification-svc svc-1 COUNT 10 STREAMS events >
XREADGROUP GROUP analytics-svc svc-1 COUNT 10 STREAMS events >
XREADGROUP GROUP audit-svc svc-1 COUNT 10 STREAMS events >

8.6 Pub/Sub 高级用法

通知系统

# 用户关注通知
PUBLISH user:1001:notifications '{"type":"follow","from":2001,"time":"2026-05-10T10:00:00"}'

# 系统公告
PUBLISH system:announcements '{"title":"系统维护","content":"今晚22:00-23:00维护"}'

配置广播

# 配置变更广播
PUBLISH config:update '{"key":"max_upload_size","value":"20971520"}'

# 所有应用实例订阅 config:* 频道
PSUBSCRIBE config:*

实时日志聚合

# 各服务将日志推送到 Redis
PUBLISH logs:app '{"level":"ERROR","service":"order-svc","message":"DB connection timeout"}'
PUBLISH logs:app '{"level":"INFO","service":"user-svc","message":"User 1001 logged in"}'

# 日志聚合器订阅所有日志
SUBSCRIBE logs:app

⚠️ 注意:Pub/Sub 不保证消息送达。如果订阅者不在线,消息会丢失。对于重要消息,使用 Stream 或专业的消息队列(如 RabbitMQ、Kafka)。


📌 业务场景

场景一:实时通知推送

# 用户收到消息时通知
PUBLISH push:user:1001 '{"type":"message","from":"客服","content":"您的订单已发货"}'

# WebSocket 服务订阅
SUBSCRIBE push:user:1001

场景二:分布式事件广播

# 缓存失效广播
PUBLISH cache:invalidate '{"key":"user:1001","reason":"profile_update"}'

# 各节点订阅并清除本地缓存
PSUBSCRIBE cache:*

场景三:实时数据同步

# 数据库变更事件
PUBLISH db:changes '{"table":"orders","action":"update","id":1001,"status":"shipped"}'

# 搜索索引服务、缓存服务各自订阅处理

🔗 扩展阅读