强曰为道
与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

Memcached 传输协议精讲 / 第07章 二进制协议命令

第07章 二进制协议命令

本章详细讲解每个 Memcached 操作在二进制协议中的具体帧格式。


7.1 GET / GETQ / GETK / GETKQ

四种获取命令的区别

命令 Opcode 说明
GET 0x00 标准获取
GETQ 0x09 静默获取(key 不存在时不返回响应)
GETK 0x0C 获取 + 返回 key
GETKQ 0x0D 静默获取 + 返回 key

GET 请求帧

Header (24 bytes):
  Magic:      0x80
  Opcode:     0x00
  Key Length: len(key)
  Extras:     0
  Body Length: len(key)

Body:
  Key:        <key bytes>

GET 响应帧

Header (24 bytes):
  Magic:      0x81
  Opcode:     0x00
  Key Length:  0 (GET) 或 len(key) (GETK)
  Extras:     4 (flags)
  Status:     0x0000 (成功) 或 0x0001 (未找到)
  Body Length: 4 + len(key) + len(value)
  CAS:        <cas_unique>

Body:
  Extras:     <flags: uint32>
  Key:        <key bytes> (GETK/GETKQ)
  Value:      <value bytes>

Python 实现

import struct
import socket

def binary_get(sock, key: str, getk: bool = False):
    """二进制 GET 命令"""
    key_bytes = key.encode()
    opcode = 0x0C if getk else 0x00

    header = struct.pack(
        "!BBHBBHIIQ",
        0x80,           # magic
        opcode,         # opcode
        len(key_bytes), # key length
        0,              # extras length
        0x00,           # data type
        0,              # reserved
        len(key_bytes), # body length
        1,              # opaque
        0               # cas
    )

    sock.sendall(header + key_bytes)

    # 读取响应头
    resp_header = _recv_exact(sock, 24)
    (_, opcode, key_len, extras_len, _, status, body_len, opaque, cas) = \
        struct.unpack("!BBHBBHIIQ", resp_header)

    # 读取响应体
    body = _recv_exact(sock, body_len)

    if status == 0x0001:  # Key Not Found
        return None

    flags = struct.unpack(">I", body[:extras_len])[0]
    value = body[extras_len + key_len:]

    return {
        'value': value,
        'flags': flags,
        'cas': cas,
        'opaque': opaque
    }

def _recv_exact(sock, n: int) -> bytes:
    data = b""
    while len(data) < n:
        chunk = sock.recv(n - len(data))
        if not chunk:
            raise ConnectionError("Connection closed")
        data += chunk
    return data

# GETQ — 静默获取
# 当 key 不存在时,不返回任何响应
# 适用于管道化请求:发送多个 GETQ,只有存在的 key 才返回响应

def binary_get_multi(sock, keys: list[str]) -> dict:
    """使用 GETKQ 批量获取"""
    for i, key in enumerate(keys):
        key_bytes = key.encode()
        header = struct.pack(
            "!BBHBBHIIQ",
            0x80, 0x0D, len(key_bytes), 0, 0x00, 0, len(key_bytes), i, 0
        )
        sock.sendall(header + key_bytes)

    # 发送 NOOP 作为结束标记
    noop_header = struct.pack(
        "!BBHBBHIIQ",
        0x80, 0x0A, 0, 0, 0x00, 0, 0, len(keys), 0
    )
    sock.sendall(noop_header)

    # 读取响应直到 NOOP 响应
    result = {}
    while True:
        resp_header = _recv_exact(sock, 24)
        (_, opcode, key_len, extras_len, _, status, body_len, opaque, cas) = \
            struct.unpack("!BBHBBHIIQ", resp_header)

        if opcode == 0x0A:  # NOOP
            break

        body = _recv_exact(sock, body_len)
        if status == 0x0000:
            flags = struct.unpack(">I", body[:extras_len])[0]
            key = body[extras_len:extras_len + key_len].decode()
            value = body[extras_len + key_len:]
            result[key] = {'value': value, 'flags': flags, 'cas': cas}

    return result

7.2 SET / ADD / REPLACE

Extras 字段

存储命令的 Extras 固定为 8 字节:

 0                   1                   2                   3
 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|                         Flags (uint32)                        |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|                       Expiration (uint32)                     |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+

