Memcached 传输协议精讲 / 第02章 文本协议详解
第02章 文本协议详解
文本协议是 Memcached 最基础也最常用的通信方式,掌握它是理解一切后续内容的前提。
2.1 协议基础规则
行终止符
Memcached 文本协议使用 \r\n(CRLF)作为行终止符,与 HTTP 协议一致。
命令行\r\n
数据行\r\n
注意: 这是最常见的客户端实现错误来源。使用
\n而非\r\n会导致协议解析失败。
字符编码
| 部分 | 编码 |
|---|---|
| 命令行 | ASCII |
| Key | ASCII,最长 250 字节 |
| 数据体(Data) | 原始字节流,不强制要求编码 |
数据体(data block)可以是任意二进制数据,长度由命令行中的 <length> 字段精确指定。
通用命令行格式
大多数命令遵循以下模式:
<command> <key> <flags> <exptime> <length> [noreply]\r\n
<data block>\r\n
| 字段 | 类型 | 说明 |
|---|---|---|
command | string | 命令名称 |
key | string | 缓存键,最长 250 字节 |
flags | uint16 | 客户端自定义标志位 |
exptime | int | 过期时间(秒),0=永不过期 |
length | int | 数据块的字节长度 |
noreply | string | 可选,设置后服务端不返回响应 |
2.2 存储命令语法
SET 命令
set <key> <flags> <exptime> <length> [noreply]\r\n
<data block>\r\n
完整示例:
# 存储一个用户对象(flags=0, TTL=3600秒, 数据长度=13字节)
set user:1001 0 3600 13
{"name":"Bob"}
服务端响应:
STORED\r\n # 成功
NOT_STORED\r\n # 存储失败(如条件命令不满足)
ERROR\r\n # 未知命令
CLIENT_ERROR ...\r\n # 客户端错误
SERVER_ERROR ...\r\n # 服务端错误
其他存储命令对比
| 命令 | 语法 | 语义 |
|---|---|---|
set | set <key> <flags> <exptime> <length> | 无条件存储(存在则覆盖) |
add | add <key> <flags> <exptime> <length> | 仅当 key 不存在时存储 |
replace | replace <key> <flags> <exptime> <length> | 仅当 key 已存在时存储 |
append | append <key> <flags> <exptime> <length> | 在已有值的末尾追加数据 |
prepend | prepend <key> <flags> <exptime> <length> | 在已有值的开头插入数据 |
cas | cas <key> <flags> <exptime> <length> <cas_unique> | CAS 乐观锁写入 |
2.3 检索命令语法
GET 命令
get <key>*\r\n
支持一个或多个 key(空格分隔),返回所有存在的 key 及其值。
单 key 示例:
get user:1001\r\n
多 key 示例:
get user:1001 user:1002 session:abc\r\n
响应格式:
VALUE <key> <flags> <length> [cas <cas_unique>]\r\n
<data block>\r\n
...
END\r\n
注意:
cas字段仅在使用gets命令时返回。
GETS 命令
gets <key>*\r\n
与 get 相同,但每个返回值附带 cas_unique 标识符,用于后续 CAS 操作。
响应示例:
VALUE user:1001 0 13 12345
{"name":"Bob"}
END
2.4 删除命令语法
DELETE 命令
delete <key> [noreply]\r\n
响应:
DELETED\r\n # 成功删除
NOT_FOUND\r\n # key 不存在
示例
echo "delete user:1001" | nc 127.0.0.1 11211
# DELETED
2.5 递增递减命令语法
INCR / DECR 命令
incr <key> <value> [noreply]\r\n
decr <key> <value> [noreply]\r\n
| 参数 | 说明 |
|---|---|
key | 目标键(值必须为十进制数字字符串) |
value | 递增/递减的步长(无符号 64 位整数) |
响应:
<number>\r\n # 操作后的新值
NOT_FOUND\r\n # key 不存在
重要规则:
- 若 key 不存在,返回
NOT_FOUND,不会自动创建 - 值溢出时行为未定义(不回绕到 0)
- DECR 结果下限为 0,不会变为负数
示例:
# 设置计数器
set view_count 0 0 1
0
STORED
# 递增
incr view_count 1
1
incr view_count 10
11
# 递减
decr view_count 5
6
2.6 其他管理命令
VERSION
version\r\n
响应:
VERSION 1.6.22\r\n
FLUSH_ALL
flush_all [delay]\r\n
| 参数 | 说明 |
|---|---|
| 无参数 | 立即清除所有缓存 |
delay | 延迟 N 秒后清除 |
响应:
OK\r\n
注意:
flush_all是全局操作,在生产环境中慎用!建议使用前加delay参数给予缓冲时间。
QUIT
quit\r\n
关闭当前连接,无响应。
VERBOSITY
verbosity <level> [noreply]\r\n
设置日志详细级别(0-4),级别越高输出越多。
2.7 Noreply 模式
在命令末尾添加 noreply 参数可指示服务端不返回响应,适用于对可靠性要求不高的写操作。
# 正常模式
set key1 0 0 5
hello
STORED
# noreply 模式(无响应)
set key2 0 0 5 noreply
hello
支持 noreply 的命令
| 命令 | 支持 noreply |
|---|---|
| set | ✅ |
| add | ✅ |
| replace | ✅ |
| append | ✅ |
| prepend | ✅ |
| cas | ✅ |
| delete | ✅ |
| incr | ✅ |
| decr | ✅ |
| flush_all | ✅ |
| get / gets | ❌ |
使用场景
import socket
def batch_set_noreply(sock, items: dict, ttl: int = 300):
"""批量写入,使用 noreply 提升吞吐量"""
for key, value in items.items():
data = value.encode() if isinstance(value, str) else value
cmd = f"set {key} 0 {ttl} {len(data)} noreply\r\n"
sock.sendall(cmd.encode() + data + b"\r\n")
# 不需要等待响应,直接返回
# 使用
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(('127.0.0.1', 11211))
batch_set_noreply(sock, {
"cache:a": "value_a",
"cache:b": "value_b",
"cache:c": "value_c",
})
注意: 使用 noreply 时无法得知操作是否成功。在需要确认的场景下,不要使用 noreply。
2.8 响应格式详解
存储命令响应
| 响应 | 含义 |
|---|---|
STORED | 存储成功 |
NOT_STORED | 存储失败(add/replace 条件不满足,或数据长度为 0) |
EXISTS | CAS 操作失败:key 已被修改 |
NOT_FOUND | CAS 操作失败:key 不存在 |
检索命令响应
VALUE <key> <flags> <bytes> [cas <cas_unique>]\r\n
<data block>\r\n
VALUE <key2> <flags2> <bytes2> [cas <cas_unique2>]\r\n
<data block2>\r\n
END\r\n
- 每个存在的 key 返回一个
VALUE块 - 不存在的 key 被静默忽略
- 最后以
END结束
错误响应
| 响应类型 | 格式 | 说明 |
|---|---|---|
| 通用错误 | ERROR\r\n | 未知命令 |
| 客户端错误 | CLIENT_ERROR <message>\r\n | 请求格式错误 |
| 服务端错误 | SERVER_ERROR <message>\r\n | 服务端内部错误 |
常见错误消息:
| 错误消息 | 原因 |
|---|---|
CLIENT_ERROR bad command line format | 命令行格式错误 |
CLIENT_ERROR bad data chunk | 数据块长度与声明不一致 |
CLIENT_ERROR bad command line format: invalid exptime | TTL 不是有效数字 |
CLIENT_ERROR bad command line format: invalid flags | flags 不是有效数字 |
SERVER_ERROR out of memory | 内存不足 |
SERVER_ERROR temporary failure | 临时性错误 |
2.9 完整协议解析器实现
以下是一个简化但可用的文本协议解析器:
#!/usr/bin/env python3
"""memcached_text_parser.py — Memcached 文本协议解析器"""
from dataclasses import dataclass
from typing import List, Optional, Union
from enum import Enum
import socket
class ResponseType(Enum):
STORED = "STORED"
NOT_STORED = "NOT_STORED"
EXISTS = "EXISTS"
NOT_FOUND = "NOT_FOUND"
DELETED = "DELETED"
ERROR = "ERROR"
VALUE = "VALUE"
END = "END"
@dataclass
class ValueResponse:
key: str
flags: int
cas_unique: Optional[int]
data: bytes
@dataclass
class Response:
type: ResponseType
values: List[ValueResponse]
message: str = ""
class MemcachedTextParser:
"""Memcached 文本协议解析器"""
def __init__(self):
self.buffer = b""
def feed(self, data: bytes):
self.buffer += data
def parse_response(self, command_type: str = "get") -> Optional[Response]:
"""尝试从 buffer 中解析完整响应"""
lines = self.buffer.split(b"\r\n")
if command_type in ("get", "gets"):
return self._parse_value_response(lines)
elif command_type in ("set", "add", "replace", "append", "prepend", "cas"):
return self._parse_store_response(lines)
elif command_type == "delete":
return self._parse_simple_response(lines)
elif command_type in ("incr", "decr"):
return self._parse_counter_response(lines)
return None
def _parse_value_response(self, lines: list) -> Optional[Response]:
"""解析 VALUE 响应"""
if b"END" not in lines:
return None # 数据不完整
values = []
i = 0
while i < len(lines):
line = lines[i]
if line == b"END":
return Response(type=ResponseType.END, values=values)
if line.startswith(b"VALUE "):
parts = line.decode().split()
key = parts[1]
flags = int(parts[2])
length = int(parts[3])
cas = int(parts[5]) if len(parts) > 5 and parts[4] == "cas" else None
i += 1
if i < len(lines):
data = lines[i]
values.append(ValueResponse(
key=key, flags=flags,
cas_unique=cas, data=data
))
i += 1
return None # 数据不完整
def _parse_store_response(self, lines: list) -> Optional[Response]:
"""解析存储命令响应"""
if len(lines) < 2:
return None
first_line = lines[0]
if first_line == b"STORED":
return Response(type=ResponseType.STORED, values=[])
elif first_line == b"NOT_STORED":
return Response(type=ResponseType.NOT_STORED, values=[])
elif first_line == b"EXISTS":
return Response(type=ResponseType.EXISTS, values=[])
elif first_line == b"NOT_FOUND":
return Response(type=ResponseType.NOT_FOUND, values=[])
elif first_line.startswith(b"CLIENT_ERROR"):
return Response(type=ResponseType.ERROR,
values=[], message=first_line.decode())
elif first_line.startswith(b"SERVER_ERROR"):
return Response(type=ResponseType.ERROR,
values=[], message=first_line.decode())
elif first_line == b"ERROR":
return Response(type=ResponseType.ERROR, values=[])
return None
def _parse_simple_response(self, lines: list) -> Optional[Response]:
"""解析简单响应"""
if len(lines) < 2:
return None
line = lines[0]
if line == b"DELETED":
return Response(type=ResponseType.DELETED, values=[])
elif line == b"NOT_FOUND":
return Response(type=ResponseType.NOT_FOUND, values=[])
return None
def _parse_counter_response(self, lines: list) -> Optional[Response]:
"""解析计数器响应"""
if len(lines) < 2:
return None
line = lines[0]
if line == b"NOT_FOUND":
return Response(type=ResponseType.NOT_FOUND, values=[])
try:
value = int(line)
return Response(type=ResponseType.STORED,
values=[], message=str(value))
except ValueError:
return None
# 测试
parser = MemcachedTextParser()
parser.feed(b"VALUE user:1001 0 13\r\n")
parser.feed(b'{"name":"Bob"}\r\n')
parser.feed(b"END\r\n")
result = parser.parse_response("get")
if result and result.values:
print(f"Key: {result.values[0].key}")
print(f"Data: {result.values[0].data.decode()}")
2.10 业务场景
场景一:Session 会话管理
import json
import uuid
import socket
class SessionStore:
def __init__(self, host='127.0.0.1', port=11211):
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.connect((host, port))
def create_session(self, user_data: dict, ttl: int = 1800) -> str:
session_id = str(uuid.uuid4())
data = json.dumps(user_data).encode()
self.sock.sendall(
f"set session:{session_id} 0 {ttl} {len(data)}\r\n".encode()
+ data + b"\r\n"
)
self.sock.recv(1024)
return session_id
def get_session(self, session_id: str) -> dict:
self.sock.sendall(f"get session:{session_id}\r\n".encode())
resp = self.sock.recv(65536).decode()
lines = resp.strip().split("\r\n")
if lines[0].startswith("VALUE"):
return json.loads(lines[1])
return None
def destroy_session(self, session_id: str) -> bool:
self.sock.sendall(f"delete session:{session_id}\r\n".encode())
return b"DELETED" in self.sock.recv(1024)
# 使用
store = SessionStore()
sid = store.create_session({"user_id": 1001, "role": "admin"})
print(f"Session ID: {sid}")
print(f"Session Data: {store.get_session(sid)}")
场景二:商品库存扣减(原子操作)
def decrement_stock(sock, product_id: str, quantity: int) -> bool:
"""
使用 DECR 原子扣减库存。
注意:此方案要求库存值在缓存中初始化。
"""
key = f"stock:{product_id}"
sock.sendall(f"decr {key} {quantity}\r\n".encode())
resp = sock.recv(1024).decode().strip()
if resp == "NOT_FOUND":
return False # 库存键不存在
remaining = int(resp)
if remaining < 0:
# 库存不足,回滚
sock.sendall(f"incr {key} {quantity}\r\n".encode())
sock.recv(1024)
return False
return True # 扣减成功
2.11 注意事项
| 编号 | 注意事项 | 说明 |
|---|---|---|
| 1 | CRLF 行终止 | 必须使用 \r\n,不要用 \n |
| 2 | 数据长度精确匹配 | <length> 必须与数据块的字节数完全一致 |
| 3 | Key 不能包含空白 | Key 中不能有空格、\r、\n |
| 4 | noreply 不可靠 | 服务端仍可能因错误关闭连接 |
| 5 | 管道化有序性 | 响应顺序与请求顺序一致 |
| 6 | 编码问题 | 数据体是字节流,不要混用不同编码 |
| 7 | 最大 Value 大小 | 默认 1MB,可通过 -I 参数调整 |
2.12 扩展阅读
上一章: 第01章 Memcached 协议概述 下一章: 第03章 核心命令总览 — 系统梳理所有 Memcached 文本协议命令。