强曰为道
与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

Memcached 传输协议精讲 / 第03章 核心命令总览

第03章 核心命令总览

本章系统梳理 Memcached 文本协议中的所有核心命令,建立全局视角。


3.1 命令全景图

Memcached 文本协议定义了约 20+ 个命令,按功能分为 7 大类:

Memcached 命令
├── 存储类(Storage)
│   ├── set        — 无条件写入
│   ├── add        — 不存在时写入
│   ├── replace    — 已存在时替换
│   ├── append     — 追加到末尾
│   ├── prepend    — 插入到开头
│   └── cas        — CAS 乐观锁写入
├── 检索类(Retrieval)
│   ├── get        — 获取一个或多个 key
│   └── gets       — 获取 + CAS 唯一标识
├── 删除类(Deletion)
│   └── delete     — 删除单个 key
├── 原子操作类(Atomic)
│   ├── incr       — 递增
│   └── decr       — 递减
├── 统计类(Statistics)
│   ├── stats              — 全局统计
│   ├── stats items        — 按 slab class 统计
│   ├── stats slabs        — Slab 分配器统计
│   ├── stats sizes        — 按大小分布统计
│   └── stats cachedump    — Slab 内容转储
├── 管理类(Management)
│   ├── flush_all  — 清除所有缓存
│   ├── version    — 版本查询
│   ├── verbosity  — 日志级别
│   └── quit       — 关闭连接
└── 协议发现类(Protocol)
    └── lru_crawler metadump — 元数据转储

3.2 存储命令概览

命令对比表

命令Key 已存在Key 不存在典型场景
set✅ 覆盖✅ 创建通用写入
add❌ NOT_STORED✅ 创建初始化 / 分布式锁
replace✅ 覆盖❌ NOT_FOUND仅更新已存在数据
append✅ 追加❌ NOT_FOUND日志追加
prepend✅ 前插❌ NOT_FOUND优先级队列
cas✅ 仅版本匹配时❌ NOT_FOUND乐观锁更新

通用语法

<command> <key> <flags> <exptime> <length> [noreply]\r\n
<data block>\r\n

语义差异详解

set vs add vs replace

# 假设 key "test" 不存在
set test 0 0 5
hello
STORED              # set 成功

add test 0 0 5
world
NOT_STORED          # add 失败,key 已存在

replace test 0 0 5
world
STORED              # replace 成功,key 已存在

# 假设 key "newkey" 不存在
replace newkey 0 0 5
hello
NOT_FOUND           # replace 失败,key 不存在

CAS 命令详解

CAS(Compare-And-Swap)是实现乐观并发控制的关键:

# 1. 使用 gets 获取当前值及其 CAS 唯一标识
gets counter
VALUE counter 0 1 123456
5
END

# 2. 尝试用 CAS 更新(仅当 cas_unique 匹配时生效)
cas counter 0 0 1 123456
6
STORED              # 成功:版本匹配

# 3. 再次尝试(版本已变化)
cas counter 0 0 1 123456
7
EXISTS              # 失败:版本已变化

CAS 工作流程

┌──────────┐    gets    ┌──────────────┐
│  Client  │──────────▶│  Memcached   │
│          │◀──────────│  (v=123456)  │
└──────────┘  VALUE +   └──────────────┘
     │        cas_unique
     │ 处理业务逻辑
     ▼
┌──────────┐   cas v=123456   ┌──────────────┐
│  Client  │─────────────────▶│  Memcached   │
│          │◀─────────────────│  STORED      │
└──────────┘   响应            └──────────────┘

3.3 检索命令概览

命令CAS 支持批量获取说明
get标准获取
gets获取 + CAS 标识

get 命令

get <key> [key2 key3 ...]\r\n

响应格式

VALUE <key> <flags> <length>\r\n
<data>\r\n
...
END\r\n

gets 命令

gets <key> [key2 key3 ...]\r\n

响应格式

VALUE <key> <flags> <length> <cas_unique>\r\n
<data>\r\n
...
END\r\n

批量获取示例

get user:1 user:2 user:3
VALUE user:1 0 11
{"id":1,"a"}
VALUE user:2 0 11
{"id":2,"b"}
END
# user:3 不存在,被静默忽略

3.4 删除命令

delete 命令

delete <key> [noreply]\r\n
响应含义
DELETED删除成功
NOT_FOUNDkey 不存在

