Python 编程教程 / 21 - 安全编程
第 21 章:安全编程
了解常见安全漏洞,编写安全的 Python 代码。
21.1 安全概述
OWASP Top 10
| 排名 | 漏洞类型 | Python 防范措施 |
|---|---|---|
| A01 | 访问控制失效 | 权限验证、RBAC |
| A02 | 加密失败 | 使用标准库加密 |
| A03 | 注入 | 参数化查询、输入验证 |
| A04 | 不安全设计 | 安全设计评审 |
| A05 | 安全配置错误 | 最小权限原则 |
| A06 | 脆弱过时组件 | 定期更新依赖 |
| A07 | 身份认证失败 | MFA、安全密码存储 |
| A08 | 软件和数据完整性失败 | 签名验证 |
| A09 | 日志和监控不足 | 完善日志记录 |
| A10 | SSRF | URL 白名单验证 |
21.2 哈希与加密
21.2.1 密码哈希
import hashlib
import secrets
# ❌ 不要使用 MD5/SHA1 存储密码
# hashlib.md5(b"password").hexdigest() # 不安全!
# ✅ 使用 bcrypt 或 argon2
# pip install bcrypt
import bcrypt
# 哈希密码
password = "my_secure_password"
hashed = bcrypt.hashpw(password.encode(), bcrypt.gensalt())
print(hashed) # b'$2b$12$...'
# 验证密码
is_valid = bcrypt.checkpw(password.encode(), hashed)
print(is_valid) # True
21.2.2 哈希校验
import hashlib
def file_hash(filepath: str, algorithm: str = "sha256") -> str:
"""计算文件哈希值。"""
h = hashlib.new(algorithm)
with open(filepath, "rb") as f:
while chunk := f.read(8192):
h.update(chunk)
return h.hexdigest()
# 使用
print(file_hash("data.zip"))
21.2.3 对称加密
from cryptography.fernet import Fernet
# 生成密钥
key = Fernet.generate_key()
cipher = Fernet(key)
# 加密
message = b"Secret message"
encrypted = cipher.encrypt(message)
print(encrypted)
# 解密
decrypted = cipher.decrypt(encrypted)
print(decrypted) # b'Secret message'
21.2.4 令牌生成
import secrets
import string
# 安全随机令牌
token = secrets.token_urlsafe(32)
print(token) # URL 安全的随机字符串
# 安全随机整数
random_int = secrets.randbelow(100)
# 安全选择
alphabet = string.ascii_letters + string.digits
password = "".join(secrets.choice(alphabet) for _ in range(16))
# 安全比较(防止计时攻击)
secrets.compare_digest("token1", "token1") # True
21.3 输入验证
21.3.1 Pydantic 验证
from pydantic import BaseModel, Field, field_validator
import re
class UserInput(BaseModel):
username: str = Field(..., min_length=3, max_length=20, pattern=r"^[a-zA-Z0-9_]+$")
email: str
age: int = Field(..., ge=0, le=150)
@field_validator("email")
@classmethod
def validate_email(cls, v: str) -> str:
pattern = r"^[\w.-]+@[\w.-]+\.\w+$"
if not re.match(pattern, v):
raise ValueError("邮箱格式不正确")
return v.lower()
# 使用
try:
user = UserInput(username="alice", email="alice@example.com", age=30)
print(user)
except Exception as e:
print(f"验证失败: {e}")
21.3.2 防止路径遍历
from pathlib import Path
def safe_read_file(base_dir: str, filename: str) -> str:
"""安全读取文件(防止路径遍历攻击)。"""
base = Path(base_dir).resolve()
target = (base / filename).resolve()
# 确保目标在基础目录内
if not str(target).startswith(str(base)):
raise ValueError("非法路径")
return target.read_text(encoding="utf-8")
# ❌ 攻击者可能传入 "../../../etc/passwd"
# ✅ 上述代码会拒绝这种请求
21.4 SQL 注入防护
# ❌ 危险:字符串拼接
query = f"SELECT * FROM users WHERE name = '{user_input}'"
# ✅ 安全:参数化查询
cursor.execute("SELECT * FROM users WHERE name = ?", (user_input,))
# ✅ SQLAlchemy ORM
session.query(User).filter(User.name == user_input).first()
# ✅ Pydantic + FastAPI
@app.get("/users/{user_id}")
async def get_user(user_id: int): # FastAPI 自动验证类型
...
21.5 常见漏洞防范
21.5.1 防止命令注入
import subprocess
# ❌ 危险
user_input = "file.txt; rm -rf /"
subprocess.run(f"cat {user_input}", shell=True) # 命令注入!
# ✅ 安全
subprocess.run(["cat", user_input], check=True) # 不使用 shell
# ✅ 使用 shlex 转义
import shlex
subprocess.run(f"cat {shlex.quote(user_input)}", shell=True)
21.5.2 防止反序列化攻击
import json
# ❌ 危险:pickle 可执行任意代码
# import pickle
# pickle.loads(untrusted_data) # 可能执行恶意代码!
# ✅ 安全:使用 json
data = json.loads(trusted_json_string)
# ✅ YAML 使用 safe_load
import yaml
data = yaml.safe_load(trusted_yaml_string) # 不使用 yaml.load()
21.5.3 环境变量管理
import os
from pathlib import Path
# ❌ 硬编码密钥
API_KEY = "sk-1234567890abcdef" # 不要这样做!
# ✅ 从环境变量读取
API_KEY = os.environ.get("API_KEY")
if not API_KEY:
raise ValueError("请设置 API_KEY 环境变量")
# ✅ 使用 python-dotenv
from dotenv import load_dotenv
load_dotenv() # 从 .env 文件加载
API_KEY = os.environ.get("API_KEY")
# .env 文件
API_KEY=sk-1234567890
DATABASE_URL=postgresql://user:pass@localhost/db
SECRET_KEY=your-secret-key-here
# .gitignore
.env
*.pem
*.key
21.6 HTTPS 和安全请求
import httpx
import ssl
import certifi
# ✅ 使用 HTTPS
async def secure_request(url: str):
async with httpx.AsyncClient(verify=certifi.where()) as response:
return await response.get(url)
# ✅ 验证 SSL 证书(默认开启)
# ❌ 禁用 SSL 验证(仅测试环境)
# httpx.get(url, verify=False) # 危险!
21.7 依赖安全
# 检查已知漏洞
$ pip install safety
$ safety check
# 使用 pip-audit(推荐)
$ pip install pip-audit
$ pip-audit
# 使用 bandit 进行代码安全扫描
$ pip install bandit
$ bandit -r src/
# 使用 snyk
$ snyk test
21.8 安全编码清单
| 类别 | 检查项 |
|---|---|
| 输入验证 | 所有外部输入都经过验证 |
| 输出编码 | HTML/URL/SQL 输出正确编码 |
| 认证 | 密码使用 bcrypt/argon2 哈希 |
| 会话 | 使用安全的会话管理 |
| 授权 | 每个操作都验证权限 |
| 加密 | 使用 HTTPS,敏感数据加密 |
| 日志 | 不记录敏感信息 |
| 依赖 | 定期更新,扫描漏洞 |
| 密钥 | 使用环境变量,不硬编码 |
| 错误 | 不向用户暴露内部错误 |
21.9 注意事项
🔴 注意:
- 永远不要信任用户输入
- 永远不要在代码中硬编码密码/密钥
- 永远不要使用
pickle反序列化不受信任的数据 - 永远不要禁用 SSL 证书验证(生产环境)
💡 提示:
- 使用
secrets模块而非random生成安全随机数 - 使用
bcrypt或argon2哈希密码 - 使用
bandit扫描代码安全问题 - 使用
pip-audit检查依赖漏洞
📌 业务场景:
import secrets
import bcrypt
from pydantic import BaseModel, Field
class AuthService:
def hash_password(self, password: str) -> str:
return bcrypt.hashpw(password.encode(), bcrypt.gensalt()).decode()
def verify_password(self, password: str, hashed: str) -> bool:
return bcrypt.checkpw(password.encode(), hashed.encode())
def generate_token(self) -> str:
return secrets.token_urlsafe(32)
def generate_reset_token(self, user_id: int) -> str:
"""生成密码重置令牌(实际应配合 Redis 设置过期时间)。"""
token = secrets.token_urlsafe(32)
# 存储 token -> user_id 映射,设置 15 分钟过期
return token