强曰为道
与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

dqlite 分布式 SQLite 教程 / 第 6 章:集群搭建与管理

第 6 章:集群搭建与管理

本章介绍如何搭建 dqlite 多节点集群,包括节点管理、Leader 选举、故障转移和成员变更。


6.1 集群架构总览

dqlite 集群由多个节点组成,通过 Raft 协议保证数据一致性:

┌───────────────────────────────────────────────────┐
│                dqlite Cluster                      │
│                                                    │
│   ┌──────────┐     ┌──────────┐     ┌──────────┐ │
│   │  Node 1  │◀───▶│  Node 2  │◀───▶│  Node 3  │ │
│   │ (Leader) │     │(Follower)│     │(Follower) │ │
│   │  :9001   │     │  :9002   │     │  :9003   │ │
│   └──────────┘     └──────────┘     └──────────┘ │
│        │                                      │    │
│        │         Raft Consensus               │    │
│        │     ┌──────────────────┐             │    │
│        └────▶│  Log Replication │◀────────────┘    │
│              └──────────────────┘                  │
│                                                    │
│   Client ──────▶ Leader (写)                       │
│   Client ──────▶ Any Node (读,可配置)              │
└───────────────────────────────────────────────────┘

6.1.1 节点角色

角色 说明 数量
Leader 处理所有写请求,协调日志复制 1
Follower 接收并复制 Leader 的日志 N-1
Candidate 正在竞选 Leader 的临时状态 0-1

6.1.2 集群规模建议

场景 推荐节点数 可用性
开发测试 1 无冗余
小型生产 3 可容忍 1 节点故障
中型生产 5 可容忍 2 节点故障
高可用 7 可容忍 3 节点故障

6.2 三节点集群搭建

6.2.1 本地三节点(开发测试)

在同一台机器上启动三个节点:

#!/bin/bash
# start-cluster.sh - 本地三节点集群

DATA_BASE="/tmp/dqlite-cluster"
mkdir -p ${DATA_BASE}/node{1,2,3}

# 启动 Node 1
echo "Starting Node 1..."
./dqlite-node 1 "127.0.0.1:9001" "${DATA_BASE}/node1" &
PID1=$!

sleep 2

# 启动 Node 2
echo "Starting Node 2..."
./dqlite-node 2 "127.0.0.1:9002" "${DATA_BASE}/node2" &
PID2=$!

sleep 2

# 启动 Node 3
echo "Starting Node 3..."
./dqlite-node 3 "127.0.0.1:9003" "${DATA_BASE}/node3" &
PID3=$!

sleep 2

echo "Cluster started:"
echo "  Node 1: PID=$PID1, address=127.0.0.1:9001"
echo "  Node 2: PID=$PID2, address=127.0.0.1:9002"
echo "  Node 3: PID=$PID3, address=127.0.0.1:9003"

# 将节点添加到集群(由第一个节点执行)
# 具体方式取决于 go-dqlite 或 C API

停止集群:

#!/bin/bash
# stop-cluster.sh
kill $PID1 $PID2 $PID3
wait
echo "Cluster stopped"

6.2.2 Go 实现:集群初始化

package main

import (
    "context"
    "database/sql"
    "fmt"
    "log"
    "os"
    "os/signal"
    "syscall"
    "time"

    dqlite "github.com/canonical/go-dqlite/v2"
    "github.com/canonical/go-dqlite/v2/driver"
)

type ClusterConfig struct {
    Nodes []NodeConfig
}

type NodeConfig struct {
    ID      uint64
    Address string
    DataDir string
}

func startClusterNode(cfg NodeConfig, allNodes []driver.NodeInfo) (*dqlite.Node, *sql.DB, error) {
    // 确保数据目录
    if err := os.MkdirAll(cfg.DataDir, 0755); err != nil {
        return nil, nil, fmt.Errorf("mkdir: %w", err)
    }

    // 创建节点
    node, err := dqlite.New(cfg.ID, cfg.Address, cfg.DataDir, nil)
    if err != nil {
        return nil, nil, fmt.Errorf("new node: %w", err)
    }

    // 创建驱动(需要知道所有节点地址)
    store := driver.NewInmemNodeStore()
    store.Set(context.Background(), allNodes)

    drv, err := driver.New(store)
    if err != nil {
        node.Close()
        return nil, nil, fmt.Errorf("new driver: %w", err)
    }

    db := sql.OpenDB(drv)
    db.SetMaxOpenConns(5)

    return node, db, nil
}

