强曰为道
与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

SQLite 完全指南 / 20 - 实战场景

20 - 实战场景:嵌入式应用、本地缓存、单用户应用与日志

20.1 嵌入式应用

20.1.1 IoT 传感器数据采集

-- IoT 设备传感器数据存储
PRAGMA journal_mode = WAL;
PRAGMA synchronous = NORMAL;

CREATE TABLE sensor_readings (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    device_id TEXT NOT NULL,
    sensor_type TEXT NOT NULL CHECK(sensor_type IN ('temperature', 'humidity', 'pressure', 'light')),
    value REAL NOT NULL,
    unit TEXT NOT NULL,
    recorded_at TEXT NOT NULL DEFAULT (datetime('now', 'localtime'))
);

-- 按设备和时间查询的索引
CREATE INDEX idx_sensor_device_time ON sensor_readings(device_id, recorded_at);

-- 按传感器类型和时间查询的索引
CREATE INDEX idx_sensor_type_time ON sensor_readings(sensor_type, recorded_at);

-- 批量插入传感器数据
BEGIN;
INSERT INTO sensor_readings (device_id, sensor_type, value, unit) VALUES
    ('device_001', 'temperature', 25.6, '°C'),
    ('device_001', 'humidity', 62.3, '%'),
    ('device_001', 'pressure', 1013.25, 'hPa'),
    ('device_002', 'temperature', 23.1, '°C'),
    ('device_002', 'humidity', 55.8, '%');
COMMIT;

-- 查询最近 24 小时的温度数据
SELECT device_id, value, recorded_at
FROM sensor_readings
WHERE sensor_type = 'temperature'
AND recorded_at > datetime('now', '-24 hours')
ORDER BY recorded_at DESC;

-- 按设备统计平均值
SELECT device_id, sensor_type,
       ROUND(AVG(value), 2) AS avg_value,
       MIN(value) AS min_value,
       MAX(value) AS max_value,
       COUNT(*) AS reading_count
FROM sensor_readings
WHERE recorded_at > datetime('now', '-7 days')
GROUP BY device_id, sensor_type;

-- 按小时聚合
SELECT
    strftime('%Y-%m-%d %H:00', recorded_at) AS hour,
    sensor_type,
    ROUND(AVG(value), 2) AS avg_value
FROM sensor_readings
WHERE device_id = 'device_001'
AND recorded_at > datetime('now', '-24 hours')
GROUP BY hour, sensor_type
ORDER BY hour;

20.1.2 数据过期清理

-- 创建数据保留策略:保留最近 30 天的数据
CREATE TRIGGER cleanup_old_readings AFTER INSERT ON sensor_readings
BEGIN
    DELETE FROM sensor_readings
    WHERE recorded_at < datetime('now', '-30 days')
    AND id IN (SELECT id FROM sensor_readings ORDER BY id LIMIT 100);
END;

-- 或定期清理脚本
DELETE FROM sensor_readings WHERE recorded_at < datetime('now', '-30 days');
VACUUM;

20.1.3 设备配置管理

CREATE TABLE devices (
    id TEXT PRIMARY KEY,
    name TEXT NOT NULL,
    firmware_version TEXT,
    config TEXT DEFAULT '{}',  -- JSON 配置
    last_seen TEXT,
    is_active INTEGER DEFAULT 1
);

-- 更新设备配置
UPDATE devices
SET config = json_set(config, '$.sample_rate', 5000, '$.threshold', 30.0)
WHERE id = 'device_001';

-- 查询设备配置
SELECT id, json_extract(config, '$.sample_rate') AS sample_rate,
       json_extract(config, '$.threshold') AS threshold
FROM devices
WHERE is_active = 1;

20.2 本地缓存系统

20.2.1 通用缓存方案

