MySQL 传输协议精讲 / 03 - 认证机制
第 03 章:认证机制
3.1 MySQL 认证架构
MySQL 使用可插拔认证架构(Pluggable Authentication),允许不同的用户使用不同的认证方式。认证插件在握手阶段由服务器指定,客户端根据指定的插件计算认证响应。
认证流程总览
Client Server
│ │
│ ←── HandshakeV10 ────────────────────── │
│ auth_plugin: "caching_sha2_password" │
│ scramble: [32 字节随机数] │
│ │
│ ──── HandshakeResponse41 ──────────────→ │
│ auth_data: [加密后的认证数据] │
│ │
│ ┌─────── 认证缓存命中? ────────────┐ │
│ │ 是 → OK_Packet │ │
│ │ 否 → AuthMoreData + RSA 公钥交换 │ │
│ └───────────────────────────────────┘ │
│ │
│ ←── OK_Packet 或 ERR_Packet ────────── │
支持的认证插件
| 插件名称 | MySQL 版本 | 默认版本 | 安全性 |
|---|---|---|---|
mysql_native_password | 4.1+ | 5.7 及之前 | 中等(SHA1) |
sha256_password | 5.6+ | - | 高(SHA256 + RSA) |
caching_sha2_password | 8.0+ | 8.0 | 高(SHA256 + 缓存 + RSA) |
auth_socket | 5.5+ | - | 高(OS 用户验证) |
authentication_ldap_simple | 5.5+ | - | 取决于 LDAP |
authentication_windows | 5.5+ | - | 高(Windows 域认证) |
mysql_clear_password | 5.5+ | - | 低(明文,需 TLS) |
-- 查看当前支持的认证插件
SELECT PLUGIN_NAME, PLUGIN_STATUS
FROM INFORMATION_SCHEMA.PLUGINS
WHERE PLUGIN_TYPE = 'AUTHENTICATION';
-- 查看用户的认证方式
SELECT user, host, plugin FROM mysql.user;
3.2 mysql_native_password
这是 MySQL 最经典的认证插件,从 4.1 版本开始引入。
认证算法
客户端计算:
hash1 = SHA1(password)
hash2 = SHA1(scramble + SHA1(hash1))
response = hash1 XOR hash2
服务端验证:
stored_hash = SHA1(SHA1(password)) -- 存储在 mysql.user 表中
expected = SHA1(scramble + stored_hash)
验证: SHA1(response) == expected
详细步骤
- 服务器存储(
mysql.user表的authentication_string字段):
存储值 = HEX(SHA1(SHA1("password")))
示例: 对于密码 "root"
SHA1("root") = dc76e9f0c0006e8f919e0c51efc6f2030ffd32ec
SHA1(SHA1("root")) = 66cd9b2dfba670265aa8760ca0972aba44173971
存储值 = "66CD9B2DFBA670265AA8760CA0972ABA44173971"
- 认证过程:
"""
mysql_native_password 完整认证实现
"""
import hashlib
import struct
def sha1_binary(data: bytes) -> bytes:
"""计算 SHA1 哈希,返回 20 字节二进制"""
return hashlib.sha1(data).digest()
def sha1_hex(data: bytes) -> str:
"""计算 SHA1 哈希,返回 40 字符十六进制"""
return hashlib.sha1(data).hexdigest().upper()
def mysql_native_password_auth(password: str, scramble: bytes) -> bytes:
"""
mysql_native_password 客户端认证计算
输入:
password - 明文密码
scramble - 服务器发送的 20 字节随机数 (取前 20 字节)
输出:
20 字节的认证响应
"""
if not password:
return b''
# Step 1: SHA1(password)
stage1 = sha1_binary(password.encode('utf-8'))
# Step 2: SHA1(SHA1(password))
stage2 = sha1_binary(stage1)
# Step 3: SHA1(scramble + SHA1(SHA1(password)))
stage3 = sha1_binary(scramble + stage2)
# Step 4: XOR(SHA1(password), SHA1(scramble + SHA1(SHA1(password))))
result = bytes(a ^ b for a, b in zip(stage1, stage3))
return result # 20 字节
def mysql_native_password_verify(password: str, scramble: bytes, response: bytes) -> bool:
"""
服务器端验证
输入:
password - 明文密码 (用于演示; 实际服务器存储的是 hash)
scramble - 发送给客户端的随机数
response - 客户端返回的认证响应
"""
# 服务器端的验证逻辑 (使用存储的 hash)
stored_hash = sha1_binary(sha1_binary(password.encode('utf-8')))
# 预期的客户端响应
expected_stage3 = sha1_binary(scramble + stored_hash)
expected_stage1 = sha1_binary(password.encode('utf-8'))
expected_response = bytes(a ^ b for a, b in zip(expected_stage1, expected_stage3))
# 验证方式: SHA1(response) 应该等于 SHA1(expected_response)
# 实际上服务器更高效地比较: SHA1(response) == SHA1(scramble + stored_hash)
return sha1_binary(response) == expected_stage3
# 完整演示
def demo():
password = "MySecurePass123!"
scramble = bytes.fromhex("4a5b6c7d8e9fa0b1c2d3e4f5a6b7c8d9e0f1a2b3")
print(f"密码: {password}")
print(f"Scramble: {scramble.hex()}")
print()
# 客户端计算
response = mysql_native_password_auth(password, scramble)
print(f"认证响应: {response.hex()}")
# 服务器验证
is_valid = mysql_native_password_verify(password, scramble, response)
print(f"验证结果: {'✓ 通过' if is_valid else '✗ 失败'}")
# 错误密码测试
wrong_response = mysql_native_password_auth("WrongPassword", scramble)
is_valid_wrong = mysql_native_password_verify("WrongPassword", scramble, wrong_response)
print(f"错误密码验证: {'✓ 通过' if is_valid_wrong else '✗ 失败'}")
# 交叉验证(错误密码的响应不应该通过正确密码的验证)
is_cross = mysql_native_password_verify(password, scramble, wrong_response)
print(f"交叉验证: {'✓ 通过' if is_cross else '✗ 失败(符合预期)'}")
if __name__ == '__main__':
demo()
安全性分析
| 方面 | 评估 |
|---|---|
| 密码传输 | 不传输明文,使用挑战-应答 |
| 哈希强度 | SHA1,已被认为不够安全 |
| 中间人攻击 | 不防中间人(无双向认证) |
| 彩虹表攻击 | 存储的是 double-SHA1,略有防护 |
| 暴力破解 | 如果数据库被拖库,SHA1 容易被暴力破解 |
3.3 caching_sha2_password
MySQL 8.0 的默认认证插件,在安全性与性能之间取得了良好平衡。
认证算法
客户端计算:
hash1 = SHA256(password)
hash2 = SHA256(SHA256(hash1))
response = SHA256(hash1) XOR SHA256(scramble + hash2)
服务端验证:
stored_hash = SHA256(SHA256(password)) -- 存储在 mysql.user 表
验证: 对比计算结果
两种认证模式
caching_sha2_password 有两种工作模式,取决于服务器是否缓存了认证结果:
模式一:快速认证(Fast Auth)— 缓存命中
Client → Server: 认证响应 (SHA256 挑战-应答)
Server → Client: OK_Packet (缓存命中, 1 个 RTT)
模式二:完整认证(Full Auth)— 缓存未命中
Client → Server: 认证响应 (SHA256 挑战-应答)
Server → Client: AuthMoreData + 快速认证失败标志
Client → Server: 请求 RSA 公钥 (或使用本地公钥)
Server → Client: RSA 公钥
Client → Server: RSA(password XOR SHA256(hash), 公钥加密)
Server → Client: OK_Packet 或 ERR_Packet
Python 完整实现
"""
caching_sha2_password 完整认证实现
"""
import hashlib
import os
import struct
try:
from cryptography.hazmat.primitives.asymmetric import rsa, padding
from cryptography.hazmat.primitives import hashes, serialization
HAS_CRYPTO = True
except ImportError:
HAS_CRYPTO = False
print("[!] 请安装 cryptography: pip install cryptography")
def sha256_binary(data: bytes) -> bytes:
return hashlib.sha256(data).digest()
def xor_bytes(a: bytes, b: bytes) -> bytes:
"""两个等长字节序列的异或"""
return bytes(x ^ y for x, y in zip(a, b))
def caching_sha2_password_fast_auth(password: str, scramble: bytes) -> bytes:
"""
caching_sha2_password 快速认证(挑战-应答模式)
这是客户端发送的第一轮认证数据
算法:
hash1 = SHA256(password)
hash2 = SHA256(hash1)
hash3 = SHA256(hash2 + scramble)
response = hash1 XOR hash3
注意: 响应长度为 32 字节 (SHA256 输出长度)
"""
if not password:
return b''
hash1 = sha256_binary(password.encode('utf-8')) # SHA256(password)
hash2 = sha256_binary(hash1) # SHA256(SHA256(password))
hash3 = sha256_binary(hash2 + scramble[:]) # SHA256(SHA256(SHA256(password)) + scramble)
response = xor_bytes(hash1, hash3) # XOR
return response # 32 字节
def caching_sha2_password_full_auth(password: str, rsa_public_key_pem: bytes) -> bytes:
"""
caching_sha2_password 完整认证(RSA 加密模式)
当快速认证失败(缓存未命中)时使用
算法:
xor_key = SHA256(password)
encrypted = RSA_OAEP_Encrypt(xor_key XOR password_bytes, rsa_public_key)
"""
if not HAS_CRYPTO:
raise RuntimeError("需要 cryptography 库")
# XOR 密码与 SHA256(password)
password_bytes = password.encode('utf-8')
xor_key = sha256_binary(password_bytes)
# 对齐到 8 字节边界
padded = password_bytes + b'\x00' * (8 - len(password_bytes) % 8) if len(password_bytes) % 8 != 0 else password_bytes
# XOR 操作(循环使用 xor_key)
xor_result = bytearray(len(padded))
for i in range(len(padded)):
xor_result[i] = padded[i] ^ xor_key[i % len(xor_key)]
# RSA OAEP 加密
public_key = serialization.load_pem_public_key(rsa_public_key_pem)
encrypted = public_key.encrypt(
bytes(xor_result),
padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA256()),
algorithm=hashes.SHA256(),
label=None
)
)
return encrypted
def simulate_full_auth_flow():
"""模拟完整的认证流程(含缓存未命中场景)"""
print("=" * 60)
print("模拟 caching_sha2_password 完整认证流程")
print("=" * 60)
password = "MySecurePass123!"
scramble = os.urandom(20) # 20 字节随机数
print(f"\n密码: {password}")
print(f"Scramble: {scramble.hex()}")
# === 第一步:快速认证尝试 ===
print("\n--- 第一步:快速认证 ---")
fast_response = caching_sha2_password_fast_auth(password, scramble)
print(f"快速认证响应: {fast_response.hex()}")
print(f"响应长度: {len(fast_response)} 字节")
# 假设缓存未命中
print("\n[服务器] 缓存未命中, 请求完整认证...")
# === 第二步:完整认证 ===
print("\n--- 第二步:RSA 公钥交换 ---")
if HAS_CRYPTO:
# 生成 RSA 密钥对(演示用)
private_key = rsa.generate_private_key(
public_exponent=65537,
key_size=2048
)
public_key = private_key.public_key()
# 导出公钥 PEM
public_key_pem = public_key.public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo
)
print(f"RSA 公钥 (PEM, {len(public_key_pem)} 字节):")
print(public_key_pem.decode()[:200] + "...")
# 客户端使用公钥加密
encrypted = caching_sha2_password_full_auth(password, public_key_pem)
print(f"加密后的认证数据: {encrypted[:32].hex()}...")
print(f"加密数据长度: {len(encrypted)} 字节")
# 服务端使用私钥解密
decrypted = private_key.decrypt(
encrypted,
padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA256()),
algorithm=hashes.SHA256(),
label=None
)
)
# 提取密码(去掉 padding)
decrypted_password = decrypted.rstrip(b'\x00')
print(f"\n[服务器] 解密得到: {decrypted_password}")
print(f"验证: {'✓ 成功' if decrypted_password == password.encode() else '✗ 失败'}")
else:
print("[!] 跳过 RSA 演示 (需要 cryptography 库)")
def compare_auth_methods():
"""对比两种认证方法"""
print("\n" + "=" * 60)
print("认证方法对比")
print("=" * 60)
password = "TestPassword!"
scramble = os.urandom(20)
# mysql_native_password
import hashlib
h1 = hashlib.sha1(password.encode()).digest()
h2 = hashlib.sha1(h1).digest()
h3 = hashlib.sha1(scramble + h2).digest()
native_response = bytes(a ^ b for a, b in zip(h1, h3))
# caching_sha2_password
sha2_response = caching_sha2_password_fast_auth(password, scramble)
print(f"\n密码: {password}")
print(f"Scramble: {scramble.hex()}")
print(f"\nmysql_native_password:")
print(f" 响应长度: {len(native_response)} 字节")
print(f" 哈希算法: SHA1")
print(f" 响应值: {native_response.hex()}")
print(f"\ncaching_sha2_password:")
print(f" 响应长度: {len(sha2_response)} 字节")
print(f" 哈希算法: SHA256")
print(f" 响应值: {sha2_response.hex()}")
if __name__ == '__main__':
compare_auth_methods()
simulate_full_auth_flow()
认证缓存机制
caching_sha2_password 的缓存存储在内存中,由以下参数控制:
-- 查看缓存大小
SHOW VARIABLES LIKE 'caching_sha2_password%';
-- caching_sha2_password_auto_generate_rsa_keys ON
-- caching_sha2_password_private_key_path private_key.pem
-- caching_sha2_password_public_key_path public_key.pem
-- caching_sha2_password_rsa_key_size 2048
-- 手动刷新缓存(通常在密码变更后)
FLUSH PRIVILEGES;
缓存何时被清除:
- 用户密码被修改
- 用户被删除
FLUSH PRIVILEGES执行- 服务器重启
3.4 sha256_password
MySQL 5.6 引入的 SHA256 认证插件,是 caching_sha2_password 的前身。
与 caching_sha2_password 的区别
| 特性 | sha256_password | caching_sha2_password |
|---|---|---|
| 引入版本 | MySQL 5.6 | MySQL 8.0 |
| 快速认证 | 不支持 | 支持(缓存) |
| 每次连接都需要 RSA | 是 | 仅缓存未命中时 |
| 性能 | 较慢 | 较快 |
| 存储格式 | SHA256(SHA256(password)) + salt | SHA256(SHA256(password)) |
认证流程
Client → Server: 初始认证响应 (可能为空或 "mysql_native_password" 格式)
Server → Client: AuthMoreData + 0x04 (请求公钥) 或 RSA 公钥
Client → Server: RSA 加密的密码
Server → Client: OK / ERR
3.5 auth_socket
auth_socket 插件通过操作系统用户身份进行认证,不使用密码。
工作原理
1. 客户端通过 Unix socket 连接 MySQL
2. MySQL 获取 socket 连接的 UID
3. 将 UID 映射为操作系统用户名
4. 验证该用户名是否匹配 MySQL 用户名
配置示例
-- 创建 socket 认证用户
CREATE USER 'dbadmin'@'localhost' IDENTIFIED WITH auth_socket;
-- 允许 dbadmin 用户以 root 身份连接
CREATE USER 'root'@'localhost' IDENTIFIED WITH auth_socket AS 'root';
# 只有 OS root 用户可以以 root 身份连接 MySQL
sudo mysql -u root
# OS 用户 dbadmin 可以连接
mysql -u dbadmin # 使用 dbadmin 的 OS 身份
适用场景
- 本地系统管理脚本
- 无需暴露密码的自动化运维
- 安全敏感的特权账户管理
3.6 其他认证插件
LDAP 认证
-- 安装 LDAP 认证插件
INSTALL PLUGIN authentication_ldap_simple SONAME 'authentication_ldap_simple.so';
-- 配置 LDAP 服务器
SET GLOBAL authentication_ldap_simple_server_host = 'ldap.example.com';
SET GLOBAL authentication_ldap_simple_server_port = 389;
SET GLOBAL authentication_ldap_simple_bind_base_dn = 'ou=users,dc=example,dc=com';
-- 创建 LDAP 认证用户
CREATE USER 'ldapuser'@'%'
IDENTIFIED WITH authentication_ldap_simple
AS 'uid=ldapuser,ou=users,dc=example,dc=com';
PAM 认证
-- 安装 PAM 认证插件
INSTALL PLUGIN authentication_pam SONAME 'authentication_pam.so';
-- 创建 PAM 认证用户
CREATE USER 'pamuser'@'%'
IDENTIFIED WITH authentication_pam
AS 'mysql'; -- 'mysql' 是 PAM 服务名
3.7 认证协议消息格式
AuthMoreData 包
当服务器需要发送额外的认证数据时使用:
字节偏移 大小 字段
──────────────────────────────
0 1 字节 0xFE (标识 AuthMoreData)
1 变长 认证数据
不同认证插件使用不同的 AuthMoreData 格式:
| 插件 | 数据内容 |
|---|---|
caching_sha2_password | 0x01 (快速认证失败标志) 或 RSA 公钥 |
sha256_password | 0x04 (请求公钥) 或 RSA 公钥 |
mysql_native_password | 不使用 |
RSA 公钥传输
当客户端没有预配置 RSA 公钥时,可以从服务器获取:
def handle_auth_more_data(data, password, scramble):
"""处理 AuthMoreData 响应"""
status = data[0]
if status == 0x01:
# caching_sha2_password: 快速认证失败
# 需要进行 RSA 完整认证
print("[*] 快速认证失败,需要 RSA 公钥")
# 发送公钥请求
request_public_key = b'\x02' # 特殊命令
return request_public_key
elif status == 0x04:
# sha256_password: 服务器要求 RSA 加密
print("[*] 需要 RSA 加密密码")
else:
# 后续字节是 RSA 公钥
public_key_pem = data # 整个数据是公钥
print(f"[*] 收到 RSA 公钥 ({len(public_key_pem)} 字节)")
# 使用公钥加密密码...
3.8 认证方式切换与兼容性
切换认证插件
-- 将用户切换到 mysql_native_password(兼容旧客户端)
ALTER USER 'app_user'@'%'
IDENTIFIED WITH mysql_native_password BY 'password123';
-- 将用户切换到 caching_sha2_password(推荐)
ALTER USER 'app_user'@'%'
IDENTIFIED WITH caching_sha2_password BY 'password123';
-- 修改默认认证插件(全局)
-- 在 my.cnf 中设置:
-- [mysqld]
-- default_authentication_plugin = mysql_native_password
客户端兼容性矩阵
| 客户端驱动 | mysql_native_password | caching_sha2_password | sha256_password |
|---|---|---|---|
| MySQL Connector/Python 8.0+ | ✓ | ✓ | ✓ |
| PyMySQL 1.0+ | ✓ | ✓ | ✓ |
| mysql-connector-java 8.0+ | ✓ | ✓ | ✓ |
| Go go-sql-driver 1.5+ | ✓ | ✓ | ✗ |
| Node.js mysql2 | ✓ | ✓ | ✗ |
| PHP mysqlnd (PHP 7.4+) | ✓ | ✓ | ✓ |
| PHP mysqlnd (PHP < 7.4) | ✓ | ✗ | ✗ |
| 旧版客户端/驱动 | ✓ | ✗ | ✗ |
迁移建议:升级 MySQL 到 8.0 之前,先升级所有客户端驱动以支持
caching_sha2_password,或在服务器端为特定用户保留mysql_native_password。
3.9 安全最佳实践
密码策略配置
-- 安装密码验证组件
INSTALL COMPONENT 'file://component_validate_password';
-- 配置密码策略
SET GLOBAL validate_password.policy = MEDIUM;
SET GLOBAL validate_password.length = 12;
SET GLOBAL validate_password.mixed_case_count = 1;
SET GLOBAL validate_password.number_count = 1;
SET GLOBAL validate_password.special_char_count = 1;
TLS 强制
-- 要求用户使用 TLS 连接
ALTER USER 'app_user'@'%' REQUIRE SSL;
-- 要求 TLS 版本
ALTER USER 'app_user'@'%' REQUIRE X509;
-- 全局 TLS 配置
-- my.cnf:
-- [mysqld]
-- require_secure_transport = ON
-- tls_version = TLSv1.2,TLSv1.3
认证安全清单
| 检查项 | 建议 |
|---|---|
| 认证插件 | 生产环境使用 caching_sha2_password |
| 密码强度 | 启用 validate_password 组件 |
| 密码轮换 | 设置 default_password_lifetime |
| 连接加密 | 强制 SSL/TLS |
| 公钥交换 | 预配置 RSA 公钥(禁用 allowPublicKeyRetrieval) |
| 权限最小化 | 按需授权,避免使用 GRANT ALL |
| 远程 root | 禁止 root 远程登录 |
| 匿名用户 | 删除所有匿名用户 |
-- 安全加固脚本
DELETE FROM mysql.user WHERE User = '';
DELETE FROM mysql.user WHERE User = 'root' AND Host NOT IN ('localhost', '127.0.0.1', '::1');
FLUSH PRIVILEGES;
3.10 业务场景
场景一:应用升级后连接失败
问题:MySQL 升级到 8.0 后,应用报错 Authentication plugin 'caching_sha2_password' cannot be loaded。
分析:旧的客户端驱动不支持 caching_sha2_password 插件。
解决方案:
-- 临时方案:将用户切换回旧的认证方式
ALTER USER 'app'@'%' IDENTIFIED WITH mysql_native_password BY 'password';
-- 长期方案:升级客户端驱动
场景二:密码安全性审计
通过查询 mysql.user 表审计所有用户的认证配置:
SELECT
User,
Host,
plugin AS '认证插件',
authentication_string AS '认证数据',
password_expired AS '密码过期',
password_lifetime AS '密码有效期',
account_locked AS '账户锁定',
CASE
WHEN plugin = 'mysql_native_password' THEN '⚠ 建议升级到 caching_sha2_password'
WHEN plugin = 'caching_sha2_password' THEN '✓ 推荐'
WHEN plugin = 'auth_socket' THEN '✓ 本地安全'
ELSE '?'
END AS '安全评估'
FROM mysql.user
WHERE User NOT LIKE 'mysql.%'
ORDER BY User, Host;
场景三:多因素认证
MySQL Enterprise 支持多因素认证(MFA),协议层面通过多次认证交换实现:
Client → Server: HandshakeResponse (第一因素)
Server → Client: AuthMoreData (要求第二因素)
Client → Server: AuthResponse (第二因素)
Server → Client: OK / ERR
3.11 扩展阅读
- MySQL 8.0 Reference: Pluggable Authentication
- MySQL 8.0 Reference: caching_sha2_password
- MySQL Internals: Authentication Method
- OWASP: Password Storage Cheat Sheet
上一章:02 - 握手过程 下一章:04 - 数据包格式 —— 深入理解 MySQL 数据包的字节级结构。