强曰为道
与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

Redis 传输协议精讲 / 12 - 最佳实践

最佳实践

12.1 驱动开发指南

驱动架构

┌─────────────────────────────────────────────────────┐
│                    Application                       │
├─────────────────────────────────────────────────────┤
│                 High-Level API                       │
│  client.get("key")  client.set("key", "value")      │
├─────────────────────────────────────────────────────┤
│                 Command Layer                        │
│  ["GET", "key"]  ["SET", "key", "value"]             │
├─────────────────────────────────────────────────────┤
│                 Protocol Layer (RESP)                 │
│  *2\r\n$3\r\nGET\r\n$3\r\nkey\r\n                   │
├─────────────────────────────────────────────────────┤
│                 Connection Layer                      │
│  TCP Socket / TLS / Unix Domain Socket               │
├─────────────────────────────────────────────────────┤
│                 Connection Pool                       │
│  Pool of persistent connections                      │
└─────────────────────────────────────────────────────┘

最小驱动实现

"""
Redis 驱动最小实现
覆盖:连接、命令发送、响应解析、Pipeline
"""

import socket
from typing import Any, List, Optional

class RedisError(Exception):
    pass

class Connection:
    """RESP 协议连接"""

    def __init__(self, host="127.0.0.1", port=6379, socket_timeout=5.0):
        self.host = host
        self.port = port
        self.socket_timeout = socket_timeout
        self._sock: Optional[socket.socket] = None
        self._buffer = b""

    def connect(self):
        """建立连接"""
        self._sock = socket.create_connection(
            (self.host, self.port),
            timeout=self.socket_timeout
        )
        self._sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)

    @property
    def sock(self) -> socket.socket:
        if self._sock is None:
            self.connect()
        return self._sock

    def send_command(self, *args) -> None:
        """发送 RESP 命令"""
        parts = [f"*{len(args)}\r\n".encode()]
        for arg in args:
            if isinstance(arg, str):
                arg = arg.encode("utf-8")
            parts.append(f"${len(arg)}\r\n".encode())
            parts.append(arg)
            parts.append(b"\r\n")
        self.sock.sendall(b"".join(parts))

    def read_response(self) -> Any:
        """读取 RESP 响应"""
        line = self._read_line()
        prefix = chr(line[0])

        if prefix == "+":
            return line[1:].decode("utf-8")
        elif prefix == "-":
            raise RedisError(line[1:].decode("utf-8"))
        elif prefix == ":":
            return int(line[1:])
        elif prefix == "$":
            length = int(line[1:])
            if length == -1:
                return None
            data = self._read_bytes(length + 2)
            return data[:length]
        elif prefix == "*":
            count = int(line[1:])
            if count == -1:
                return None
            return [self.read_response() for _ in range(count)]
        else:
            raise ValueError(f"Unknown type: {prefix}")

    def _read_line(self) -> bytes:
        while b"\r\n" not in self._buffer:
            chunk = self.sock.recv(65536)
            if not chunk:
                raise ConnectionError("Connection closed")
            self._buffer += chunk
        pos = self._buffer.index(b"\r\n")
        line = self._buffer[:pos]
        self._buffer = self._buffer[pos + 2:]
        return line

    def _read_bytes(self, n: int) -> bytes:
        while len(self._buffer) < n:
            chunk = self.sock.recv(65536)
            if not chunk:
                raise ConnectionError("Connection closed")
            self._buffer += chunk
        result = self._buffer[:n]
        self._buffer = self._buffer[n:]
        return result

    def close(self):
        if self._sock:
            self._sock.close()
            self._sock = None


class Pipeline:
    """Pipeline 批量执行"""

    def __init__(self, connection: Connection):
        self.connection = connection
        self.commands: List[tuple] = []
        self.response_count = 0

    def execute_command(self, *args) -> "Pipeline":
        self.commands.append(args)
        self.response_count += 1
        return self  # 支持链式调用

    def execute(self) -> list:
        """发送所有命令并收集响应"""
        # 发送所有命令
        buf = b""
        for args in self.commands:
            parts = [f"*{len(args)}\r\n".encode()]
            for arg in args:
                if isinstance(arg, str):
                    arg = arg.encode("utf-8")
                parts.append(f"${len(arg)}\r\n".encode())
                parts.append(arg)
                parts.append(b"\r\n")
            buf += b"".join(parts)
        self.connection.sock.sendall(buf)

        # 收集响应
        results = []
        for _ in range(self.response_count):
            results.append(self.connection.read_response())

        self.commands.clear()
        self.response_count = 0
        return results