-- 本地缓存表
CREATE TABLE cache (
    key TEXT PRIMARY KEY,
    value TEXT NOT NULL,
    content_type TEXT DEFAULT 'application/json',
    etag TEXT,
    expires_at TEXT NOT NULL,
    created_at TEXT NOT NULL DEFAULT (datetime('now')),
    accessed_at TEXT NOT NULL DEFAULT (datetime('now')),
    access_count INTEGER DEFAULT 0
);

-- 缓存索引
CREATE INDEX idx_cache_expires ON cache(expires_at);

-- 设置缓存
INSERT OR REPLACE INTO cache (key, value, content_type, etag, expires_at)
VALUES (
    'user:profile:42',
    '{"name":"张三","email":"zs@example.com"}',
    'application/json',
    '"abc123"',
    datetime('now', '+1 hour')
);

-- 获取缓存
SELECT value, etag FROM cache
WHERE key = 'user:profile:42'
AND expires_at > datetime('now');

-- 更新访问信息
UPDATE cache SET
    accessed_at = datetime('now'),
    access_count = access_count + 1
WHERE key = 'user:profile:42';

-- 清理过期缓存
DELETE FROM cache WHERE expires_at < datetime('now');

-- LRU 淘汰(空间不足时删除最久未访问的)
DELETE FROM cache WHERE key IN (
    SELECT key FROM cache ORDER BY accessed_at ASC LIMIT 100
);

20.2.2 HTTP 响应缓存

import sqlite3
import json
import hashlib
from datetime import datetime, timedelta

class SQLiteCache:
    def __init__(self, db_path='cache.db'):
        self.conn = sqlite3.connect(db_path)
        self._init_db()

    def _init_db(self):
        self.conn.executescript('''
            PRAGMA journal_mode = WAL;
            CREATE TABLE IF NOT EXISTS http_cache (
                url_hash TEXT PRIMARY KEY,
                url TEXT NOT NULL,
                method TEXT NOT NULL DEFAULT 'GET',
                status_code INTEGER,
                headers TEXT,
                body TEXT,
                etag TEXT,
                expires_at TEXT,
                created_at TEXT DEFAULT (datetime('now'))
            );
            CREATE INDEX IF NOT EXISTS idx_cache_expires ON http_cache(expires_at);
        ''')

    def _hash_url(self, url):
        return hashlib.sha256(url.encode()).hexdigest()

    def get(self, url):
        url_hash = self._hash_url(url)
        row = self.conn.execute(
            '''SELECT status_code, headers, body, etag FROM http_cache
               WHERE url_hash = ? AND (expires_at IS NULL OR expires_at > datetime('now'))''',
            (url_hash,)
        ).fetchone()
        if row:
            return {
                'status_code': row[0],
                'headers': json.loads(row[1]) if row[1] else {},
                'body': row[2],
                'etag': row[3]
            }
        return None

    def put(self, url, status_code, headers, body, ttl_seconds=3600):
        url_hash = self._hash_url(url)
        expires_at = (datetime.now() + timedelta(seconds=ttl_seconds)).isoformat()
        etag = headers.get('etag')
        self.conn.execute(
            '''INSERT OR REPLACE INTO http_cache
               (url_hash, url, status_code, headers, body, etag, expires_at)
               VALUES (?, ?, ?, ?, ?, ?, ?)''',
            (url_hash, url, status_code, json.dumps(headers), body, etag, expires_at)
        )
        self.conn.commit()

    def invalidate(self, url):
        url_hash = self._hash_url(url)
        self.conn.execute('DELETE FROM http_cache WHERE url_hash = ?', (url_hash,))
        self.conn.commit()

    def clear_expired(self):
        self.conn.execute("DELETE FROM http_cache WHERE expires_at < datetime('now')")
        self.conn.commit()

# 使用示例
cache = SQLiteCache()
cached = cache.get('https://api.example.com/users/42')
if cached:
    print('缓存命中:', cached['body'])
else:
    # 发起 HTTP 请求...
    cache.put('https://api.example.com/users/42', 200, {'etag': '"v1"'}, '{"name":"张三"}')

20.2.3 会话缓存

