MySQL 传输协议精讲 / 04 - 数据包格式
第 04 章:数据包格式
4.1 MySQL 数据包基础
MySQL 协议中所有通信都以**数据包(Packet)为单位。每个数据包由一个 4 字节的包头和一个可变长度的有效载荷(Payload)**组成。
基本结构
┌──────────────────── 4 字节包头 ────────────────────┐
│ payload_length (3 字节) │ sequence_id (1 字节) │
├─────────────────────────────────────────────────────┤
│ │
│ payload (0 ~ 16MB - 1 字节) │
│ │
└─────────────────────────────────────────────────────┘
| 字段 | 大小 | 说明 |
|---|---|---|
payload_length | 3 字节 | 有效载荷长度(小端序),不包括包头的 4 字节 |
sequence_id | 1 字节 | 序列号,用于匹配请求和响应 |
payload | 0 ~ 2^24 - 1 字节 | 有效载荷数据 |
4.2 包头详解
payload_length(3 字节,小端序)
有效载荷的长度,以**小端序(Little-Endian)**存储,最大值为 2^24 - 1 = 16,777,215 字节(约 16 MB)。
import struct
# 编码长度
length = 1024
encoded = struct.pack('<I', length)[:3] # 3 字节小端序
print(encoded.hex()) # 000400
# 解码长度
payload_length = struct.unpack('<I', encoded + b'\x00')[0]
print(payload_length) # 1024
sequence_id(1 字节)
序列号用于匹配请求和响应,以及检测数据包乱序。
序列号规则
| 场景 | 起始序列号 | 说明 |
|---|---|---|
| 新命令请求 | 0 | 客户端发送命令时,序列号从 0 开始 |
| 服务器响应 | 1 | 服务器第一个响应包的序列号为 1 |
| 多包响应 | 递增 | 每个后续包序列号加 1 |
| 大请求分包 | 递增 | 客户端发送大请求时,分包序列号递增 |
示例:一个 SELECT 查询的序列号
Client → Server: COM_QUERY "SELECT * FROM users" (seq=0)
Server → Client: Column Count (seq=1)
Server → Client: Column Definition 1 (seq=2)
Server → Client: Column Definition 2 (seq=3)
Server → Client: EOF (seq=4)
Server → Client: Row Data 1 (seq=5)
Server → Client: Row Data 2 (seq=6)
Server → Client: EOF (seq=7)
序列号回绕
序列号是 1 字节无符号整数,范围 0-255。当超过 255 时回绕到 0:
def next_sequence(current):
return (current + 1) & 0xFF # 0xFF = 255
注意:正常情况下,单次请求-响应的序列号不会超过 255(每个数据包最大 16 MB,需要 255 个包才能达到 ~4 GB)。但对于超大结果集,理论上序列号可能回绕。
4.3 最大包长与分包
单包最大大小
单个 MySQL 数据包的有效载荷最大为 2^24 - 1 = 16,777,215 字节(16 MB - 1 字节)。加上 4 字节包头,整个数据包最大为 16 MB + 3 字节。
max_allowed_packet
MySQL 通过 max_allowed_packet 参数控制单个数据包的最大大小:
-- 查看当前设置
SHOW VARIABLES LIKE 'max_allowed_packet';
-- 默认值: 67108864 (64 MB)
-- 设置新的值
SET GLOBAL max_allowed_packet = 134217728; -- 128 MB
注意:
max_allowed_packet限制的是有效载荷的总大小,而不是单个网络包的大小。当数据超过单包限制时,MySQL 会自动分包。
分包机制
当数据超过单个包的最大大小(16 MB - 1)时,发送方将数据分成多个包发送。分包规则:
- 前 N-1 个包:有效载荷长度 恰好为
0xFFFFFF(16,777,215 字节) - 最后一个包:有效载荷长度 小于
0xFFFFFF
接收方判断分包结束的规则:当收到一个长度小于 0xFFFFFF 的包时,表示消息结束。
分包示例:发送 40 MB 数据
包 1: length=0xFFFFFF (16,777,215) seq=0 → 继续
包 2: length=0xFFFFFF (16,777,215) seq=1 → 继续
包 3: length=0x0A1B2C (663,340) seq=2 → 结束 (length < 0xFFFFFF)
总数据 = 16,777,215 + 16,777,215 + 663,340 = 34,217,770 字节
Python 分包实现
"""
mysql_packet_splitter.py
MySQL 数据包的分包与合并实现
"""
import struct
MAX_PACKET_PAYLOAD = 0xFFFFFF # 16 MB - 1
def create_packet(payload: bytes, sequence_id: int) -> bytes:
"""
创建一个 MySQL 数据包(不分包)
参数:
payload: 有效载荷数据
sequence_id: 序列号 (0-255)
返回:
完整的数据包字节序列
"""
if len(payload) > MAX_PACKET_PAYLOAD:
raise ValueError(f"Payload 太大 ({len(payload)} > {MAX_PACKET_PAYLOAD}),需要分包")
# 包头: 3 字节长度 + 1 字节序列号
header = struct.pack('<I', len(payload))[:3] + struct.pack('B', sequence_id & 0xFF)
return header + payload
def create_packets_with_splitting(payload: bytes, start_seq: int = 0) -> list:
"""
创建数据包,自动处理分包
参数:
payload: 有效载荷数据
start_seq: 起始序列号
返回:
数据包列表
"""
packets = []
offset = 0
seq = start_seq
while offset < len(payload):
# 计算本包的数据量
chunk_size = min(MAX_PACKET_PAYLOAD, len(payload) - offset)
chunk = payload[offset:offset + chunk_size]
packets.append(create_packet(chunk, seq))
offset += chunk_size
seq = (seq + 1) & 0xFF
# 如果 payload 长度恰好是 MAX_PACKET_PAYLOAD 的倍数,
# 需要额外发送一个空包来标识结束
if len(payload) > 0 and len(payload) % MAX_PACKET_PAYLOAD == 0:
packets.append(create_packet(b'', seq))
return packets
def parse_packets_from_stream(data: bytes, start_offset: int = 0) -> list:
"""
从字节流中解析多个数据包,处理分包合并
参数:
data: 完整的字节流
start_offset: 起始偏移量
返回:
(完整消息, 已消费字节数) 的列表
"""
packets = []
offset = start_offset
while offset + 4 <= len(data):
# 读取包头
payload_length = struct.unpack('<I', data[offset:offset+3] + b'\x00')[0]
sequence_id = data[offset + 3]
# 检查是否有足够的数据
if offset + 4 + payload_length > len(data):
break # 数据不完整
payload = data[offset + 4:offset + 4 + payload_length]
packets.append({
'sequence_id': sequence_id,
'payload_length': payload_length,
'payload': payload,
})
offset += 4 + payload_length
# 如果长度小于最大值,消息结束
if payload_length < MAX_PACKET_PAYLOAD:
break
return packets, offset
def demo():
"""演示分包与合并"""
print("=" * 60)
print("MySQL 数据包分包演示")
print("=" * 60)
# 示例 1:小数据包(不需要分包)
print("\n--- 示例 1:小数据包 ---")
payload = b"SELECT * FROM users WHERE id = 1"
packets = create_packets_with_splitting(payload, start_seq=0)
for i, pkt in enumerate(packets):
pkt_len = struct.unpack('<I', pkt[0:3] + b'\x00')[0]
seq = pkt[3]
print(f" 包 {i}: len={pkt_len}, seq={seq}, payload={pkt[4:30]}...")
# 示例 2:大数据包(需要分包)
print("\n--- 示例 2:大数据包(模拟)---")
# 创建一个超过 16MB 的数据(这里用小数据模拟)
big_payload = b'A' * (MAX_PACKET_PAYLOAD + 1000)
packets = create_packets_with_splitting(big_payload, start_seq=0)
for i, pkt in enumerate(packets):
pkt_len = struct.unpack('<I', pkt[0:3] + b'\x00')[0]
seq = pkt[3]
print(f" 包 {i}: len={pkt_len}, seq={seq}")
print(f" 总数据大小: {len(big_payload):,} 字节")
print(f" 分包数量: {len(packets)}")
# 示例 3:恰好 16MB 的数据(需要额外的空包)
print("\n--- 示例 3:恰好 MAX_PACKET_PAYLOAD 的倍数 ---")
exact_payload = b'B' * MAX_PACKET_PAYLOAD
packets = create_packets_with_splitting(exact_payload, start_seq=0)
for i, pkt in enumerate(packets):
pkt_len = struct.unpack('<I', pkt[0:3] + b'\x00')[0]
seq = pkt[3]
print(f" 包 {i}: len={pkt_len}, seq={seq}")
print(f" 注意: 额外的空包用于标识消息结束")
if __name__ == '__main__':
demo()
4.4 长度编码整数(Length-Encoded Integer)
MySQL 协议中大量使用长度编码整数(Length-Encoded Integer,简称 LEI)来表示可变长度的数值。
编码规则
| 第一个字节 | 含义 | 字节数 |
|---|---|---|
| 0x00 - 0xFA | 直接表示 0 - 250 | 1 |
| 0xFC | 后跟 2 字节小端序整数 | 3 |
| 0xFD | 后跟 3 字节小端序整数 | 4 |
| 0xFE | 后跟 8 字节小端序整数 | 9 |
| 0xFF | 保留(标识 error 包头或 NULL) | - |
Python 实现
"""
length_encoded.py
MySQL 长度编码整数的编解码实现
"""
import struct
def read_length_encoded_integer(data: bytes, offset: int = 0) -> tuple:
"""
读取长度编码整数
参数:
data: 字节序列
offset: 起始偏移量
返回:
(值, 新偏移量) 元组
"""
if offset >= len(data):
raise ValueError("偏移量超出数据范围")
first = data[offset]
if first < 0xFB:
# 直接值 (0-250)
return first, offset + 1
elif first == 0xFC:
# 2 字节值
value = struct.unpack('<H', data[offset+1:offset+3])[0]
return value, offset + 3
elif first == 0xFD:
# 3 字节值
value = struct.unpack('<I', data[offset+1:offset+4] + b'\x00')[0]
return value, offset + 4
elif first == 0xFE:
# 8 字节值
value = struct.unpack('<Q', data[offset+1:offset+9])[0]
return value, offset + 9
else:
raise ValueError(f"无效的长度编码前缀: 0x{first:02X}")
def write_length_encoded_integer(value: int) -> bytes:
"""
写入长度编码整数
参数:
value: 要编码的整数
返回:
编码后的字节序列
"""
if value < 0xFB:
return struct.pack('B', value)
elif value < 0x10000:
return b'\xFC' + struct.pack('<H', value)
elif value < 0x1000000:
return b'\xFD' + struct.pack('<I', value)[:3]
elif value < 0x10000000000000000:
return b'\xFE' + struct.pack('<Q', value)
else:
raise ValueError(f"值太大: {value}")
def read_length_encoded_string(data: bytes, offset: int = 0) -> tuple:
"""
读取长度编码字符串
格式: [长度编码的长度] + [字符串字节]
返回:
(字符串, 新偏移量) 元组
"""
length, offset = read_length_encoded_integer(data, offset)
string = data[offset:offset + length].decode('utf-8')
return string, offset + length
def write_length_encoded_string(s: str) -> bytes:
"""
写入长度编码字符串
"""
encoded = s.encode('utf-8')
return write_length_encoded_integer(len(encoded)) + encoded
def demo():
"""演示长度编码"""
print("=" * 60)
print("MySQL 长度编码整数演示")
print("=" * 60)
test_values = [0, 1, 42, 250, 251, 1000, 65535, 65536, 1000000, 16777215, 16777216]
for value in test_values:
encoded = write_length_encoded_integer(value)
decoded, _ = read_length_encoded_integer(encoded)
print(f" {value:>10,} → {encoded.hex():<24s} ({len(encoded)} 字节) → {decoded:>10,}")
# 字符串编码
print("\n字符串编码:")
test_strings = ["", "hello", "你好世界", "A" * 1000]
for s in test_strings:
encoded = write_length_encoded_string(s)
decoded, _ = read_length_encoded_string(encoded)
print(f" '{s[:20]}...' → {len(encoded)} 字节 → '{decoded[:20]}...'")
if __name__ == '__main__':
demo()
4.5 长度编码字符串(Length-Encoded String)
除了长度编码整数,MySQL 协议中还广泛使用长度编码字符串:
格式
[长度编码的字节数] + [原始字节数据]
长度编码字符串不以 null 结尾,依靠长度字段来确定边界。这使得它可以包含任意字节数据(包括 null 字节和多字节 UTF-8 字符)。
使用场景
- 认证数据(auth_response)
- 字段值(Result Set Row 中的文本数据)
- 参数值(COM_STMT_SEND_LONG_DATA)
- 连接属性
4.6 固定长度整数
MySQL 协议也使用固定长度的整数,有以下几种字节序:
| 类型 | 大小 | 字节序 | 使用场景 |
|---|---|---|---|
int<1> | 1 字节 | - | 序列号、标志位 |
int<2> | 2 字节 | 小端序 | 能力标志低位、状态标志 |
int<3> | 3 字节 | 小端序 | 包长度 |
int<4> | 4 字节 | 小端序 | 连接 ID、能力标志完整 |
int<6> | 6 字节 | 小端序 | 影响行数(某些场景) |
int<8> | 8 字节 | 小端序 | 长度编码的大整数 |
Python 工具函数
import struct
def read_uint1(data, offset=0):
return data[offset], offset + 1
def read_uint2(data, offset=0):
return struct.unpack('<H', data[offset:offset+2])[0], offset + 2
def read_uint3(data, offset=0):
return struct.unpack('<I', data[offset:offset+3] + b'\x00')[0], offset + 3
def read_uint4(data, offset=0):
return struct.unpack('<I', data[offset:offset+4])[0], offset + 4
def read_uint8(data, offset=0):
return struct.unpack('<Q', data[offset:offset+8])[0], offset + 8
def read_int1(data, offset=0):
return struct.unpack('<b', data[offset:offset+1])[0], offset + 1
def read_int2(data, offset=0):
return struct.unpack('<h', data[offset:offset+2])[0], offset + 2
def read_int4(data, offset=0):
return struct.unpack('<i', data[offset:offset+4])[0], offset + 4
def read_int8(data, offset=0):
return struct.unpack('<q', data[offset:offset+8])[0], offset + 8
def read_float(data, offset=0):
return struct.unpack('<f', data[offset:offset+4])[0], offset + 4
def read_double(data, offset=0):
return struct.unpack('<d', data[offset:offset+8])[0], offset + 8
4.7 协议中的字节序
MySQL 协议统一使用小端序(Little-Endian),这与 x86 架构一致:
十进制 1024 的表示:
小端序: 00 04 (低位字节在前)
大端序: 04 00 (高位字节在前)
十进制 16,777,215 (0xFFFFFF) 的 3 字节表示:
小端序: FF FF FF
大端序: FF FF FF (3 字节的 FF 正好相同)
注意:Java 等使用大端序的语言在实现 MySQL 协议时需要特别注意字节序转换。
4.8 完整数据包解析器
"""
mysql_packet_parser.py
完整的 MySQL 数据包解析器
"""
import struct
from enum import IntEnum
from dataclasses import dataclass
from typing import List, Optional
class PacketType(IntEnum):
"""数据包类型"""
OK = 0x00
EOF = 0xFE
ERR = 0xFF
UNKNOWN = -1
@dataclass
class MySQLPacket:
"""MySQL 数据包"""
sequence_id: int
payload: bytes
packet_type: PacketType = PacketType.UNKNOWN
def __post_init__(self):
if len(self.payload) > 0:
first_byte = self.payload[0]
if first_byte == 0x00 and len(self.payload) >= 7:
self.packet_type = PacketType.OK
elif first_byte == 0xFF:
self.packet_type = PacketType.ERR
elif first_byte == 0xFE and len(self.payload) < 9:
self.packet_type = PacketType.EOF
class PacketReader:
"""从字节流中读取 MySQL 数据包"""
def __init__(self):
self._buffer = b''
self._offset = 0
def feed(self, data: bytes):
"""向缓冲区添加数据"""
self._buffer += data
def read_packet(self) -> Optional[MySQLPacket]:
"""读取一个完整的数据包"""
if self._offset + 4 > len(self._buffer):
return None # 数据不足
# 读取包头
payload_length = struct.unpack(
'<I', self._buffer[self._offset:self._offset+3] + b'\x00'
)[0]
sequence_id = self._buffer[self._offset + 3]
# 检查是否有足够的数据
if self._offset + 4 + payload_length > len(self._buffer):
return None # 数据不完整
# 提取有效载荷
payload = self._buffer[self._offset + 4:self._offset + 4 + payload_length]
self._offset += 4 + payload_length
return MySQLPacket(sequence_id=sequence_id, payload=payload)
def read_all_packets(self) -> List[MySQLPacket]:
"""读取所有可用的完整数据包"""
packets = []
while True:
pkt = self.read_packet()
if pkt is None:
break
packets.append(pkt)
return packets
def remaining(self) -> bytes:
"""返回未消费的数据"""
remaining = self._buffer[self._offset:]
self._buffer = remaining
self._offset = 0
return remaining
class PacketWriter:
"""构建 MySQL 数据包"""
@staticmethod
def write_packet(payload: bytes, sequence_id: int) -> bytes:
"""构建一个数据包"""
header = struct.pack('<I', len(payload))[:3] + struct.pack('B', sequence_id & 0xFF)
return header + payload
@staticmethod
def write_payload(*parts) -> bytes:
"""将多个部分组合为有效载荷"""
return b''.join(parts)
def demo():
"""演示数据包解析器"""
print("=" * 60)
print("MySQL 数据包解析器演示")
print("=" * 60)
# 构造一些数据包
writer = PacketWriter()
# OK 包
ok_payload = b'\x00\x00\x00\x02\x00\x00\x00' # 简化的 OK 包
ok_packet = writer.write_packet(ok_payload, 1)
# ERR 包
err_payload = b'\xFF\x15\x04#28000Access denied'
err_packet = writer.write_packet(err_payload, 1)
# 普通数据包
data_payload = b'\x03SELECT 1'
data_packet = writer.write_packet(data_payload, 0)
# 拼接所有数据包
raw_data = data_packet + ok_packet + err_packet
# 解析
reader = PacketReader()
reader.feed(raw_data)
packets = reader.read_all_packets()
for pkt in packets:
print(f"\n数据包:")
print(f" 序列号: {pkt.sequence_id}")
print(f" 载荷长度: {len(pkt.payload)}")
print(f" 类型: {pkt.packet_type.name}")
print(f" 原始数据: {pkt.payload[:20].hex()}")
if __name__ == '__main__':
demo()
4.9 COMpressed 协议
MySQL 支持在数据包层面进行压缩传输,以减少网络带宽使用。
启用压缩
客户端在 HandshakeResponse 中设置 CLIENT_COMPRESS 能力标志来请求压缩。
压缩数据包格式
┌──────────────── 压缩包头 (7 字节) ────────────────┐
│ compressed_length (3 字节) │
│ compressed_sequence_id (1 字节) │
│ uncompressed_length (3 字节) │
├─────────────────────────────────────────────────────┤
│ │
│ compressed_payload (变长) │
│ │
└─────────────────────────────────────────────────────┘
| 字段 | 大小 | 说明 |
|---|---|---|
compressed_length | 3 字节 | 压缩后的数据长度(不含压缩包头) |
compressed_sequence_id | 1 字节 | 压缩序列号(独立于普通序列号) |
uncompressed_length | 3 字节 | 压缩前的数据长度(0 表示未压缩) |
压缩策略
- 如果压缩后的数据大于等于原始数据的 80%,MySQL 选择不压缩,
uncompressed_length设为 0 - 多个小包可能被合并为一个压缩包
- 一个大包可能被压缩为多个压缩分包
何时使用压缩
| 场景 | 推荐 |
|---|---|
| 局域网、低延迟 | 不推荐(CPU 开销大于带宽节省) |
| 跨数据中心、高延迟 | 推荐(减少传输数据量) |
| 大量文本数据传输 | 推荐(压缩率高) |
| 大量二进制数据传输 | 不推荐(压缩率低) |
| CPU 资源紧张 | 不推荐 |
import zlib
def compress_data(data: bytes) -> bytes:
"""使用 zlib 压缩数据"""
return zlib.compress(data, 6) # 压缩级别 6
def decompress_data(data: bytes) -> bytes:
"""解压缩数据"""
return zlib.decompress(data)
def create_compressed_packet(payload: bytes, seq: int) -> bytes:
"""创建压缩数据包"""
compressed = compress_data(payload)
# 如果压缩没有节省空间,发送原始数据
if len(compressed) >= len(payload):
header = struct.pack('<I', len(payload))[:3] # compressed_length
header += struct.pack('B', seq) # compressed_sequence_id
header += b'\x00\x00\x00' # uncompressed_length = 0 (未压缩)
return header + payload
header = struct.pack('<I', len(compressed))[:3] # compressed_length
header += struct.pack('B', seq) # compressed_sequence_id
header += struct.pack('<I', len(payload))[:3] # uncompressed_length
return header + compressed
4.10 注意事项
重要提醒
包头读取:必须确保读取完整的 4 字节包头后再读取 payload。在非阻塞 I/O 场景下,可能出现只读到部分包头的情况。
序列号验证:客户端应该验证服务器响应的序列号是否符合预期(从 1 开始递增),以检测数据包乱序或丢失。
分包边界:当接收数据时,TCP 的消息边界可能与 MySQL 包边界不一致。必须基于 payload_length 来确定包的边界。
长度编码的 0xFB:值 0xFB 被保留用于标识 NULL 值(在 Result Set Row 中),不应作为整数的编码前缀。
超大包的内存:
max_allowed_packet可以设到 1 GB,接收端需要分配足够大的缓冲区。压缩的陷阱:压缩会引入额外的延迟和 CPU 开销。对于小数据包,压缩可能适得其反。
4.11 业务场景
场景一:代理层的包转发
数据库代理(如 ProxySQL)需要正确解析和转发 MySQL 数据包。关键挑战:
- 完整接收一个 MySQL 消息(可能跨多个包)后才能转发
- 需要修改包头中的某些字段(如序列号)
- 需要检测消息类型(OK/ERR/Result Set)以决定路由策略
场景二:抓包分析工具
编写 MySQL 协议分析工具时,需要:
- 从 TCP 流中正确分割 MySQL 包
- 处理压缩数据包
- 解析长度编码字段
场景三:调试大结果集传输
当应用报告 “Packet too large” 错误时,需要检查:
max_allowed_packet配置- 客户端驱动的缓冲区大小
- 中间代理的包大小限制
-- 调整包大小
SET GLOBAL max_allowed_packet = 256 * 1024 * 1024; -- 256 MB
4.12 扩展阅读
上一章:03 - 认证机制 下一章:05 - 命令与请求 —— 深入了解客户端可以发送的所有命令类型。