func main() {
    // 集群配置
    nodes := []NodeConfig{
        {ID: 1, Address: "127.0.0.1:9001", DataDir: "/tmp/dqlite-cluster/node1"},
        {ID: 2, Address: "127.0.0.1:9002", DataDir: "/tmp/dqlite-cluster/node2"},
        {ID: 3, Address: "127.0.0.1:9003", DataDir: "/tmp/dqlite-cluster/node3"},
    }

    allNodeInfos := []driver.NodeInfo{
        {ID: 1, Address: "127.0.0.1:9001"},
        {ID: 2, Address: "127.0.0.1:9002"},
        {ID: 3, Address: "127.0.0.1:9003"},
    }

    // 启动所有节点
    var (
        nodeInstances []*dqlite.Node
        databases     []*sql.DB
    )

    for _, cfg := range nodes {
        node, db, err := startClusterNode(cfg, allNodeInfos)
        if err != nil {
            log.Fatalf("Failed to start node %d: %v", cfg.ID, err)
        }
        nodeInstances = append(nodeInstances, node)
        databases = append(databases, db)
        fmt.Printf("Node %d started on %s\n", cfg.ID, cfg.Address)
    }

    defer func() {
        for i, node := range nodeInstances {
            databases[i].Close()
            node.Close()
        }
    }()

    // 等待集群稳定
    time.Sleep(3 * time.Second)

    // 使用第一个节点的数据库进行操作
    db := databases[0]
    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()

    if err := db.PingContext(ctx); err != nil {
        log.Fatalf("Ping failed: %v", err)
    }

    // 创建测试表
    _, err := db.Exec(`CREATE TABLE IF NOT EXISTS cluster_test (
        id INTEGER PRIMARY KEY,
        node_id INTEGER,
        created_at TEXT DEFAULT (datetime('now'))
    )`)
    if err != nil {
        log.Fatalf("Create table failed: %v", err)
    }

    fmt.Println("Cluster is ready!")

    // 等待信号
    sigCh := make(chan os.Signal, 1)
    signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
    <-sigCh

    fmt.Println("\nShutting down cluster...")
}

6.2.3 多机器集群部署

在三台机器上分别部署:

机器 1 (192.168.1.101):
  Node ID: 1
  Address: 192.168.1.101:9001
  DataDir: /var/lib/dqlite

机器 2 (192.168.1.102):
  Node ID: 2
  Address: 192.168.1.102:9001
  DataDir: /var/lib/dqlite

机器 3 (192.168.1.103):
  Node ID: 3
  Address: 192.168.1.103:9001
  DataDir: /var/lib/dqlite

每台机器的 systemd 服务文件:

# /etc/systemd/system/dqlite.service
[Unit]
Description=dqlite Node
After=network-online.target
Wants=network-online.target

[Service]
Type=simple
User=dqlite
Group=dqlite
ExecStart=/usr/local/bin/dqlite-node --id %i --address :9001 --data-dir /var/lib/dqlite
Restart=always
RestartSec=5
LimitNOFILE=65536

# 安全加固
NoNewPrivileges=true
ProtectSystem=strict
ReadWritePaths=/var/lib/dqlite

[Install]
WantedBy=multi-user.target

6.3 Leader 选举

6.3.1 选举触发条件

触发条件 说明
初始启动 集群首次启动时
Leader 崩溃 Follower 超时未收到心跳
网络分区 Leader 在少数派中
手动转移 管理员主动触发 Leader 转移

6.3.2 选举时间线

