强曰为道
与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

MySQL 传输协议精讲 / 08 - 预处理语句详解

第 08 章:预处理语句详解

8.1 预处理语句概述

**预处理语句(Prepared Statement)**是 MySQL 4.1 引入的特性,允许将 SQL 语句的结构与参数分离。它使用二进制协议进行参数绑定和结果传输。

与直接查询的对比

特性直接查询 (COM_QUERY)预处理语句
SQL 传参字符串拼接参数占位符 ?
解析次数每次都解析只解析一次
执行计划缓存不缓存服务端缓存
SQL 注入风险需要转义天然免疫
数据类型文本传输二进制传输
网络开销较大较小(重复执行时)
服务端资源不需要额外资源需要维护语句句柄

8.2 预处理语句生命周期

┌─────────────┐
│  PREPARE    │  COM_STMT_PREPARE → 服务器解析SQL、分配句柄
└──────┬──────┘
       │
       ▼
┌─────────────┐
│  BIND+EXEC  │  COM_STMT_EXECUTE → 绑定参数、执行查询
└──────┬──────┘
       │ 可重复执行多次
       ▼
┌─────────────┐
│  FETCH      │  COM_STMT_FETCH → 游标模式下获取更多行
└──────┬──────┘
       │
       ▼
┌─────────────┐
│  CLOSE      │  COM_STMT_CLOSE → 释放服务器资源
└─────────────┘

完整生命周期

Client                                    Server
  │                                          │
  │ ──── COM_STMT_PREPARE (seq=0) ────────→ │  ① 预处理
  │      "SELECT * FROM users WHERE id = ?"  │
  │                                          │
  │ ←─── Prepare OK (seq=1) ────────────── │
  │      statement_id = 1                    │
  │      num_columns = 5                     │
  │      num_params = 1                      │
  │                                          │
  │ ←─── Param Definition (seq=2) ──────── │  参数元数据
  │ ←─── EOF (seq=3) ────────────────────── │
  │ ←─── Column Definition × 5 (seq=4-8) ─ │  列元数据
  │ ←─── EOF (seq=9) ────────────────────── │
  │                                          │
  │ ──── COM_STMT_EXECUTE (seq=0) ────────→ │  ② 执行
  │      statement_id = 1                    │
  │      params = [42]                       │
  │                                          │
  │ ←─── Binary Result Set ─────────────── │
  │                                          │
  │ ──── COM_STMT_EXECUTE (seq=0) ────────→ │  ③ 再次执行
  │      params = [100]                      │
  │                                          │
  │ ←─── Binary Result Set ─────────────── │
  │                                          │
  │ ──── COM_STMT_CLOSE (seq=0) ──────────→ │  ④ 关闭
  │                                          │

8.3 COM_STMT_PREPARE 详解

命令格式

字节偏移   大小      字段
──────────────────────────────
0          1 字节    0x16 (COM_STMT_PREPARE)
1          变长      SQL 语句 (含 ? 占位符)

响应格式

Prepare OK 包:

字节偏移   大小      字段                    说明
──────────────────────────────────────────────────────
0          1 字节    0x00                    OK 标识
1          4 字节    statement_id            语句句柄 ID
5          2 字节    num_columns             列数
7          2 字节    num_params              参数数
9          1 字节    0x00                    填充
10         2 字节    warnings                警告数

后续包:

如果 num_params > 0:
  Param Definition × num_params
  EOF

如果 num_columns > 0:
  Column Definition × num_columns
  EOF

Python 实现

"""
mysql_prepared_statement.py
预处理语句的完整实现
"""
import socket
import struct


def encode_packet(payload: bytes, seq: int) -> bytes:
    return struct.pack('<I', len(payload))[:3] + struct.pack('B', seq) + payload


