AgensGraph 完全指南 / 第 10 章:事务与并发控制
第 10 章:事务与并发控制
10.1 ACID 特性
AgensGraph 继承了 PostgreSQL 完整的 ACID 事务能力,图操作与关系操作共享同一事务上下文。
| 特性 | 含义 | AgensGraph 实现 |
|---|
| Atomicity(原子性) | 事务要么全部成功,要么全部回滚 | 通过 WAL 和 MVCC 实现 |
| Consistency(一致性) | 事务前后数据满足所有约束 | 约束检查 + 触发器 |
| Isolation(隔离性) | 并发事务互不干扰 | MVCC + 隔离级别 |
| Durability(持久性) | 已提交事务的数据不会丢失 | WAL + 检查点机制 |
10.1.1 事务基本操作
-- 显式事务
BEGIN;
SET graph_path = social_network;
CREATE (:Person {name: 'Alice', age: 30});
CREATE (:Person {name: 'Bob', age: 28});
MATCH (a:Person {name: 'Alice'}), (b:Person {name: 'Bob'})
CREATE (a)-[:KNOWS {since: 2024}]->(b);
COMMIT;
-- 回滚事务
BEGIN;
SET graph_path = social_network;
DELETE (:Person {name: 'Alice'});
ROLLBACK; -- Alice 不会被删除
-- 自动提交(默认行为)
-- 每条语句自动包装在一个事务中
CREATE (:Person {name: 'Charlie'});
-- 隐式 COMMIT
10.1.2 保存点(Savepoint)
BEGIN;
INSERT INTO orders (user_name, amount) VALUES ('Alice', 100);
SAVEPOINT sp1;
INSERT INTO orders (user_name, amount) VALUES ('Bob', 200);
-- 出错了
ROLLBACK TO sp1; -- 只回滚 Bob 的订单
INSERT INTO orders (user_name, amount) VALUES ('Carol', 300);
COMMIT;
-- 结果: Alice 和 Carol 的订单存在,Bob 的不存在
10.2 隔离级别
10.2.1 SQL 标准隔离级别
| 隔离级别 | 脏读 | 不可重复读 | 幻读 | 性能 |
|---|
| READ UNCOMMITTED | ✅ 可能 | ✅ 可能 | ✅ 可能 | ⭐⭐⭐⭐⭐ |
| READ COMMITTED | ❌ 防止 | ✅ 可能 | ✅ 可能 | ⭐⭐⭐⭐ |
| REPEATABLE READ | ❌ 防止 | ❌ 防止 | ✅ 可能 | ⭐⭐⭐ |
| SERIALIZABLE | ❌ 防止 | ❌ 防止 | ❌ 防止 | ⭐⭐ |
10.2.2 设置隔离级别
-- 查看当前隔离级别
SHOW transaction_isolation;
-- 设置会话级隔离级别
SET SESSION CHARACTERISTICS AS TRANSACTION ISOLATION LEVEL REPEATABLE READ;
-- 设置单个事务的隔离级别
BEGIN ISOLATION LEVEL SERIALIZABLE;
SET graph_path = social_network;
MATCH (p:Person) RETURN count(p);
COMMIT;
-- PostgreSQL 默认级别: READ COMMITTED
10.2.3 MVCC 机制
MVCC(多版本并发控制):
事务 A (READ COMMITTED):
BEGIN;
SELECT * FROM person WHERE name = 'Alice';
-- 读取版本 v1 (age=30)
事务 B:
BEGIN;
UPDATE person SET age = 31 WHERE name = 'Alice';
COMMIT; -- 创建版本 v2
事务 A (继续):
SELECT * FROM person WHERE name = 'Alice';
-- READ COMMITTED: 读取版本 v2 (age=31) ← 不同结果!
-- REPEATABLE READ: 仍然读取版本 v1 (age=30) ← 一致结果
10.3 锁机制
10.3.1 锁类型概览
| 锁类型 | 粒度 | 说明 | 冲突 |
|---|
| 表级锁 | 表 | ACCESS EXCLUSIVE 最强 | 阻塞所有操作 |
| 行级锁 | 行 | FOR UPDATE 锁定单行 | 只阻塞同行写入 |
| 顶点锁 | 顶点 | 修改顶点时自动加锁 | 阻塞同顶点修改 |
| 边锁 | 边 | 修改边时自动加锁 | 阻塞同边修改 |
10.3.2 显式锁定
-- 表级锁
LOCK TABLE person IN EXCLUSIVE MODE;
-- 行级锁(SELECT FOR UPDATE)
BEGIN;
SELECT * FROM person WHERE name = 'Alice' FOR UPDATE;
-- Alice 的记录被锁定,其他事务不能修改
UPDATE person SET age = 31 WHERE name = 'Alice';
COMMIT;
-- FOR SHARE(共享锁,允许其他事务读但不允许写)
SELECT * FROM person WHERE name = 'Alice' FOR SHARE;
-- FOR NO KEY UPDATE(不锁定主键)
SELECT * FROM person WHERE name = 'Alice' FOR NO KEY UPDATE;
-- SKIP LOCKED(跳过已锁定的行)
SELECT * FROM person FOR UPDATE SKIP LOCKED;
10.3.3 图操作的锁行为
-- CREATE 顶点:自动获取新元组的排他锁
CREATE (:Person {name: 'Alice'});
-- MATCH + SET:获取匹配顶点的行级锁
MATCH (p:Person {name: 'Alice'})
SET p.age = 31;
-- Alice 顶点被锁定直到事务结束
-- MATCH + DELETE:获取排他锁
MATCH (p:Person {name: 'Alice'})
DELETE p;
-- Alice 被锁定,直到 COMMIT 或 ROLLBACK
-- MERGE:如果匹配则获取锁,如果创建则获取新元组锁
MERGE (p:Person {name: 'Alice'})
ON MATCH SET p.last_seen = datetime();
10.4 并发问题与解决方案
10.4.1 丢失更新
-- 场景:两个事务同时修改同一顶点
-- 事务 A:
BEGIN;
MATCH (p:Person {name: 'Alice'}) RETURN p.credits; -- 读到 100
-- 计算: 100 + 50 = 150
-- 事务 B (同时):
BEGIN;
MATCH (p:Person {name: 'Alice'}) RETURN p.credits; -- 也读到 100
-- 计算: 100 + 30 = 130
-- 事务 A:
MATCH (p:Person {name: 'Alice'}) SET p.credits = 150;
COMMIT;
-- 事务 B:
MATCH (p:Person {name: 'Alice'}) SET p.credits = 130;
COMMIT;
-- 最终结果: 130(丢失了事务 A 的更新!)
解决方案:
-- 方案 1: 使用 FOR UPDATE(悲观锁)
BEGIN;
-- 在 SQL 层面锁定
SELECT properties FROM person
WHERE properties->>'name' = 'Alice'
FOR UPDATE;
-- 然后修改
MATCH (p:Person {name: 'Alice'})
SET p.credits = p.credits + 50;
COMMIT;
-- 方案 2: 原子更新(推荐)
MATCH (p:Person {name: 'Alice'})
SET p.credits = p.credits + 50;
-- 直接在 SET 中使用表达式,避免读-改-写
10.4.2 幻读
-- 场景: 事务 A 查询计数,事务 B 同时插入新数据
-- 事务 A (REPEATABLE READ):
BEGIN ISOLATION LEVEL REPEATABLE READ;
MATCH (p:Person) RETURN count(p); -- 100
-- ... 事务 B 插入新数据并提交 ...
MATCH (p:Person) RETURN count(p); -- 仍然 100(快照一致性)
COMMIT;
10.4.3 可序列化异常检测
-- 使用 SERIALIZABLE 隔离级别 + 重试逻辑
DO $$
DECLARE
retry_count INT := 0;
max_retries INT := 3;
success BOOLEAN := FALSE;
BEGIN
WHILE retry_count < max_retries AND NOT success LOOP
BEGIN
-- 业务逻辑
PERFORM cypher('social_network', $$
MATCH (p:Person {name: 'Alice'})
SET p.credits = p.credits + 10
$$);
success := TRUE;
EXCEPTION
WHEN serialization_failure THEN
retry_count := retry_count + 1;
RAISE NOTICE 'Retry % due to serialization failure', retry_count;
WHEN deadlock_detected THEN
retry_count := retry_count + 1;
RAISE NOTICE 'Retry % due to deadlock', retry_count;
END;
END LOOP;
IF NOT success THEN
RAISE EXCEPTION 'Transaction failed after % retries', max_retries;
END IF;
END $$;
10.5 死锁处理
10.5.1 死锁的产生
死锁场景:
事务 A:
LOCK person WHERE name='Alice' → 获得 Alice 的锁
LOCK person WHERE name='Bob' → 等待 Bob 的锁...
事务 B:
LOCK person WHERE name='Bob' → 获得 Bob 的锁
LOCK person WHERE name='Alice' → 等待 Alice 的锁...
→ 循环等待,形成死锁
10.5.2 死锁检测与处理
AgensGraph(PostgreSQL)自动检测死锁并回滚其中一个事务:
-- 查看死锁相关配置
SHOW deadlock_timeout; -- 默认 1s
-- 查看锁等待情况
SELECT
blocked_locks.pid AS blocked_pid,
blocked_activity.usename AS blocked_user,
blocking_locks.pid AS blocking_pid,
blocking_activity.usename AS blocking_user,
blocked_activity.query AS blocked_statement,
blocking_activity.query AS blocking_statement
FROM pg_catalog.pg_locks blocked_locks
JOIN pg_catalog.pg_stat_activity blocked_activity
ON blocked_activity.pid = blocked_locks.pid
JOIN pg_catalog.pg_locks blocking_locks
ON blocking_locks.locktype = blocked_locks.locktype
AND blocking_locks.database IS NOT DISTINCT FROM blocked_locks.database
AND blocking_locks.relation IS NOT DISTINCT FROM blocked_locks.relation
AND blocking_locks.pid != blocked_locks.pid
JOIN pg_catalog.pg_stat_activity blocking_activity
ON blocking_activity.pid = blocking_locks.pid
WHERE NOT blocked_locks.granted;
10.5.3 死锁预防策略
| 策略 | 说明 |
|---|
| 固定加锁顺序 | 总是按 ID 从小到大加锁 |
| 缩短事务 | 减少持锁时间 |
| 使用原子操作 | SET n.prop = n.prop + 1 而非读-改-写 |
| 合理超时 | 设置 lock_timeout |
| 避免长事务 | 定期检查 pg_stat_activity |
-- 设置锁超时
SET lock_timeout = '5s';
-- 设置语句超时
SET statement_timeout = '30s';
-- 查看长事务
SELECT pid, now() - xact_start AS duration, query
FROM pg_stat_activity
WHERE state = 'active'
AND xact_start < now() - interval '5 minutes'
ORDER BY duration DESC;
10.6 高并发场景最佳实践
10.6.1 热点顶点更新
-- ❌ 低效:读-改-写模式(有丢失更新风险)
MATCH (counter:Counter {name: 'page_views'})
WITH counter, counter.value AS current_val
SET counter.value = current_val + 1;
-- ✅ 高效:原子递增
MATCH (counter:Counter {name: 'page_views'})
SET counter.value = counter.value + 1;
-- ✅ 使用 PostgreSQL 序列(最高并发性能)
CREATE SEQUENCE page_view_seq;
-- 通过 SQL 操作
SELECT nextval('page_view_seq');
10.6.2 批量操作优化
-- ❌ 低效:逐条创建
CREATE (:Log {event: 'login', user: 'Alice', time: datetime()});
CREATE (:Log {event: 'click', user: 'Bob', time: datetime()});
-- ... 重复 10000 次
-- ✅ 高效:批量创建(单个事务)
UNWIND range(1, 10000) AS i
CREATE (:Log {event: 'auto_' + toString(i), time: datetime()});
10.6.3 读写分离模式
生产环境读写分离:
应用层
├─ 写请求 ──▶ Primary (AgensGraph Master)
│ │
│ ▼ (Streaming Replication)
└─ 读请求 ──▶ Replica (AgensGraph Standby)
适用场景:
- 写操作频率低,读操作频率高
- 可以接受读取略有延迟的数据
10.7 监控事务状态
-- 查看当前活动事务
SELECT
pid,
usename,
application_name,
state,
xact_start,
now() - xact_start AS txn_duration,
query_start,
now() - query_start AS query_duration,
wait_event_type,
wait_event,
query
FROM pg_stat_activity
WHERE state != 'idle'
ORDER BY xact_start;
-- 查看锁等待
SELECT * FROM pg_locks WHERE NOT granted;
-- 查看事务 ID 消耗
SELECT
txid_current(),
txid_snapshot_xmax(txid_current_snapshot()) AS xmax;
10.8 本章小结
| 要点 | 说明 |
|---|
| ACID | AgensGraph 完整支持 ACID 事务 |
| 隔离级别 | READ COMMITTED(默认)到 SERIALIZABLE |
| MVCC | 多版本并发控制,读不阻塞写 |
| 锁机制 | 表级锁 + 行级锁,自动管理 |
| 死锁 | 自动检测和回滚,预防比处理更重要 |
| 原子操作 | 优先使用 SET n.prop = n.prop + 1 |
10.9 练习
- 编写一个并发安全的"用户积分转移"事务。
- 使用
SAVEPOINT 实现"批量导入中部分失败"的回滚。 - 编写监控脚本检测运行超过 5 分钟的长事务。
- 实现带有重试逻辑的可序列化事务函数。
10.10 扩展阅读