强曰为道
与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

MySQL 传输协议精讲 / 07 - 二进制协议

第 07 章:二进制协议

7.1 二进制协议概述

二进制协议(Binary Protocol)用于预处理语句(Prepared Statement)的参数绑定和结果集传输。与文本协议不同,二进制协议使用原生二进制格式表示数据值,无需字符串转换。

文本协议 vs 二进制协议

特性文本协议二进制协议
使用命令COM_QUERYCOM_STMT_EXECUTE
值格式字符串原生二进制
NULL 标识0xFB 长度编码null_bitmap 位图
参数传递SQL 字符串拼接占位符 + 绑定值
SQL 注入需要转义天然免疫
类型转换服务端转文本客户端按类型解析
网络传输较大(字符串)较小(二进制)
CPU 开销较高较低

7.2 二进制参数类型

MySQL 二进制协议中的参数类型 ID 与 Column Definition 中的类型 ID 相同:

参数类型编码

类型 ID名称字节大小Python 对应
0x00DECIMAL变长Decimal
0x01TINYINT1int
0x02SMALLINT2int
0x03INT4int
0x04FLOAT4float
0x05DOUBLE8float
0x07TIMESTAMP变长datetime
0x08BIGINT8int
0x0ADATE变长date
0x0BTIME变长time
0x0CDATETIME变长datetime
0x0FVARCHAR变长str
0x10BIT变长bytes
0xF5JSON变长str/dict
0xF6NEWDECIMAL变长Decimal
0xFCBLOB变长bytes
0xFDVAR_STRING变长str
0xFESTRING变长str/bytes

7.3 参数绑定格式

COM_STMT_EXECUTE 中的参数编码

1. 命令头 (10 字节)
   - 0x17 (COM_STMT_EXECUTE)
   - statement_id (4 字节)
   - flags (1 字节)
   - iteration_count (4 字节, 固定为 1)

2. null_bitmap (变长)
   - 每个参数占 1 bit
   - 大小 = (num_params + 7) / 8
   - bit = 1 表示该参数为 NULL

3. new_params_bound_flag (1 字节)
   - 0x00: 不发送新参数类型(使用上次的)
   - 0x01: 发送新参数类型

4. 参数类型数组 (仅当 new_params_bound_flag = 0x01)
   - 每个参数 2 字节: [type_id (1字节)] [unsigned_flag (1字节)]

5. 参数值数组
   - 每个参数按其类型编码

null_bitmap 的计算