CREATE TABLE sessions (
    token TEXT PRIMARY KEY,
    user_id INTEGER NOT NULL,
    data TEXT DEFAULT '{}',
    ip_address TEXT,
    user_agent TEXT,
    created_at TEXT DEFAULT (datetime('now')),
    expires_at TEXT NOT NULL
);

CREATE INDEX idx_sessions_user ON sessions(user_id);
CREATE INDEX idx_sessions_expires ON sessions(expires_at);

-- 创建会话
INSERT INTO sessions (token, user_id, data, ip_address, expires_at)
VALUES ('abc123...', 42, '{"role":"admin"}', '192.168.1.1', datetime('now', '+24 hours'));

-- 验证会话
SELECT user_id, data FROM sessions
WHERE token = 'abc123...'
AND expires_at > datetime('now');

-- 清理过期会话
DELETE FROM sessions WHERE expires_at < datetime('now');

20.3 单用户桌面应用

20.3.1 笔记应用

PRAGMA journal_mode = WAL;
PRAGMA foreign_keys = ON;

-- 笔记本
CREATE TABLE notebooks (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    name TEXT NOT NULL,
    description TEXT,
    color TEXT DEFAULT '#1976D2',
    sort_order INTEGER DEFAULT 0,
    created_at TEXT DEFAULT (datetime('now', 'localtime')),
    updated_at TEXT DEFAULT (datetime('now', 'localtime'))
);

-- 标签
CREATE TABLE tags (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    name TEXT NOT NULL UNIQUE,
    color TEXT DEFAULT '#757575'
);

-- 笔记
CREATE TABLE notes (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    notebook_id INTEGER NOT NULL,
    title TEXT NOT NULL,
    content TEXT DEFAULT '',
    is_pinned INTEGER DEFAULT 0,
    is_trashed INTEGER DEFAULT 0,
    created_at TEXT DEFAULT (datetime('now', 'localtime')),
    updated_at TEXT DEFAULT (datetime('now', 'localtime')),
    FOREIGN KEY (notebook_id) REFERENCES notebooks(id) ON DELETE CASCADE
);

-- 笔记-标签关联
CREATE TABLE note_tags (
    note_id INTEGER NOT NULL,
    tag_id INTEGER NOT NULL,
    PRIMARY KEY (note_id, tag_id),
    FOREIGN KEY (note_id) REFERENCES notes(id) ON DELETE CASCADE,
    FOREIGN KEY (tag_id) REFERENCES tags(id) ON DELETE CASCADE
);

-- 笔记 FTS 索引
CREATE VIRTUAL TABLE notes_fts USING fts5(
    title, content,
    content='notes', content_rowid='id'
);

CREATE TRIGGER notes_ai AFTER INSERT ON notes BEGIN
    INSERT INTO notes_fts(rowid, title, content) VALUES (new.id, new.title, new.content);
END;
CREATE TRIGGER notes_ad AFTER DELETE ON notes BEGIN
    INSERT INTO notes_fts(notes_fts, rowid, title, content) VALUES ('delete', old.id, old.title, old.content);
END;
CREATE TRIGGER notes_au AFTER UPDATE ON notes BEGIN
    INSERT INTO notes_fts(notes_fts, rowid, title, content) VALUES ('delete', old.id, old.title, old.content);
    INSERT INTO notes_fts(rowid, title, content) VALUES (new.id, new.title, new.content);
END;

-- 插入示例数据
INSERT INTO notebooks (name) VALUES ('工作'), ('学习'), ('生活');
INSERT INTO tags (name, color) VALUES ('重要', '#F44336'), ('待办', '#FF9800'), ('参考', '#4CAF50');

INSERT INTO notes (notebook_id, title, content) VALUES
    (1, '周报模板', '## 本周工作总结\n\n## 下周计划\n\n## 需要协调的问题\n'),
    (1, '会议记录 2026-05-09', '参会人员:张三、李四\n议题:Q2 目标复盘\n'),
    (2, 'SQLite 学习笔记', '## WAL 模式\n- 读写并发\n- 需要 checkpoint\n'),
    (3, '购物清单', '- 牛奶\n- 面包\n- 鸡蛋\n');

