强曰为道
与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

MySQL 传输协议精讲 / 04 - 数据包格式

第 04 章:数据包格式

4.1 MySQL 数据包基础

MySQL 协议中所有通信都以**数据包(Packet)为单位。每个数据包由一个 4 字节的包头和一个可变长度的有效载荷(Payload)**组成。

基本结构

┌──────────────────── 4 字节包头 ────────────────────┐
│  payload_length (3 字节)  │  sequence_id (1 字节)   │
├─────────────────────────────────────────────────────┤
│                                                     │
│              payload (0 ~ 16MB - 1 字节)            │
│                                                     │
└─────────────────────────────────────────────────────┘
字段大小说明
payload_length3 字节有效载荷长度(小端序),不包括包头的 4 字节
sequence_id1 字节序列号,用于匹配请求和响应
payload0 ~ 2^24 - 1 字节有效载荷数据

4.2 包头详解

payload_length(3 字节,小端序)

有效载荷的长度,以**小端序(Little-Endian)**存储,最大值为 2^24 - 1 = 16,777,215 字节(约 16 MB)。

import struct

# 编码长度
length = 1024
encoded = struct.pack('<I', length)[:3]  # 3 字节小端序
print(encoded.hex())  # 000400

# 解码长度
payload_length = struct.unpack('<I', encoded + b'\x00')[0]
print(payload_length)  # 1024

sequence_id(1 字节)

序列号用于匹配请求和响应,以及检测数据包乱序

序列号规则

场景起始序列号说明
新命令请求0客户端发送命令时,序列号从 0 开始
服务器响应1服务器第一个响应包的序列号为 1
多包响应递增每个后续包序列号加 1
大请求分包递增客户端发送大请求时,分包序列号递增
示例:一个 SELECT 查询的序列号

Client → Server:  COM_QUERY "SELECT * FROM users"  (seq=0)
Server → Client:  Column Count                      (seq=1)
Server → Client:  Column Definition 1               (seq=2)
Server → Client:  Column Definition 2               (seq=3)
Server → Client:  EOF                                (seq=4)
Server → Client:  Row Data 1                         (seq=5)
Server → Client:  Row Data 2                         (seq=6)
Server → Client:  EOF                                (seq=7)

序列号回绕

序列号是 1 字节无符号整数,范围 0-255。当超过 255 时回绕到 0:

def next_sequence(current):
    return (current + 1) & 0xFF  # 0xFF = 255

注意:正常情况下,单次请求-响应的序列号不会超过 255(每个数据包最大 16 MB,需要 255 个包才能达到 ~4 GB)。但对于超大结果集,理论上序列号可能回绕。


4.3 最大包长与分包

单包最大大小

单个 MySQL 数据包的有效载荷最大为 2^24 - 1 = 16,777,215 字节(16 MB - 1 字节)。加上 4 字节包头,整个数据包最大为 16 MB + 3 字节。

max_allowed_packet

MySQL 通过 max_allowed_packet 参数控制单个数据包的最大大小:

-- 查看当前设置
SHOW VARIABLES LIKE 'max_allowed_packet';
-- 默认值: 67108864 (64 MB)

-- 设置新的值
SET GLOBAL max_allowed_packet = 134217728;  -- 128 MB

注意max_allowed_packet 限制的是有效载荷的总大小,而不是单个网络包的大小。当数据超过单包限制时,MySQL 会自动分包。

分包机制

当数据超过单个包的最大大小(16 MB - 1)时,发送方将数据分成多个包发送。分包规则:

  • 前 N-1 个包:有效载荷长度 恰好为 0xFFFFFF(16,777,215 字节)
  • 最后一个包:有效载荷长度 小于 0xFFFFFF

接收方判断分包结束的规则:当收到一个长度小于 0xFFFFFF 的包时,表示消息结束。

分包示例:发送 40 MB 数据

