SQLite 完全指南 / 20 - 实战场景
20 - 实战场景:嵌入式应用、本地缓存、单用户应用与日志
20.1 嵌入式应用
20.1.1 IoT 传感器数据采集
-- IoT 设备传感器数据存储
PRAGMA journal_mode = WAL;
PRAGMA synchronous = NORMAL;
CREATE TABLE sensor_readings (
id INTEGER PRIMARY KEY AUTOINCREMENT,
device_id TEXT NOT NULL,
sensor_type TEXT NOT NULL CHECK(sensor_type IN ('temperature', 'humidity', 'pressure', 'light')),
value REAL NOT NULL,
unit TEXT NOT NULL,
recorded_at TEXT NOT NULL DEFAULT (datetime('now', 'localtime'))
);
-- 按设备和时间查询的索引
CREATE INDEX idx_sensor_device_time ON sensor_readings(device_id, recorded_at);
-- 按传感器类型和时间查询的索引
CREATE INDEX idx_sensor_type_time ON sensor_readings(sensor_type, recorded_at);
-- 批量插入传感器数据
BEGIN;
INSERT INTO sensor_readings (device_id, sensor_type, value, unit) VALUES
('device_001', 'temperature', 25.6, '°C'),
('device_001', 'humidity', 62.3, '%'),
('device_001', 'pressure', 1013.25, 'hPa'),
('device_002', 'temperature', 23.1, '°C'),
('device_002', 'humidity', 55.8, '%');
COMMIT;
-- 查询最近 24 小时的温度数据
SELECT device_id, value, recorded_at
FROM sensor_readings
WHERE sensor_type = 'temperature'
AND recorded_at > datetime('now', '-24 hours')
ORDER BY recorded_at DESC;
-- 按设备统计平均值
SELECT device_id, sensor_type,
ROUND(AVG(value), 2) AS avg_value,
MIN(value) AS min_value,
MAX(value) AS max_value,
COUNT(*) AS reading_count
FROM sensor_readings
WHERE recorded_at > datetime('now', '-7 days')
GROUP BY device_id, sensor_type;
-- 按小时聚合
SELECT
strftime('%Y-%m-%d %H:00', recorded_at) AS hour,
sensor_type,
ROUND(AVG(value), 2) AS avg_value
FROM sensor_readings
WHERE device_id = 'device_001'
AND recorded_at > datetime('now', '-24 hours')
GROUP BY hour, sensor_type
ORDER BY hour;
20.1.2 数据过期清理
-- 创建数据保留策略:保留最近 30 天的数据
CREATE TRIGGER cleanup_old_readings AFTER INSERT ON sensor_readings
BEGIN
DELETE FROM sensor_readings
WHERE recorded_at < datetime('now', '-30 days')
AND id IN (SELECT id FROM sensor_readings ORDER BY id LIMIT 100);
END;
-- 或定期清理脚本
DELETE FROM sensor_readings WHERE recorded_at < datetime('now', '-30 days');
VACUUM;
20.1.3 设备配置管理
CREATE TABLE devices (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
firmware_version TEXT,
config TEXT DEFAULT '{}', -- JSON 配置
last_seen TEXT,
is_active INTEGER DEFAULT 1
);
-- 更新设备配置
UPDATE devices
SET config = json_set(config, '$.sample_rate', 5000, '$.threshold', 30.0)
WHERE id = 'device_001';
-- 查询设备配置
SELECT id, json_extract(config, '$.sample_rate') AS sample_rate,
json_extract(config, '$.threshold') AS threshold
FROM devices
WHERE is_active = 1;
20.2 本地缓存系统
20.2.1 通用缓存方案
-- 本地缓存表
CREATE TABLE cache (
key TEXT PRIMARY KEY,
value TEXT NOT NULL,
content_type TEXT DEFAULT 'application/json',
etag TEXT,
expires_at TEXT NOT NULL,
created_at TEXT NOT NULL DEFAULT (datetime('now')),
accessed_at TEXT NOT NULL DEFAULT (datetime('now')),
access_count INTEGER DEFAULT 0
);
-- 缓存索引
CREATE INDEX idx_cache_expires ON cache(expires_at);
-- 设置缓存
INSERT OR REPLACE INTO cache (key, value, content_type, etag, expires_at)
VALUES (
'user:profile:42',
'{"name":"张三","email":"zs@example.com"}',
'application/json',
'"abc123"',
datetime('now', '+1 hour')
);
-- 获取缓存
SELECT value, etag FROM cache
WHERE key = 'user:profile:42'
AND expires_at > datetime('now');
-- 更新访问信息
UPDATE cache SET
accessed_at = datetime('now'),
access_count = access_count + 1
WHERE key = 'user:profile:42';
-- 清理过期缓存
DELETE FROM cache WHERE expires_at < datetime('now');
-- LRU 淘汰(空间不足时删除最久未访问的)
DELETE FROM cache WHERE key IN (
SELECT key FROM cache ORDER BY accessed_at ASC LIMIT 100
);
20.2.2 HTTP 响应缓存
import sqlite3
import json
import hashlib
from datetime import datetime, timedelta
class SQLiteCache:
def __init__(self, db_path='cache.db'):
self.conn = sqlite3.connect(db_path)
self._init_db()
def _init_db(self):
self.conn.executescript('''
PRAGMA journal_mode = WAL;
CREATE TABLE IF NOT EXISTS http_cache (
url_hash TEXT PRIMARY KEY,
url TEXT NOT NULL,
method TEXT NOT NULL DEFAULT 'GET',
status_code INTEGER,
headers TEXT,
body TEXT,
etag TEXT,
expires_at TEXT,
created_at TEXT DEFAULT (datetime('now'))
);
CREATE INDEX IF NOT EXISTS idx_cache_expires ON http_cache(expires_at);
''')
def _hash_url(self, url):
return hashlib.sha256(url.encode()).hexdigest()
def get(self, url):
url_hash = self._hash_url(url)
row = self.conn.execute(
'''SELECT status_code, headers, body, etag FROM http_cache
WHERE url_hash = ? AND (expires_at IS NULL OR expires_at > datetime('now'))''',
(url_hash,)
).fetchone()
if row:
return {
'status_code': row[0],
'headers': json.loads(row[1]) if row[1] else {},
'body': row[2],
'etag': row[3]
}
return None
def put(self, url, status_code, headers, body, ttl_seconds=3600):
url_hash = self._hash_url(url)
expires_at = (datetime.now() + timedelta(seconds=ttl_seconds)).isoformat()
etag = headers.get('etag')
self.conn.execute(
'''INSERT OR REPLACE INTO http_cache
(url_hash, url, status_code, headers, body, etag, expires_at)
VALUES (?, ?, ?, ?, ?, ?, ?)''',
(url_hash, url, status_code, json.dumps(headers), body, etag, expires_at)
)
self.conn.commit()
def invalidate(self, url):
url_hash = self._hash_url(url)
self.conn.execute('DELETE FROM http_cache WHERE url_hash = ?', (url_hash,))
self.conn.commit()
def clear_expired(self):
self.conn.execute("DELETE FROM http_cache WHERE expires_at < datetime('now')")
self.conn.commit()
# 使用示例
cache = SQLiteCache()
cached = cache.get('https://api.example.com/users/42')
if cached:
print('缓存命中:', cached['body'])
else:
# 发起 HTTP 请求...
cache.put('https://api.example.com/users/42', 200, {'etag': '"v1"'}, '{"name":"张三"}')
20.2.3 会话缓存
CREATE TABLE sessions (
token TEXT PRIMARY KEY,
user_id INTEGER NOT NULL,
data TEXT DEFAULT '{}',
ip_address TEXT,
user_agent TEXT,
created_at TEXT DEFAULT (datetime('now')),
expires_at TEXT NOT NULL
);
CREATE INDEX idx_sessions_user ON sessions(user_id);
CREATE INDEX idx_sessions_expires ON sessions(expires_at);
-- 创建会话
INSERT INTO sessions (token, user_id, data, ip_address, expires_at)
VALUES ('abc123...', 42, '{"role":"admin"}', '192.168.1.1', datetime('now', '+24 hours'));
-- 验证会话
SELECT user_id, data FROM sessions
WHERE token = 'abc123...'
AND expires_at > datetime('now');
-- 清理过期会话
DELETE FROM sessions WHERE expires_at < datetime('now');
20.3 单用户桌面应用
20.3.1 笔记应用
PRAGMA journal_mode = WAL;
PRAGMA foreign_keys = ON;
-- 笔记本
CREATE TABLE notebooks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
description TEXT,
color TEXT DEFAULT '#1976D2',
sort_order INTEGER DEFAULT 0,
created_at TEXT DEFAULT (datetime('now', 'localtime')),
updated_at TEXT DEFAULT (datetime('now', 'localtime'))
);
-- 标签
CREATE TABLE tags (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL UNIQUE,
color TEXT DEFAULT '#757575'
);
-- 笔记
CREATE TABLE notes (
id INTEGER PRIMARY KEY AUTOINCREMENT,
notebook_id INTEGER NOT NULL,
title TEXT NOT NULL,
content TEXT DEFAULT '',
is_pinned INTEGER DEFAULT 0,
is_trashed INTEGER DEFAULT 0,
created_at TEXT DEFAULT (datetime('now', 'localtime')),
updated_at TEXT DEFAULT (datetime('now', 'localtime')),
FOREIGN KEY (notebook_id) REFERENCES notebooks(id) ON DELETE CASCADE
);
-- 笔记-标签关联
CREATE TABLE note_tags (
note_id INTEGER NOT NULL,
tag_id INTEGER NOT NULL,
PRIMARY KEY (note_id, tag_id),
FOREIGN KEY (note_id) REFERENCES notes(id) ON DELETE CASCADE,
FOREIGN KEY (tag_id) REFERENCES tags(id) ON DELETE CASCADE
);
-- 笔记 FTS 索引
CREATE VIRTUAL TABLE notes_fts USING fts5(
title, content,
content='notes', content_rowid='id'
);
CREATE TRIGGER notes_ai AFTER INSERT ON notes BEGIN
INSERT INTO notes_fts(rowid, title, content) VALUES (new.id, new.title, new.content);
END;
CREATE TRIGGER notes_ad AFTER DELETE ON notes BEGIN
INSERT INTO notes_fts(notes_fts, rowid, title, content) VALUES ('delete', old.id, old.title, old.content);
END;
CREATE TRIGGER notes_au AFTER UPDATE ON notes BEGIN
INSERT INTO notes_fts(notes_fts, rowid, title, content) VALUES ('delete', old.id, old.title, old.content);
INSERT INTO notes_fts(rowid, title, content) VALUES (new.id, new.title, new.content);
END;
-- 插入示例数据
INSERT INTO notebooks (name) VALUES ('工作'), ('学习'), ('生活');
INSERT INTO tags (name, color) VALUES ('重要', '#F44336'), ('待办', '#FF9800'), ('参考', '#4CAF50');
INSERT INTO notes (notebook_id, title, content) VALUES
(1, '周报模板', '## 本周工作总结\n\n## 下周计划\n\n## 需要协调的问题\n'),
(1, '会议记录 2026-05-09', '参会人员:张三、李四\n议题:Q2 目标复盘\n'),
(2, 'SQLite 学习笔记', '## WAL 模式\n- 读写并发\n- 需要 checkpoint\n'),
(3, '购物清单', '- 牛奶\n- 面包\n- 鸡蛋\n');
-- 全文搜索笔记
SELECT n.id, n.title,
highlight(notes_fts, 1, '<b>', '</b>') AS content_highlight,
nb.name AS notebook
FROM notes_fts
JOIN notes n ON notes_fts.rowid = n.id
JOIN notebooks nb ON n.notebook_id = nb.id
WHERE notes_fts MATCH 'SQLite'
AND n.is_trashed = 0
ORDER BY rank;
-- 按标签查询
SELECT n.title, GROUP_CONCAT(t.name) AS tags
FROM notes n
JOIN note_tags nt ON n.id = nt.note_id
JOIN tags t ON nt.tag_id = t.id
WHERE n.is_trashed = 0
GROUP BY n.id;
-- 统计
SELECT
nb.name AS notebook,
COUNT(n.id) AS note_count,
MAX(n.updated_at) AS last_updated
FROM notebooks nb
LEFT JOIN notes n ON nb.id = n.notebook_id AND n.is_trashed = 0
GROUP BY nb.id;
20.3.2 财务记账
CREATE TABLE accounts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
type TEXT NOT NULL CHECK(type IN ('cash', 'bank', 'credit', 'alipay', 'wechat')),
balance_cents INTEGER NOT NULL DEFAULT 0,
currency TEXT DEFAULT 'CNY',
icon TEXT,
is_active INTEGER DEFAULT 1
);
CREATE TABLE categories (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
type TEXT NOT NULL CHECK(type IN ('income', 'expense')),
icon TEXT,
parent_id INTEGER,
FOREIGN KEY (parent_id) REFERENCES categories(id)
);
CREATE TABLE transactions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
account_id INTEGER NOT NULL,
category_id INTEGER NOT NULL,
type TEXT NOT NULL CHECK(type IN ('income', 'expense', 'transfer')),
amount_cents INTEGER NOT NULL,
description TEXT,
transaction_date TEXT NOT NULL,
created_at TEXT DEFAULT (datetime('now', 'localtime')),
FOREIGN KEY (account_id) REFERENCES accounts(id),
FOREIGN KEY (category_id) REFERENCES categories(id)
);
-- 转账记录
CREATE TABLE transfers (
id INTEGER PRIMARY KEY AUTOINCREMENT,
from_transaction_id INTEGER NOT NULL,
to_transaction_id INTEGER NOT NULL,
FOREIGN KEY (from_transaction_id) REFERENCES transactions(id),
FOREIGN KEY (to_transaction_id) REFERENCES transactions(id)
);
-- 索引
CREATE INDEX idx_txn_date ON transactions(transaction_date);
CREATE INDEX idx_txn_account ON transactions(account_id, transaction_date);
CREATE INDEX idx_txn_category ON transactions(category_id);
-- 月度统计
SELECT
strftime('%Y-%m', transaction_date) AS month,
type,
SUM(amount_cents) / 100.0 AS total
FROM transactions
WHERE transaction_date >= '2026-01-01'
GROUP BY month, type
ORDER BY month;
-- 分类统计
SELECT
c.name AS category,
SUM(t.amount_cents) / 100.0 AS total,
COUNT(*) AS count
FROM transactions t
JOIN categories c ON t.category_id = c.id
WHERE t.type = 'expense'
AND t.transaction_date >= '2026-05-01'
GROUP BY c.id
ORDER BY total DESC;
20.4 日志系统
20.4.1 结构化日志存储
PRAGMA journal_mode = WAL;
PRAGMA synchronous = NORMAL;
CREATE TABLE logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
level TEXT NOT NULL CHECK(level IN ('DEBUG', 'INFO', 'WARN', 'ERROR', 'FATAL')),
source TEXT NOT NULL,
message TEXT NOT NULL,
context TEXT, -- JSON 格式的上下文数据
request_id TEXT,
user_id INTEGER,
created_at TEXT NOT NULL DEFAULT (datetime('now', 'localtime'))
);
-- 索引
CREATE INDEX idx_logs_level_time ON logs(level, created_at);
CREATE INDEX idx_logs_source_time ON logs(source, created_at);
CREATE INDEX idx_logs_request ON logs(request_id);
CREATE INDEX idx_logs_user ON logs(user_id);
-- 部分索引:只为错误日志创建详细索引
CREATE INDEX idx_logs_error_detail ON logs(source, created_at, message)
WHERE level IN ('ERROR', 'FATAL');
-- 批量插入日志
BEGIN;
INSERT INTO logs (level, source, message, context, request_id) VALUES
('INFO', 'auth', '用户登录成功', '{"ip":"192.168.1.1","user_agent":"Mozilla/5.0"}', 'req_001'),
('INFO', 'api', '请求处理完成', '{"method":"GET","path":"/users","duration_ms":45}', 'req_001'),
('WARN', 'db', '慢查询检测', '{"query":"SELECT * FROM orders","duration_ms":1500}', 'req_002'),
('ERROR', 'payment', '支付失败', '{"order_id":12345,"error":"timeout"}', 'req_003');
COMMIT;
-- 查询错误日志
SELECT * FROM logs
WHERE level IN ('ERROR', 'FATAL')
AND created_at > datetime('now', '-24 hours')
ORDER BY created_at DESC;
-- 按来源统计
SELECT source, level, COUNT(*) AS count
FROM logs
WHERE created_at > datetime('now', '-1 hour')
GROUP BY source, level
ORDER BY count DESC;
-- 追踪请求
SELECT * FROM logs
WHERE request_id = 'req_001'
ORDER BY created_at;
20.4.2 日志轮转与清理
-- 按大小轮转(保留最近 N 条)
DELETE FROM logs WHERE id NOT IN (
SELECT id FROM logs ORDER BY id DESC LIMIT 1000000
);
-- 按时间轮转(保留最近 30 天)
DELETE FROM logs WHERE created_at < datetime('now', '-30 days');
-- 轮转到归档表
CREATE TABLE logs_archive AS
SELECT * FROM logs WHERE created_at < datetime('now', '-7 days');
DELETE FROM logs WHERE created_at < datetime('now', '-7 days');
VACUUM;
20.4.3 日志查询接口
import sqlite3
from datetime import datetime, timedelta
class LogStore:
def __init__(self, db_path='logs.db'):
self.conn = sqlite3.connect(db_path)
self.conn.row_factory = sqlite3.Row
self.conn.execute('PRAGMA journal_mode = WAL')
self.conn.execute('PRAGMA synchronous = NORMAL')
def log(self, level, source, message, context=None, request_id=None, user_id=None):
self.conn.execute(
'''INSERT INTO logs (level, source, message, context, request_id, user_id)
VALUES (?, ?, ?, ?, ?, ?)''',
(level, source, message,
json.dumps(context) if context else None,
request_id, user_id)
)
self.conn.commit()
def query(self, level=None, source=None, start_time=None, end_time=None,
keyword=None, limit=100, offset=0):
conditions = []
params = []
if level:
conditions.append('level = ?')
params.append(level)
if source:
conditions.append('source = ?')
params.append(source)
if start_time:
conditions.append('created_at >= ?')
params.append(start_time)
if end_time:
conditions.append('created_at <= ?')
params.append(end_time)
if keyword:
conditions.append('message LIKE ?')
params.append(f'%{keyword}%')
where = 'WHERE ' + ' AND '.join(conditions) if conditions else ''
params.extend([limit, offset])
return self.conn.execute(
f'SELECT * FROM logs {where} ORDER BY created_at DESC LIMIT ? OFFSET ?',
params
).fetchall()
def stats(self, hours=24):
return self.conn.execute('''
SELECT level, COUNT(*) as count
FROM logs
WHERE created_at > datetime('now', ? || ' hours')
GROUP BY level
ORDER BY count DESC
''', (f'-{hours}',)).fetchall()
def rotate(self, keep_days=30):
self.conn.execute(
"DELETE FROM logs WHERE created_at < datetime('now', ? || ' days')",
(f'-{keep_days}',)
)
self.conn.commit()
self.conn.execute('VACUUM')
# 使用
logger = LogStore()
logger.log('INFO', 'app', '应用启动', {'version': '1.0.0'})
logger.log('ERROR', 'db', '连接超时', {'host': '10.0.0.1', 'timeout': 30})
errors = logger.query(level='ERROR', hours=24)
stats = logger.stats(hours=1)
20.5 数据迁移与 ETL
20.5.1 CSV 数据导入
-- 导入 CSV(需要先创建表)
CREATE TABLE sales (
id INTEGER PRIMARY KEY AUTOINCREMENT,
date TEXT,
product TEXT,
quantity INTEGER,
price REAL,
region TEXT
);
-- CLI 导入
-- .mode csv
-- .import sales.csv sales
-- 跳过标题行(SQLite 3.32.0+)
-- .import --skip 1 sales.csv sales
20.5.2 JSON 数据导入
import sqlite3
import json
def import_json_to_sqlite(db_path, table, json_path):
with open(json_path) as f:
data = json.load(f)
conn = sqlite3.connect(db_path)
if not data:
return
# 从第一条记录推断列
columns = list(data[0].keys())
placeholders = ','.join(['?'] * len(columns))
col_names = ','.join(columns)
conn.executemany(
f'INSERT INTO {table} ({col_names}) VALUES ({placeholders})',
[tuple(row.get(col) for col in columns) for row in data]
)
conn.commit()
conn.close()
# 使用
import_json_to_sqlite('mydb.db', 'products', 'products.json')
20.6 小型 Web 应用
20.6.1 URL 短链接服务
PRAGMA journal_mode = WAL;
PRAGMA foreign_keys = ON;
CREATE TABLE short_urls (
id INTEGER PRIMARY KEY AUTOINCREMENT,
short_code TEXT NOT NULL UNIQUE,
original_url TEXT NOT NULL,
created_at TEXT DEFAULT (datetime('now')),
expires_at TEXT,
click_count INTEGER DEFAULT 0,
creator_ip TEXT
);
CREATE INDEX idx_short_code ON short_urls(short_code);
CREATE TABLE click_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
short_url_id INTEGER NOT NULL,
ip_address TEXT,
user_agent TEXT,
referer TEXT,
clicked_at TEXT DEFAULT (datetime('now')),
FOREIGN KEY (short_url_id) REFERENCES short_urls(id) ON DELETE CASCADE
);
-- 生成短链接
INSERT INTO short_urls (short_code, original_url, expires_at)
VALUES ('abc123', 'https://example.com/very/long/url/here', datetime('now', '+30 days'));
-- 重定向查询
UPDATE short_urls SET click_count = click_count + 1 WHERE short_code = 'abc123';
SELECT original_url FROM short_urls
WHERE short_code = 'abc123'
AND (expires_at IS NULL OR expires_at > datetime('now'));
-- 统计点击
INSERT INTO click_logs (short_url_id, ip_address, user_agent, referer)
VALUES (1, '192.168.1.1', 'Mozilla/5.0', 'https://twitter.com');
-- 点击统计
SELECT DATE(clicked_at) AS date, COUNT(*) AS clicks
FROM click_logs
WHERE short_url_id = 1
GROUP BY date
ORDER BY date DESC;
20.6.2 任务队列
CREATE TABLE task_queue (
id INTEGER PRIMARY KEY AUTOINCREMENT,
task_type TEXT NOT NULL,
payload TEXT NOT NULL, -- JSON
status TEXT NOT NULL DEFAULT 'pending'
CHECK(status IN ('pending', 'processing', 'completed', 'failed')),
priority INTEGER DEFAULT 0,
retry_count INTEGER DEFAULT 0,
max_retries INTEGER DEFAULT 3,
error_message TEXT,
created_at TEXT DEFAULT (datetime('now')),
started_at TEXT,
completed_at TEXT
);
CREATE INDEX idx_queue_pending ON task_queue(priority DESC, created_at)
WHERE status = 'pending';
-- 提交任务
INSERT INTO task_queue (task_type, payload, priority) VALUES
('send_email', '{"to":"user@example.com","subject":"Welcome"}', 1),
('generate_report', '{"type":"monthly","month":"2026-05"}', 0);
-- 获取下一个任务(原子操作)
UPDATE task_queue
SET status = 'processing', started_at = datetime('now')
WHERE id = (
SELECT id FROM task_queue
WHERE status = 'pending'
ORDER BY priority DESC, created_at ASC
LIMIT 1
)
RETURNING *;
-- 完成任务
UPDATE task_queue SET status = 'completed', completed_at = datetime('now') WHERE id = 1;
-- 任务失败
UPDATE task_queue SET
status = CASE WHEN retry_count < max_retries THEN 'pending' ELSE 'failed' END,
retry_count = retry_count + 1,
error_message = 'Connection timeout'
WHERE id = 1;
20.7 测试数据生成
-- 生成测试数据
CREATE TABLE test_users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
email TEXT NOT NULL,
age INTEGER,
city TEXT,
created_at TEXT DEFAULT (datetime('now', '-' || ABS(RANDOM()) % 365 || ' days'))
);
INSERT INTO test_users (name, email, age, city)
SELECT
'用户_' || id,
'user_' || id || '@example.com',
18 + ABS(RANDOM()) % 60,
CASE ABS(RANDOM()) % 5
WHEN 0 THEN '北京' WHEN 1 THEN '上海' WHEN 2 THEN '广州'
WHEN 3 THEN '深圳' ELSE '杭州'
END
FROM (
WITH RECURSIVE cnt(x) AS (SELECT 1 UNION ALL SELECT x+1 FROM cnt WHERE x < 10000)
SELECT x AS id FROM cnt
);
-- 验证数据
SELECT city, COUNT(*), AVG(age) FROM test_users GROUP BY city;
⚠️ 注意事项
- 日志表需要定期清理——否则会无限增长
- 缓存需要设置过期时间——防止数据陈旧
- 任务队列需要错误处理——避免任务卡在 processing 状态
- 嵌入式应用注意磁盘空间——IoT 设备存储有限
- 桌面应用需要数据备份——提示用户定期备份
- Web 应用不要用 SQLite 存储会话——重启会丢失
💡 技巧
- 使用 JSON 存储灵活数据——避免频繁修改表结构
- 使用 FTS5 实现全文搜索——比 LIKE 快得多
- 使用触发器维护缓存一致性——自动同步相关数据
- 使用 WAL 模式——提升读写并发性能
- 使用
VACUUM INTO备份——在线、原子操作
📌 业务场景总结
| 场景 | 关键特性 | 性能要求 | 推荐配置 |
|---|---|---|---|
| IoT 数据采集 | 批量写入、时序查询 | 中等 | WAL + NORMAL |
| 本地缓存 | 快速读写、过期清理 | 高 | WAL + 大缓存 |
| 桌面应用 | 复杂查询、FTS | 中等 | WAL + foreign_keys |
| 日志系统 | 大量写入、定期清理 | 高 | WAL + OFF(可选) |
| Web 后端 | 并发读、低并发写 | 高 | WAL + NORMAL + 连接池 |
🔗 扩展阅读
🎉 恭喜你完成了 SQLite 完全指南的全部 20 章!
回到目录:SQLite 完全指南