-- 全文搜索笔记
SELECT n.id, n.title,
       highlight(notes_fts, 1, '<b>', '</b>') AS content_highlight,
       nb.name AS notebook
FROM notes_fts
JOIN notes n ON notes_fts.rowid = n.id
JOIN notebooks nb ON n.notebook_id = nb.id
WHERE notes_fts MATCH 'SQLite'
AND n.is_trashed = 0
ORDER BY rank;

-- 按标签查询
SELECT n.title, GROUP_CONCAT(t.name) AS tags
FROM notes n
JOIN note_tags nt ON n.id = nt.note_id
JOIN tags t ON nt.tag_id = t.id
WHERE n.is_trashed = 0
GROUP BY n.id;

-- 统计
SELECT
    nb.name AS notebook,
    COUNT(n.id) AS note_count,
    MAX(n.updated_at) AS last_updated
FROM notebooks nb
LEFT JOIN notes n ON nb.id = n.notebook_id AND n.is_trashed = 0
GROUP BY nb.id;

20.3.2 财务记账

CREATE TABLE accounts (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    name TEXT NOT NULL,
    type TEXT NOT NULL CHECK(type IN ('cash', 'bank', 'credit', 'alipay', 'wechat')),
    balance_cents INTEGER NOT NULL DEFAULT 0,
    currency TEXT DEFAULT 'CNY',
    icon TEXT,
    is_active INTEGER DEFAULT 1
);

CREATE TABLE categories (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    name TEXT NOT NULL,
    type TEXT NOT NULL CHECK(type IN ('income', 'expense')),
    icon TEXT,
    parent_id INTEGER,
    FOREIGN KEY (parent_id) REFERENCES categories(id)
);

CREATE TABLE transactions (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    account_id INTEGER NOT NULL,
    category_id INTEGER NOT NULL,
    type TEXT NOT NULL CHECK(type IN ('income', 'expense', 'transfer')),
    amount_cents INTEGER NOT NULL,
    description TEXT,
    transaction_date TEXT NOT NULL,
    created_at TEXT DEFAULT (datetime('now', 'localtime')),
    FOREIGN KEY (account_id) REFERENCES accounts(id),
    FOREIGN KEY (category_id) REFERENCES categories(id)
);

-- 转账记录
CREATE TABLE transfers (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    from_transaction_id INTEGER NOT NULL,
    to_transaction_id INTEGER NOT NULL,
    FOREIGN KEY (from_transaction_id) REFERENCES transactions(id),
    FOREIGN KEY (to_transaction_id) REFERENCES transactions(id)
);

-- 索引
CREATE INDEX idx_txn_date ON transactions(transaction_date);
CREATE INDEX idx_txn_account ON transactions(account_id, transaction_date);
CREATE INDEX idx_txn_category ON transactions(category_id);

-- 月度统计
SELECT
    strftime('%Y-%m', transaction_date) AS month,
    type,
    SUM(amount_cents) / 100.0 AS total
FROM transactions
WHERE transaction_date >= '2026-01-01'
GROUP BY month, type
ORDER BY month;

-- 分类统计
SELECT
    c.name AS category,
    SUM(t.amount_cents) / 100.0 AS total,
    COUNT(*) AS count
FROM transactions t
JOIN categories c ON t.category_id = c.id
WHERE t.type = 'expense'
AND t.transaction_date >= '2026-05-01'
GROUP BY c.id
ORDER BY total DESC;

20.4 日志系统

20.4.1 结构化日志存储

PRAGMA journal_mode = WAL;
PRAGMA synchronous = NORMAL;

