MySQL 传输协议精讲 / 08 - 预处理语句详解
第 08 章:预处理语句详解
8.1 预处理语句概述
**预处理语句(Prepared Statement)**是 MySQL 4.1 引入的特性,允许将 SQL 语句的结构与参数分离。它使用二进制协议进行参数绑定和结果传输。
与直接查询的对比
| 特性 | 直接查询 (COM_QUERY) | 预处理语句 |
|---|---|---|
| SQL 传参 | 字符串拼接 | 参数占位符 ? |
| 解析次数 | 每次都解析 | 只解析一次 |
| 执行计划缓存 | 不缓存 | 服务端缓存 |
| SQL 注入风险 | 需要转义 | 天然免疫 |
| 数据类型 | 文本传输 | 二进制传输 |
| 网络开销 | 较大 | 较小(重复执行时) |
| 服务端资源 | 不需要额外资源 | 需要维护语句句柄 |
8.2 预处理语句生命周期
┌─────────────┐
│ PREPARE │ COM_STMT_PREPARE → 服务器解析SQL、分配句柄
└──────┬──────┘
│
▼
┌─────────────┐
│ BIND+EXEC │ COM_STMT_EXECUTE → 绑定参数、执行查询
└──────┬──────┘
│ 可重复执行多次
▼
┌─────────────┐
│ FETCH │ COM_STMT_FETCH → 游标模式下获取更多行
└──────┬──────┘
│
▼
┌─────────────┐
│ CLOSE │ COM_STMT_CLOSE → 释放服务器资源
└─────────────┘
完整生命周期
Client Server
│ │
│ ──── COM_STMT_PREPARE (seq=0) ────────→ │ ① 预处理
│ "SELECT * FROM users WHERE id = ?" │
│ │
│ ←─── Prepare OK (seq=1) ────────────── │
│ statement_id = 1 │
│ num_columns = 5 │
│ num_params = 1 │
│ │
│ ←─── Param Definition (seq=2) ──────── │ 参数元数据
│ ←─── EOF (seq=3) ────────────────────── │
│ ←─── Column Definition × 5 (seq=4-8) ─ │ 列元数据
│ ←─── EOF (seq=9) ────────────────────── │
│ │
│ ──── COM_STMT_EXECUTE (seq=0) ────────→ │ ② 执行
│ statement_id = 1 │
│ params = [42] │
│ │
│ ←─── Binary Result Set ─────────────── │
│ │
│ ──── COM_STMT_EXECUTE (seq=0) ────────→ │ ③ 再次执行
│ params = [100] │
│ │
│ ←─── Binary Result Set ─────────────── │
│ │
│ ──── COM_STMT_CLOSE (seq=0) ──────────→ │ ④ 关闭
│ │
8.3 COM_STMT_PREPARE 详解
命令格式
字节偏移 大小 字段
──────────────────────────────
0 1 字节 0x16 (COM_STMT_PREPARE)
1 变长 SQL 语句 (含 ? 占位符)
响应格式
Prepare OK 包:
字节偏移 大小 字段 说明
──────────────────────────────────────────────────────
0 1 字节 0x00 OK 标识
1 4 字节 statement_id 语句句柄 ID
5 2 字节 num_columns 列数
7 2 字节 num_params 参数数
9 1 字节 0x00 填充
10 2 字节 warnings 警告数
后续包:
如果 num_params > 0:
Param Definition × num_params
EOF
如果 num_columns > 0:
Column Definition × num_columns
EOF
Python 实现
"""
mysql_prepared_statement.py
预处理语句的完整实现
"""
import socket
import struct
def encode_packet(payload: bytes, seq: int) -> bytes:
return struct.pack('<I', len(payload))[:3] + struct.pack('B', seq) + payload
def read_packet(sock: socket.socket) -> tuple:
"""从 socket 读取一个数据包"""
header = b''
while len(header) < 4:
header += sock.recv(4 - len(header))
pkt_len = struct.unpack('<I', header[0:3] + b'\x00')[0]
seq = header[3]
payload = b''
while len(payload) < pkt_len:
payload += sock.recv(pkt_len - len(payload))
return payload, seq
class PreparedStatement:
"""预处理语句"""
def __init__(self, statement_id, num_columns, num_params, warnings):
self.statement_id = statement_id
self.num_columns = num_columns
self.num_params = num_params
self.warnings = warnings
self.param_definitions = []
self.column_definitions = []
def send_prepare(sock: socket.socket, sql: str) -> PreparedStatement:
"""发送 COM_STMT_PREPARE 命令"""
payload = b'\x16' + sql.encode('utf-8')
sock.send(encode_packet(payload, 0))
# 读取 Prepare OK
response, seq = read_packet(sock)
if response[0] == 0xFF:
# ERR
error_code = struct.unpack('<H', response[1:3])[0]
message = response[9:].decode('utf-8', errors='replace')
raise Exception(f"Prepare 失败: [{error_code}] {message}")
statement_id = struct.unpack('<I', response[1:5])[0]
num_columns = struct.unpack('<H', response[5:7])[0]
num_params = struct.unpack('<H', response[7:9])[0]
warnings = struct.unpack('<H', response[10:12])[0]
stmt = PreparedStatement(statement_id, num_columns, num_params, warnings)
print(f"[PREPARE] statement_id={statement_id}, columns={num_columns}, params={num_params}")
# 读取参数定义
if num_params > 0:
for i in range(num_params):
param_data, _ = read_packet(sock)
# 解析参数定义(简化版)
stmt.param_definitions.append(parse_param_def(param_data))
# EOF
eof, _ = read_packet(sock)
# 读取列定义
if num_columns > 0:
for i in range(num_columns):
col_data, _ = read_packet(sock)
stmt.column_definitions.append(parse_col_def(col_data))
# EOF
eof, _ = read_packet(sock)
return stmt
def parse_param_def(data: bytes) -> dict:
"""解析参数定义"""
# Param Definition 格式与 Column Definition 相同
# 简化解析
return {
'type': data[-5] if len(data) >= 5 else 0,
'flags': struct.unpack('<H', data[-4:-2])[0] if len(data) >= 5 else 0,
}
def parse_col_def(data: bytes) -> dict:
"""解析列定义(简化版)"""
return {'raw': data[:30].hex()}
def send_execute(sock: socket.socket, stmt: PreparedStatement,
params: list = None) -> bytes:
"""发送 COM_STMT_EXECUTE 命令"""
payload = bytearray()
# 命令类型
payload.append(0x17)
# statement_id
payload.extend(struct.pack('<I', stmt.statement_id))
# flags (no cursor)
payload.append(0x00)
# iteration_count
payload.extend(struct.pack('<I', 1))
num_params = stmt.num_params
if num_params > 0 and params:
# null_bitmap
bitmap_size = (num_params + 7) // 8
null_bitmap = bytearray(bitmap_size)
for i, param in enumerate(params):
if param is None:
null_bitmap[i // 8] |= (1 << (i % 8))
payload.extend(null_bitmap)
# new_params_bound_flag
payload.append(0x01)
# 参数类型和值
for param in params:
if param is None:
payload.append(0x06) # MYSQL_TYPE_NULL
payload.append(0x00)
elif isinstance(param, int):
payload.append(0x08) # MYSQL_TYPE_LONGLONG
payload.append(0x00)
elif isinstance(param, str):
payload.append(0xFD) # MYSQL_TYPE_VAR_STRING
payload.append(0x00)
elif isinstance(param, float):
payload.append(0x05) # MYSQL_TYPE_DOUBLE
payload.append(0x00)
else:
payload.append(0xFD)
payload.append(0x00)
# 参数值
for param in params:
if param is None:
pass
elif isinstance(param, int):
payload.extend(struct.pack('<q', param))
elif isinstance(param, str):
encoded = param.encode('utf-8')
payload.extend(encode_length(len(encoded)))
payload.extend(encoded)
elif isinstance(param, float):
payload.extend(struct.pack('<d', param))
else:
payload.extend(b'\x00') # 空 bitmap
payload.append(0x00) # 无新参数
sock.send(encode_packet(bytes(payload), 0))
response, seq = read_packet(sock)
return response
def send_close(sock: socket.socket, statement_id: int):
"""发送 COM_STMT_CLOSE 命令"""
payload = b'\x19' + struct.pack('<I', statement_id)
sock.send(encode_packet(payload, 0))
# COM_STMT_CLOSE 不返回响应
def encode_length(length: int) -> bytes:
if length < 0xFB:
return struct.pack('B', length)
elif length < 0x10000:
return b'\xFC' + struct.pack('<H', length)
elif length < 0x1000000:
return b'\xFD' + struct.pack('<I', length)[:3]
else:
return b'\xFE' + struct.pack('<Q', length)
8.4 参数元数据
COM_STMT_PREPARE 的响应中包含参数元数据,描述每个 ? 占位符的类型信息。
参数定义格式
参数定义与列定义格式相同(Column Definition 41),但信息较简:
| 字段 | 说明 |
|---|---|
| catalog | 固定 “def” |
| schema | 空 |
| table_alias | 空 |
| table_name | 空 |
| column_alias | 空 |
| column_name | 参数名(如 “@P1”) |
| column_type | 参数类型 |
注意:MySQL 服务器在 PREPARE 阶段推断参数类型,但实际类型在 EXECUTE 阶段由绑定的值决定。因此,参数类型可能不精确。
8.5 列元数据
如果预处理的 SQL 是 SELECT 等返回结果集的语句,PREPARE 响应中还包含列元数据,格式与文本结果集的列定义完全相同。
8.6 游标支持
MySQL 5.0 引入了预处理语句的**游标(Cursor)**支持,允许分批获取结果集。
启用游标
在 COM_STMT_EXECUTE 时设置 flags 字段:
| 值 | 名称 | 说明 |
|---|---|---|
| 0x00 | CURSOR_TYPE_NO_CURSOR | 不使用游标 |
| 0x01 | CURSOR_TYPE_READ_ONLY | 只读游标 |
| 0x02 | CURSOR_TYPE_FOR_UPDATE | FOR UPDATE 游标 |
| 0x04 | CURSOR_TYPE_SCROLLABLE | 可滚动游标 |
游标使用流程
Client Server
│ │
│ ──── COM_STMT_PREPARE ────────────────→ │
│ ←─── Prepare OK ────────────────────── │
│ │
│ ──── COM_STMT_EXECUTE (flags=0x01) ───→ │ 启用游标
│ │
│ ←─── Binary Result Set (部分行) ────── │
│ ←─── OK (SERVER_STATUS_CURSOR_EXISTS) │
│ │
│ ──── COM_STMT_FETCH (num_rows=100) ───→ │ 获取下一批
│ │
│ ←─── Binary Result Set (100 行) ────── │
│ │
│ ──── COM_STMT_FETCH (num_rows=100) ───→ │ 获取下一批
│ │
│ ←─── Binary Result Set (剩余行) ────── │
│ ←─── EOF (游标关闭) ──────────────── │
│ │
│ ──── COM_STMT_CLOSE ─────────────────→ │
│ │
def send_fetch(sock: socket.socket, statement_id: int, num_rows: int) -> bytes:
"""发送 COM_STMT_FETCH 命令"""
payload = b'\x1C' + struct.pack('<I', statement_id) + struct.pack('<I', num_rows)
sock.send(encode_packet(payload, 0))
response, seq = read_packet(sock)
return response
游标的优势
- 内存效率:不需要一次加载全部结果集
- 流式处理:适合处理大表
- 精确控制:客户端决定每次获取多少行
8.7 预处理语句的服务器端实现
在服务器端,预处理语句的执行分为两个阶段:
预处理阶段
- SQL 解析(Parse):语法检查、生成解析树
- 语义分析:表/列是否存在、权限检查
- 预优化:部分查询优化(不依赖参数值)
- 分配语句句柄(Statement Handle)
执行阶段
- 参数绑定:将实际值绑定到占位符
- 完整优化:根据参数值完成查询优化
- 执行:执行查询计划
- 返回结果
8.8 COM_STMT_SEND_LONG_DATA
当参数值非常大时(如 BLOB),可以使用 COM_STMT_SEND_LONG_DATA 分批发送。
命令格式
字节偏移 大小 字段
──────────────────────────────
0 1 字节 0x18 (COM_STMT_SEND_LONG_DATA)
1 4 字节 statement_id
5 2 字节 param_index (参数索引, 从 0 开始)
7 变长 数据
使用场景
def send_long_data(sock, statement_id, param_index, data, chunk_size=65535):
"""分批发送大数据参数"""
offset = 0
while offset < len(data):
chunk = data[offset:offset + chunk_size]
payload = (b'\x18' +
struct.pack('<I', statement_id) +
struct.pack('<H', param_index) +
chunk)
sock.send(encode_packet(payload, 0))
offset += chunk_size
典型场景
- 上传 BLOB 数据(图片、文件)
- 发送超长 TEXT 值
- 流式写入 LOB 数据
8.9 COM_STMT_RESET
重置预处理语句的状态,清除已发送的长数据:
def send_stmt_reset(sock, statement_id):
"""重置预处理语句"""
payload = b'\x1A' + struct.pack('<I', statement_id)
sock.send(encode_packet(payload, 0))
response, _ = read_packet(sock)
return response # OK 或 ERR
8.10 完整示例:预处理语句 CRUD
"""
mysql_prepared_crud.py
使用预处理语句执行 CRUD 操作
"""
def prepared_crud_example(sock):
"""预处理语句 CRUD 示例"""
# === INSERT ===
stmt = send_prepare(sock,
"INSERT INTO users (name, age, email) VALUES (?, ?, ?)")
print(f"INSERT: stmt_id={stmt.statement_id}")
# 批量插入
users = [
("Alice", 28, "alice@example.com"),
("Bob", 35, "bob@example.com"),
("Charlie", 22, "charlie@example.com"),
]
for name, age, email in users:
response = send_execute(sock, stmt, [name, age, email])
print(f" 插入 {name}: OK")
send_close(sock, stmt.statement_id)
# === SELECT ===
stmt = send_prepare(sock,
"SELECT id, name, email FROM users WHERE age > ?")
print(f"\nSELECT: stmt_id={stmt.statement_id}")
response = send_execute(sock, stmt, [25])
print(f" 查询结果: {response[:50].hex()}")
send_close(sock, stmt.statement_id)
# === UPDATE ===
stmt = send_prepare(sock,
"UPDATE users SET email = ? WHERE id = ?")
print(f"\nUPDATE: stmt_id={stmt.statement_id}")
response = send_execute(sock, stmt, ["newalice@example.com", 1])
print(f" 更新结果: OK")
send_close(sock, stmt.statement_id)
# === DELETE ===
stmt = send_prepare(sock,
"DELETE FROM users WHERE id = ?")
print(f"\nDELETE: stmt_id={stmt.statement_id}")
response = send_execute(sock, stmt, [3])
print(f" 删除结果: OK")
send_close(sock, stmt.statement_id)
8.11 使用游标的大结果集处理
def fetch_large_resultset(sock, sql, params=None, batch_size=1000):
"""使用游标分批获取大结果集"""
# 预处理
stmt = send_prepare(sock, sql)
print(f"预处理完成: {stmt.num_columns} 列, {stmt.num_params} 参数")
# 启用只读游标执行
# flags = 0x01 (CURSOR_TYPE_READ_ONLY)
# 需要修改 send_execute 来支持 flags 参数
response = send_execute_with_cursor(sock, stmt, params, cursor_type=0x01)
all_rows = []
total_fetched = 0
while True:
# 检查是否有 SERVER_STATUS_CURSOR_EXISTS (0x0040)
status = struct.unpack('<H', response[-5:-3])[0] if len(response) >= 5 else 0
if status & 0x0040:
# 游标打开中,继续获取
rows = parse_binary_rows(response, stmt.column_definitions)
all_rows.extend(rows)
total_fetched += len(rows)
print(f" 已获取 {total_fetched} 行...")
# 获取下一批
response = send_fetch(sock, stmt.statement_id, batch_size)
else:
# 游标关闭(EOF 或 最后一批)
if len(response) > 5:
rows = parse_binary_rows(response, stmt.column_definitions)
all_rows.extend(rows)
total_fetched += len(rows)
break
print(f"总共获取 {total_fetched} 行")
# 关闭语句
send_close(sock, stmt.statement_id)
return all_rows
def send_execute_with_cursor(sock, stmt, params, cursor_type=0):
"""支持游标的 COM_STMT_EXECUTE"""
# 类似 send_execute,但 flags 参数使用 cursor_type
pass # 实现略,参考 send_execute
8.12 注意事项
重要提醒
语句句柄限制:每个预处理语句占用服务器资源。
max_prepared_stmt_count参数限制总数(默认 16382)。必须关闭:使用完毕后必须发送
COM_STMT_CLOSE,否则会泄漏服务器资源。参数类型推断:MySQL 在 PREPARE 阶段推断的参数类型可能不准确,客户端应根据实际情况选择正确的绑定类型。
游标内存:启用游标后,服务器为每次 FETCH 维护中间状态,这也会消耗内存。
重连后失效:连接断开后,所有预处理语句句柄失效,需要重新 PREPARE。
SQL 注入:虽然预处理语句本身免疫注入,但动态构造 SQL 模板(如动态表名)仍需谨慎。
8.13 业务场景
场景一:ORM 的查询执行
大多数 ORM 框架默认使用预处理语句:
# SQLAlchemy 示例
from sqlalchemy import text
stmt = text("SELECT * FROM users WHERE id = :id")
result = conn.execute(stmt, {"id": 42})
场景二:批量导入
def batch_insert(conn, table, data, batch_size=1000):
"""使用预处理语句批量插入"""
placeholders = ", ".join(["?"] * len(data[0]))
sql = f"INSERT INTO {table} VALUES ({placeholders})"
stmt = conn.prepare(sql)
for i in range(0, len(data), batch_size):
batch = data[i:i + batch_size]
for row in batch:
stmt.execute(row)
conn.commit()
stmt.close()
场景三:存储过程调用
存储过程的 OUT 参数通过预处理语句的二进制协议返回:
-- 存储过程
CREATE PROCEDURE get_user_count(OUT count INT)
BEGIN
SELECT COUNT(*) INTO count FROM users;
END;
8.14 扩展阅读
上一章:07 - 二进制协议 下一章:09 - 结果集详解 —— 深入理解结果集的完整结构和流式读取。