t=0ms     Leader 崩溃(或网络断开)
t=150ms   Follower A 选举超时,转为 Candidate
t=151ms   Candidate A 增加任期,投自己一票
t=152ms   Candidate A 向 B、C 发送 RequestVote
t=153ms   B 投赞成票(A 的日志最新)
t=154ms   C 投赞成票
t=155ms   A 获得多数票,成为 Leader
t=156ms   A 开始发送心跳

总耗时:约 150-300ms(取决于随机超时)

6.3.3 手动 Leader 转移

在计划性维护前,建议先转移 Leader:

// 通过 SQL 执行 Leader 转移(如果 go-dqlite 支持)
// 注意:具体的 API 可能随版本变化

// 方式 1:停止当前 Leader,集群自动选举新 Leader
func gracefulLeaderTransfer(db *sql.DB) error {
    // 1. 停止接受新写入
    _, err := db.Exec("PRAGMA dqlite_stop_writes")
    if err != nil {
        return err
    }

    // 2. 等待所有写入完成
    time.Sleep(2 * time.Second)

    // 3. 新的 Leader 会在选举超时后自动产生
    return nil
}

// 方式 2:使用 dqlite 客户端的 Transfer 命令
// (需要连接到当前 Leader)

6.3.4 选举问题排查

问题 原因 解决方案
无法选出 Leader 网络不通或多数节点故障 检查网络连通性
频繁 Leader 切换 网络延迟或不稳定 调整选举超时参数
选举时间过长 时钟不同步 使用 NTP 同步时钟
脑裂 网络分区(Raft 保证不会发生) 检查是否有误配置

6.4 故障转移

6.4.1 自动故障转移

当 Leader 故障时,dqlite 自动处理:

正常状态:
  Node 1 (Leader) ──▶ Node 2 (Follower)
         │
         └──────────▶ Node 3 (Follower)

Node 1 故障后:
  ✗ Node 1 (Down)
  
  Node 2 (Candidate) ──▶ 选举 ──▶ Node 2 (New Leader)
         │
         └──────────────▶ Node 3 (Follower)

6.4.2 故障转移时间

阶段 耗时 说明
故障检测 150-300ms 选举超时
选举 1-10ms 投票过程
日志同步 0-100ms 新 Leader 确认日志最新
总计 150-400ms 从故障到新 Leader 就绪

6.4.3 客户端处理故障转移

// 客户端需要处理 Leader 切换期间的错误
func executeWithFailover(ctx context.Context, dbs []*sql.DB, query string, args ...interface{}) error {
    var lastErr error

    for _, db := range dbs {
        _, err := db.ExecContext(ctx, query, args...)
        if err == nil {
            return nil
        }

        // 如果是 Leader 相关错误,尝试下一个节点
        if isLeaderError(err) {
            lastErr = err
            continue
        }

        // 其他错误直接返回
        return err
    }

    return fmt.Errorf("all nodes failed: %w", lastErr)
}

func isLeaderError(err error) bool {
    if err == nil {
        return false
    }
    msg := err.Error()
    return strings.Contains(msg, "server is not the leader") ||
           strings.Contains(msg, "no leader") ||
           strings.Contains(msg, "connection refused")
}

6.4.4 健康检查

// 集群健康检查
type ClusterHealth struct {
    NodeID    uint64
    Address   string
    IsLeader  bool
    IsHealthy bool
    Lag       time.Duration
}

func checkClusterHealth(nodeInfos []driver.NodeInfo) []ClusterHealth {
    var results []ClusterHealth

    for _, info := range nodeInfos {
        health := ClusterHealth{
            NodeID:  info.ID,
            Address: info.Address,
        }

        // 尝试连接
        store := driver.NewInmemNodeStore()
        store.Set(context.Background(), []driver.NodeInfo{info})

        drv, err := driver.New(store)
        if err != nil {
            health.IsHealthy = false
            results = append(results, health)
            continue
        }

        db := sql.OpenDB(drv)
        ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)

        start := time.Now()
        err = db.PingContext(ctx)
        health.Lag = time.Since(start)

        health.IsHealthy = (err == nil)

        db.Close()
        drv.Close()
        cancel()

        results = append(results, health)
    }

    return results
}