包 1: length=0xFFFFFF (16,777,215)  seq=0  → 继续
包 2: length=0xFFFFFF (16,777,215)  seq=1  → 继续
包 3: length=0x0A1B2C (663,340)     seq=2  → 结束 (length < 0xFFFFFF)

总数据 = 16,777,215 + 16,777,215 + 663,340 = 34,217,770 字节

Python 分包实现

"""
mysql_packet_splitter.py
MySQL 数据包的分包与合并实现
"""
import struct

MAX_PACKET_PAYLOAD = 0xFFFFFF  # 16 MB - 1


def create_packet(payload: bytes, sequence_id: int) -> bytes:
    """
    创建一个 MySQL 数据包(不分包)

    参数:
        payload: 有效载荷数据
        sequence_id: 序列号 (0-255)

    返回:
        完整的数据包字节序列
    """
    if len(payload) > MAX_PACKET_PAYLOAD:
        raise ValueError(f"Payload 太大 ({len(payload)} > {MAX_PACKET_PAYLOAD}),需要分包")

    # 包头: 3 字节长度 + 1 字节序列号
    header = struct.pack('<I', len(payload))[:3] + struct.pack('B', sequence_id & 0xFF)
    return header + payload


def create_packets_with_splitting(payload: bytes, start_seq: int = 0) -> list:
    """
    创建数据包,自动处理分包

    参数:
        payload: 有效载荷数据
        start_seq: 起始序列号

    返回:
        数据包列表
    """
    packets = []
    offset = 0
    seq = start_seq

    while offset < len(payload):
        # 计算本包的数据量
        chunk_size = min(MAX_PACKET_PAYLOAD, len(payload) - offset)
        chunk = payload[offset:offset + chunk_size]

        packets.append(create_packet(chunk, seq))

        offset += chunk_size
        seq = (seq + 1) & 0xFF

    # 如果 payload 长度恰好是 MAX_PACKET_PAYLOAD 的倍数,
    # 需要额外发送一个空包来标识结束
    if len(payload) > 0 and len(payload) % MAX_PACKET_PAYLOAD == 0:
        packets.append(create_packet(b'', seq))

    return packets


def parse_packets_from_stream(data: bytes, start_offset: int = 0) -> list:
    """
    从字节流中解析多个数据包,处理分包合并

    参数:
        data: 完整的字节流
        start_offset: 起始偏移量

    返回:
        (完整消息, 已消费字节数) 的列表
    """
    packets = []
    offset = start_offset

    while offset + 4 <= len(data):
        # 读取包头
        payload_length = struct.unpack('<I', data[offset:offset+3] + b'\x00')[0]
        sequence_id = data[offset + 3]

        # 检查是否有足够的数据
        if offset + 4 + payload_length > len(data):
            break  # 数据不完整

        payload = data[offset + 4:offset + 4 + payload_length]

        packets.append({
            'sequence_id': sequence_id,
            'payload_length': payload_length,
            'payload': payload,
        })

        offset += 4 + payload_length

        # 如果长度小于最大值,消息结束
        if payload_length < MAX_PACKET_PAYLOAD:
            break

    return packets, offset


