强曰为道
与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

AgensGraph 完全指南 / 第 10 章:事务与并发控制

第 10 章:事务与并发控制

10.1 ACID 特性

AgensGraph 继承了 PostgreSQL 完整的 ACID 事务能力,图操作与关系操作共享同一事务上下文。

特性含义AgensGraph 实现
Atomicity(原子性)事务要么全部成功,要么全部回滚通过 WAL 和 MVCC 实现
Consistency(一致性)事务前后数据满足所有约束约束检查 + 触发器
Isolation(隔离性)并发事务互不干扰MVCC + 隔离级别
Durability(持久性)已提交事务的数据不会丢失WAL + 检查点机制

10.1.1 事务基本操作

-- 显式事务
BEGIN;
  SET graph_path = social_network;
  CREATE (:Person {name: 'Alice', age: 30});
  CREATE (:Person {name: 'Bob', age: 28});
  MATCH (a:Person {name: 'Alice'}), (b:Person {name: 'Bob'})
    CREATE (a)-[:KNOWS {since: 2024}]->(b);
COMMIT;

-- 回滚事务
BEGIN;
  SET graph_path = social_network;
  DELETE (:Person {name: 'Alice'});
ROLLBACK;  -- Alice 不会被删除

-- 自动提交(默认行为)
-- 每条语句自动包装在一个事务中
CREATE (:Person {name: 'Charlie'});
-- 隐式 COMMIT

10.1.2 保存点(Savepoint)

BEGIN;
  INSERT INTO orders (user_name, amount) VALUES ('Alice', 100);

  SAVEPOINT sp1;
    INSERT INTO orders (user_name, amount) VALUES ('Bob', 200);
    -- 出错了
  ROLLBACK TO sp1;  -- 只回滚 Bob 的订单

  INSERT INTO orders (user_name, amount) VALUES ('Carol', 300);
COMMIT;
-- 结果: Alice 和 Carol 的订单存在,Bob 的不存在

10.2 隔离级别

10.2.1 SQL 标准隔离级别

隔离级别脏读不可重复读幻读性能
READ UNCOMMITTED✅ 可能✅ 可能✅ 可能⭐⭐⭐⭐⭐
READ COMMITTED❌ 防止✅ 可能✅ 可能⭐⭐⭐⭐
REPEATABLE READ❌ 防止❌ 防止✅ 可能⭐⭐⭐
SERIALIZABLE❌ 防止❌ 防止❌ 防止⭐⭐

10.2.2 设置隔离级别

-- 查看当前隔离级别
SHOW transaction_isolation;

-- 设置会话级隔离级别
SET SESSION CHARACTERISTICS AS TRANSACTION ISOLATION LEVEL REPEATABLE READ;

-- 设置单个事务的隔离级别
BEGIN ISOLATION LEVEL SERIALIZABLE;
  SET graph_path = social_network;
  MATCH (p:Person) RETURN count(p);
COMMIT;

-- PostgreSQL 默认级别: READ COMMITTED

10.2.3 MVCC 机制

MVCC(多版本并发控制):

  事务 A (READ COMMITTED):
    BEGIN;
    SELECT * FROM person WHERE name = 'Alice';
    -- 读取版本 v1 (age=30)

  事务 B:
    BEGIN;
    UPDATE person SET age = 31 WHERE name = 'Alice';
    COMMIT;  -- 创建版本 v2

  事务 A (继续):
    SELECT * FROM person WHERE name = 'Alice';
    -- READ COMMITTED: 读取版本 v2 (age=31)  ← 不同结果!
    -- REPEATABLE READ: 仍然读取版本 v1 (age=30)  ← 一致结果

10.3 锁机制

10.3.1 锁类型概览

锁类型粒度说明冲突
表级锁ACCESS EXCLUSIVE 最强阻塞所有操作
行级锁FOR UPDATE 锁定单行只阻塞同行写入
顶点锁顶点修改顶点时自动加锁阻塞同顶点修改
边锁修改边时自动加锁阻塞同边修改

10.3.2 显式锁定

-- 表级锁
LOCK TABLE person IN EXCLUSIVE MODE;

-- 行级锁(SELECT FOR UPDATE)
BEGIN;
  SELECT * FROM person WHERE name = 'Alice' FOR UPDATE;
  -- Alice 的记录被锁定,其他事务不能修改
  UPDATE person SET age = 31 WHERE name = 'Alice';
COMMIT;

-- FOR SHARE(共享锁,允许其他事务读但不允许写)
SELECT * FROM person WHERE name = 'Alice' FOR SHARE;