def read_packet(sock: socket.socket) -> tuple:
    """从 socket 读取一个数据包"""
    header = b''
    while len(header) < 4:
        header += sock.recv(4 - len(header))

    pkt_len = struct.unpack('<I', header[0:3] + b'\x00')[0]
    seq = header[3]

    payload = b''
    while len(payload) < pkt_len:
        payload += sock.recv(pkt_len - len(payload))

    return payload, seq


class PreparedStatement:
    """预处理语句"""
    def __init__(self, statement_id, num_columns, num_params, warnings):
        self.statement_id = statement_id
        self.num_columns = num_columns
        self.num_params = num_params
        self.warnings = warnings
        self.param_definitions = []
        self.column_definitions = []


def send_prepare(sock: socket.socket, sql: str) -> PreparedStatement:
    """发送 COM_STMT_PREPARE 命令"""
    payload = b'\x16' + sql.encode('utf-8')
    sock.send(encode_packet(payload, 0))

    # 读取 Prepare OK
    response, seq = read_packet(sock)

    if response[0] == 0xFF:
        # ERR
        error_code = struct.unpack('<H', response[1:3])[0]
        message = response[9:].decode('utf-8', errors='replace')
        raise Exception(f"Prepare 失败: [{error_code}] {message}")

    statement_id = struct.unpack('<I', response[1:5])[0]
    num_columns = struct.unpack('<H', response[5:7])[0]
    num_params = struct.unpack('<H', response[7:9])[0]
    warnings = struct.unpack('<H', response[10:12])[0]

    stmt = PreparedStatement(statement_id, num_columns, num_params, warnings)
    print(f"[PREPARE] statement_id={statement_id}, columns={num_columns}, params={num_params}")

    # 读取参数定义
    if num_params > 0:
        for i in range(num_params):
            param_data, _ = read_packet(sock)
            # 解析参数定义(简化版)
            stmt.param_definitions.append(parse_param_def(param_data))
        # EOF
        eof, _ = read_packet(sock)

    # 读取列定义
    if num_columns > 0:
        for i in range(num_columns):
            col_data, _ = read_packet(sock)
            stmt.column_definitions.append(parse_col_def(col_data))
        # EOF
        eof, _ = read_packet(sock)

    return stmt


def parse_param_def(data: bytes) -> dict:
    """解析参数定义"""
    # Param Definition 格式与 Column Definition 相同
    # 简化解析
    return {
        'type': data[-5] if len(data) >= 5 else 0,
        'flags': struct.unpack('<H', data[-4:-2])[0] if len(data) >= 5 else 0,
    }


def parse_col_def(data: bytes) -> dict:
    """解析列定义(简化版)"""
    return {'raw': data[:30].hex()}