def build_null_bitmap(params: list, num_params: int) -> bytes:
    """构建 null bitmap"""
    bitmap_size = (num_params + 7) // 8
    bitmap = bytearray(bitmap_size)
    for i, param in enumerate(params):
        if param is None:
            bitmap[i // 8] |= (1 << (i % 8))
    return bytes(bitmap)

各类型参数值的编码

整数类型

import struct

def encode_tinyint(value: int) -> bytes:
    """TINYINT (1 字节)"""
    return struct.pack('<b', value)

def encode_smallint(value: int) -> bytes:
    """SMALLINT (2 字节)"""
    return struct.pack('<h', value)

def encode_int(value: int) -> bytes:
    """INT (4 字节)"""
    return struct.pack('<i', value)

def encode_bigint(value: int) -> bytes:
    """BIGINT (8 字节)"""
    return struct.pack('<q', value)

浮点类型

def encode_float(value: float) -> bytes:
    """FLOAT (4 字节)"""
    return struct.pack('<f', value)

def encode_double(value: float) -> bytes:
    """DOUBLE (8 字节)"""
    return struct.pack('<d', value)

字符串类型

def encode_string(value: str) -> bytes:
    """VARCHAR/TEXT: 长度编码 + 字节数据"""
    encoded = value.encode('utf-8')
    return encode_length(len(encoded)) + encoded

def encode_length(length: int) -> bytes:
    """长度编码"""
    if length < 0xFB:
        return struct.pack('B', length)
    elif length < 0x10000:
        return b'\xFC' + struct.pack('<H', length)
    elif length < 0x1000000:
        return b'\xFD' + struct.pack('<I', length)[:3]
    else:
        return b'\xFE' + struct.pack('<Q', length)

日期时间类型

def encode_date(year: int, month: int, day: int) -> bytes:
    """DATE: 长度 + 年(2) + 月(1) + 日(1)"""
    return struct.pack('BBBB', 4, 0, 0, 0) + struct.pack('<HBB', year, month, day)
    # 实际: length=4, year(2), month(1), day(1) = 总共 5 字节

def encode_datetime(year, month, day, hour=0, minute=0, second=0, microsecond=0) -> bytes:
    """DATETIME/TIMESTAMP"""
    if microsecond:
        length = 11  # 年月日时分秒微秒
        return (struct.pack('B', length) +
                struct.pack('<HBBBBI', year, month, day, hour, minute, second) +
                struct.pack('<I', microsecond))
    elif second:
        length = 7   # 年月日时分秒
        return (struct.pack('B', length) +
                struct.pack('<HBBBBB', year, month, day, hour, minute, second))
    else:
        length = 4   # 年月日
        return (struct.pack('B', length) +
                struct.pack('<HBB', year, month, day))

def encode_time(negative=False, hours=0, minutes=0, seconds=0, microseconds=0) -> bytes:
    """TIME"""
    if microseconds:
        length = 12
        return (struct.pack('B', length) +
                struct.pack('<BiBBBBBI', 1 if negative else 0, 0, 0, 0, hours, minutes, seconds, microseconds))
    elif seconds:
        length = 8
        return (struct.pack('B', length) +
                struct.pack('<BiBBBBB', 1 if negative else 0, 0, 0, 0, hours, minutes, seconds))
    else:
        length = 0
        return struct.pack('B', length)

7.4 完整的参数绑定示例

"""
mysql_binary_params.py
二进制协议参数绑定完整实现
"""
import struct


# 参数类型常量
MYSQL_TYPE_DECIMAL     = 0x00
MYSQL_TYPE_TINY        = 0x01
MYSQL_TYPE_SHORT       = 0x02
MYSQL_TYPE_LONG        = 0x03
MYSQL_TYPE_FLOAT       = 0x04
MYSQL_TYPE_DOUBLE      = 0x05
MYSQL_TYPE_NULL        = 0x06
MYSQL_TYPE_TIMESTAMP   = 0x07
MYSQL_TYPE_LONGLONG    = 0x08
MYSQL_TYPE_DATE        = 0x0A
MYSQL_TYPE_TIME        = 0x0B
MYSQL_TYPE_DATETIME    = 0x0C
MYSQL_TYPE_VARCHAR     = 0x0F
MYSQL_TYPE_BIT        = 0x10
MYSQL_TYPE_JSON        = 0xF5
MYSQL_TYPE_NEWDECIMAL  = 0xF6
MYSQL_TYPE_BLOB        = 0xFC
MYSQL_TYPE_VAR_STRING  = 0xFD
MYSQL_TYPE_STRING      = 0xFE


# Python 类型到 MySQL 类型的映射
PYTHON_TYPE_MAP = {
    type(None): (MYSQL_TYPE_NULL, 0),
    bool:       (MYSQL_TYPE_TINY, 0),
    int:        (MYSQL_TYPE_LONGLONG, 0),
    float:      (MYSQL_TYPE_DOUBLE, 0),
    str:        (MYSQL_TYPE_VAR_STRING, 0),
    bytes:      (MYSQL_TYPE_BLOB, 0),
    bytearray:  (MYSQL_TYPE_BLOB, 0),
}


def infer_param_type(value) -> tuple:
    """推断参数的 MySQL 类型"""
    if value is None:
        return (MYSQL_TYPE_NULL, 0)
    for py_type, mysql_type in PYTHON_TYPE_MAP.items():
        if isinstance(value, py_type):
            return mysql_type
    return (MYSQL_TYPE_VAR_STRING, 0)


def encode_param_value(value) -> bytes:
    """编码参数值为二进制格式"""
    if value is None:
        return b''

    if isinstance(value, bool):
        return struct.pack('<b', 1 if value else 0)

    if isinstance(value, int):
        if -128 <= value <= 127:
            return struct.pack('<b', value)
        elif -32768 <= value <= 32767:
            return struct.pack('<h', value)
        elif -2147483648 <= value <= 2147483647:
            return struct.pack('<i', value)
        else:
            return struct.pack('<q', value)

    if isinstance(value, float):
        return struct.pack('<d', value)

    if isinstance(value, str):
        encoded = value.encode('utf-8')
        return encode_length_coded(len(encoded)) + encoded

    if isinstance(value, (bytes, bytearray)):
        return encode_length_coded(len(value)) + bytes(value)

    raise TypeError(f"不支持的参数类型: {type(value)}")


def encode_length_coded(length: int) -> bytes:
    """长度编码"""
    if length < 0xFB:
        return struct.pack('B', length)
    elif length < 0x10000:
        return b'\xFC' + struct.pack('<H', length)
    elif length < 0x1000000:
        return b'\xFD' + struct.pack('<I', length)[:3]
    else:
        return b'\xFE' + struct.pack('<Q', length)


def build_execute_packet(statement_id: int, params: list, cursor_type: int = 0) -> bytes:
    """
    构建 COM_STMT_EXECUTE 数据包

    参数:
        statement_id: 预处理语句 ID
        params: 参数值列表
        cursor_type: 游标类型 (0=无游标, 1=READ_ONLY, 2=FOR_UPDATE, 4=SCROLLABLE)

    返回:
        完整的数据包(含包头)
    """
    num_params = len(params)

    # === 构建有效载荷 ===
    payload = bytearray()

    # 命令类型
    payload.append(0x17)  # COM_STMT_EXECUTE

    # statement_id
    payload.extend(struct.pack('<I', statement_id))

    # flags (cursor type)
    payload.append(cursor_type & 0xFF)

    # iteration_count (固定为 1)
    payload.extend(struct.pack('<I', 1))

    # null_bitmap
    bitmap_size = (num_params + 7) // 8
    null_bitmap = bytearray(bitmap_size)
    for i, param in enumerate(params):
        if param is None:
            null_bitmap[i // 8] |= (1 << (i % 8))
    payload.extend(null_bitmap)

    # new_params_bound_flag
    if num_params > 0:
        payload.append(0x01)  # 发送新参数类型

        # 参数类型数组
        for param in params:
            type_id, unsigned_flag = infer_param_type(param)
            payload.append(type_id)
            payload.append(unsigned_flag)

        # 参数值数组
        for param in params:
            if param is not None:
                payload.extend(encode_param_value(param))
    else:
        payload.append(0x00)  # 无参数

    # === 添加包头 ===
    packet = struct.pack('<I', len(payload))[:3] + struct.pack('B', 0) + bytes(payload)
    return packet


def demo():
    """演示二进制参数绑定"""
    print("=" * 60)
    print("MySQL 二进制协议参数绑定演示")
    print("=" * 60)

    # 模拟语句: INSERT INTO users (name, age, balance, active) VALUES (?, ?, ?, ?)
    statement_id = 1
    params = ["Alice", 28, 9999.50, True]

    packet = build_execute_packet(statement_id, params)

    print(f"\n语句 ID: {statement_id}")
    print(f"参数: {params}")
    print(f"数据包大小: {len(packet)} 字节")
    print(f"数据包头: {packet[:4].hex()}")
    print(f"命令: 0x{packet[4]:02X} (COM_STMT_EXECUTE)")
    print(f"完整 payload: {packet[4:].hex()}")
    print()

    # NULL 参数
    print("--- 含 NULL 参数 ---")
    params_with_null = ["Bob", None, 100.00, None]
    packet2 = build_execute_packet(statement_id, params_with_null)
    print(f"参数: {params_with_null}")
    print(f"数据包大小: {len(packet2)} 字节")

    # 解析 null_bitmap
    bitmap = packet2[14:14 + 1]  # 4 参数 → 1 字节 bitmap
    print(f"null_bitmap: 0x{bitmap[0]:02X}")
    for i in range(4):
        is_null = (bitmap[0] >> i) & 1
        print(f"  参数 {i} ({params_with_null[i]}): {'NULL' if is_null else '非 NULL'}")


if __name__ == '__main__':
    demo()

7.5 二进制结果集

当使用 COM_STMT_EXECUTE 执行预处理语句时,结果集使用二进制格式返回。

与文本结果集的区别

部分文本结果集二进制结果集
列定义相同格式相同格式
行数据长度编码字符串null_bitmap + 二进制值
NULL 表示0xFBnull_bitmap 中的 bit
整数值字符串 “123”4 字节小端序整数

二进制行数据格式

1. 前导字节 (1 字节)
   固定为 0x00

2. null_bitmap (变长)
   大小 = (column_count + 7 + 2) / 8
   前 2 bit 保留(始终为 0),后续每 bit 对应一列
   bit = 1 表示该列为 NULL

3. 列值数据
   每列按其类型以二进制格式编码
   NULL 列不占用空间(由 bitmap 标识)

Python 二进制结果集解析

"""
mysql_binary_resultset.py
解析二进制结果集
"""
import struct
from datetime import datetime, date, time, timedelta
from decimal import Decimal
from typing import List, Any


# Column type 定义 (同 Column Definition)
TYPE_DECIMAL    = 0x00
TYPE_TINY       = 0x01
TYPE_SHORT      = 0x02
TYPE_LONG       = 0x03
TYPE_FLOAT      = 0x04
TYPE_DOUBLE     = 0x05
TYPE_NULL       = 0x06
TYPE_TIMESTAMP  = 0x07
TYPE_LONGLONG   = 0x08
TYPE_INT24      = 0x09
TYPE_DATE       = 0x0A
TYPE_TIME       = 0x0B
TYPE_DATETIME   = 0x0C
TYPE_YEAR       = 0x0D
TYPE_VARCHAR    = 0x0F
TYPE_BIT        = 0x10
TYPE_JSON       = 0xF5
TYPE_NEWDECIMAL = 0xF6
TYPE_ENUM       = 0xF7
TYPE_SET        = 0xF8
TYPE_TINY_BLOB  = 0xF9
TYPE_MEDIUM_BLOB= 0xFA
TYPE_LONG_BLOB  = 0xFB
TYPE_BLOB       = 0xFC
TYPE_VAR_STRING = 0xFD
TYPE_STRING     = 0xFE
TYPE_GEOMETRY   = 0xFF


def read_length_encoded_bytes(data: bytes, offset: int) -> tuple:
    """读取长度编码的字节序列"""
    length, new_offset = read_length_encoded_int(data, offset)
    if length is None:
        return None, offset + 1
    return data[new_offset:new_offset + length], new_offset + length


def read_length_encoded_int(data: bytes, offset: int) -> tuple:
    first = data[offset]
    if first < 0xFB:
        return first, offset + 1
    elif first == 0xFC:
        return struct.unpack('<H', data[offset+1:offset+3])[0], offset + 3
    elif first == 0xFD:
        return struct.unpack('<I', data[offset+1:offset+4] + b'\x00')[0], offset + 4
    elif first == 0xFE:
        return struct.unpack('<Q', data[offset+1:offset+9])[0], offset + 9


def decode_binary_column(data: bytes, offset: int, column_type: int, flags: int = 0, decimals: int = 0) -> tuple:
    """
    解码二进制结果集中的一列

    返回: (值, 新偏移量)
    """
    if column_type == TYPE_TINY:
        if flags & 0x0020:  # UNSIGNED
            return data[offset], offset + 1
        return struct.unpack('<b', data[offset:offset+1])[0], offset + 1

    elif column_type == TYPE_SHORT:
        if flags & 0x0020:
            return struct.unpack('<H', data[offset:offset+2])[0], offset + 2
        return struct.unpack('<h', data[offset:offset+2])[0], offset + 2

    elif column_type in (TYPE_LONG, TYPE_INT24):
        if flags & 0x0020:
            return struct.unpack('<I', data[offset:offset+4])[0], offset + 4
        return struct.unpack('<i', data[offset:offset+4])[0], offset + 4

    elif column_type == TYPE_LONGLONG:
        if flags & 0x0020:
            return struct.unpack('<Q', data[offset:offset+8])[0], offset + 8
        return struct.unpack('<q', data[offset:offset+8])[0], offset + 8

    elif column_type == TYPE_FLOAT:
        return struct.unpack('<f', data[offset:offset+4])[0], offset + 4

    elif column_type == TYPE_DOUBLE:
        return struct.unpack('<d', data[offset:offset+8])[0], offset + 8

    elif column_type == TYPE_YEAR:
        return struct.unpack('<H', data[offset:offset+2])[0], offset + 2

    elif column_type in (TYPE_DATE, TYPE_DATETIME, TYPE_TIMESTAMP):
        return decode_binary_temporal(data, offset)

    elif column_type == TYPE_TIME:
        return decode_binary_time(data, offset)

    elif column_type == TYPE_NEWDECIMAL:
        return decode_binary_decimal(data, offset, decimals)

    elif column_type in (TYPE_VARCHAR, TYPE_STRING, TYPE_ENUM, TYPE_SET):
        raw, new_offset = read_length_encoded_bytes(data, offset)
        if raw is not None:
            return raw.decode('utf-8'), new_offset
        return None, new_offset

    elif column_type in (TYPE_BLOB, TYPE_TINY_BLOB, TYPE_MEDIUM_BLOB,
                          TYPE_LONG_BLOB, TYPE_JSON, TYPE_BIT, TYPE_GEOMETRY):
        raw, new_offset = read_length_encoded_bytes(data, offset)
        return raw, new_offset

    else:
        # 默认当作长度编码字节
        raw, new_offset = read_length_encoded_bytes(data, offset)
        return raw, new_offset


def decode_binary_temporal(data: bytes, offset: int) -> tuple:
    """解码 DATE/DATETIME/TIMESTAMP 二进制值"""
    length = data[offset]
    offset += 1

    if length == 0:
        return None, offset

    year = struct.unpack('<H', data[offset:offset+2])[0]
    month = data[offset + 2]
    day = data[offset + 3]
    offset += 4

    if length == 4:
        return date(year, month, day), offset

    hour = data[offset]
    minute = data[offset + 1]
    second = data[offset + 2]
    offset += 3

    if length == 7:
        return datetime(year, month, day, hour, minute, second), offset

    microsecond = struct.unpack('<I', data[offset:offset+4])[0]
    offset += 4
    return datetime(year, month, day, hour, minute, second, microsecond), offset


def decode_binary_time(data: bytes, offset: int) -> tuple:
    """解码 TIME 二进制值"""
    length = data[offset]
    offset += 1

    if length == 0:
        return timedelta(0), offset

    is_negative = data[offset]
    offset += 1

    days = struct.unpack('<I', data[offset:offset+4])[0]
    offset += 4
    hours = data[offset]
    minutes = data[offset + 1]
    seconds = data[offset + 2]
    offset += 3

    total_seconds = days * 86400 + hours * 3600 + minutes * 60 + seconds

    if length > 8:
        microseconds = struct.unpack('<I', data[offset:offset+4])[0]
        total_seconds += microseconds / 1000000
        offset += 4

    td = timedelta(seconds=total_seconds)
    if is_negative:
        td = -td

    return td, offset


def decode_binary_decimal(data: bytes, offset: int, decimals: int) -> tuple:
    """解码 NEWDECIMAL 二进制值"""
    raw, new_offset = read_length_encoded_bytes(data, offset)
    if raw is None:
        return None, new_offset
    # 简化处理:直接转字符串
    return Decimal(raw.decode('utf-8')), new_offset


def parse_binary_row(data: bytes, columns: list) -> List[Any]:
    """
    解析二进制结果集中的一行

    参数:
        data: 行数据 payload
        columns: 列定义列表 [(name, type_id, flags, decimals), ...]

    返回:
        值列表
    """
    offset = 0

    # 前导字节 (0x00)
    offset += 1

    # null_bitmap
    num_cols = len(columns)
    bitmap_size = (num_cols + 7 + 2) // 8  # +2 for reserved bits
    null_bitmap = data[offset:offset + bitmap_size]
    offset += bitmap_size

    # 解析每列
    row = []
    for i, (name, type_id, flags, decimals) in enumerate(columns):
        # 检查 null bit (注意: 前 2 bit 保留, 列从 bit 2 开始)
        bit_index = i + 2
        is_null = (null_bitmap[bit_index // 8] >> (bit_index % 8)) & 1

        if is_null:
            row.append(None)
        else:
            value, offset = decode_binary_column(data, offset, type_id, flags, decimals)
            row.append(value)

    return row


def demo():
    """演示二进制结果集解析"""
    print("=" * 60)
    print("MySQL 二进制结果集解析演示")
    print("=" * 60)

    # 模拟列定义
    columns = [
        ("id",      TYPE_LONGLONG,    0x0001 | 0x0002, 0),  # NOT NULL, PRI
        ("name",    TYPE_VAR_STRING,  0,               0),
        ("balance", TYPE_NEWDECIMAL,  0x0020,          2),  # UNSIGNED
        ("active",  TYPE_TINY,        0,               0),
    ]

    # 构造模拟的二进制行数据
    row_data = bytearray()

    # 前导字节
    row_data.append(0x00)

    # null_bitmap: 4 列 → (4+2+7)/8 = 1 字节, 所有非 NULL → 0x00
    row_data.append(0x00)

    # id (BIGINT, 8 字节)
    row_data.extend(struct.pack('<q', 42))

    # name (VAR_STRING, 长度编码)
    name = "Alice".encode('utf-8')
    row_data.append(len(name))
    row_data.extend(name)

    # balance (NEWDECIMAL)
    balance = "12345.67".encode('utf-8')
    row_data.append(len(balance))
    row_data.extend(balance)

    # active (TINYINT)
    row_data.append(1)

    # 解析
    row = parse_binary_row(bytes(row_data), columns)

    print("\n列定义:")
    for name, type_id, flags, dec in columns:
        print(f"  {name}: type=0x{type_id:02X}, flags=0x{flags:04X}")

    print("\n原始数据:")
    print(f"  {bytes(row_data).hex()}")

    print("\n解析结果:")
    for i, (col_def, value) in enumerate(zip(columns, row)):
        name = col_def[0]
        print(f"  {name} = {value!r} (type: {type(value).__name__})")


if __name__ == '__main__':
    demo()

7.6 二进制与文本结果集的类型对照

MySQL 类型文本协议格式二进制协议格式
TINYINT“127”1 字节: 0x7F
SMALLINT“1234”2 字节: 0xD204
INT“123456”4 字节: 0x40E20100
BIGINT“123456789”8 字节
FLOAT“3.14”4 字节 IEEE 754
DOUBLE“3.14159265”8 字节 IEEE 754
DECIMAL“1234.56”长度编码字符串
VARCHAR“hello”长度编码字符串
BLOBhex 或 raw长度编码字节
DATE“2024-01-15”4+1 字节 (length + year/month/day)
DATETIME“2024-01-15 10:30:00”8+1 字节
NULL0xFBnull_bitmap 中的 bit
BIThex 字符串长度编码字节

7.7 注意事项

重要提醒

  1. null_bitmap 的偏移:二进制结果集行中的 null_bitmap 有 2 bit 保留位,列对应的 bit 从第 2 bit 开始(即 bit_index = column_index + 2)。

  2. 有符号/无符号:二进制整数类型需要根据 UNSIGNED_FLAG 标志选择正确的解码方式(有符号 vs 无符号)。

  3. DECIMAL 的精度:二进制 DECIMAL 使用长度编码字符串传输,而非原生二进制浮点,以保证精度。

  4. BLOB/TEXT 大小:大 BLOB 值可能跨越多个数据包(使用 COM_STMT_SEND_LONG_DATA 预先发送)。

  5. 参数类型推断:驱动通常根据 Python/Java 值的类型自动推断 MySQL 参数类型,但显式指定类型更安全。

  6. BIT 类型:BIT 类型以大端序二进制字节传输,需要特殊处理。


7.8 业务场景

场景一:高性能批量插入

使用二进制协议批量插入比文本协议更高效:

# 文本协议 (慢): 每条 INSERT 都要序列化为 SQL 字符串
# INSERT INTO t VALUES (1, 'Alice', 100.00)
# INSERT INTO t VALUES (2, 'Bob', 200.00)
# ...

# 二进制协议 (快): 预处理一次, 绑定执行多次
# PREPARE: INSERT INTO t VALUES (?, ?, ?)
# EXECUTE: [1, 'Alice', 100.00]
# EXECUTE: [2, 'Bob', 200.00]
# ...

场景二:精确的类型映射

当应用需要精确的类型映射时(如金融系统中的 DECIMAL),二进制协议可以避免文本转换的精度损失。

场景三:SQL 注入防护

二进制协议天然免疫 SQL 注入,因为参数值不会被拼接到 SQL 字符串中。


7.9 扩展阅读


上一章06 - 文本协议 下一章08 - 预处理语句详解 —— 深入了解预处理语句的完整生命周期。