CREATE TABLE logs (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    level TEXT NOT NULL CHECK(level IN ('DEBUG', 'INFO', 'WARN', 'ERROR', 'FATAL')),
    source TEXT NOT NULL,
    message TEXT NOT NULL,
    context TEXT,  -- JSON 格式的上下文数据
    request_id TEXT,
    user_id INTEGER,
    created_at TEXT NOT NULL DEFAULT (datetime('now', 'localtime'))
);

-- 索引
CREATE INDEX idx_logs_level_time ON logs(level, created_at);
CREATE INDEX idx_logs_source_time ON logs(source, created_at);
CREATE INDEX idx_logs_request ON logs(request_id);
CREATE INDEX idx_logs_user ON logs(user_id);

-- 部分索引:只为错误日志创建详细索引
CREATE INDEX idx_logs_error_detail ON logs(source, created_at, message)
WHERE level IN ('ERROR', 'FATAL');

-- 批量插入日志
BEGIN;
INSERT INTO logs (level, source, message, context, request_id) VALUES
    ('INFO', 'auth', '用户登录成功', '{"ip":"192.168.1.1","user_agent":"Mozilla/5.0"}', 'req_001'),
    ('INFO', 'api', '请求处理完成', '{"method":"GET","path":"/users","duration_ms":45}', 'req_001'),
    ('WARN', 'db', '慢查询检测', '{"query":"SELECT * FROM orders","duration_ms":1500}', 'req_002'),
    ('ERROR', 'payment', '支付失败', '{"order_id":12345,"error":"timeout"}', 'req_003');
COMMIT;

-- 查询错误日志
SELECT * FROM logs
WHERE level IN ('ERROR', 'FATAL')
AND created_at > datetime('now', '-24 hours')
ORDER BY created_at DESC;

-- 按来源统计
SELECT source, level, COUNT(*) AS count
FROM logs
WHERE created_at > datetime('now', '-1 hour')
GROUP BY source, level
ORDER BY count DESC;

-- 追踪请求
SELECT * FROM logs
WHERE request_id = 'req_001'
ORDER BY created_at;

20.4.2 日志轮转与清理

-- 按大小轮转(保留最近 N 条)
DELETE FROM logs WHERE id NOT IN (
    SELECT id FROM logs ORDER BY id DESC LIMIT 1000000
);

-- 按时间轮转(保留最近 30 天)
DELETE FROM logs WHERE created_at < datetime('now', '-30 days');

-- 轮转到归档表
CREATE TABLE logs_archive AS
SELECT * FROM logs WHERE created_at < datetime('now', '-7 days');

DELETE FROM logs WHERE created_at < datetime('now', '-7 days');
VACUUM;

20.4.3 日志查询接口

import sqlite3
from datetime import datetime, timedelta

class LogStore:
    def __init__(self, db_path='logs.db'):
        self.conn = sqlite3.connect(db_path)
        self.conn.row_factory = sqlite3.Row
        self.conn.execute('PRAGMA journal_mode = WAL')
        self.conn.execute('PRAGMA synchronous = NORMAL')

    def log(self, level, source, message, context=None, request_id=None, user_id=None):
        self.conn.execute(
            '''INSERT INTO logs (level, source, message, context, request_id, user_id)
               VALUES (?, ?, ?, ?, ?, ?)''',
            (level, source, message,
             json.dumps(context) if context else None,
             request_id, user_id)
        )
        self.conn.commit()

    def query(self, level=None, source=None, start_time=None, end_time=None,
              keyword=None, limit=100, offset=0):
        conditions = []
        params = []

        if level:
            conditions.append('level = ?')
            params.append(level)
        if source:
            conditions.append('source = ?')
            params.append(source)
        if start_time:
            conditions.append('created_at >= ?')
            params.append(start_time)
        if end_time:
            conditions.append('created_at <= ?')
            params.append(end_time)
        if keyword:
            conditions.append('message LIKE ?')
            params.append(f'%{keyword}%')

        where = 'WHERE ' + ' AND '.join(conditions) if conditions else ''
        params.extend([limit, offset])

        return self.conn.execute(
            f'SELECT * FROM logs {where} ORDER BY created_at DESC LIMIT ? OFFSET ?',
            params
        ).fetchall()

    def stats(self, hours=24):
        return self.conn.execute('''
            SELECT level, COUNT(*) as count
            FROM logs
            WHERE created_at > datetime('now', ? || ' hours')
            GROUP BY level
            ORDER BY count DESC
        ''', (f'-{hours}',)).fetchall()

    def rotate(self, keep_days=30):
        self.conn.execute(
            "DELETE FROM logs WHERE created_at < datetime('now', ? || ' days')",
            (f'-{keep_days}',)
        )
        self.conn.commit()
        self.conn.execute('VACUUM')

