强曰为道
与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

Python 编程教程 / 17 - 数据库

第 17 章:数据库

掌握 SQLite、SQLAlchemy ORM、数据库迁移和异步数据库操作。


17.1 SQLite(标准库)

17.1.1 基本操作

import sqlite3

# 连接数据库(文件不存在会自动创建)
conn = sqlite3.connect("app.db")
conn.row_factory = sqlite3.Row  # 返回字典式行对象

# 创建表
conn.execute("""
    CREATE TABLE IF NOT EXISTS users (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        name TEXT NOT NULL,
        email TEXT UNIQUE NOT NULL,
        created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
    )
""")
conn.commit()

# 插入
conn.execute("INSERT INTO users (name, email) VALUES (?, ?)", ("Alice", "alice@example.com"))
conn.commit()

# 查询
rows = conn.execute("SELECT * FROM users").fetchall()
for row in rows:
    print(dict(row))

# 参数化查询(防 SQL 注入)
user = conn.execute("SELECT * FROM users WHERE email = ?", ("alice@example.com",)).fetchone()
print(user["name"])

# 关闭
conn.close()

17.1.2 上下文管理器

import sqlite3

with sqlite3.connect("app.db") as conn:
    conn.execute("INSERT INTO users (name, email) VALUES (?, ?)", ("Bob", "bob@example.com"))
    # 自动提交(如果无异常)或回滚(如果有异常)

17.2 SQLAlchemy ORM

17.2.1 安装与配置

$ pip install sqlalchemy

17.2.2 声明式模型

from datetime import datetime
from sqlalchemy import create_engine, String, ForeignKey, func
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship, Session

class Base(DeclarativeBase):
    pass

class User(Base):
    __tablename__ = "users"

    id: Mapped[int] = mapped_column(primary_key=True)
    name: Mapped[str] = mapped_column(String(100))
    email: Mapped[str] = mapped_column(String(255), unique=True)
    created_at: Mapped[datetime] = mapped_column(default=func.now())

    # 关系
    posts: Mapped[list["Post"]] = relationship(back_populates="author")

    def __repr__(self) -> str:
        return f"User(id={self.id}, name={self.name!r})"

class Post(Base):
    __tablename__ = "posts"

    id: Mapped[int] = mapped_column(primary_key=True)
    title: Mapped[str] = mapped_column(String(200))
    content: Mapped[str]
    user_id: Mapped[int] = mapped_column(ForeignKey("users.id"))

    author: Mapped["User"] = relationship(back_populates="posts")

17.2.3 CRUD 操作

# 创建引擎
engine = create_engine("sqlite:///app.db", echo=True)

# 创建表
Base.metadata.create_all(engine)

# 使用 Session
with Session(engine) as session:
    # Create
    user = User(name="Alice", email="alice@example.com")
    session.add(user)
    session.commit()

    # Read
    user = session.query(User).filter_by(name="Alice").first()
    users = session.query(User).filter(User.name.like("A%")).all()

    # Update
    user.name = "Alice Updated"
    session.commit()

    # Delete
    session.delete(user)
    session.commit()

17.2.4 查询构建

from sqlalchemy import select, and_, or_

with Session(engine) as session:
    # 基本查询
    stmt = select(User).where(User.name == "Alice")
    users = session.scalars(stmt).all()

    # 条件组合
    stmt = select(User).where(
        and_(
            User.name.like("A%"),
            or_(User.email.like("%@gmail.com"), User.email.like("%@outlook.com")),
        )
    )

    # 排序和分页
    stmt = select(User).order_by(User.created_at.desc()).offset(10).limit(10)

    # 聚合
    stmt = select(func.count(User.id))
    count = session.scalar(stmt)

    # 关联查询
    stmt = select(User, Post).join(Post, User.id == Post.user_id)
    results = session.execute(stmt).all()

17.3 连接池

from sqlalchemy import create_engine

# 配置连接池
engine = create_engine(
    "postgresql://user:password@localhost:5432/mydb",
    pool_size=10,           # 连接池大小
    max_overflow=20,        # 最大溢出连接
    pool_timeout=30,        # 获取连接超时
    pool_recycle=3600,      # 连接回收时间
    pool_pre_ping=True,     # 使用前检查连接
)

17.4 数据库迁移(Alembic)

# 安装
$ pip install alembic

# 初始化
$ alembic init alembic

# 创建迁移
$ alembic revision --autogenerate -m "add users table"

# 执行迁移
$ alembic upgrade head

# 回滚
$ alembic downgrade -1

# 查看历史
$ alembic history

alembic.ini 关键配置:

[alembic]
script_location = alembic
sqlalchemy.url = sqlite:///app.db

17.5 异步 ORM(SQLAlchemy 2.0+)

from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
import asyncio

async_engine = create_async_engine("sqlite+aiosqlite:///app.db")
async_session = async_sessionmaker(async_engine, class_=AsyncSession)

async def main():
    async with async_session() as session:
        user = User(name="Alice", email="alice@example.com")
        session.add(user)
        await session.commit()

        # 查询
        result = await session.execute(select(User).where(User.name == "Alice"))
        user = result.scalar_one()
        print(user)

asyncio.run(main())

17.6 数据库选型

数据库 类型 适用场景 Python 驱动
SQLite 嵌入式 本地开发、小型应用 sqlite3 (标准库)
PostgreSQL 关系型 生产环境首选 psycopg2, asyncpg
MySQL 关系型 Web 应用 pymysql, aiomysql
MongoDB 文档型 灵活 schema pymongo, motor
Redis 键值型 缓存、会话 redis

17.7 注意事项

🔴 注意

  • 永远使用参数化查询,防止 SQL 注入
  • 使用连接池管理数据库连接
  • 生产环境使用 PostgreSQL 而非 SQLite
  • 数据库迁移脚本要纳入版本控制

💡 提示

  • 使用 SQLAlchemy 2.0+ 的新式声明模型
  • 使用 Alembic 管理数据库迁移
  • 使用 async_session 支持异步操作
  • echo=True 仅在开发时启用,生产环境关闭

📌 业务场景

from sqlalchemy import create_engine, select
from sqlalchemy.orm import Session

class UserRepository:
    def __init__(self, engine):
        self.engine = engine

    def get_by_id(self, user_id: int) -> User | None:
        with Session(self.engine) as session:
            return session.get(User, user_id)

    def get_by_email(self, email: str) -> User | None:
        with Session(self.engine) as session:
            return session.scalars(
                select(User).where(User.email == email)
            ).first()

    def create(self, name: str, email: str) -> User:
        with Session(self.engine) as session:
            user = User(name=name, email=email)
            session.add(user)
            session.commit()
            session.refresh(user)
            return user

17.8 扩展阅读