def send_execute(sock: socket.socket, stmt: PreparedStatement,
                 params: list = None) -> bytes:
    """发送 COM_STMT_EXECUTE 命令"""
    payload = bytearray()

    # 命令类型
    payload.append(0x17)

    # statement_id
    payload.extend(struct.pack('<I', stmt.statement_id))

    # flags (no cursor)
    payload.append(0x00)

    # iteration_count
    payload.extend(struct.pack('<I', 1))

    num_params = stmt.num_params
    if num_params > 0 and params:
        # null_bitmap
        bitmap_size = (num_params + 7) // 8
        null_bitmap = bytearray(bitmap_size)
        for i, param in enumerate(params):
            if param is None:
                null_bitmap[i // 8] |= (1 << (i % 8))
        payload.extend(null_bitmap)

        # new_params_bound_flag
        payload.append(0x01)

        # 参数类型和值
        for param in params:
            if param is None:
                payload.append(0x06)  # MYSQL_TYPE_NULL
                payload.append(0x00)
            elif isinstance(param, int):
                payload.append(0x08)  # MYSQL_TYPE_LONGLONG
                payload.append(0x00)
            elif isinstance(param, str):
                payload.append(0xFD)  # MYSQL_TYPE_VAR_STRING
                payload.append(0x00)
            elif isinstance(param, float):
                payload.append(0x05)  # MYSQL_TYPE_DOUBLE
                payload.append(0x00)
            else:
                payload.append(0xFD)
                payload.append(0x00)

        # 参数值
        for param in params:
            if param is None:
                pass
            elif isinstance(param, int):
                payload.extend(struct.pack('<q', param))
            elif isinstance(param, str):
                encoded = param.encode('utf-8')
                payload.extend(encode_length(len(encoded)))
                payload.extend(encoded)
            elif isinstance(param, float):
                payload.extend(struct.pack('<d', param))
    else:
        payload.extend(b'\x00')  # 空 bitmap
        payload.append(0x00)     # 无新参数

    sock.send(encode_packet(bytes(payload), 0))
    response, seq = read_packet(sock)
    return response


def send_close(sock: socket.socket, statement_id: int):
    """发送 COM_STMT_CLOSE 命令"""
    payload = b'\x19' + struct.pack('<I', statement_id)
    sock.send(encode_packet(payload, 0))
    # COM_STMT_CLOSE 不返回响应


def encode_length(length: int) -> bytes:
    if length < 0xFB:
        return struct.pack('B', length)
    elif length < 0x10000:
        return b'\xFC' + struct.pack('<H', length)
    elif length < 0x1000000:
        return b'\xFD' + struct.pack('<I', length)[:3]
    else:
        return b'\xFE' + struct.pack('<Q', length)

8.4 参数元数据

COM_STMT_PREPARE 的响应中包含参数元数据,描述每个 ? 占位符的类型信息。

参数定义格式

参数定义与列定义格式相同(Column Definition 41),但信息较简:

字段说明
catalog固定 “def”
schema
table_alias
table_name
column_alias
column_name参数名(如 “@P1”)
column_type参数类型

注意:MySQL 服务器在 PREPARE 阶段推断参数类型,但实际类型在 EXECUTE 阶段由绑定的值决定。因此,参数类型可能不精确。


8.5 列元数据

如果预处理的 SQL 是 SELECT 等返回结果集的语句,PREPARE 响应中还包含列元数据,格式与文本结果集的列定义完全相同。


8.6 游标支持

MySQL 5.0 引入了预处理语句的**游标(Cursor)**支持,允许分批获取结果集。

启用游标

COM_STMT_EXECUTE 时设置 flags 字段:

名称说明
0x00CURSOR_TYPE_NO_CURSOR不使用游标
0x01CURSOR_TYPE_READ_ONLY只读游标
0x02CURSOR_TYPE_FOR_UPDATEFOR UPDATE 游标
0x04CURSOR_TYPE_SCROLLABLE可滚动游标

游标使用流程

Client                                    Server
  │                                          │
  │ ──── COM_STMT_PREPARE ────────────────→ │
  │ ←─── Prepare OK ────────────────────── │
  │                                          │
  │ ──── COM_STMT_EXECUTE (flags=0x01) ───→ │  启用游标
  │                                          │
  │ ←─── Binary Result Set (部分行) ────── │
  │ ←─── OK (SERVER_STATUS_CURSOR_EXISTS)  │
  │                                          │
  │ ──── COM_STMT_FETCH (num_rows=100) ───→ │  获取下一批
  │                                          │
  │ ←─── Binary Result Set (100 行) ────── │
  │                                          │
  │ ──── COM_STMT_FETCH (num_rows=100) ───→ │  获取下一批
  │                                          │
  │ ←─── Binary Result Set (剩余行) ────── │
  │ ←─── EOF (游标关闭) ──────────────── │
  │                                          │
  │ ──── COM_STMT_CLOSE ─────────────────→ │
  │                                          │
def send_fetch(sock: socket.socket, statement_id: int, num_rows: int) -> bytes:
    """发送 COM_STMT_FETCH 命令"""
    payload = b'\x1C' + struct.pack('<I', statement_id) + struct.pack('<I', num_rows)
    sock.send(encode_packet(payload, 0))
    response, seq = read_packet(sock)
    return response

游标的优势

  • 内存效率:不需要一次加载全部结果集
  • 流式处理:适合处理大表
  • 精确控制:客户端决定每次获取多少行

8.7 预处理语句的服务器端实现

在服务器端,预处理语句的执行分为两个阶段:

预处理阶段

  1. SQL 解析(Parse):语法检查、生成解析树
  2. 语义分析:表/列是否存在、权限检查
  3. 预优化:部分查询优化(不依赖参数值)
  4. 分配语句句柄(Statement Handle)

执行阶段

  1. 参数绑定:将实际值绑定到占位符
  2. 完整优化:根据参数值完成查询优化
  3. 执行:执行查询计划
  4. 返回结果

8.8 COM_STMT_SEND_LONG_DATA

当参数值非常大时(如 BLOB),可以使用 COM_STMT_SEND_LONG_DATA 分批发送。

命令格式

字节偏移   大小      字段
──────────────────────────────
0          1 字节    0x18 (COM_STMT_SEND_LONG_DATA)
1          4 字节    statement_id
5          2 字节    param_index (参数索引, 从 0 开始)
7          变长      数据

使用场景

def send_long_data(sock, statement_id, param_index, data, chunk_size=65535):
    """分批发送大数据参数"""
    offset = 0
    while offset < len(data):
        chunk = data[offset:offset + chunk_size]
        payload = (b'\x18' +
                   struct.pack('<I', statement_id) +
                   struct.pack('<H', param_index) +
                   chunk)
        sock.send(encode_packet(payload, 0))
        offset += chunk_size

典型场景

  • 上传 BLOB 数据(图片、文件)
  • 发送超长 TEXT 值
  • 流式写入 LOB 数据

8.9 COM_STMT_RESET

重置预处理语句的状态,清除已发送的长数据:

def send_stmt_reset(sock, statement_id):
    """重置预处理语句"""
    payload = b'\x1A' + struct.pack('<I', statement_id)
    sock.send(encode_packet(payload, 0))
    response, _ = read_packet(sock)
    return response  # OK 或 ERR

8.10 完整示例:预处理语句 CRUD

"""
mysql_prepared_crud.py
使用预处理语句执行 CRUD 操作
"""


def prepared_crud_example(sock):
    """预处理语句 CRUD 示例"""

    # === INSERT ===
    stmt = send_prepare(sock,
        "INSERT INTO users (name, age, email) VALUES (?, ?, ?)")
    print(f"INSERT: stmt_id={stmt.statement_id}")

    # 批量插入
    users = [
        ("Alice", 28, "alice@example.com"),
        ("Bob", 35, "bob@example.com"),
        ("Charlie", 22, "charlie@example.com"),
    ]

    for name, age, email in users:
        response = send_execute(sock, stmt, [name, age, email])
        print(f"  插入 {name}: OK")

    send_close(sock, stmt.statement_id)

    # === SELECT ===
    stmt = send_prepare(sock,
        "SELECT id, name, email FROM users WHERE age > ?")
    print(f"\nSELECT: stmt_id={stmt.statement_id}")

    response = send_execute(sock, stmt, [25])
    print(f"  查询结果: {response[:50].hex()}")

    send_close(sock, stmt.statement_id)

    # === UPDATE ===
    stmt = send_prepare(sock,
        "UPDATE users SET email = ? WHERE id = ?")
    print(f"\nUPDATE: stmt_id={stmt.statement_id}")

    response = send_execute(sock, stmt, ["newalice@example.com", 1])
    print(f"  更新结果: OK")

    send_close(sock, stmt.statement_id)

    # === DELETE ===
    stmt = send_prepare(sock,
        "DELETE FROM users WHERE id = ?")
    print(f"\nDELETE: stmt_id={stmt.statement_id}")

    response = send_execute(sock, stmt, [3])
    print(f"  删除结果: OK")

    send_close(sock, stmt.statement_id)

8.11 使用游标的大结果集处理

def fetch_large_resultset(sock, sql, params=None, batch_size=1000):
    """使用游标分批获取大结果集"""
    # 预处理
    stmt = send_prepare(sock, sql)
    print(f"预处理完成: {stmt.num_columns} 列, {stmt.num_params} 参数")

    # 启用只读游标执行
    # flags = 0x01 (CURSOR_TYPE_READ_ONLY)
    # 需要修改 send_execute 来支持 flags 参数
    response = send_execute_with_cursor(sock, stmt, params, cursor_type=0x01)

    all_rows = []
    total_fetched = 0

    while True:
        # 检查是否有 SERVER_STATUS_CURSOR_EXISTS (0x0040)
        status = struct.unpack('<H', response[-5:-3])[0] if len(response) >= 5 else 0

        if status & 0x0040:
            # 游标打开中,继续获取
            rows = parse_binary_rows(response, stmt.column_definitions)
            all_rows.extend(rows)
            total_fetched += len(rows)
            print(f"  已获取 {total_fetched} 行...")

            # 获取下一批
            response = send_fetch(sock, stmt.statement_id, batch_size)
        else:
            # 游标关闭(EOF 或 最后一批)
            if len(response) > 5:
                rows = parse_binary_rows(response, stmt.column_definitions)
                all_rows.extend(rows)
                total_fetched += len(rows)
            break

    print(f"总共获取 {total_fetched} 行")

    # 关闭语句
    send_close(sock, stmt.statement_id)

    return all_rows


def send_execute_with_cursor(sock, stmt, params, cursor_type=0):
    """支持游标的 COM_STMT_EXECUTE"""
    # 类似 send_execute,但 flags 参数使用 cursor_type
    pass  # 实现略,参考 send_execute

8.12 注意事项

重要提醒

  1. 语句句柄限制:每个预处理语句占用服务器资源。max_prepared_stmt_count 参数限制总数(默认 16382)。

  2. 必须关闭:使用完毕后必须发送 COM_STMT_CLOSE,否则会泄漏服务器资源。

  3. 参数类型推断:MySQL 在 PREPARE 阶段推断的参数类型可能不准确,客户端应根据实际情况选择正确的绑定类型。

  4. 游标内存:启用游标后,服务器为每次 FETCH 维护中间状态,这也会消耗内存。

  5. 重连后失效:连接断开后,所有预处理语句句柄失效,需要重新 PREPARE。

  6. SQL 注入:虽然预处理语句本身免疫注入,但动态构造 SQL 模板(如动态表名)仍需谨慎。


8.13 业务场景

场景一:ORM 的查询执行

大多数 ORM 框架默认使用预处理语句:

# SQLAlchemy 示例
from sqlalchemy import text
stmt = text("SELECT * FROM users WHERE id = :id")
result = conn.execute(stmt, {"id": 42})

场景二:批量导入

def batch_insert(conn, table, data, batch_size=1000):
    """使用预处理语句批量插入"""
    placeholders = ", ".join(["?"] * len(data[0]))
    sql = f"INSERT INTO {table} VALUES ({placeholders})"
    stmt = conn.prepare(sql)

    for i in range(0, len(data), batch_size):
        batch = data[i:i + batch_size]
        for row in batch:
            stmt.execute(row)
        conn.commit()

    stmt.close()

场景三:存储过程调用

存储过程的 OUT 参数通过预处理语句的二进制协议返回:

-- 存储过程
CREATE PROCEDURE get_user_count(OUT count INT)
BEGIN
    SELECT COUNT(*) INTO count FROM users;
END;

8.14 扩展阅读


上一章07 - 二进制协议 下一章09 - 结果集详解 —— 深入理解结果集的完整结构和流式读取。