强曰为道
与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

Python 编程教程 / 21 - 安全编程

第 21 章:安全编程

了解常见安全漏洞,编写安全的 Python 代码。


21.1 安全概述

OWASP Top 10

排名 漏洞类型 Python 防范措施
A01 访问控制失效 权限验证、RBAC
A02 加密失败 使用标准库加密
A03 注入 参数化查询、输入验证
A04 不安全设计 安全设计评审
A05 安全配置错误 最小权限原则
A06 脆弱过时组件 定期更新依赖
A07 身份认证失败 MFA、安全密码存储
A08 软件和数据完整性失败 签名验证
A09 日志和监控不足 完善日志记录
A10 SSRF URL 白名单验证

21.2 哈希与加密

21.2.1 密码哈希

import hashlib
import secrets

# ❌ 不要使用 MD5/SHA1 存储密码
# hashlib.md5(b"password").hexdigest()  # 不安全!

# ✅ 使用 bcrypt 或 argon2
# pip install bcrypt
import bcrypt

# 哈希密码
password = "my_secure_password"
hashed = bcrypt.hashpw(password.encode(), bcrypt.gensalt())
print(hashed)  # b'$2b$12$...'

# 验证密码
is_valid = bcrypt.checkpw(password.encode(), hashed)
print(is_valid)  # True

21.2.2 哈希校验

import hashlib

def file_hash(filepath: str, algorithm: str = "sha256") -> str:
    """计算文件哈希值。"""
    h = hashlib.new(algorithm)
    with open(filepath, "rb") as f:
        while chunk := f.read(8192):
            h.update(chunk)
    return h.hexdigest()

# 使用
print(file_hash("data.zip"))

21.2.3 对称加密

from cryptography.fernet import Fernet

# 生成密钥
key = Fernet.generate_key()
cipher = Fernet(key)

# 加密
message = b"Secret message"
encrypted = cipher.encrypt(message)
print(encrypted)

# 解密
decrypted = cipher.decrypt(encrypted)
print(decrypted)  # b'Secret message'

21.2.4 令牌生成

import secrets
import string

# 安全随机令牌
token = secrets.token_urlsafe(32)
print(token)  # URL 安全的随机字符串

# 安全随机整数
random_int = secrets.randbelow(100)

# 安全选择
alphabet = string.ascii_letters + string.digits
password = "".join(secrets.choice(alphabet) for _ in range(16))

# 安全比较(防止计时攻击)
secrets.compare_digest("token1", "token1")  # True

21.3 输入验证

21.3.1 Pydantic 验证

from pydantic import BaseModel, Field, field_validator
import re

class UserInput(BaseModel):
    username: str = Field(..., min_length=3, max_length=20, pattern=r"^[a-zA-Z0-9_]+$")
    email: str
    age: int = Field(..., ge=0, le=150)

    @field_validator("email")
    @classmethod
    def validate_email(cls, v: str) -> str:
        pattern = r"^[\w.-]+@[\w.-]+\.\w+$"
        if not re.match(pattern, v):
            raise ValueError("邮箱格式不正确")
        return v.lower()

# 使用
try:
    user = UserInput(username="alice", email="alice@example.com", age=30)
    print(user)
except Exception as e:
    print(f"验证失败: {e}")

21.3.2 防止路径遍历

from pathlib import Path

def safe_read_file(base_dir: str, filename: str) -> str:
    """安全读取文件(防止路径遍历攻击)。"""
    base = Path(base_dir).resolve()
    target = (base / filename).resolve()

    # 确保目标在基础目录内
    if not str(target).startswith(str(base)):
        raise ValueError("非法路径")

    return target.read_text(encoding="utf-8")

# ❌ 攻击者可能传入 "../../../etc/passwd"
# ✅ 上述代码会拒绝这种请求

21.4 SQL 注入防护

# ❌ 危险:字符串拼接
query = f"SELECT * FROM users WHERE name = '{user_input}'"

# ✅ 安全:参数化查询
cursor.execute("SELECT * FROM users WHERE name = ?", (user_input,))

# ✅ SQLAlchemy ORM
session.query(User).filter(User.name == user_input).first()

