强曰为道
与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

Redis 传输协议精讲 / 05 - Pipeline 管道机制

Pipeline 管道机制

5.1 为什么需要 Pipeline

RTT 的代价

标准的 Redis 交互是"请求-响应"模式:客户端发送一个命令,等待响应后才能发送下一个命令。

Client                          Server
  │                                │
  │──── SET key1 value1 ─────────→│  T0
  │                                │
  │←──── +OK ─────────────────────│  T0 + RTT
  │                                │
  │──── SET key2 value2 ─────────→│  T0 + RTT
  │                                │
  │←──── +OK ─────────────────────│  T0 + 2*RTT
  │                                │
  │──── SET key3 value3 ─────────→│  T0 + 2*RTT
  │                                │
  │←──── +OK ─────────────────────│  T0 + 3*RTT

如果 RTT = 1ms,3 个命令需要 3ms(不计命令执行时间)。10000 个命令需要 10 秒!

Pipeline 的解决方案

Pipeline 允许客户端一次发送多个命令,然后批量读取响应:

Client                          Server
  │                                │
  │──── SET key1 value1 ─────────→│
  │──── SET key2 value2 ─────────→│  所有命令一次发送
  │──── SET key3 value3 ─────────→│
  │                                │
  │←──── +OK ─────────────────────│
  │←──── +OK ─────────────────────│  所有响应一次返回
  │←──── +OK ─────────────────────│

3 个命令只需要 1 个 RTT!


5.2 Pipeline 协议层面

从协议角度看,Pipeline 就是在同一个 TCP 连接上连续发送多个 RESP 数组,不等待响应

*3\r\n$3\r\nSET\r\n$4\r\nkey1\r\n$6\r\nvalue1\r\n
*3\r\n$3\r\nSET\r\n$4\r\nkey2\r\n$6\r\nvalue2\r\n
*3\r\n$3\r\nSET\r\n$4\r\nkey3\r\n$6\r\nvalue3\r\n

这 3 个 RESP 数组在同一个 TCP 段中发送。服务器依次处理每个命令,并将响应依次返回:

+OK\r\n
+OK\r\n
+OK\r\n

关键点

特性说明
不是新协议Pipeline 只是发送方式的改变,协议格式不变
不保证原子性Pipeline 中的命令可能被其他客户端的命令穿插
顺序保证Pipeline 中的命令按发送顺序执行,响应也按同样顺序返回
错误处理单个命令失败不影响其他命令,每个响应独立

5.3 性能对比

基准测试

import socket
import time

def benchmark_single(n):
    """逐个发送命令"""
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    s.connect(("127.0.0.1", 6379))

    start = time.time()
    for i in range(n):
        cmd = f"*3\r\n$3\r\nSET\r\n$4\r\nkey{i}\r\n$5\r\nvalue\r\n".encode()
        s.sendall(cmd)
        s.recv(1024)  # 等待响应
    elapsed = time.time() - start

    s.close()
    return elapsed

def benchmark_pipeline(n, batch_size=1000):
    """批量发送命令"""
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    s.connect(("127.0.0.1", 6379))

    start = time.time()
    for batch_start in range(0, n, batch_size):
        batch_end = min(batch_start + batch_size, n)
        # 一次发送多个命令
        buf = b""
        for i in range(batch_start, batch_end):
            buf += f"*3\r\n$3\r\nSET\r\n$4\r\nkey{i}\r\n$5\r\nvalue\r\n".encode()
        s.sendall(buf)

        # 批量读取响应
        received = 0
        while received < batch_end - batch_start:
            data = s.recv(4096)
            received += data.count(b"+OK\r\n")
    elapsed = time.time() - start

    s.close()
    return elapsed

n = 10000
t_single = benchmark_single(n)
t_pipeline = benchmark_pipeline(n)

print(f"逐个发送: {t_single:.3f}s ({n/t_single:.0f} ops/s)")
print(f"Pipeline: {t_pipeline:.3f}s ({n/t_pipeline:.0f} ops/s)")
print(f"加速比: {t_single/t_pipeline:.1f}x")

典型结果

模式耗时QPS加速比
逐个发送2.50s4,0001x
Pipeline (batch=100)0.15s66,66717x
Pipeline (batch=1000)0.08s125,00031x

注意:实际性能取决于网络延迟(RTT)和服务器处理能力。RTT 越高,Pipeline 的收益越大。

不同 RTT 下的性能

假设 10000 个命令,命令执行时间忽略不计:

RTT逐个发送Pipeline (batch=1000)加速比
0.1ms (本地)1s0.08s12x
1ms (同机房)10s0.08s125x
10ms (跨机房)100s0.1s1000x
100ms (跨地域)1000s0.5s2000x

Pipeline 在高延迟场景下收益最显著。


5.4 实现方式

方式一:手动拼接 RESP

import socket

class ManualPipeline:
    def __init__(self, host="127.0.0.1", port=6379):
        self.sock = socket.create_connection((host, port))
        self.commands = []
        self.response_count = 0

    def execute(self, *args):
        """添加命令到 Pipeline"""
        parts = [f"*{len(args)}\r\n".encode()]
        for arg in args:
            if isinstance(arg, str):
                arg = arg.encode("utf-8")
            parts.append(f"${len(arg)}\r\n".encode())
            parts.append(arg)
            parts.append(b"\r\n")
        self.commands.append(b"".join(parts))
        self.response_count += 1

    def flush(self):
        """发送所有命令并读取响应"""
        # 一次发送所有命令
        self.sock.sendall(b"".join(self.commands))

        # 读取所有响应
        responses = []
        buf = b""
        while len(responses) < self.response_count:
            chunk = self.sock.recv(65536)
            if not chunk:
                raise ConnectionError("Connection closed")
            buf += chunk
            # 简单统计 +OK 响应数量
            responses = buf.split(b"+OK\r\n")
            # 最后一个可能是不完整的
            if responses[-1]:
                responses = responses[:-1]

        self.commands.clear()
        self.response_count = 0
        return responses

# 使用
pipeline = ManualPipeline()
for i in range(10000):
    pipeline.execute("SET", f"key:{i}", f"value:{i}")
results = pipeline.flush()
print(f"Sent {len(results)} commands")

方式二:使用 redis-py

import redis

r = redis.Redis(host="127.0.0.1", port=6379, decode_responses=True)

# 创建 Pipeline(默认使用事务)
pipe = r.pipeline(transaction=False)

# 添加命令
for i in range(10000):
    pipe.set(f"key:{i}", f"value:{i}")

# 执行并获取结果
results = pipe.execute()
print(f"Executed {len(results)} commands")

方式三:使用 Jedis (Java)

import redis.clients.jedis.Jedis;
import redis.clients.jedis.Pipeline;

try (Jedis jedis = new Jedis("127.0.0.1", 6379)) {
    Pipeline pipeline = jedis.pipelined();

    for (int i = 0; i < 10000; i++) {
        pipeline.set("key:" + i, "value:" + i);
    }

    List<Object> results = pipeline.syncAndReturnAll();
    System.out.println("Executed " + results.size() + " commands");
}

5.5 Pipeline 中的错误处理

Pipeline 中每个命令独立执行,单个命令失败不影响其他命令:

import redis

r = redis.Redis(host="127.0.0.1", port=6379, decode_responses=True)

# 设置一个字符串类型的 key
r.set("mykey", "hello")

pipe = r.pipeline(transaction=False)
pipe.set("key1", "value1")      # 正常
pipe.lpush("mykey", "item")     # 错误:类型不匹配
pipe.set("key2", "value2")      # 正常

try:
    results = pipe.execute()
    print(results)  # [True, ResponseError, True]
except redis.exceptions.ResponseError as e:
    # 取决于 raise_on_error 参数
    print(f"Error: {e}")

错误处理策略

策略说明
raise_on_error=True(默认)遇到第一个错误时抛出异常
raise_on_error=False返回所有结果,错误位置为异常对象
# 不抛出异常,检查每个结果
pipe = r.pipeline(transaction=False)
pipe.set("key1", "value1")
pipe.lpush("mykey", "item")
pipe.set("key2", "value2")

results = pipe.execute(raise_on_error=False)
for i, result in enumerate(results):
    if isinstance(result, Exception):
        print(f"Command {i} failed: {result}")
    else:
        print(f"Command {i} succeeded: {result}")

5.6 Pipeline 的内存考量

发送缓冲区

Pipeline 的命令在发送前存储在客户端内存中。如果 Pipeline 过大,可能导致:

  1. 客户端内存溢出:百万级命令占用大量内存
  2. 服务器输入缓冲区溢出client-output-buffer-limit 限制
  3. 网络拥塞:大量数据一次性发送

最佳实践:分批发送

def batch_pipeline(r, commands, batch_size=1000):
    """分批执行 Pipeline 命令"""
    results = []

    for i in range(0, len(commands), batch_size):
        batch = commands[i:i + batch_size]
        pipe = r.pipeline(transaction=False)

        for cmd, args in batch:
            getattr(pipe, cmd)(*args)

        results.extend(pipe.execute())

    return results