class Redis:
    """Redis 客户端"""

    def __init__(self, host="127.0.0.1", port=6379):
        self.connection = Connection(host, port)

    def execute_command(self, *args) -> Any:
        self.connection.send_command(*args)
        return self.connection.read_response()

    # 高级 API
    def get(self, key: str) -> Optional[bytes]:
        return self.execute_command("GET", key)

    def set(self, key: str, value: str, ex: int = None) -> str:
        args = ["SET", key, value]
        if ex:
            args.extend(["EX", str(ex)])
        return self.execute_command(*args)

    def delete(self, *keys: str) -> int:
        return self.execute_command("DEL", *keys)

    def pipeline(self) -> Pipeline:
        return Pipeline(self.connection)

    def ping(self) -> bool:
        return self.execute_command("PING") == "PONG"

    def close(self):
        self.connection.close()

测试

# 测试基本操作
r = Redis()
assert r.ping() == True
r.set("test", "hello")
assert r.get("test") == b"hello"
r.delete("test")

# 测试 Pipeline
pipe = r.pipeline()
for i in range(1000):
    pipe.execute_command("SET", f"key:{i}", f"value:{i}")
results = pipe.execute()
assert len(results) == 1000

r.close()

12.2 连接管理

连接生命周期

创建 → 认证 → 选择数据库 → 使用 → 健康检查 → 关闭

连接池最佳实践

import redis

# 推荐配置
pool = redis.ConnectionPool(
    host="127.0.0.1",
    port=6379,
    # 连接池大小
    max_connections=50,
    # 超时设置
    socket_timeout=2.0,
    socket_connect_timeout=1.0,
    # 重试策略
    retry_on_timeout=True,
    retry=redis.retry.Retry(
        retries=3,
        backoff=redis.backoff.ExponentialBackoff()
    ),
    # 健康检查
    health_check_interval=30,
    # TCP 选项
    socket_keepalive=True,
    socket_keepalive_options={
        1: 1,   # TCP_KEEPIDLE
        2: 3,   # TCP_KEEPINTVL
        3: 5,   # TCP_KEEPCNT
    },
    # 解码
    decode_responses=True,
)

r = redis.Redis(connection_pool=pool)

连接超时配置

参数 默认值 推荐值 说明
socket_timeout None 2.0s 读写超时
socket_connect_timeout None 1.0s 连接建立超时
health_check_interval 0 30s 健康检查间隔
max_connections 2³¹ 50-200 最大连接数

连接复用模式

# 单例模式(推荐用于全局)
class RedisSingleton:
    _instance = None
    _pool = None

    @classmethod
    def get_client(cls) -> redis.Redis:
        if cls._pool is None:
            cls._pool = redis.ConnectionPool(
                host="127.0.0.1",
                port=6379,
                max_connections=50,
                decode_responses=True
            )
        if cls._instance is None:
            cls._instance = redis.Redis(connection_pool=cls._pool)
        return cls._instance

# 使用
r = RedisSingleton.get_client()
r.set("key", "value")

连接健康检查

import threading
import time

class HealthChecker:
    """连接健康检查器"""

    def __init__(self, pool: redis.ConnectionPool, interval=30):
        self.pool = pool
        self.interval = interval
        self._stop_event = threading.Event()
        self._thread = None

    def start(self):
        self._thread = threading.Thread(target=self._check_loop, daemon=True)
        self._thread.start()

    def stop(self):
        self._stop_event.set()
        if self._thread:
            self._thread.join()

    def _check_loop(self):
        while not self._stop_event.is_set():
            self._check_connections()
            self._stop_event.wait(self.interval)

    def _check_connections(self):
        """检查连接池中的连接"""
        # redis-py 内置了健康检查,这里展示自定义逻辑
        try:
            r = redis.Redis(connection_pool=self.pool)
            if not r.ping():
                self._reset_pool()
        except redis.ConnectionError:
            self._reset_pool()

    def _reset_pool(self):
        """重置连接池"""
        self.pool.disconnect()

12.3 序列化策略

常见序列化格式

