强曰为道
与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

SQLite 完全指南 / 19 - 最佳实践

19 - 最佳实践:Schema 设计、并发策略与何时不用 SQLite

19.1 Schema 设计最佳实践

19.1.1 主键设计

-- ✅ 最佳:INTEGER PRIMARY KEY(等价于 rowid,性能最优)
CREATE TABLE users (
    id INTEGER PRIMARY KEY,
    name TEXT NOT NULL
);

-- ⚠️ 可以但有额外开销:AUTOINCREMENT
CREATE TABLE users (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    name TEXT NOT NULL
);
-- AUTOINCREMENT 使用 sqlite_sequence 表记录最大 ID
-- 比不带 AUTOINCREMENT 稍慢

-- ❌ 不推荐:使用其他类型做主键
CREATE TABLE users (
    id TEXT PRIMARY KEY,  -- UUID 做主键性能差
    name TEXT NOT NULL
);

-- ✅ 如果必须用 UUID:作为额外列 + INTEGER 主键
CREATE TABLE users (
    id INTEGER PRIMARY KEY,
    uid TEXT NOT NULL UNIQUE DEFAULT (lower(hex(randomblob(16)))),
    name TEXT NOT NULL
);
CREATE INDEX idx_users_uid ON users(uid);

主键选择对比

主键类型插入性能查询性能索引大小适用场景
INTEGER PRIMARY KEY⭐⭐⭐ 最快⭐⭐⭐ 最快最小默认首选
INTEGER PRIMARY KEY AUTOINCREMENT⭐⭐ 较快⭐⭐⭐需要严格递增
UUID (TEXT)⭐ 慢⭐⭐分布式系统
UUID + INTEGER⭐⭐⭐⭐⭐需要 UUID 但在意性能

19.1.2 外键设计

-- ✅ 开启外键约束
PRAGMA foreign_keys = ON;

CREATE TABLE orders (
    id INTEGER PRIMARY KEY,
    user_id INTEGER NOT NULL,
    amount REAL NOT NULL,
    FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE
);

-- ✅ 外键列创建索引(加速 JOIN 和约束检查)
CREATE INDEX idx_orders_user_id ON orders(user_id);

-- ON DELETE/ON UPDATE 选项:
-- CASCADE      — 级联删除/更新
-- SET NULL     — 设为 NULL
-- SET DEFAULT  — 设为默认值
-- RESTRICT     — 阻止操作(默认行为)
-- NO ACTION    — 类似 RESTRICT

19.1.3 时间字段设计

-- 方案 1:TEXT ISO 8601(推荐,可读性好)
CREATE TABLE events (
    id INTEGER PRIMARY KEY,
    name TEXT NOT NULL,
    created_at TEXT NOT NULL DEFAULT (datetime('now', 'localtime'))
);

-- 方案 2:INTEGER Unix 时间戳(紧凑,计算快)
CREATE TABLE events (
    id INTEGER PRIMARY KEY,
    name TEXT NOT NULL,
    created_at INTEGER NOT NULL DEFAULT (strftime('%s', 'now'))
);

-- 方案 3:同时存储两者(灵活查询)
CREATE TABLE events (
    id INTEGER PRIMARY KEY,
    name TEXT NOT NULL,
    created_at TEXT NOT NULL DEFAULT (datetime('now', 'localtime')),
    created_ts INTEGER NOT NULL DEFAULT (strftime('%s', 'now'))
);

19.1.4 枚举值设计

-- ✅ TEXT + CHECK 约束(推荐)
CREATE TABLE orders (
    id INTEGER PRIMARY KEY,
    status TEXT NOT NULL DEFAULT 'pending'
        CHECK(status IN ('pending', 'paid', 'shipped', 'completed', 'cancelled'))
);

-- ✅ 外键引用枚举表(更灵活)
CREATE TABLE order_statuses (
    code TEXT PRIMARY KEY,
    label TEXT NOT NULL,
    sort_order INTEGER NOT NULL
);
INSERT INTO order_statuses VALUES
    ('pending', '待处理', 1),
    ('paid', '已付款', 2),
    ('shipped', '已发货', 3),
    ('completed', '已完成', 4),
    ('cancelled', '已取消', 5);

CREATE TABLE orders (
    id INTEGER PRIMARY KEY,
    status TEXT NOT NULL DEFAULT 'pending',
    FOREIGN KEY (status) REFERENCES order_statuses(code)
);

19.1.5 金额存储