def demo():
    """演示分包与合并"""
    print("=" * 60)
    print("MySQL 数据包分包演示")
    print("=" * 60)

    # 示例 1:小数据包(不需要分包)
    print("\n--- 示例 1:小数据包 ---")
    payload = b"SELECT * FROM users WHERE id = 1"
    packets = create_packets_with_splitting(payload, start_seq=0)
    for i, pkt in enumerate(packets):
        pkt_len = struct.unpack('<I', pkt[0:3] + b'\x00')[0]
        seq = pkt[3]
        print(f"  包 {i}: len={pkt_len}, seq={seq}, payload={pkt[4:30]}...")

    # 示例 2:大数据包(需要分包)
    print("\n--- 示例 2:大数据包(模拟)---")
    # 创建一个超过 16MB 的数据(这里用小数据模拟)
    big_payload = b'A' * (MAX_PACKET_PAYLOAD + 1000)
    packets = create_packets_with_splitting(big_payload, start_seq=0)
    for i, pkt in enumerate(packets):
        pkt_len = struct.unpack('<I', pkt[0:3] + b'\x00')[0]
        seq = pkt[3]
        print(f"  包 {i}: len={pkt_len}, seq={seq}")
    print(f"  总数据大小: {len(big_payload):,} 字节")
    print(f"  分包数量: {len(packets)}")

    # 示例 3:恰好 16MB 的数据(需要额外的空包)
    print("\n--- 示例 3:恰好 MAX_PACKET_PAYLOAD 的倍数 ---")
    exact_payload = b'B' * MAX_PACKET_PAYLOAD
    packets = create_packets_with_splitting(exact_payload, start_seq=0)
    for i, pkt in enumerate(packets):
        pkt_len = struct.unpack('<I', pkt[0:3] + b'\x00')[0]
        seq = pkt[3]
        print(f"  包 {i}: len={pkt_len}, seq={seq}")
    print(f"  注意: 额外的空包用于标识消息结束")


if __name__ == '__main__':
    demo()

4.4 长度编码整数(Length-Encoded Integer)

MySQL 协议中大量使用长度编码整数(Length-Encoded Integer,简称 LEI)来表示可变长度的数值。

编码规则

第一个字节含义字节数
0x00 - 0xFA直接表示 0 - 2501
0xFC后跟 2 字节小端序整数3
0xFD后跟 3 字节小端序整数4
0xFE后跟 8 字节小端序整数9
0xFF保留(标识 error 包头或 NULL)-

Python 实现

"""
length_encoded.py
MySQL 长度编码整数的编解码实现
"""
import struct


def read_length_encoded_integer(data: bytes, offset: int = 0) -> tuple:
    """
    读取长度编码整数

    参数:
        data: 字节序列
        offset: 起始偏移量

    返回:
        (值, 新偏移量) 元组
    """
    if offset >= len(data):
        raise ValueError("偏移量超出数据范围")

    first = data[offset]

    if first < 0xFB:
        # 直接值 (0-250)
        return first, offset + 1

    elif first == 0xFC:
        # 2 字节值
        value = struct.unpack('<H', data[offset+1:offset+3])[0]
        return value, offset + 3

    elif first == 0xFD:
        # 3 字节值
        value = struct.unpack('<I', data[offset+1:offset+4] + b'\x00')[0]
        return value, offset + 4

    elif first == 0xFE:
        # 8 字节值
        value = struct.unpack('<Q', data[offset+1:offset+9])[0]
        return value, offset + 9

    else:
        raise ValueError(f"无效的长度编码前缀: 0x{first:02X}")


def write_length_encoded_integer(value: int) -> bytes:
    """
    写入长度编码整数

    参数:
        value: 要编码的整数

    返回:
        编码后的字节序列
    """
    if value < 0xFB:
        return struct.pack('B', value)
    elif value < 0x10000:
        return b'\xFC' + struct.pack('<H', value)
    elif value < 0x1000000:
        return b'\xFD' + struct.pack('<I', value)[:3]
    elif value < 0x10000000000000000:
        return b'\xFE' + struct.pack('<Q', value)
    else:
        raise ValueError(f"值太大: {value}")


def read_length_encoded_string(data: bytes, offset: int = 0) -> tuple:
    """
    读取长度编码字符串

    格式: [长度编码的长度] + [字符串字节]

    返回:
        (字符串, 新偏移量) 元组
    """
    length, offset = read_length_encoded_integer(data, offset)
    string = data[offset:offset + length].decode('utf-8')
    return string, offset + length


def write_length_encoded_string(s: str) -> bytes:
    """
    写入长度编码字符串
    """
    encoded = s.encode('utf-8')
    return write_length_encoded_integer(len(encoded)) + encoded