格式 优点 缺点 适用场景
JSON 人类可读、跨语言 体积大、不支持二进制 配置、简单对象
MessagePack 紧凑、快速 不可读 性能敏感场景
Protobuf 紧凑、Schema 定义 需要预定义 微服务通信
Pickle Python 原生、支持复杂对象 不安全、不可跨语言 仅限 Python 内部
BSON JSON 的二进制版本 体积偏大 MongoDB 集成

JSON 序列化

import json
import redis

class JSONRedis:
    """JSON 序列化的 Redis 客户端"""

    def __init__(self, client: redis.Redis):
        self.client = client

    def set_json(self, key: str, value, **kwargs):
        """存储 JSON 对象"""
        return self.client.set(key, json.dumps(value, ensure_ascii=False), **kwargs)

    def get_json(self, key: str):
        """读取 JSON 对象"""
        data = self.client.get(key)
        if data is None:
            return None
        return json.loads(data)

    def hset_json(self, name: str, mapping: dict):
        """Hash 字段使用 JSON 序列化"""
        serialized = {k: json.dumps(v, ensure_ascii=False) for k, v in mapping.items()}
        return self.client.hset(name, mapping=serialized)

    def hget_json(self, name: str, key: str):
        """读取 Hash 字段并反序列化"""
        data = self.client.hget(name, key)
        if data is None:
            return None
        return json.loads(data)


# 使用
r = redis.Redis(decode_responses=True)
jr = JSONRedis(r)

jr.set_json("user:1", {"name": "张三", "age": 30, "tags": ["python", "redis"]})
user = jr.get_json("user:1")
print(user)  # {'name': '张三', 'age': 30, 'tags': ['python', 'redis']}

MessagePack 序列化

import msgpack

class MsgPackRedis:
    def __init__(self, client: redis.Redis):
        self.client = client

    def set_msgpack(self, key: str, value, **kwargs):
        return self.client.set(key, msgpack.packb(value, use_bin_type=True), **kwargs)

    def get_msgpack(self, key: str):
        data = self.client.get(key)
        if data is None:
            return None
        return msgpack.unpackb(data, raw=False)

序列化大小对比

import json
import msgpack

data = {
    "user_id": 12345,
    "username": "alice_wonderland",
    "email": "alice@example.com",
    "scores": [95, 87, 92, 88],
    "active": True
}

json_size = len(json.dumps(data).encode())
msgpack_size = len(msgpack.packb(data))

print(f"JSON:     {json_size} bytes")
print(f"MsgPack:  {msgpack_size} bytes")
print(f"压缩比:   {json_size / msgpack_size:.2f}x")

12.4 安全最佳实践

认证

# 配置密码(Redis < 6.0)
requirepass your_password

# ACL 用户(Redis 6.0+)
ACL SETUSER alice on >password ~* &* +@all
ACL SETUSER bob on >password ~cache:* &* +@read +@write
# 客户端认证
r = redis.Redis(
    host="127.0.0.1",
    port=6379,
    username="alice",
    password="your_password"
)

TLS 加密

import ssl

r = redis.Redis(
    host="127.0.0.1",
    port=6380,
    ssl=True,
    ssl_certfile="/path/to/cert.pem",
    ssl_keyfile="/path/to/key.pem",
    ssl_ca_certs="/path/to/ca.pem",
    ssl_cert_reqs=ssl.CERT_REQUIRED
)

网络安全

# 绑定内网地址
bind 127.0.0.1 10.0.0.1

# 禁用危险命令
rename-command FLUSHDB ""
rename-command FLUSHALL ""
rename-command CONFIG ""
rename-command DEBUG ""
rename-command KEYS ""

# 禁用 Lua 脚本的危险操作
# (通过 ACL 限制)
ACL SETUSER restricted on >password ~* &* +@all -@admin -SCRIPT

ACL 详细配置

# 创建只读用户
ACL SETUSER readonly on >password ~* &* +@read -@write -@admin -@dangerous

# 创建缓存专用用户
ACL SETUSER cache_user on >password ~cache:* &* +get +set +del +expire +ttl

# 创建管理用户
ACL SETUSER admin on >strongpassword ~* &* +@all

# 查看用户权限
ACL GETUSER readonly

输入验证

def sanitize_key(key: str) -> str:
    """验证和清理 key"""
    if not key:
        raise ValueError("Key cannot be empty")
    if len(key) > 512:
        raise ValueError("Key too long (max 512 bytes)")
    # 可选:限制字符集
    if any(c in key for c in ['\r', '\n', '\x00']):
        raise ValueError("Key contains invalid characters")
    return key