SET 请求帧

Header:
  Magic:      0x80
  Opcode:     0x01
  Key Length: len(key)
  Extras:     8
  Body Length: 8 + len(key) + len(value)
  CAS:        0 (新写入) 或 cas_unique (CAS更新)

Body:
  Extras:     <flags: uint32> <exptime: uint32>
  Key:        <key bytes>
  Value:      <value bytes>

SET 响应帧

Header:
  Magic:      0x81
  Opcode:     0x01
  Status:     0x0000 (成功)
  CAS:        <new cas_unique>

Body:         (空)

完整实现

def binary_set(sock, key: str, value: bytes, flags: int = 0,
               exptime: int = 0, cas: int = 0,
               opcode: int = 0x01) -> dict:
    """
    二进制 SET/ADD/REPLACE 命令
    opcode: 0x01=SET, 0x02=ADD, 0x03=REPLACE
    """
    key_bytes = key.encode()
    extras = struct.pack(">II", flags, exptime)
    body_length = len(extras) + len(key_bytes) + len(value)

    header = struct.pack(
        "!BBHBBHIIQ",
        0x80,           # magic
        opcode,         # opcode
        len(key_bytes), # key length
        len(extras),    # extras length
        0x00,           # data type
        0,              # reserved
        body_length,    # body length
        0,              # opaque
        cas             # cas
    )

    sock.sendall(header + extras + key_bytes + value)

    # 读取响应
    resp_header = _recv_exact(sock, 24)
    (_, _, _, _, _, status, _, _, new_cas) = \
        struct.unpack("!BBHBBHIIQ", resp_header)

    return {
        'success': status == 0x0000,
        'status': status,
        'cas': new_cas
    }

操作码对比

操作 Opcode Key 不存在 Key 已存在
SET 0x01 创建 覆盖
ADD 0x02 创建 0x0005 Not Stored
REPLACE 0x03 0x0001 Not Found 覆盖

7.3 DELETE

DELETE 请求帧

Header:
  Magic:      0x80
  Opcode:     0x04
  Key Length: len(key)
  Extras:     0
  Body Length: len(key)

Body:
  Key:        <key bytes>

DELETE 响应帧

Header:
  Magic:      0x81
  Opcode:     0x04
  Status:     0x0000 (成功) 或 0x0001 (未找到)

Body:         (空)

实现

def binary_delete(sock, key: str) -> bool:
    key_bytes = key.encode()
    header = struct.pack(
        "!BBHBBHIIQ",
        0x80, 0x04, len(key_bytes), 0, 0x00, 0, len(key_bytes), 0, 0
    )
    sock.sendall(header + key_bytes)

    resp_header = _recv_exact(sock, 24)
    (_, _, _, _, _, status, _, _, _) = \
        struct.unpack("!BBHBBHIIQ", resp_header)

    return status == 0x0000

7.4 INCREMENT / DECREMENT

Extras 字段

 0                   1                   2                   3
 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|                    Delta (uint64, 8 bytes)                     |
|                                                               |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|                 Initial Value (uint64, 8 bytes)                |
|                                                               |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|                    Expiration (uint32)                         |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
  • Delta: 递增/递减的步长
  • Initial Value: key 不存在时的初始值
  • Expiration: 初始创建时的过期时间

响应

Body:
  <result: uint64>  — 递增/递减后的值

实现

def binary_incr_decr(sock, key: str, delta: int, initial: int = 0,
                     exptime: int = 0, incr: bool = True) -> int | None:
    key_bytes = key.encode()
    extras = struct.pack(">QQI", delta, initial, exptime)
    opcode = 0x05 if incr else 0x06

    header = struct.pack(
        "!BBHBBHIIQ",
        0x80, opcode, len(key_bytes), len(extras), 0x00, 0,
        len(extras) + len(key_bytes), 0, 0
    )

    sock.sendall(header + extras + key_bytes)

    resp_header = _recv_exact(sock, 24)
    (_, _, _, _, _, status, body_len, _, _) = \
        struct.unpack("!BBHBBHIIQ", resp_header)

    if status == 0x0001:  # Not Found
        return None

    body = _recv_exact(sock, body_len)
    result = struct.unpack(">Q", body)[0]
    return result

7.5 APPEND / PREPEND