注意: delete 不支持 CAS 版本检查,如需 CAS 删除需要使用 Meta 协议或二进制协议。


3.5 原子操作命令

INCR / DECR

incr <key> <delta> [noreply]\r\n
decr <key> <delta> [noreply]\r\n
参数类型说明
keystring目标键(值必须为十进制数字字符串)
deltauint64步长

行为规则

# 初始化计数器
set counter 0 0 2
10
STORED

# 递增
incr counter 3
13

# 递减
decr counter 5
8

# 下限为 0
decr counter 100
0

# 递增溢出(行为未定义)
set big 0 0 20
18446744073709551615
STORED
incr big 1
# 结果不确定,不应依赖此行为

实现计数器完整示例

#!/usr/bin/env python3
"""counter.py — 使用 Memcached 实现原子计数器"""

import socket

class AtomicCounter:
    def __init__(self, sock, key: str, initial: int = 0, ttl: int = 0):
        self.sock = sock
        self.key = key
        self._initialize(initial, ttl)

    def _initialize(self, value: int, ttl: int):
        data = str(value).encode()
        cmd = f"set {self.key} 0 {ttl} {len(data)}\r\n"
        self.sock.sendall(cmd.encode() + data + b"\r\n")
        self.sock.recv(1024)

    def increment(self, delta: int = 1) -> int:
        self.sock.sendall(f"incr {self.key} {delta}\r\n".encode())
        return int(self.sock.recv(1024).decode().strip())

    def decrement(self, delta: int = 1) -> int:
        self.sock.sendall(f"decr {self.key} {delta}\r\n".encode())
        return int(self.sock.recv(1024).decode().strip())

    def get(self) -> int:
        self.sock.sendall(f"get {self.key}\r\n".encode())
        resp = self.sock.recv(4096).decode()
        lines = resp.strip().split("\r\n")
        if lines[0].startswith("VALUE"):
            return int(lines[1])
        return 0

# 使用
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(('127.0.0.1', 11211))

views = AtomicCounter(sock, "page:home:views", initial=0)
print(f"当前浏览量: {views.get()}")
print(f"增加 1: {views.increment()}")
print(f"增加 10: {views.increment(10)}")
print(f"减少 3: {views.decrement(3)}")
print(f"最终值: {views.get()}")

3.6 统计命令

STATS 命令

stats\r\n

返回大量以 STAT 前缀的键值对,以 END 结束。

关键统计指标

指标类型说明
pidint进程 ID
uptimeint运行时间(秒)
timeint当前 Unix 时间戳
versionstring版本号
curr_itemsint当前缓存中的 item 数量
total_itemsint历史总 item 数
bytesint当前内存使用量(字节)
max_connectionsint最大连接数
curr_connectionsint当前连接数
cmd_getintGET 命令总数
cmd_setintSET 命令总数
get_hitsint缓存命中次数
get_missesint缓存未命中次数
evictionsintLRU 淘汰次数
bytes_readint读取的总字节数
bytes_writtenint写入的总字节数

缓存命中率计算

命中率 = get_hits / (get_hits + get_misses) × 100%
def get_hit_rate(sock) -> float:
    """计算缓存命中率"""
    sock.sendall(b"stats\r\n")
    data = sock.recv(8192).decode()

    hits = misses = 0
    for line in data.split("\r\n"):
        if line.startswith("STAT get_hits"):
            hits = int(line.split()[2])
        elif line.startswith("STAT get_misses"):
            misses = int(line.split()[2])

    total = hits + misses
    return (hits / total * 100) if total > 0 else 0.0

# 使用
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(('127.0.0.1', 11211))
print(f"缓存命中率: {get_hit_rate(sock):.2f}%")

STATS ITEMS

stats items\r\n

按 Slab Class 返回 item 统计信息:

STAT items:1:number 150
STAT items:1:age 3200
STAT items:1:evicted 0
STAT items:1:evicted_nonzero 0
STAT items:1:outofmemory 0
...
END

STATS SLABS

stats slabs\r\n

返回 Slab 分配器统计:

STAT 1:chunk_size 96
STAT 1:chunks_per_page 10922
STAT 1:total_pages 1
STAT 1:total_chunks 10922
STAT 1:used_chunks 150
STAT 1:free_chunks 10772
STAT 1:free_chunks_end 0
...
STAT active_slabs 1
STAT total_malloced 1048576
END