-- ✅ INTEGER 存储分为单位(推荐)
CREATE TABLE transactions (
    id INTEGER PRIMARY KEY,
    amount_cents INTEGER NOT NULL,  -- 19.99 元存储为 1999
    currency TEXT DEFAULT 'CNY'
);
-- 查询时:SELECT amount_cents / 100.0 AS amount FROM transactions;

-- ❌ REAL 浮点数(精度问题)
CREATE TABLE transactions (
    id INTEGER PRIMARY KEY,
    amount REAL NOT NULL  -- 0.1 + 0.2 ≠ 0.3
);

19.1.6 软删除设计

-- 软删除:标记删除而非物理删除
CREATE TABLE records (
    id INTEGER PRIMARY KEY,
    data TEXT NOT NULL,
    is_deleted INTEGER NOT NULL DEFAULT 0,
    deleted_at TEXT
);

-- 查询时过滤
SELECT * FROM records WHERE is_deleted = 0;

-- 删除时标记
UPDATE records SET is_deleted = 1, deleted_at = datetime('now') WHERE id = 1;

-- 部分索引:只为未删除的数据创建索引
CREATE INDEX idx_records_active ON records(data) WHERE is_deleted = 0;

-- 定期清理已删除数据
DELETE FROM records WHERE is_deleted = 1 AND deleted_at < datetime('now', '-30 days');

19.1.7 多租户设计

-- 方案 1:共享表 + tenant_id(推荐小规模)
CREATE TABLE products (
    id INTEGER PRIMARY KEY,
    tenant_id TEXT NOT NULL,
    name TEXT NOT NULL,
    price INTEGER NOT NULL
);
CREATE INDEX idx_products_tenant ON products(tenant_id);

-- 所有查询都带 tenant_id
SELECT * FROM products WHERE tenant_id = ? AND name LIKE ?;

-- 方案 2:每租户独立数据库(大规模隔离)
ATTACH DATABASE 'tenant_abc.db' AS tenant;
SELECT * FROM tenant.products;

19.2 并发策略

19.2.1 读写并发

WAL 模式下的并发模型:
┌───────────────────────────────────────┐
│           多个读连接(并发)           │
│   Reader A ←→ Reader B ←→ Reader C    │
│              ↓ WAL ↓                  │
│         ┌──────────────┐              │
│         │  写连接(单个)│              │
│         │  Writer       │              │
│         └──────────────┘              │
└───────────────────────────────────────┘
# Python 读写分离示例
import sqlite3
import threading

class SQLiteManager:
    def __init__(self, db_path):
        self.db_path = db_path
        self._write_lock = threading.Lock()

    def _get_read_conn(self):
        """获取只读连接(可多线程并发)"""
        conn = sqlite3.connect(f'file:{self.db_path}?mode=ro', uri=True)
        conn.execute('PRAGMA journal_mode = WAL')
        conn.row_factory = sqlite3.Row
        return conn

    def _get_write_conn(self):
        """获取写连接(需串行化)"""
        conn = sqlite3.connect(self.db_path)
        conn.execute('PRAGMA journal_mode = WAL')
        conn.execute('PRAGMA foreign_keys = ON')
        conn.execute('PRAGMA busy_timeout = 5000')
        conn.row_factory = sqlite3.Row
        return conn

    def read(self, query, params=()):
        with self._get_read_conn() as conn:
            return conn.execute(query, params).fetchall()

    def write(self, query, params=()):
        with self._write_lock:
            with self._get_write_conn() as conn:
                result = conn.execute(query, params)
                conn.commit()
                return result

19.2.2 处理 SQLITE_BUSY

import sqlite3
import time

def execute_with_retry(conn, query, params=(), max_retries=5):
    """带重试的 SQL 执行"""
    for attempt in range(max_retries):
        try:
            return conn.execute(query, params)
        except sqlite3.OperationalError as e:
            if 'database is locked' in str(e) and attempt < max_retries - 1:
                time.sleep(0.1 * (attempt + 1))  # 递增等待
                continue
            raise

# 设置 busy_timeout(更简单的方案)
conn = sqlite3.connect('mydb.db', timeout=10)  # 10 秒超时
conn.execute('PRAGMA busy_timeout = 10000')

19.2.3 高并发写入策略

import sqlite3
from queue import Queue
from threading import Thread

class WriteWorker:
    """单线程写入 worker(避免锁竞争)"""
    def __init__(self, db_path):
        self.db_path = db_path
        self.queue = Queue()
        self.conn = sqlite3.connect(db_path, check_same_thread=False)
        self.conn.execute('PRAGMA journal_mode = WAL')
        self._start_worker()

    def _start_worker(self):
        def worker():
            while True:
                task = self.queue.get()
                if task is None:
                    break
                query, params, callback = task
                try:
                    result = self.conn.execute(query, params)
                    self.conn.commit()
                    if callback:
                        callback(result)
                except Exception as e:
                    if callback:
                        callback(e)
                finally:
                    self.queue.task_done()

        self.thread = Thread(target=worker, daemon=True)
        self.thread.start()

    def execute_async(self, query, params=(), callback=None):
        """异步执行写操作"""
        self.queue.put((query, params, callback))

    def close(self):
        self.queue.put(None)
        self.thread.join()
        self.conn.close()