# 使用
commands = [("set", [f"key:{i}", f"value:{i}"]) for i in range(100000)]
results = batch_pipeline(r, commands, batch_size=5000)

缓冲区配置

# Redis 服务端配置
# 硬限制:达到此限制立即断开连接
client-output-buffer-limit normal 0 0 0

# 软限制 + 窗口时间
client-output-buffer-limit normal 256mb 128mb 60

5.7 Pipeline vs 事务 vs Lua 脚本

特性PipelineMULTI/EXECLua 脚本
原子性⚠️ 伪原子✅ 真原子
性能最高较高较低(脚本编译)
复杂逻辑
网络往返1 RTT1 RTT1 RTT
错误处理逐个检查整体回滚脚本内处理
适用场景批量读写需要原子保证条件逻辑

组合使用

Pipeline 可以包含事务命令:

pipe = r.pipeline(transaction=True)  # 使用事务

pipe.multi()
pipe.set("key1", "value1")
pipe.set("key2", "value2")
pipe.execute()  # EXEC

这等价于在一个 Pipeline 中发送 MULTI、多个命令、EXEC


5.8 异步 Pipeline

Python asyncio 实现

import asyncio
import redis.asyncio as aioredis

async def async_pipeline_demo():
    r = aioredis.Redis(host="127.0.0.1", port=6379)

    pipe = r.pipeline(transaction=False)
    for i in range(10000):
        await pipe.set(f"key:{i}", f"value:{i}")

    results = await pipe.execute()
    print(f"Executed {len(results)} commands")

    await r.close()

asyncio.run(async_pipeline_demo())

Go 实现

package main

import (
    "context"
    "fmt"
    "github.com/redis/go-redis/v9"
)

func main() {
    ctx := context.Background()
    rdb := redis.NewClient(&redis.Options{
        Addr: "127.0.0.1:6379",
    })

    pipe := rdb.Pipeline()

    for i := 0; i < 10000; i++ {
        pipe.Set(ctx, fmt.Sprintf("key:%d", i), fmt.Sprintf("value:%d", i), 0)
    }

    cmders, err := pipe.Exec(ctx)
    if err != nil {
        panic(err)
    }

    fmt.Printf("Executed %d commands\n", len(cmders))
}

5.9 性能优化技巧

技巧一:合并小命令

# 不推荐:10000 次 HSET
for i in range(10000):
    pipe.hset("myhash", f"field:{i}", f"value:{i}")

# 推荐:使用 HMSET(单次命令设置多个字段)
fields = {f"field:{i}": f"value:{i}" for i in range(10000)}
pipe.hset("myhash", mapping=fields)

技巧二:使用 MSET 替代多次 SET

# 不推荐
for key, value in data.items():
    pipe.set(key, value)

# 推荐(每 1000 个一批)
items = list(data.items())
for i in range(0, len(items), 1000):
    batch = dict(items[i:i+1000])
    r.mset(batch)

技巧三:避免 Pipeline 中的读写依赖

# 不推荐:Pipeline 中有依赖关系
pipe.get("counter")      # 读
pipe.incr("counter")     # 写(依赖读的结果)
pipe.set("result", ...)  # 依赖 incr 的结果

# 推荐:使用 Lua 脚本处理有依赖的操作
lua_script = """
local counter = redis.call('INCR', KEYS[1])
redis.call('SET', KEYS[2], counter * 2)
return counter
"""
r.eval(lua_script, 2, "counter", "result")

5.10 注意事项

⚠️ Pipeline 不保证原子性 Pipeline 中的命令可能被其他客户端的命令穿插执行。如果需要原子性保证,请使用 MULTI/EXEC 或 Lua 脚本。

⚠️ Pipeline 大小要适中 建议每批 1000-10000 个命令。太小收益不明显,太大可能导致内存问题。

⚠️ 连接池中的 Pipeline Pipeline 必须在同一个连接上执行。使用连接池时,Pipeline 会占用一个连接直到执行完成。

⚠️ 网络超时 大 Pipeline 的发送和接收可能需要较长时间,确保客户端的超时设置足够。


5.11 扩展阅读

资源说明
Redis Pipeline 文档官方文档
redis-py PipelinePython 客户端 Pipeline
Jedis PipelineJava 客户端 Pipeline

上一章:命令格式与发送 | 下一章:发布订阅协议