STATS CACHEDUMP

stats cachedump <slab_id> <limit>\r\n

转储指定 Slab Class 中的 item 列表(仅用于调试):

stats cachedump 1 100
ITEM user:1001 [13 b; 1620000000 s]
ITEM session:abc [50 b; 1620003600 s]
END

注意: stats cachedump 在某些版本中已被移除或禁用,不建议在生产环境中依赖此命令。

STATS SIZES

stats sizes\r\n

返回按 item 大小分布的统计:

STAT 32 5
STAT 64 120
STAT 96 300
STAT 128 50
...
END

注意: 需要启动时添加 --enable-stats-sizes 编译选项才可用。


3.7 管理命令

FLUSH_ALL

# 立即清除
flush_all

# 延迟 10 秒清除
flush_all 10

VERSION

version
# VERSION 1.6.22

VERBOSITY

verbosity 2
# OK
级别说明
0无日志输出
1基本信息
2详细信息
3调试级别
4极其详细

QUIT

quit

关闭当前 TCP 连接。


3.8 完整命令速查表

命令语法响应noreply
setset <key> <flags> <exptime> <length>STORED / NOT_STORED
addadd <key> <flags> <exptime> <length>STORED / NOT_STORED
replacereplace <key> <flags> <exptime> <length>STORED / NOT_STORED / NOT_FOUND
appendappend <key> <flags> <exptime> <length>STORED / NOT_STORED
prependprepend <key> <flags> <exptime> <length>STORED / NOT_STORED
cascas <key> <flags> <exptime> <length> <cas_unique>STORED / EXISTS / NOT_FOUND
getget <key> [key...]VALUE … END
getsgets <key> [key...]VALUE … END (含 cas)
deletedelete <key>DELETED / NOT_FOUND
incrincr <key> <delta>数字 / NOT_FOUND
decrdecr <key> <delta>数字 / NOT_FOUND
statsstats [subcommand]STAT … END
flush_allflush_all [delay]OK
versionversionVERSION …
verbosityverbosity <level>OK
quitquit(无响应)

3.9 综合实验

构建简易分布式锁

import socket
import time
import uuid

class MemcachedLock:
    """基于 Memcached add 命令的分布式锁"""

    def __init__(self, sock, lock_key: str, ttl: int = 30):
        self.sock = sock
        self.lock_key = f"lock:{lock_key}"
        self.token = str(uuid.uuid4()).encode()
        self.ttl = ttl
        self.acquired = False

    def acquire(self, timeout: int = 10) -> bool:
        deadline = time.time() + timeout
        while time.time() < deadline:
            cmd = f"add {self.lock_key} 0 {self.ttl} {len(self.token)}\r\n"
            self.sock.sendall(cmd.encode() + self.token + b"\r\n")
            resp = self.sock.recv(1024).decode().strip()
            if resp == "STORED":
                self.acquired = True
                return True
            time.sleep(0.1)  # 等待后重试
        return False

    def release(self) -> bool:
        if not self.acquired:
            return False
        cmd = f"delete {self.lock_key}\r\n"
        self.sock.sendall(cmd.encode())
        resp = self.sock.recv(1024).decode().strip()
        self.acquired = False
        return resp == "DELETED"

    def __enter__(self):
        if not self.acquire():
            raise TimeoutError("Failed to acquire lock")
        return self

    def __exit__(self, *args):
        self.release()

# 使用示例
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(('127.0.0.1', 11211))

with MemcachedLock(sock, "order:12345"):
    print("已获取锁,执行临界区操作...")
    time.sleep(1)
    print("操作完成,释放锁")

3.10 注意事项

  1. CAS 唯一标识溢出: CAS unique 是 64 位无符号整数,实际应用中几乎不会溢出
  2. incr/decr 不创建 key: 不存在的 key 返回 NOT_FOUND,不会自动初始化为 0
  3. append/prepend 不检查数据格式: 仅做字节级拼接,不解析内容
  4. stats 命令有性能开销: 高频调用 stats 可能影响服务性能
  5. flush_all 是全局操作: 会清除所有数据,生产环境务必谨慎

3.11 扩展阅读


上一章: 第02章 文本协议详解 下一章: 第04章 存储命令深入 — 深入剖析每个存储命令的内部机制。