19.3 何时不用 SQLite

19.3.1 不适合的场景

场景原因推荐方案
高并发写入(>100 QPS)写锁粒度为整个数据库PostgreSQL、MySQL
多服务器共享数据无法通过网络访问PostgreSQL、MySQL
需要用户权限管理无内置用户系统PostgreSQL
超大规模数据(>TB)单文件存储限制分布式数据库
复杂分析查询缺少高级优化器ClickHouse、DuckDB
全文搜索(高并发)FTS5 不适合高并发Elasticsearch
实时数据流无流式处理Redis、Kafka

19.3.2 适合的场景

场景理由
嵌入式/移动应用零依赖、零配置
桌面应用本地存储单一文件、便携
单用户 Web 应用中小流量足够
开发/测试环境无需安装数据库服务
数据分析探索直接查询文件
缓存层替代 Redis 作为本地缓存
日志/审计写入频率可控
配置存储结构化配置数据
IoT/边缘计算资源受限环境

19.3.3 决策流程图

需要数据库
    │
    ├── 多服务器共享数据? ── 是 ──→ PostgreSQL/MySQL
    │
    ├── 并发写入 > 100 QPS? ── 是 ──→ PostgreSQL/MySQL
    │
    ├── 数据量 > 100GB? ── 是 ──→ 分布式数据库
    │
    ├── 需要网络访问? ── 是 ──→ PostgreSQL/MySQL
    │
    ├── 需要用户权限管理? ── 是 ──→ PostgreSQL/MySQL
    │
    └── 否 ──→ SQLite ✅

19.4 错误处理

19.4.1 常见错误

错误码含义处理方式
SQLITE_BUSY数据库被锁定重试或增加 busy_timeout
SQLITE_CONSTRAINT约束违反检查数据或使用 ON CONFLICT
SQLITE_CORRUPT数据库损坏从备份恢复
SQLITE_FULL磁盘空间不足清理空间或 VACUUM
SQLITE_READONLY只读数据库检查文件权限
SQLITE_IOERRI/O 错误检查文件系统
import sqlite3

def safe_execute(db_path, query, params=()):
    try:
        conn = sqlite3.connect(db_path, timeout=10)
        conn.execute('PRAGMA busy_timeout = 10000')
        result = conn.execute(query, params)
        conn.commit()
        return result
    except sqlite3.OperationalError as e:
        if 'database is locked' in str(e):
            print('数据库繁忙,请稍后重试')
        elif 'database disk image is malformed' in str(e):
            print('数据库损坏,需要从备份恢复')
        elif 'disk I/O error' in str(e):
            print('磁盘 I/O 错误')
        else:
            raise
    except sqlite3.IntegrityError as e:
        if 'UNIQUE constraint failed' in str(e):
            print('数据已存在')
        elif 'FOREIGN KEY constraint failed' in str(e):
            print('引用的数据不存在')
        else:
            raise
    finally:
        conn.close()

19.5 测试策略

19.5.1 使用内存数据库测试

import sqlite3
import pytest

@pytest.fixture
def db():
    """每个测试使用独立的内存数据库"""
    conn = sqlite3.connect(':memory:')
    conn.execute('PRAGMA foreign_keys = ON')
    conn.executescript('''
        CREATE TABLE users (
            id INTEGER PRIMARY KEY,
            name TEXT NOT NULL,
            email TEXT UNIQUE NOT NULL
        );
        CREATE TABLE orders (
            id INTEGER PRIMARY KEY,
            user_id INTEGER NOT NULL,
            amount REAL NOT NULL,
            FOREIGN KEY (user_id) REFERENCES users(id)
        );
    ''')
    yield conn
    conn.close()

def test_create_user(db):
    db.execute("INSERT INTO users (name, email) VALUES ('张三', 'zs@example.com')")
    user = db.execute("SELECT * FROM users WHERE email = 'zs@example.com'").fetchone()
    assert user[1] == '张三'

def test_unique_email(db):
    db.execute("INSERT INTO users (name, email) VALUES ('张三', 'zs@example.com')")
    with pytest.raises(sqlite3.IntegrityError):
        db.execute("INSERT INTO users (name, email) VALUES ('李四', 'zs@example.com')")