6.5 成员变更

6.5.1 添加节点

// 添加新节点到集群
func addNodeToCluster(ctx context.Context, leaderDB *sql.DB,
    newNodeID uint64, newNodeAddr string) error {

    // 注意:具体的 SQL 或 API 调用取决于 go-dqlite 版本
    // 以下为概念性代码

    // 使用 dqlite 客户端的 Add 命令
    // 在实际使用中,需要通过 go-dqlite 的 Client API

    log.Printf("Adding node %d at %s to cluster...", newNodeID, newNodeAddr)

    // 方式 1:通过 go-dqlite Client API
    // client.Add(ctx, dqlite.NodeInfo{ID: newNodeID, Address: newNodeAddr})

    // 方式 2:通过自定义 SQL 命令
    // _, err := leaderDB.Exec("ALTER CLUSTER ADD NODE ?", newNodeID)

    return nil
}

6.5.2 移除节点

// 从集群移除节点
func removeNodeFromCluster(ctx context.Context, leaderDB *sql.DB,
    nodeID uint64) error {

    log.Printf("Removing node %d from cluster...", nodeID)

    // 类似添加节点,通过客户端 API 执行

    return nil
}

6.5.3 成员变更脚本

#!/bin/bash
# cluster-manage.sh - 集群成员管理脚本

set -euo pipefail

ACTION="$1"
NODE_ID="$2"
NODE_ADDR="${3:-}"

case "$ACTION" in
    add)
        if [ -z "$NODE_ADDR" ]; then
            echo "Usage: $0 add <node-id> <node-address>"
            exit 1
        fi
        echo "Adding node $NODE_ID ($NODE_ADDR) to cluster..."
        # 通过 go-dqlite CLI 或 API 添加
        # go-dqlite cluster add $NODE_ID $NODE_ADDR
        ;;
    remove)
        echo "Removing node $NODE_ID from cluster..."
        # go-dqlite cluster remove $NODE_ID
        ;;
    list)
        echo "Listing cluster members..."
        # go-dqlite cluster list
        ;;
    *)
        echo "Usage: $0 {add|remove|list} <node-id> [node-address]"
        exit 1
        ;;
esac

6.5.4 成员变更最佳实践

规则 说明
一次只变更一个节点 避免 Quorum 计算混乱
先添加再移除 替换节点时,先加入新节点再移除旧节点
等待同步完成 新节点完全同步后再进行下一次变更
保持奇数节点 3→5→3 而不是 3→4→3
变更前备份 关键操作前创建快照

6.6 集群运维命令

6.6.1 检查集群状态

// 获取集群信息
func getClusterInfo(db *sql.DB) error {
    // 查询集群信息(具体 SQL 取决于版本)
    rows, err := db.Query("SELECT * FROM dqlite_cluster")
    if err != nil {
        return err
    }
    defer rows.Close()

    fmt.Println("=== Cluster Info ===")
    // 遍历结果...
    return nil
}

6.6.2 查看 Raft 日志状态

// 查看 Raft 状态
type RaftStatus struct {
    Term         uint64
    CommitIndex  uint64
    LastLogIndex uint64
    Role         string
}

6.7 常见集群问题

问题 原因 解决方案
集群无法启动 节点配置不一致 检查所有节点配置
写入失败 Leader 不可达 检查集群健康状态
数据不一致 时钟偏差过大 使用 NTP 同步时钟
节点反复加入退出 网络不稳定 检查网络质量
日志增长过快 快照未触发 调整快照阈值
新节点同步慢 数据量大 使用快照而非日志重放

本章小结

要点 说明
集群规模 推荐 3、5、7 个节点(奇数)
Leader 选举 150-300ms 超时后自动选举
故障转移 自动,150-400ms 完成
成员变更 一次一个节点,等待同步完成
客户端处理 需要实现重试和 Leader 发现

下一章

第 7 章:性能优化 — 学习批量写入、读优化、同步策略和日志压缩等性能优化技巧。