APPEND 请求帧

Header:
  Magic:      0x80
  Opcode:     0x0E (Append) 或 0x0F (Prepend)
  Key Length: len(key)
  Extras:     0
  Body Length: len(key) + len(value)

Body:
  Key:        <key bytes>
  Value:      <value bytes>  — 要追加/前插的数据

实现

def binary_append(sock, key: str, value: bytes, prepend: bool = False) -> bool:
    key_bytes = key.encode()
    opcode = 0x0F if prepend else 0x0E

    header = struct.pack(
        "!BBHBBHIIQ",
        0x80, opcode, len(key_bytes), 0, 0x00, 0,
        len(key_bytes) + len(value), 0, 0
    )

    sock.sendall(header + key_bytes + value)

    resp_header = _recv_exact(sock, 24)
    (_, _, _, _, _, status, _, _, _) = \
        struct.unpack("!BBHBBHIIQ", resp_header)

    return status == 0x0000

7.6 VERSION

VERSION 请求帧

Header:
  Magic:      0x80
  Opcode:     0x0B
  Key Length: 0
  Extras:     0
  Body Length: 0

Body:         (空)

VERSION 响应帧

Header:
  Magic:      0x81
  Opcode:     0x0B
  Status:     0x0000
  Body Length: len(version_string)

Body:
  Value:      <version string bytes>

7.7 FLUSH

FLUSH 请求帧

Header:
  Magic:      0x80
  Opcode:     0x08
  Key Length: 0
  Extras:     0 或 4
  Body Length: 0 或 4

Body:
  Extras:     <delay: uint32>  (可选,延迟秒数)

实现

def binary_flush(sock, delay: int = 0) -> bool:
    if delay > 0:
        extras = struct.pack(">I", delay)
        header = struct.pack(
            "!BBHBBHIIQ",
            0x80, 0x08, 0, 4, 0x00, 0, 4, 0, 0
        )
        sock.sendall(header + extras)
    else:
        header = struct.pack(
            "!BBHBBHIIQ",
            0x80, 0x08, 0, 0, 0x00, 0, 0, 0, 0
        )
        sock.sendall(header)

    resp_header = _recv_exact(sock, 24)
    (_, _, _, _, _, status, _, _, _) = \
        struct.unpack("!BBHBBHIIQ", resp_header)

    return status == 0x0000

7.8 STAT

STAT 请求帧

Header:
  Magic:      0x80
  Opcode:     0x10
  Key Length: 0 或 len(key)
  Extras:     0
  Body Length: 0 或 len(key)

Body:
  Key:        <stat key>  (可选,指定特定统计项)

STAT 响应

服务端返回多个响应帧,每帧包含一个统计键值对:

# 每个统计项一个响应帧
Header:
  Magic:      0x81
  Opcode:     0x10
  Key Length: len(stat_name)
  Body Length: len(stat_name) + len(stat_value)
  Status:     0x0000

Body:
  Key:        <stat name>
  Value:      <stat value>

最后一个响应帧的 Key Length = 0,Body Length = 0,表示统计结束。

实现

def binary_stats(sock) -> dict[str, str]:
    header = struct.pack(
        "!BBHBBHIIQ",
        0x80, 0x10, 0, 0, 0x00, 0, 0, 0, 0
    )
    sock.sendall(header)

    result = {}
    while True:
        resp_header = _recv_exact(sock, 24)
        (_, _, key_len, _, _, status, body_len, _, _) = \
            struct.unpack("!BBHBBHIIQ", resp_header)

        if key_len == 0 and body_len == 0:
            break

        body = _recv_exact(sock, body_len)
        key = body[:key_len].decode()
        value = body[key_len:].decode()
        result[key] = value

    return result

7.9 NOOP

NOOP 帧

Header:
  Magic:      0x80
  Opcode:     0x0A
  Key Length: 0
  Extras:     0
  Body Length: 0

Body:         (空)

用途

  1. 管道化边界标记: 在静默命令(Q 系列)之后发送 NOOP,用 NOOP 响应标记所有静默响应的结束
  2. 心跳检测: 检测连接是否存活