19.6 代码规范

19.6.1 SQL 命名规范

元素规范示例
表名小写复数,下划线分隔users, order_items
列名小写,下划线分隔created_at, user_id
索引idx_表名_列名idx_users_email
主键idid INTEGER PRIMARY KEY
外键表名_id(单数)user_id, order_id
时间字段动作_atcreated_at, updated_at
布尔字段is_形容词is_active, is_deleted

19.6.2 连接管理规范

# ✅ 使用上下文管理器
with sqlite3.connect('mydb.db') as conn:
    conn.execute(...)

# ✅ 使用 try/finally
conn = sqlite3.connect('mydb.db')
try:
    conn.execute(...)
    conn.commit()
finally:
    conn.close()

# ❌ 不要忘记关闭连接
conn = sqlite3.connect('mydb.db')
conn.execute(...)  # 如果这里抛异常,连接不会关闭
conn.close()

19.6.3 PRAGMA 设置规范

def get_connection(db_path):
    """标准连接初始化"""
    conn = sqlite3.connect(db_path, timeout=10)
    conn.row_factory = sqlite3.Row

    # 标准 PRAGMA 设置
    pragmas = {
        'journal_mode': 'WAL',
        'foreign_keys': 'ON',
        'busy_timeout': '10000',
        'synchronous': 'NORMAL',
        'temp_store': 'MEMORY',
        'cache_size': '-8000',  # 8MB
    }

    for key, value in pragmas.items():
        conn.execute(f'PRAGMA {key} = {value}')

    return conn

19.7 部署检查清单

#检查项说明
1WAL 模式已开启PRAGMA journal_mode = WAL
2外键已开启PRAGMA foreign_keys = ON
3busy_timeout 已设置PRAGMA busy_timeout = 5000
4数据库文件权限正确chmod 600 mydb.db
5备份脚本已配置定时备份 + 验证
6磁盘空间充足VACUUM 需要 2 倍空间
7索引已创建检查热查询的索引
8ANALYZE 已执行PRAGMA analyze
9数据完整性检查PRAGMA integrity_check
10日志模式正确WAL 文件正常

⚠️ 注意事项

  1. 不要在循环中打开关闭连接——连接建立有开销
  2. 不要忘记 PRAGMA foreign_keys = ON——默认是关闭的
  3. 不要将数据库放在 NFS/SMB 上——文件锁不可靠
  4. 不要在事务中执行长时间操作——会阻塞其他写操作
  5. 不要使用 SELECT *——只查询需要的列
  6. 不要忽略 SQLITE_BUSY 错误——需要合理的重试机制

💡 技巧

  1. INTEGER PRIMARY KEY 是最佳主键选择——性能最优
  2. 外键列一定要创建索引——加速 JOIN 和约束检查
  3. 使用 CHECK 约束而非应用层验证——数据完整性更可靠
  4. 使用 WITHOUT ROWID 优化复合主键的小型表
  5. 定期 PRAGMA optimize 保持优化器统计信息最新

📌 业务场景

场景一:项目启动规范

# 项目初始化时的标准设置
def init_database(db_path):
    conn = sqlite3.connect(db_path)
    conn.executescript('''
        PRAGMA journal_mode = WAL;
        PRAGMA foreign_keys = ON;
        PRAGMA busy_timeout = 5000;
        PRAGMA synchronous = NORMAL;
    ''')
    conn.executescript('''
        CREATE TABLE IF NOT EXISTS migrations (
            id INTEGER PRIMARY KEY,
            name TEXT NOT NULL,
            applied_at TEXT DEFAULT (datetime('now'))
        );
    ''')
    conn.commit()
    return conn

场景二:数据库迁移管理

class Migration:
    def __init__(self, conn):
        self.conn = conn
        self._ensure_migrations_table()

    def _ensure_migrations_table(self):
        self.conn.execute('''
            CREATE TABLE IF NOT EXISTS schema_migrations (
                version INTEGER PRIMARY KEY,
                applied_at TEXT DEFAULT (datetime('now'))
            )
        ''')

    def applied(self, version):
        row = self.conn.execute(
            'SELECT 1 FROM schema_migrations WHERE version = ?', (version,)
        ).fetchone()
        return row is not None

    def apply(self, version, sql):
        if self.applied(version):
            return
        self.conn.executescript(sql)
        self.conn.execute('INSERT INTO schema_migrations (version) VALUES (?)', (version,))
        self.conn.commit()

🔗 扩展阅读


📖 下一章20 - 实战场景 —— 嵌入式应用、本地缓存、单用户应用、日志