# 使用
logger = LogStore()
logger.log('INFO', 'app', '应用启动', {'version': '1.0.0'})
logger.log('ERROR', 'db', '连接超时', {'host': '10.0.0.1', 'timeout': 30})

errors = logger.query(level='ERROR', hours=24)
stats = logger.stats(hours=1)

20.5 数据迁移与 ETL

20.5.1 CSV 数据导入

-- 导入 CSV(需要先创建表)
CREATE TABLE sales (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    date TEXT,
    product TEXT,
    quantity INTEGER,
    price REAL,
    region TEXT
);

-- CLI 导入
-- .mode csv
-- .import sales.csv sales

-- 跳过标题行(SQLite 3.32.0+)
-- .import --skip 1 sales.csv sales

20.5.2 JSON 数据导入

import sqlite3
import json

def import_json_to_sqlite(db_path, table, json_path):
    with open(json_path) as f:
        data = json.load(f)

    conn = sqlite3.connect(db_path)

    if not data:
        return

    # 从第一条记录推断列
    columns = list(data[0].keys())
    placeholders = ','.join(['?'] * len(columns))
    col_names = ','.join(columns)

    conn.executemany(
        f'INSERT INTO {table} ({col_names}) VALUES ({placeholders})',
        [tuple(row.get(col) for col in columns) for row in data]
    )
    conn.commit()
    conn.close()

# 使用
import_json_to_sqlite('mydb.db', 'products', 'products.json')

20.6 小型 Web 应用

20.6.1 URL 短链接服务

PRAGMA journal_mode = WAL;
PRAGMA foreign_keys = ON;

CREATE TABLE short_urls (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    short_code TEXT NOT NULL UNIQUE,
    original_url TEXT NOT NULL,
    created_at TEXT DEFAULT (datetime('now')),
    expires_at TEXT,
    click_count INTEGER DEFAULT 0,
    creator_ip TEXT
);

CREATE INDEX idx_short_code ON short_urls(short_code);

CREATE TABLE click_logs (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    short_url_id INTEGER NOT NULL,
    ip_address TEXT,
    user_agent TEXT,
    referer TEXT,
    clicked_at TEXT DEFAULT (datetime('now')),
    FOREIGN KEY (short_url_id) REFERENCES short_urls(id) ON DELETE CASCADE
);

-- 生成短链接
INSERT INTO short_urls (short_code, original_url, expires_at)
VALUES ('abc123', 'https://example.com/very/long/url/here', datetime('now', '+30 days'));

-- 重定向查询
UPDATE short_urls SET click_count = click_count + 1 WHERE short_code = 'abc123';
SELECT original_url FROM short_urls
WHERE short_code = 'abc123'
AND (expires_at IS NULL OR expires_at > datetime('now'));

-- 统计点击
INSERT INTO click_logs (short_url_id, ip_address, user_agent, referer)
VALUES (1, '192.168.1.1', 'Mozilla/5.0', 'https://twitter.com');

-- 点击统计
SELECT DATE(clicked_at) AS date, COUNT(*) AS clicks
FROM click_logs
WHERE short_url_id = 1
GROUP BY date
ORDER BY date DESC;

20.6.2 任务队列