def binary_noop(sock) -> bool:
    header = struct.pack(
        "!BBHBBHIIQ",
        0x80, 0x0A, 0, 0, 0x00, 0, 0, 0, 0
    )
    sock.sendall(header)

    resp_header = _recv_exact(sock, 24)
    (_, opcode, _, _, _, status, _, _, _) = \
        struct.unpack("!BBHBBHIIQ", resp_header)

    return opcode == 0x0A and status == 0x0000

7.10 Q 系列命令(静默命令)

概述

Q 系列命令(命令名带 Q 后缀)在操作成功时不返回响应,只在失败时返回。

命令 Opcode 成功时响应
SETQ 0x11
ADDQ 0x12
REPLACEQ 0x13
DELETEQ 0x14
INCREMENTQ 0x15
DECREMENTQ 0x16
GETQ 0x09 无(key 不存在时不返回)
GETKQ 0x0D 无(key 不存在时不返回)
APPENDQ 0x19
PREPENDQ 0x1A
FLUSHQ 0x18
QUITQ 0x17

管道化模式

def binary_multi_set(sock, items: list[tuple[str, bytes, int]]):
    """
    使用 SETQ 管道化批量写入
    items: [(key, value, exptime), ...]
    """
    for i, (key, value, exptime) in enumerate(items):
        key_bytes = key.encode()
        extras = struct.pack(">II", 0, exptime)
        body_len = len(extras) + len(key_bytes) + len(value)

        header = struct.pack(
            "!BBHBBHIIQ",
            0x80, 0x11, len(key_bytes), len(extras), 0x00, 0,
            body_len, i, 0
        )
        sock.sendall(header + extras + key_bytes + value)

    # 发送 NOOP 确认所有操作完成
    noop_header = struct.pack(
        "!BBHBBHIIQ",
        0x80, 0x0A, 0, 0, 0x00, 0, 0, len(items), 0
    )
    sock.sendall(noop_header)

    # 读取 NOOP 响应(所有 SETQ 成功则只有 NOOP 响应)
    while True:
        resp_header = _recv_exact(sock, 24)
        (_, opcode, _, _, _, _, _, opaque, _) = \
            struct.unpack("!BBHBBHIIQ", resp_header)

        if opcode == 0x0A:  # NOOP
            break

        # 如果收到非 NOOP 响应,说明某个 SETQ 失败
        body_len = struct.unpack("!I", resp_header[8:12])[0]
        if body_len > 0:
            _recv_exact(sock, body_len)

    return True

7.11 SASL 认证

SASL 机制列表请求

Header:
  Magic:      0x80
  Opcode:     0x20
  Body Length: 0

SASL 认证请求

Header:
  Magic:      0x80
  Opcode:     0x21
  Key Length: len(mechanism)
  Body Length: len(mechanism) + len(challenge)

Body:
  Key:        "PLAIN" (机制名称)
  Value:      <sasl challenge bytes>

PLAIN 认证示例

def binary_sasl_auth(sock, username: str, password: str) -> bool:
    """SASL PLAIN 认证"""
    # PLAIN 格式: \0username\0password
    challenge = b"\0" + username.encode() + b"\0" + password.encode()
    mechanism = b"PLAIN"

    header = struct.pack(
        "!BBHBBHIIQ",
        0x80, 0x21, len(mechanism), 0, 0x00, 0,
        len(mechanism) + len(challenge), 0, 0
    )
    sock.sendall(header + mechanism + challenge)

    resp_header = _recv_exact(sock, 24)
    (_, _, _, _, _, status, body_len, _, _) = \
        struct.unpack("!BBHBBHIIQ", resp_header)

    if body_len > 0:
        _recv_exact(sock, body_len)

    return status == 0x0000

7.12 完整二进制客户端

#!/usr/bin/env python3
"""full_binary_client.py — 完整的 Memcached 二进制协议客户端"""

import struct
import socket
from typing import Optional