# ✅ Pydantic + FastAPI
@app.get("/users/{user_id}")
async def get_user(user_id: int):  # FastAPI 自动验证类型
    ...

21.5 常见漏洞防范

21.5.1 防止命令注入

import subprocess

# ❌ 危险
user_input = "file.txt; rm -rf /"
subprocess.run(f"cat {user_input}", shell=True)  # 命令注入!

# ✅ 安全
subprocess.run(["cat", user_input], check=True)  # 不使用 shell

# ✅ 使用 shlex 转义
import shlex
subprocess.run(f"cat {shlex.quote(user_input)}", shell=True)

21.5.2 防止反序列化攻击

import json

# ❌ 危险:pickle 可执行任意代码
# import pickle
# pickle.loads(untrusted_data)  # 可能执行恶意代码!

# ✅ 安全:使用 json
data = json.loads(trusted_json_string)

# ✅ YAML 使用 safe_load
import yaml
data = yaml.safe_load(trusted_yaml_string)  # 不使用 yaml.load()

21.5.3 环境变量管理

import os
from pathlib import Path

# ❌ 硬编码密钥
API_KEY = "sk-1234567890abcdef"  # 不要这样做!

# ✅ 从环境变量读取
API_KEY = os.environ.get("API_KEY")
if not API_KEY:
    raise ValueError("请设置 API_KEY 环境变量")

# ✅ 使用 python-dotenv
from dotenv import load_dotenv
load_dotenv()  # 从 .env 文件加载
API_KEY = os.environ.get("API_KEY")
# .env 文件
API_KEY=sk-1234567890
DATABASE_URL=postgresql://user:pass@localhost/db
SECRET_KEY=your-secret-key-here
# .gitignore
.env
*.pem
*.key

21.6 HTTPS 和安全请求

import httpx
import ssl
import certifi

# ✅ 使用 HTTPS
async def secure_request(url: str):
    async with httpx.AsyncClient(verify=certifi.where()) as response:
        return await response.get(url)

# ✅ 验证 SSL 证书(默认开启)
# ❌ 禁用 SSL 验证(仅测试环境)
# httpx.get(url, verify=False)  # 危险!

21.7 依赖安全

# 检查已知漏洞
$ pip install safety
$ safety check

# 使用 pip-audit(推荐)
$ pip install pip-audit
$ pip-audit

# 使用 bandit 进行代码安全扫描
$ pip install bandit
$ bandit -r src/

# 使用 snyk
$ snyk test

21.8 安全编码清单

类别 检查项
输入验证 所有外部输入都经过验证
输出编码 HTML/URL/SQL 输出正确编码
认证 密码使用 bcrypt/argon2 哈希
会话 使用安全的会话管理
授权 每个操作都验证权限
加密 使用 HTTPS,敏感数据加密
日志 不记录敏感信息
依赖 定期更新,扫描漏洞
密钥 使用环境变量,不硬编码
错误 不向用户暴露内部错误

21.9 注意事项

🔴 注意

  • 永远不要信任用户输入
  • 永远不要在代码中硬编码密码/密钥
  • 永远不要使用 pickle 反序列化不受信任的数据
  • 永远不要禁用 SSL 证书验证(生产环境)

💡 提示

  • 使用 secrets 模块而非 random 生成安全随机数
  • 使用 bcryptargon2 哈希密码
  • 使用 bandit 扫描代码安全问题
  • 使用 pip-audit 检查依赖漏洞

📌 业务场景

import secrets
import bcrypt
from pydantic import BaseModel, Field

class AuthService:
    def hash_password(self, password: str) -> str:
        return bcrypt.hashpw(password.encode(), bcrypt.gensalt()).decode()

    def verify_password(self, password: str, hashed: str) -> bool:
        return bcrypt.checkpw(password.encode(), hashed.encode())

    def generate_token(self) -> str:
        return secrets.token_urlsafe(32)

    def generate_reset_token(self, user_id: int) -> str:
        """生成密码重置令牌(实际应配合 Redis 设置过期时间)。"""
        token = secrets.token_urlsafe(32)
        # 存储 token -> user_id 映射,设置 15 分钟过期
        return token

21.10 扩展阅读