CREATE TABLE task_queue (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    task_type TEXT NOT NULL,
    payload TEXT NOT NULL,  -- JSON
    status TEXT NOT NULL DEFAULT 'pending'
        CHECK(status IN ('pending', 'processing', 'completed', 'failed')),
    priority INTEGER DEFAULT 0,
    retry_count INTEGER DEFAULT 0,
    max_retries INTEGER DEFAULT 3,
    error_message TEXT,
    created_at TEXT DEFAULT (datetime('now')),
    started_at TEXT,
    completed_at TEXT
);

CREATE INDEX idx_queue_pending ON task_queue(priority DESC, created_at)
WHERE status = 'pending';

-- 提交任务
INSERT INTO task_queue (task_type, payload, priority) VALUES
    ('send_email', '{"to":"user@example.com","subject":"Welcome"}', 1),
    ('generate_report', '{"type":"monthly","month":"2026-05"}', 0);

-- 获取下一个任务(原子操作)
UPDATE task_queue
SET status = 'processing', started_at = datetime('now')
WHERE id = (
    SELECT id FROM task_queue
    WHERE status = 'pending'
    ORDER BY priority DESC, created_at ASC
    LIMIT 1
)
RETURNING *;

-- 完成任务
UPDATE task_queue SET status = 'completed', completed_at = datetime('now') WHERE id = 1;

-- 任务失败
UPDATE task_queue SET
    status = CASE WHEN retry_count < max_retries THEN 'pending' ELSE 'failed' END,
    retry_count = retry_count + 1,
    error_message = 'Connection timeout'
WHERE id = 1;

20.7 测试数据生成

-- 生成测试数据
CREATE TABLE test_users (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    name TEXT NOT NULL,
    email TEXT NOT NULL,
    age INTEGER,
    city TEXT,
    created_at TEXT DEFAULT (datetime('now', '-' || ABS(RANDOM()) % 365 || ' days'))
);

INSERT INTO test_users (name, email, age, city)
SELECT
    '用户_' || id,
    'user_' || id || '@example.com',
    18 + ABS(RANDOM()) % 60,
    CASE ABS(RANDOM()) % 5
        WHEN 0 THEN '北京' WHEN 1 THEN '上海' WHEN 2 THEN '广州'
        WHEN 3 THEN '深圳' ELSE '杭州'
    END
FROM (
    WITH RECURSIVE cnt(x) AS (SELECT 1 UNION ALL SELECT x+1 FROM cnt WHERE x < 10000)
    SELECT x AS id FROM cnt
);

-- 验证数据
SELECT city, COUNT(*), AVG(age) FROM test_users GROUP BY city;

⚠️ 注意事项

  1. 日志表需要定期清理——否则会无限增长
  2. 缓存需要设置过期时间——防止数据陈旧
  3. 任务队列需要错误处理——避免任务卡在 processing 状态
  4. 嵌入式应用注意磁盘空间——IoT 设备存储有限
  5. 桌面应用需要数据备份——提示用户定期备份
  6. Web 应用不要用 SQLite 存储会话——重启会丢失

💡 技巧

  1. 使用 JSON 存储灵活数据——避免频繁修改表结构
  2. 使用 FTS5 实现全文搜索——比 LIKE 快得多
  3. 使用触发器维护缓存一致性——自动同步相关数据
  4. 使用 WAL 模式——提升读写并发性能
  5. 使用 VACUUM INTO 备份——在线、原子操作

📌 业务场景总结

场景关键特性性能要求推荐配置
IoT 数据采集批量写入、时序查询中等WAL + NORMAL
本地缓存快速读写、过期清理WAL + 大缓存
桌面应用复杂查询、FTS中等WAL + foreign_keys
日志系统大量写入、定期清理WAL + OFF(可选)
Web 后端并发读、低并发写WAL + NORMAL + 连接池

🔗 扩展阅读


🎉 恭喜你完成了 SQLite 完全指南的全部 20 章!

回到目录:SQLite 完全指南