class FullBinaryMemcachedClient:
    def __init__(self, host='127.0.0.1', port=11211):
        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.sock.connect((host, port))
        self._opaque = 0

    def _next_opaque(self) -> int:
        self._opaque += 1
        return self._opaque

    def _send_recv(self, opcode: int, key: bytes = b"",
                   value: bytes = b"", extras: bytes = b"",
                   cas: int = 0) -> dict:
        body_len = len(extras) + len(key) + len(value)
        header = struct.pack(
            "!BBHBBHIIQ",
            0x80, opcode, len(key), len(extras), 0x00, 0,
            body_len, self._next_opaque(), cas
        )
        self.sock.sendall(header + extras + key + value)

        resp = _recv_exact(self.sock, 24)
        (_, opcode, key_len, extras_len, _, status, body_len, opaque, cas) = \
            struct.unpack("!BBHBBHIIQ", resp)

        body = _recv_exact(self.sock, body_len) if body_len > 0 else b""
        return {
            'opcode': opcode, 'status': status,
            'key': body[extras_len:extras_len + key_len],
            'value': body[extras_len + key_len:],
            'extras': body[:extras_len], 'cas': cas, 'opaque': opaque
        }

    def version(self) -> str:
        r = self._send_recv(0x0B)
        return r['value'].decode()

    def set(self, key: str, value: bytes, flags: int = 0,
            exptime: int = 0, cas: int = 0) -> dict:
        extras = struct.pack(">II", flags, exptime)
        return self._send_recv(0x01, key.encode(), value, extras, cas)

    def add(self, key: str, value: bytes, flags: int = 0,
            exptime: int = 0) -> dict:
        extras = struct.pack(">II", flags, exptime)
        return self._send_recv(0x02, key.encode(), value, extras)

    def replace(self, key: str, value: bytes, flags: int = 0,
                exptime: int = 0) -> dict:
        extras = struct.pack(">II", flags, exptime)
        return self._send_recv(0x03, key.encode(), value, extras)

    def get(self, key: str) -> Optional[tuple[bytes, int, int]]:
        r = self._send_recv(0x00, key.encode())
        if r['status'] == 0x0001:
            return None
        flags = struct.unpack(">I", r['extras'])[0] if r['extras'] else 0
        return r['value'], flags, r['cas']

    def delete(self, key: str) -> bool:
        r = self._send_recv(0x04, key.encode())
        return r['status'] == 0x0000

    def incr(self, key: str, delta: int, initial: int = 0,
             exptime: int = 0) -> Optional[int]:
        extras = struct.pack(">QQI", delta, initial, exptime)
        r = self._send_recv(0x05, key.encode(), extras=extras)
        if r['status'] == 0x0001:
            return None
        return struct.unpack(">Q", r['value'])[0]

    def decr(self, key: str, delta: int, initial: int = 0,
             exptime: int = 0) -> Optional[int]:
        extras = struct.pack(">QQI", delta, initial, exptime)
        r = self._send_recv(0x06, key.encode(), extras=extras)
        if r['status'] == 0x0001:
            return None
        return struct.unpack(">Q", r['value'])[0]

    def flush(self, delay: int = 0) -> bool:
        if delay > 0:
            extras = struct.pack(">I", delay)
            r = self._send_recv(0x08, extras=extras)
        else:
            r = self._send_recv(0x08)
        return r['status'] == 0x0000

    def noop(self) -> bool:
        r = self._send_recv(0x0A)
        return r['status'] == 0x0000

    def close(self):
        self._send_recv(0x07)  # QUIT
        self.sock.close()


# 测试
client = FullBinaryMemcachedClient()
print(f"Version: {client.version()}")

client.set("bin:test", b"hello binary", flags=0, exptime=300)
result = client.get("bin:test")
if result:
    print(f"Value: {result[0].decode()}, Flags: {result[1]}, CAS: {result[2]}")

print(f"Incr: {client.incr('bin:counter', 1, initial=100)}")
print(f"Incr: {client.incr('bin:counter', 1)}")

client.delete("bin:test")
client.delete("bin:counter")
client.close()

7.13 注意事项

编号 注意事项 说明
1 Q 系列命令静默 成功时不返回响应,需要 NOOP 标记结束
2 INCR/DECR 初始值 二进制协议支持 key 不存在时设置初始值(文本协议不支持)
3 CAS 为 0 语义 二进制协议中 CAS=0 表示"不做 CAS 检查"
4 Extras 长度固定 每个命令的 Extras 长度是固定的
5 Body 长度验证 extras_len + key_len + value_len == body_len

7.14 扩展阅读


上一章: 第06章 二进制协议基础 下一章: 第08章 Meta 协议 — 探索 Memcached 1.6+ 的 Meta 协议高级特性。