12.5 性能优化清单

命令层面

优化 说明
✅ 使用 MSET/MGET 批量操作减少 RTT
✅ 使用 Pipeline 批量发送命令
✅ 使用 Lua 脚本 服务端原子操作
✅ 使用 Hash Tag 集群中相关 key 同槽
❌ 避免 KEYS * 使用 SCAN 替代
❌ 避免大 Hash/Set 考虑分片
❌ 避免长时间脚本 限制执行时间

连接层面

# ✅ 正确:使用连接池
pool = redis.ConnectionPool(max_connections=50)
r = redis.Redis(connection_pool=pool)

# ❌ 错误:每次创建新连接
r = redis.Redis(host="127.0.0.1", port=6379)

数据结构选择

场景 推荐数据结构 不推荐
缓存 String / Hash -
对象存储(多字段) Hash String (JSON)
计数器 String (INCR) -
队列 List (LPUSH/RPOP) -
去重 Set / Bloom Filter -
排行榜 Sorted Set -
限流 Sorted Set + Lua String (计数)
分布式锁 String (SET NX EX) -
时间序列 Sorted Set / RedisTimeSeries List

12.6 监控与告警

关键指标

# 获取所有指标
redis-cli INFO all

# 关键指标分类
redis-cli INFO memory      # 内存使用
redis-cli INFO stats       # 命令统计
redis-cli INFO clients     # 连接信息
redis-cli INFO replication # 复制状态
指标 说明 告警阈值
used_memory 已用内存 > maxmemory 的 80%
used_memory_rss RSS 内存 > maxmemory 的 1.5x
connected_clients 连接数 > max_clients 的 80%
instantaneous_ops_per_sec QPS 根据容量规划
keyspace_hits 命中数 计算命中率
keyspace_misses 未命中数 命中率 < 90%
latest_fork_usec 最近 fork 耗时 > 500ms
rejected_connections 拒绝连接数 > 0
expired_keys 过期 key 数 监控趋势

慢查询日志

# 配置慢查询阈值
CONFIG SET slowlog-log-slower-than 10000  # 10ms
CONFIG SET slowlog-max-len 128

# 查看慢查询
SLOWLOG GET 10

# 输出格式:
# 1) 1) (integer) 1           # ID
#    2) (integer) 1619787123  # 时间戳
#    3) (integer) 15000       # 耗时(微秒)
#    4) 1) "KEYS"             # 命令
#       2) "*"
#    5) "127.0.0.1:52345"    # 客户端地址
#    6) ""                    # 客户端名称

12.7 常见问题排查

问题一:连接超时

原因分析:
- 网络不通
- Redis 拒绝连接(max_clients)
- Redis 阻塞(慢命令、fork、AOF fsync)

排查步骤:
1. telnet <host> <port>
2. 检查 max_clients 配置
3. 检查 slowlog
4. 检查 info clients

问题二:内存增长

原因分析:
- Key 未设置过期时间
- 大 Key 存在
- 内存碎片

排查步骤:
1. INFO memory
2. redis-cli --bigkeys
3. MEMORY USAGE <key>
4. CONFIG SET activedefrag yes

问题三:主从延迟

原因分析:
- 网络带宽不足
- 从节点负载过高
- 大 Key 传输

排查步骤:
1. INFO replication
2. 检查 master_repl_offset vs slave_repl_offset
3. 检查 repl_backlog_size

12.8 代码示例汇总

完整的 Redis 工具类

import redis
import json
import hashlib
import time
from typing import Any, Optional, List, Dict
from functools import wraps