-- FOR NO KEY UPDATE(不锁定主键)
SELECT * FROM person WHERE name = 'Alice' FOR NO KEY UPDATE;

-- SKIP LOCKED(跳过已锁定的行)
SELECT * FROM person FOR UPDATE SKIP LOCKED;

10.3.3 图操作的锁行为

-- CREATE 顶点:自动获取新元组的排他锁
CREATE (:Person {name: 'Alice'});

-- MATCH + SET:获取匹配顶点的行级锁
MATCH (p:Person {name: 'Alice'})
SET p.age = 31;
-- Alice 顶点被锁定直到事务结束

-- MATCH + DELETE:获取排他锁
MATCH (p:Person {name: 'Alice'})
DELETE p;
-- Alice 被锁定,直到 COMMIT 或 ROLLBACK

-- MERGE:如果匹配则获取锁,如果创建则获取新元组锁
MERGE (p:Person {name: 'Alice'})
ON MATCH SET p.last_seen = datetime();

10.4 并发问题与解决方案

10.4.1 丢失更新

-- 场景:两个事务同时修改同一顶点

-- 事务 A:
BEGIN;
  MATCH (p:Person {name: 'Alice'}) RETURN p.credits;  -- 读到 100
  -- 计算: 100 + 50 = 150

-- 事务 B (同时):
BEGIN;
  MATCH (p:Person {name: 'Alice'}) RETURN p.credits;  -- 也读到 100
  -- 计算: 100 + 30 = 130

-- 事务 A:
  MATCH (p:Person {name: 'Alice'}) SET p.credits = 150;
COMMIT;

-- 事务 B:
  MATCH (p:Person {name: 'Alice'}) SET p.credits = 130;
COMMIT;
-- 最终结果: 130(丢失了事务 A 的更新!)

解决方案

-- 方案 1: 使用 FOR UPDATE(悲观锁)
BEGIN;
  -- 在 SQL 层面锁定
  SELECT properties FROM person
  WHERE properties->>'name' = 'Alice'
  FOR UPDATE;

  -- 然后修改
  MATCH (p:Person {name: 'Alice'})
  SET p.credits = p.credits + 50;
COMMIT;

-- 方案 2: 原子更新(推荐)
MATCH (p:Person {name: 'Alice'})
SET p.credits = p.credits + 50;
-- 直接在 SET 中使用表达式,避免读-改-写

10.4.2 幻读

-- 场景: 事务 A 查询计数,事务 B 同时插入新数据

-- 事务 A (REPEATABLE READ):
BEGIN ISOLATION LEVEL REPEATABLE READ;
  MATCH (p:Person) RETURN count(p);  -- 100
  -- ... 事务 B 插入新数据并提交 ...
  MATCH (p:Person) RETURN count(p);  -- 仍然 100(快照一致性)
COMMIT;

10.4.3 可序列化异常检测

-- 使用 SERIALIZABLE 隔离级别 + 重试逻辑
DO $$
DECLARE
    retry_count INT := 0;
    max_retries INT := 3;
    success BOOLEAN := FALSE;
BEGIN
    WHILE retry_count < max_retries AND NOT success LOOP
        BEGIN
            -- 业务逻辑
            PERFORM cypher('social_network', $$
              MATCH (p:Person {name: 'Alice'})
              SET p.credits = p.credits + 10
            $$);
            success := TRUE;
        EXCEPTION
            WHEN serialization_failure THEN
                retry_count := retry_count + 1;
                RAISE NOTICE 'Retry % due to serialization failure', retry_count;
            WHEN deadlock_detected THEN
                retry_count := retry_count + 1;
                RAISE NOTICE 'Retry % due to deadlock', retry_count;
        END;
    END LOOP;

    IF NOT success THEN
        RAISE EXCEPTION 'Transaction failed after % retries', max_retries;
    END IF;
END $$;

10.5 死锁处理

10.5.1 死锁的产生

死锁场景:

  事务 A:
    LOCK person WHERE name='Alice'   → 获得 Alice 的锁
    LOCK person WHERE name='Bob'     → 等待 Bob 的锁...

  事务 B:
    LOCK person WHERE name='Bob'     → 获得 Bob 的锁
    LOCK person WHERE name='Alice'   → 等待 Alice 的锁...

  → 循环等待,形成死锁

10.5.2 死锁检测与处理

AgensGraph(PostgreSQL)自动检测死锁并回滚其中一个事务:

-- 查看死锁相关配置
SHOW deadlock_timeout;  -- 默认 1s

-- 查看锁等待情况
SELECT
    blocked_locks.pid     AS blocked_pid,
    blocked_activity.usename  AS blocked_user,
    blocking_locks.pid     AS blocking_pid,
    blocking_activity.usename AS blocking_user,
    blocked_activity.query    AS blocked_statement,
    blocking_activity.query   AS blocking_statement
FROM pg_catalog.pg_locks blocked_locks
JOIN pg_catalog.pg_stat_activity blocked_activity
    ON blocked_activity.pid = blocked_locks.pid
JOIN pg_catalog.pg_locks blocking_locks
    ON blocking_locks.locktype = blocked_locks.locktype
    AND blocking_locks.database IS NOT DISTINCT FROM blocked_locks.database
    AND blocking_locks.relation IS NOT DISTINCT FROM blocked_locks.relation
    AND blocking_locks.pid != blocked_locks.pid
JOIN pg_catalog.pg_stat_activity blocking_activity
    ON blocking_activity.pid = blocking_locks.pid
WHERE NOT blocked_locks.granted;

10.5.3 死锁预防策略

策略说明
固定加锁顺序总是按 ID 从小到大加锁
缩短事务减少持锁时间
使用原子操作SET n.prop = n.prop + 1 而非读-改-写
合理超时设置 lock_timeout
避免长事务定期检查 pg_stat_activity
-- 设置锁超时
SET lock_timeout = '5s';

-- 设置语句超时
SET statement_timeout = '30s';

-- 查看长事务
SELECT pid, now() - xact_start AS duration, query
FROM pg_stat_activity
WHERE state = 'active'
  AND xact_start < now() - interval '5 minutes'
ORDER BY duration DESC;

10.6 高并发场景最佳实践

10.6.1 热点顶点更新

-- ❌ 低效:读-改-写模式(有丢失更新风险)
MATCH (counter:Counter {name: 'page_views'})
WITH counter, counter.value AS current_val
SET counter.value = current_val + 1;

-- ✅ 高效:原子递增
MATCH (counter:Counter {name: 'page_views'})
SET counter.value = counter.value + 1;

-- ✅ 使用 PostgreSQL 序列(最高并发性能)
CREATE SEQUENCE page_view_seq;
-- 通过 SQL 操作
SELECT nextval('page_view_seq');

10.6.2 批量操作优化

-- ❌ 低效:逐条创建
CREATE (:Log {event: 'login', user: 'Alice', time: datetime()});
CREATE (:Log {event: 'click', user: 'Bob', time: datetime()});
-- ... 重复 10000 次

-- ✅ 高效:批量创建(单个事务)
UNWIND range(1, 10000) AS i
CREATE (:Log {event: 'auto_' + toString(i), time: datetime()});

10.6.3 读写分离模式

生产环境读写分离:

  应用层
    ├─ 写请求 ──▶ Primary (AgensGraph Master)
    │                    │
    │                    ▼ (Streaming Replication)
    └─ 读请求 ──▶ Replica (AgensGraph Standby)

  适用场景:
    - 写操作频率低,读操作频率高
    - 可以接受读取略有延迟的数据

10.7 监控事务状态

-- 查看当前活动事务
SELECT
    pid,
    usename,
    application_name,
    state,
    xact_start,
    now() - xact_start AS txn_duration,
    query_start,
    now() - query_start AS query_duration,
    wait_event_type,
    wait_event,
    query
FROM pg_stat_activity
WHERE state != 'idle'
ORDER BY xact_start;

-- 查看锁等待
SELECT * FROM pg_locks WHERE NOT granted;

-- 查看事务 ID 消耗
SELECT
    txid_current(),
    txid_snapshot_xmax(txid_current_snapshot()) AS xmax;

10.8 本章小结

要点说明
ACIDAgensGraph 完整支持 ACID 事务
隔离级别READ COMMITTED(默认)到 SERIALIZABLE
MVCC多版本并发控制,读不阻塞写
锁机制表级锁 + 行级锁,自动管理
死锁自动检测和回滚,预防比处理更重要
原子操作优先使用 SET n.prop = n.prop + 1

10.9 练习

  1. 编写一个并发安全的"用户积分转移"事务。
  2. 使用 SAVEPOINT 实现"批量导入中部分失败"的回滚。
  3. 编写监控脚本检测运行超过 5 分钟的长事务。
  4. 实现带有重试逻辑的可序列化事务函数。

10.10 扩展阅读