def demo():
    """演示长度编码"""
    print("=" * 60)
    print("MySQL 长度编码整数演示")
    print("=" * 60)

    test_values = [0, 1, 42, 250, 251, 1000, 65535, 65536, 1000000, 16777215, 16777216]

    for value in test_values:
        encoded = write_length_encoded_integer(value)
        decoded, _ = read_length_encoded_integer(encoded)
        print(f"  {value:>10,}{encoded.hex():<24s} ({len(encoded)} 字节) → {decoded:>10,}")

    # 字符串编码
    print("\n字符串编码:")
    test_strings = ["", "hello", "你好世界", "A" * 1000]
    for s in test_strings:
        encoded = write_length_encoded_string(s)
        decoded, _ = read_length_encoded_string(encoded)
        print(f"  '{s[:20]}...' → {len(encoded)} 字节 → '{decoded[:20]}...'")


if __name__ == '__main__':
    demo()

4.5 长度编码字符串(Length-Encoded String)

除了长度编码整数,MySQL 协议中还广泛使用长度编码字符串

格式

[长度编码的字节数] + [原始字节数据]

长度编码字符串不以 null 结尾,依靠长度字段来确定边界。这使得它可以包含任意字节数据(包括 null 字节和多字节 UTF-8 字符)。

使用场景

  • 认证数据(auth_response)
  • 字段值(Result Set Row 中的文本数据)
  • 参数值(COM_STMT_SEND_LONG_DATA)
  • 连接属性

4.6 固定长度整数

MySQL 协议也使用固定长度的整数,有以下几种字节序:

类型大小字节序使用场景
int<1>1 字节-序列号、标志位
int<2>2 字节小端序能力标志低位、状态标志
int<3>3 字节小端序包长度
int<4>4 字节小端序连接 ID、能力标志完整
int<6>6 字节小端序影响行数(某些场景)
int<8>8 字节小端序长度编码的大整数

Python 工具函数

import struct

def read_uint1(data, offset=0):
    return data[offset], offset + 1

def read_uint2(data, offset=0):
    return struct.unpack('<H', data[offset:offset+2])[0], offset + 2

def read_uint3(data, offset=0):
    return struct.unpack('<I', data[offset:offset+3] + b'\x00')[0], offset + 3

def read_uint4(data, offset=0):
    return struct.unpack('<I', data[offset:offset+4])[0], offset + 4

def read_uint8(data, offset=0):
    return struct.unpack('<Q', data[offset:offset+8])[0], offset + 8

def read_int1(data, offset=0):
    return struct.unpack('<b', data[offset:offset+1])[0], offset + 1

def read_int2(data, offset=0):
    return struct.unpack('<h', data[offset:offset+2])[0], offset + 2

def read_int4(data, offset=0):
    return struct.unpack('<i', data[offset:offset+4])[0], offset + 4

def read_int8(data, offset=0):
    return struct.unpack('<q', data[offset:offset+8])[0], offset + 8

def read_float(data, offset=0):
    return struct.unpack('<f', data[offset:offset+4])[0], offset + 4

def read_double(data, offset=0):
    return struct.unpack('<d', data[offset:offset+8])[0], offset + 8

4.7 协议中的字节序

MySQL 协议统一使用小端序(Little-Endian),这与 x86 架构一致:

十进制 1024 的表示:
  小端序: 00 04  (低位字节在前)
  大端序: 04 00  (高位字节在前)

十进制 16,777,215 (0xFFFFFF) 的 3 字节表示:
  小端序: FF FF FF
  大端序: FF FF FF  (3 字节的 FF 正好相同)

注意:Java 等使用大端序的语言在实现 MySQL 协议时需要特别注意字节序转换。


4.8 完整数据包解析器

"""
mysql_packet_parser.py
完整的 MySQL 数据包解析器
"""
import struct
from enum import IntEnum
from dataclasses import dataclass
from typing import List, Optional


class PacketType(IntEnum):
    """数据包类型"""
    OK = 0x00
    EOF = 0xFE
    ERR = 0xFF
    UNKNOWN = -1