class RedisToolkit:
    """生产级 Redis 工具类"""

    def __init__(self, host="127.0.0.1", port=6379, db=0, password=None):
        self.pool = redis.ConnectionPool(
            host=host, port=port, db=db, password=password,
            max_connections=50, decode_responses=True,
            socket_timeout=2.0, socket_connect_timeout=1.0,
            health_check_interval=30, retry_on_timeout=True
        )
        self.client = redis.Redis(connection_pool=self.pool)
        self._scripts = {}

    # === 基本操作 ===
    def get(self, key: str) -> Optional[str]:
        return self.client.get(key)

    def set(self, key: str, value: str, ex: int = None) -> bool:
        return self.client.set(key, value, ex=ex)

    def delete(self, *keys: str) -> int:
        return self.client.delete(*keys)

    # === JSON 操作 ===
    def set_json(self, key: str, value: Any, ex: int = None) -> bool:
        return self.client.set(key, json.dumps(value, ensure_ascii=False), ex=ex)

    def get_json(self, key: str) -> Optional[Any]:
        data = self.client.get(key)
        return json.loads(data) if data else None

    # === Pipeline ===
    def pipeline(self, transaction=False):
        return self.client.pipeline(transaction=transaction)

    # === 分布式锁 ===
    def acquire_lock(self, lock_name: str, owner: str, ttl: int = 30) -> bool:
        return bool(self.client.set(f"lock:{lock_name}", owner, nx=True, ex=ttl))

    def release_lock(self, lock_name: str, owner: str) -> bool:
        lua = """
        if redis.call('GET', KEYS[1]) == ARGV[1] then
            return redis.call('DEL', KEYS[1])
        end
        return 0
        """
        return bool(self.client.eval(lua, 1, f"lock:{lock_name}", owner))

    # === 限流器 ===
    def is_rate_limited(self, key: str, limit: int, window: int) -> bool:
        lua = """
        local key = KEYS[1]
        local limit = tonumber(ARGV[1])
        local window = tonumber(ARGV[2])
        local now = tonumber(ARGV[3])

        redis.call('ZREMRANGEBYSCORE', key, 0, now - window)
        local count = redis.call('ZCARD', key)

        if count < limit then
            redis.call('ZADD', key, now, now .. math.random())
            redis.call('EXPIRE', key, window)
            return 0
        end
        return 1
        """
        return bool(self.client.eval(lua, 1, key, limit, window, int(time.time())))

    # === 缓存装饰器 ===
    def cached(self, key_prefix: str, ttl: int = 300):
        def decorator(func):
            @wraps(func)
            def wrapper(*args, **kwargs):
                cache_key = f"{key_prefix}:{hashlib.md5(str(args).encode()).hexdigest()}"
                result = self.get_json(cache_key)
                if result is not None:
                    return result
                result = func(*args, **kwargs)
                self.set_json(cache_key, result, ex=ttl)
                return result
            return wrapper
        return decorator

    # === 健康检查 ===
    def health_check(self) -> Dict[str, Any]:
        try:
            start = time.time()
            self.client.ping()
            latency = time.time() - start
            info = self.client.info("memory")
            return {
                "status": "healthy",
                "latency_ms": round(latency * 1000, 2),
                "used_memory_mb": round(info["used_memory"] / 1024 / 1024, 2),
                "connected_clients": self.client.info("clients")["connected_clients"],
            }
        except Exception as e:
            return {"status": "unhealthy", "error": str(e)}

    def close(self):
        self.pool.disconnect()


# 使用示例
toolkit = RedisToolkit()

# 基本操作
toolkit.set("greeting", "hello", ex=60)
print(toolkit.get("greeting"))

# JSON 操作
toolkit.set_json("user:1", {"name": "张三", "age": 30})
user = toolkit.get_json("user:1")

# 分布式锁
if toolkit.acquire_lock("mylock", "process-1", ttl=30):
    try:
        # 执行受保护的操作
        pass
    finally:
        toolkit.release_lock("mylock", "process-1")

# 限流
if not toolkit.is_rate_limited("api:user:123", limit=100, window=60):
    # 允许请求
    pass

# 缓存装饰器
@toolkit.cached("user_info", ttl=300)
def get_user_info(user_id):
    # 耗时操作
    return {"id": user_id, "name": "Alice"}

# 健康检查
health = toolkit.health_check()
print(health)

toolkit.close()

12.9 注意事项

⚠️ 不要在循环中创建连接 始终使用连接池复用连接。

⚠️ 注意序列化开销 JSON 序列化可能成为瓶颈。对于高频操作,考虑 MessagePack 或直接存储二进制。

⚠️ 设置合理的超时 没有超时的连接可能永远阻塞。始终设置 socket_timeout

⚠️ 监控连接池状态 定期检查连接池使用率,避免连接耗尽。

⚠️ 处理连接断开 实现重连逻辑和重试策略,应对网络波动。


12.10 扩展阅读

资源 说明
Redis 最佳实践 官方设计模式
redis-py 文档 Python 客户端
Jedis 文档 Java 客户端
go-redis 文档 Go 客户端
Redis 安全指南 官方安全文档
Redis 内存优化 内存优化技巧

上一章:代理实现 | 返回:目录