@dataclass
class MySQLPacket:
    """MySQL 数据包"""
    sequence_id: int
    payload: bytes
    packet_type: PacketType = PacketType.UNKNOWN

    def __post_init__(self):
        if len(self.payload) > 0:
            first_byte = self.payload[0]
            if first_byte == 0x00 and len(self.payload) >= 7:
                self.packet_type = PacketType.OK
            elif first_byte == 0xFF:
                self.packet_type = PacketType.ERR
            elif first_byte == 0xFE and len(self.payload) < 9:
                self.packet_type = PacketType.EOF


class PacketReader:
    """从字节流中读取 MySQL 数据包"""

    def __init__(self):
        self._buffer = b''
        self._offset = 0

    def feed(self, data: bytes):
        """向缓冲区添加数据"""
        self._buffer += data

    def read_packet(self) -> Optional[MySQLPacket]:
        """读取一个完整的数据包"""
        if self._offset + 4 > len(self._buffer):
            return None  # 数据不足

        # 读取包头
        payload_length = struct.unpack(
            '<I', self._buffer[self._offset:self._offset+3] + b'\x00'
        )[0]
        sequence_id = self._buffer[self._offset + 3]

        # 检查是否有足够的数据
        if self._offset + 4 + payload_length > len(self._buffer):
            return None  # 数据不完整

        # 提取有效载荷
        payload = self._buffer[self._offset + 4:self._offset + 4 + payload_length]
        self._offset += 4 + payload_length

        return MySQLPacket(sequence_id=sequence_id, payload=payload)

    def read_all_packets(self) -> List[MySQLPacket]:
        """读取所有可用的完整数据包"""
        packets = []
        while True:
            pkt = self.read_packet()
            if pkt is None:
                break
            packets.append(pkt)
        return packets

    def remaining(self) -> bytes:
        """返回未消费的数据"""
        remaining = self._buffer[self._offset:]
        self._buffer = remaining
        self._offset = 0
        return remaining


class PacketWriter:
    """构建 MySQL 数据包"""

    @staticmethod
    def write_packet(payload: bytes, sequence_id: int) -> bytes:
        """构建一个数据包"""
        header = struct.pack('<I', len(payload))[:3] + struct.pack('B', sequence_id & 0xFF)
        return header + payload

    @staticmethod
    def write_payload(*parts) -> bytes:
        """将多个部分组合为有效载荷"""
        return b''.join(parts)


def demo():
    """演示数据包解析器"""
    print("=" * 60)
    print("MySQL 数据包解析器演示")
    print("=" * 60)

    # 构造一些数据包
    writer = PacketWriter()

    # OK 包
    ok_payload = b'\x00\x00\x00\x02\x00\x00\x00'  # 简化的 OK 包
    ok_packet = writer.write_packet(ok_payload, 1)

    # ERR 包
    err_payload = b'\xFF\x15\x04#28000Access denied'
    err_packet = writer.write_packet(err_payload, 1)

    # 普通数据包
    data_payload = b'\x03SELECT 1'
    data_packet = writer.write_packet(data_payload, 0)

    # 拼接所有数据包
    raw_data = data_packet + ok_packet + err_packet

    # 解析
    reader = PacketReader()
    reader.feed(raw_data)
    packets = reader.read_all_packets()

    for pkt in packets:
        print(f"\n数据包:")
        print(f"  序列号: {pkt.sequence_id}")
        print(f"  载荷长度: {len(pkt.payload)}")
        print(f"  类型: {pkt.packet_type.name}")
        print(f"  原始数据: {pkt.payload[:20].hex()}")


if __name__ == '__main__':
    demo()

4.9 COMpressed 协议

MySQL 支持在数据包层面进行压缩传输,以减少网络带宽使用。

启用压缩

客户端在 HandshakeResponse 中设置 CLIENT_COMPRESS 能力标志来请求压缩。

压缩数据包格式

┌──────────────── 压缩包头 (7 字节) ────────────────┐
│  compressed_length (3 字节)                         │
│  compressed_sequence_id (1 字节)                    │
│  uncompressed_length (3 字节)                       │
├─────────────────────────────────────────────────────┤
│                                                     │
│           compressed_payload (变长)                  │
│                                                     │
└─────────────────────────────────────────────────────┘
字段大小说明
compressed_length3 字节压缩后的数据长度(不含压缩包头)
compressed_sequence_id1 字节压缩序列号(独立于普通序列号)
uncompressed_length3 字节压缩前的数据长度(0 表示未压缩)

压缩策略

  • 如果压缩后的数据大于等于原始数据的 80%,MySQL 选择不压缩uncompressed_length 设为 0
  • 多个小包可能被合并为一个压缩包
  • 一个大包可能被压缩为多个压缩分包

何时使用压缩

场景推荐
局域网、低延迟不推荐(CPU 开销大于带宽节省)
跨数据中心、高延迟推荐(减少传输数据量)
大量文本数据传输推荐(压缩率高)
大量二进制数据传输不推荐(压缩率低)
CPU 资源紧张不推荐
import zlib

def compress_data(data: bytes) -> bytes:
    """使用 zlib 压缩数据"""
    return zlib.compress(data, 6)  # 压缩级别 6

def decompress_data(data: bytes) -> bytes:
    """解压缩数据"""
    return zlib.decompress(data)

def create_compressed_packet(payload: bytes, seq: int) -> bytes:
    """创建压缩数据包"""
    compressed = compress_data(payload)

    # 如果压缩没有节省空间,发送原始数据
    if len(compressed) >= len(payload):
        header = struct.pack('<I', len(payload))[:3]  # compressed_length
        header += struct.pack('B', seq)               # compressed_sequence_id
        header += b'\x00\x00\x00'                     # uncompressed_length = 0 (未压缩)
        return header + payload

    header = struct.pack('<I', len(compressed))[:3]    # compressed_length
    header += struct.pack('B', seq)                     # compressed_sequence_id
    header += struct.pack('<I', len(payload))[:3]      # uncompressed_length
    return header + compressed

4.10 注意事项

重要提醒

  1. 包头读取:必须确保读取完整的 4 字节包头后再读取 payload。在非阻塞 I/O 场景下,可能出现只读到部分包头的情况。

  2. 序列号验证:客户端应该验证服务器响应的序列号是否符合预期(从 1 开始递增),以检测数据包乱序或丢失。

  3. 分包边界:当接收数据时,TCP 的消息边界可能与 MySQL 包边界不一致。必须基于 payload_length 来确定包的边界。

  4. 长度编码的 0xFB:值 0xFB 被保留用于标识 NULL 值(在 Result Set Row 中),不应作为整数的编码前缀。

  5. 超大包的内存max_allowed_packet 可以设到 1 GB,接收端需要分配足够大的缓冲区。

  6. 压缩的陷阱:压缩会引入额外的延迟和 CPU 开销。对于小数据包,压缩可能适得其反。


4.11 业务场景

场景一:代理层的包转发

数据库代理(如 ProxySQL)需要正确解析和转发 MySQL 数据包。关键挑战:

  • 完整接收一个 MySQL 消息(可能跨多个包)后才能转发
  • 需要修改包头中的某些字段(如序列号)
  • 需要检测消息类型(OK/ERR/Result Set)以决定路由策略

场景二:抓包分析工具

编写 MySQL 协议分析工具时,需要:

  • 从 TCP 流中正确分割 MySQL 包
  • 处理压缩数据包
  • 解析长度编码字段

场景三:调试大结果集传输

当应用报告 “Packet too large” 错误时,需要检查:

  • max_allowed_packet 配置
  • 客户端驱动的缓冲区大小
  • 中间代理的包大小限制
-- 调整包大小
SET GLOBAL max_allowed_packet = 256 * 1024 * 1024;  -- 256 MB

4.12 扩展阅读


上一章03 - 认证机制 下一章05 - 命令与请求 —— 深入了解客户端可以发送的所有命令类型。