强曰为道
与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

rqlite 完全指南 / 第 10 章:客户端开发

第 10 章:客户端开发

使用 Python、Go、Java 和 CLI 工具操作 rqlite 集群。


10.1 客户端概览

rqlite 通过 HTTP API 暴露所有功能,因此任何能发送 HTTP 请求的语言都可以作为客户端。官方和社区提供了多种客户端库:

语言客户端库维护状态
CLIrqlite(官方)✅ 活跃
Pythonpyrqlite / requests✅ 活跃
Gorqlite/client(官方)✅ 活跃
Javajqlite / 原生 HTTP⚠️ 社区维护
Node.js原生 HTTP
Rust原生 HTTP

10.2 CLI 客户端(官方)

10.2.1 安装

# 随 rqlite 一起安装(参见第 2 章)
# rqlite CLI 已包含在发布包中

# 验证
rqlite -h

10.2.2 基本使用

# 连接到本地节点
rqlite

# 连接到远程节点
rqlite 192.168.1.100:4001

# 使用认证
rqlite -u admin:password 192.168.1.100:4001

# 使用 TLS(自签名证书)
rqlite -s -ca /etc/rqlite/certs/ca.crt 192.168.1.100:4001

10.2.3 交互式命令

进入交互模式后,可以直接输入 SQL:

192.168.1.100:4001> CREATE TABLE demo (id INTEGER PRIMARY KEY, name TEXT);
0 rows affected
192.168.1.100:4001> INSERT INTO demo (name) VALUES ("hello");
1 rows affected
192.168.1.100:4001> SELECT * FROM demo;
+----+-------+
| id | name  |
+----+-------+
| 1  | hello |
+----+-------+

CLI 支持的特殊命令:

命令说明
.help显示帮助信息
.tables列出所有表
.schema显示表结构
.status显示节点状态
.nodes显示集群节点
.timer on/off显示执行时间
.consistency <level>设置一致性级别
.quit退出

10.3 Python 客户端

10.3.1 使用 requests 库(推荐)

"""
rqlite Python 客户端 — 基于 requests
"""
import json
import requests
from typing import Any, Optional


class RqliteClient:
    """rqlite HTTP API 客户端封装"""
    
    def __init__(
        self,
        host: str = "localhost",
        port: int = 4001,
        scheme: str = "http",
        username: Optional[str] = None,
        password: Optional[str] = None,
        timeout: int = 30,
    ):
        self.base_url = f"{scheme}://{host}:{port}"
        self.session = requests.Session()
        self.session.timeout = timeout
        
        # 认证
        if username and password:
            self.session.auth = (username, password)
        
        # 连接池配置
        adapter = requests.adapters.HTTPAdapter(
            pool_connections=10,
            pool_maxsize=20,
            max_retries=3,
        )
        self.session.mount("http://", adapter)
        self.session.mount("https://", adapter)
    
    def query(
        self,
        sql: str,
        params: Optional[list] = None,
        level: str = "weak",
        timeout: Optional[str] = None,
    ) -> dict:
        """执行查询(SELECT)"""
        statements = self._build_statements(sql, params)
        payload = {
            "statements": statements,
            "level": level,
        }
        if timeout:
            payload["timeout"] = timeout
        
        resp = self.session.post(
            f"{self.base_url}/db/query",
            json=payload,
        )
        resp.raise_for_status()
        return resp.json()
    
    def execute(
        self,
        sql: str,
        params: Optional[list] = None,
    ) -> dict:
        """执行写入(INSERT/UPDATE/DELETE/DDL)"""
        statements = self._build_statements(sql, params)
        resp = self.session.post(
            f"{self.base_url}/db/execute",
            json=statements,
        )
        resp.raise_for_status()
        return resp.json()
    
    def batch_execute(self, statements: list[list]) -> dict:
        """批量执行多条语句(同一事务)"""
        resp = self.session.post(
            f"{self.base_url}/db/execute",
            json=statements,
        )
        resp.raise_for_status()
        return resp.json()
    
    def request(self, statements: list[dict]) -> dict:
        """混合请求(查询+执行)"""
        resp = self.session.post(
            f"{self.base_url}/db/request",
            json={"statements": statements},
        )
        resp.raise_for_status()
        return resp.json()
    
    def status(self) -> dict:
        """获取节点状态"""
        resp = self.session.get(f"{self.base_url}/status")
        resp.raise_for_status()
        return resp.json()
    
    def nodes(self) -> dict:
        """获取集群节点列表"""
        resp = self.session.get(f"{self.base_url}/nodes")
        resp.raise_for_status()
        return resp.json()
    
    def backup(self, path: str, fmt: str = "sql") -> None:
        """备份数据库"""
        params = {}
        if fmt == "binary":
            params["fmt"] = "binary"
        
        resp = self.session.get(
            f"{self.base_url}/db/backup",
            params=params,
            stream=True,
        )
        resp.raise_for_status()
        
        with open(path, "wb") as f:
            for chunk in resp.iter_content(chunk_size=8192):
                f.write(chunk)
    
    def load(self, path: str) -> dict:
        """加载 SQL dump 恢复数据"""
        with open(path, "r") as f:
            sql = f.read()
        
        resp = self.session.post(
            f"{self.base_url}/db/load",
            data=sql,
            headers={"Content-Type": "text/plain"},
        )
        resp.raise_for_status()
        return resp.json() if resp.text else {}
    
    def _build_statements(
        self, sql: str, params: Optional[list]
    ) -> list:
        """构建语句列表"""
        stmt = {"q": sql}
        if params:
            stmt["v"] = params
        return [stmt]


# ---- 使用示例 ----

if __name__ == "__main__":
    client = RqliteClient(
        host="localhost",
        port=4001,
        username="admin",
        password="password",
    )
    
    # 查看状态
    status = client.status()
    print(f"Raft State: {status['store']['raft_state']}")
    
    # 创建表
    client.execute("""
        CREATE TABLE IF NOT EXISTS products (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            name TEXT NOT NULL,
            price REAL NOT NULL,
            stock INTEGER DEFAULT 0
        )
    """)
    
    # 批量插入
    client.batch_execute([
        ["INSERT INTO products (name, price, stock) VALUES (?, ?, ?)", "笔记本", 5999.0, 100],
        ["INSERT INTO products (name, price, stock) VALUES (?, ?, ?)", "键盘", 399.0, 200],
        ["INSERT INTO products (name, price, stock) VALUES (?, ?, ?)", "鼠标", 99.0, 500],
    ])
    
    # 查询
    result = client.query("SELECT * FROM products WHERE price > ?", params=[100])
    for row in result["results"][0].get("values", []):
        print(f"  {row[1]}: ¥{row[2]}, 库存: {row[3]}")
    
    # 备份
    client.backup("/tmp/rqlite_backup.sql")
    print("备份完成")

10.3.2 使用 pyrqlite 库

pip install pyrqlite
import pyrqlite.dbapi2 as dbapi

# 连接
conn = dbapi.connect(
    host='localhost',
    port=4001,
    # username='admin',
    # password='password',
)

cursor = conn.cursor()

# 执行 SQL
cursor.execute('CREATE TABLE IF NOT EXISTS demo (id INTEGER PRIMARY KEY, name TEXT)')
cursor.execute('INSERT INTO demo (name) VALUES (?)', ('hello',))
conn.commit()

# 查询
cursor.execute('SELECT * FROM demo')
rows = cursor.fetchall()
for row in rows:
    print(row)

conn.close()

10.4 Go 客户端

10.4.1 使用官方客户端库

package main

import (
	"fmt"
	"log"
	"time"

	rqlite "github.com/rqlite/rqlite-go"
)

func main() {
	// 创建客户端
	client := rqlite.NewClient("http://localhost:4001")
	client.SetBasicAuth("admin", "password")
	client.SetTimeout(30 * time.Second)

	// 创建表
	_, err := client.Execute(`CREATE TABLE IF NOT EXISTS tasks (
		id INTEGER PRIMARY KEY AUTOINCREMENT,
		title TEXT NOT NULL,
		status TEXT DEFAULT 'pending',
		created_at DATETIME DEFAULT CURRENT_TIMESTAMP
	)`)
	if err != nil {
		log.Fatal(err)
	}

	// 插入数据
	result, err := client.Execute(`INSERT INTO tasks (title) VALUES (?)`, "完成项目文档")
	if err != nil {
		log.Fatal(err)
	}
	fmt.Printf("插入成功, ID: %d\n", result[0].LastInsertID)

	// 查询数据
	rows, err := client.Query("SELECT * FROM tasks WHERE status = ?", "pending")
	if err != nil {
		log.Fatal(err)
	}
	fmt.Printf("查询结果: %d 行\n", len(rows[0].Values))
}

10.4.2 使用标准库

package main

import (
	"bytes"
	"encoding/json"
	"fmt"
	"io"
	"log"
	"net/http"
	"time"
)

type RqliteClient struct {
	BaseURL    string
	HTTPClient *http.Client
	Username   string
	Password   string
}

func NewRqliteClient(host string, port int) *RqliteClient {
	return &RqliteClient{
		BaseURL: fmt.Sprintf("http://%s:%d", host, port),
		HTTPClient: &http.Client{
			Timeout: 30 * time.Second,
			Transport: &http.Transport{
				MaxIdleConns:        100,
				MaxIdleConnsPerHost: 20,
				IdleConnTimeout:     90 * time.Second,
			},
		},
	}
}

func (c *RqliteClient) Execute(stmts []interface{}) (map[string]interface{}, error) {
	body, err := json.Marshal(stmts)
	if err != nil {
		return nil, err
	}

	req, err := http.NewRequest("POST", c.BaseURL+"/db/execute", bytes.NewReader(body))
	if err != nil {
		return nil, err
	}
	req.Header.Set("Content-Type", "application/json")
	if c.Username != "" {
		req.SetBasicAuth(c.Username, c.Password)
	}

	resp, err := c.HTTPClient.Do(req)
	if err != nil {
		return nil, err
	}
	defer resp.Body.Close()

	var result map[string]interface{}
	if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
		return nil, err
	}
	return result, nil
}

func (c *RqliteClient) Query(sql string, level string) (map[string]interface{}, error) {
	url := fmt.Sprintf("%s/db/query?q=%s&level=%s", c.BaseURL, sql, level)
	req, err := http.NewRequest("GET", url, nil)
	if err != nil {
		return nil, err
	}
	if c.Username != "" {
		req.SetBasicAuth(c.Username, c.Password)
	}

	resp, err := c.HTTPClient.Do(req)
	if err != nil {
		return nil, err
	}
	defer resp.Body.Close()

	body, _ := io.ReadAll(resp.Body)
	var result map[string]interface{}
	if err := json.Unmarshal(body, &result); err != nil {
		return nil, err
	}
	return result, nil
}

func main() {
	client := NewRqliteClient("localhost", 4001)
	client.Username = "admin"
	client.Password = "password"

	// 执行写入
	stmts := []interface{}{
		[]interface{}{"INSERT INTO tasks (title) VALUES (?)", "学习 Go"},
	}
	result, err := client.Execute(stmts)
	if err != nil {
		log.Fatal(err)
	}
	fmt.Printf("Execute result: %+v\n", result)

	// 查询
	result, err = client.Query("SELECT * FROM tasks", "weak")
	if err != nil {
		log.Fatal(err)
	}
	fmt.Printf("Query result: %+v\n", result)
}

10.5 Java 客户端

package com.example.rqlite;

import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.time.Duration;
import java.util.Base64;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;

/**
 * rqlite Java 客户端
 */
public class RqliteClient {
    private final String baseUrl;
    private final HttpClient httpClient;
    private final ObjectMapper mapper;
    private final String authHeader;

    public RqliteClient(String host, int port, String username, String password) {
        this.baseUrl = String.format("http://%s:%d", host, port);
        this.httpClient = HttpClient.newBuilder()
                .connectTimeout(Duration.ofSeconds(10))
                .build();
        this.mapper = new ObjectMapper();
        
        if (username != null && password != null) {
            String credentials = Base64.getEncoder()
                    .encodeToString((username + ":" + password).getBytes());
            this.authHeader = "Basic " + credentials;
        } else {
            this.authHeader = null;
        }
    }

    /**
     * 执行查询
     */
    public String query(String sql, String level) throws Exception {
        String url = String.format("%s/db/query?q=%s&level=%s",
                baseUrl, 
                java.net.URLEncoder.encode(sql, "UTF-8"),
                level);
        
        HttpRequest.Builder builder = HttpRequest.newBuilder()
                .uri(URI.create(url))
                .GET();
        
        if (authHeader != null) {
            builder.header("Authorization", authHeader);
        }
        
        HttpResponse<String> response = httpClient.send(
                builder.build(), HttpResponse.BodyHandlers.ofString());
        return response.body();
    }

    /**
     * 执行写入
     */
    public String execute(String sql, Object... params) throws Exception {
        ArrayNode stmts = mapper.createArrayNode();
        ArrayNode stmt = mapper.createArrayNode();
        stmt.add(sql);
        for (Object param : params) {
            stmt.add(param.toString());
        }
        stmts.add(stmt);
        
        String body = mapper.writeValueAsString(stmts);
        
        HttpRequest.Builder builder = HttpRequest.newBuilder()
                .uri(URI.create(baseUrl + "/db/execute"))
                .POST(HttpRequest.BodyPublishers.ofString(body))
                .header("Content-Type", "application/json");
        
        if (authHeader != null) {
            builder.header("Authorization", authHeader);
        }
        
        HttpResponse<String> response = httpClient.send(
                builder.build(), HttpResponse.BodyHandlers.ofString());
        return response.body();
    }

    public static void main(String[] args) throws Exception {
        RqliteClient client = new RqliteClient("localhost", 4001, "admin", "password");
        
        // 创建表
        client.execute("CREATE TABLE IF NOT EXISTS items (id INTEGER PRIMARY KEY, name TEXT)");
        
        // 插入
        client.execute("INSERT INTO items (name) VALUES (?)", "Java Item");
        
        // 查询
        String result = client.query("SELECT * FROM items", "weak");
        System.out.println(result);
    }
}

10.6 Node.js 客户端

/**
 * rqlite Node.js 客户端
 */
const http = require('http');

class RqliteClient {
    constructor(host = 'localhost', port = 4001, options = {}) {
        this.host = host;
        this.port = port;
        this.auth = options.auth || null;
        this.timeout = options.timeout || 30000;
    }

    async execute(statements) {
        return this._post('/db/execute', statements);
    }

    async query(sql, params = [], level = 'weak') {
        const stmt = { q: sql };
        if (params.length > 0) stmt.v = params;
        return this._post('/db/query', { statements: [stmt], level });
    }

    async request(statements) {
        return this._post('/db/request', { statements });
    }

    async status() {
        return this._get('/status');
    }

    async nodes() {
        return this._get('/nodes');
    }

    _get(path) {
        return new Promise((resolve, reject) => {
            const options = {
                hostname: this.host,
                port: this.port,
                path: path,
                method: 'GET',
                timeout: this.timeout,
            };
            if (this.auth) {
                options.headers = {
                    'Authorization': 'Basic ' + Buffer.from(this.auth).toString('base64'),
                };
            }

            const req = http.request(options, (res) => {
                let data = '';
                res.on('data', chunk => data += chunk);
                res.on('end', () => {
                    try { resolve(JSON.parse(data)); }
                    catch (e) { resolve(data); }
                });
            });
            req.on('error', reject);
            req.end();
        });
    }

    _post(path, body) {
        return new Promise((resolve, reject) => {
            const data = JSON.stringify(body);
            const options = {
                hostname: this.host,
                port: this.port,
                path: path,
                method: 'POST',
                headers: {
                    'Content-Type': 'application/json',
                    'Content-Length': Buffer.byteLength(data),
                },
                timeout: this.timeout,
            };
            if (this.auth) {
                options.headers['Authorization'] = 'Basic ' + Buffer.from(this.auth).toString('base64');
            }

            const req = http.request(options, (res) => {
                let data = '';
                res.on('data', chunk => data += chunk);
                res.on('end', () => {
                    try { resolve(JSON.parse(data)); }
                    catch (e) { resolve(data); }
                });
            });
            req.on('error', reject);
            req.write(data);
            req.end();
        });
    }
}

// 使用示例
async function main() {
    const client = new RqliteClient('localhost', 4001, { auth: 'admin:password' });

    // 创建表
    await client.execute([
        ['CREATE TABLE IF NOT EXISTS notes (id INTEGER PRIMARY KEY, content TEXT, created_at DATETIME DEFAULT CURRENT_TIMESTAMP)']
    ]);

    // 插入
    await client.execute([
        ['INSERT INTO notes (content) VALUES (?)', '第一条笔记']
    ]);

    // 查询
    const result = await client.query('SELECT * FROM notes ORDER BY id DESC');
    console.log(JSON.stringify(result, null, 2));
}

main().catch(console.error);

10.7 客户端对比

特性CLIPython (requests)Go (官方)Java (原生)Node.js
安装难度⭐⭐⭐⭐⭐⭐⭐
交互式使用
参数绑定
连接池❌(基础实现)
批量操作
TLS 支持
适用场景运维调试脚本/后端后端服务企业应用全栈应用

10.8 业务场景:用户注册服务

"""
用户注册服务示例 — 使用 rqlite 存储用户数据
"""
from flask import Flask, request, jsonify
from rqlite_client import RqliteClient

app = Flask(__name__)
db = RqliteClient(host="rqlite-host", port=4001, username="app", password="secure_pass")

# 初始化表
db.execute("""
    CREATE TABLE IF NOT EXISTS users (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        username TEXT NOT NULL UNIQUE,
        email TEXT NOT NULL UNIQUE,
        password_hash TEXT NOT NULL,
        created_at DATETIME DEFAULT CURRENT_TIMESTAMP
    )
""")
db.execute("CREATE UNIQUE INDEX IF NOT EXISTS idx_users_email ON users(email)")


@app.route("/register", methods=["POST"])
def register():
    data = request.json
    username = data.get("username")
    email = data.get("email")
    password = data.get("password")

    if not all([username, email, password]):
        return jsonify({"error": "缺少必填字段"}), 400

    # 检查用户名是否已存在
    existing = db.query(
        "SELECT id FROM users WHERE username = ? OR email = ?",
        params=[username, email],
        level="strong",
    )
    if existing["results"][0].get("values"):
        return jsonify({"error": "用户名或邮箱已存在"}), 409

    # 插入新用户
    import hashlib
    password_hash = hashlib.sha256(password.encode()).hexdigest()
    
    result = db.execute(
        "INSERT INTO users (username, email, password_hash) VALUES (?, ?, ?)",
        params=[username, email, password_hash],
    )

    user_id = result["results"][0]["last_insert_id"]
    return jsonify({"id": user_id, "username": username, "email": email}), 201


@app.route("/users/<int:user_id>", methods=["GET"])
def get_user(user_id):
    result = db.query(
        "SELECT id, username, email, created_at FROM users WHERE id = ?",
        params=[user_id],
    )
    values = result["results"][0].get("values")
    if not values:
        return jsonify({"error": "用户不存在"}), 404

    row = values[0]
    return jsonify({
        "id": row[0],
        "username": row[1],
        "email": row[2],
        "created_at": row[3],
    })


if __name__ == "__main__":
    app.run(host="0.0.0.0", port=5000)

10.9 本章小结

要点内容
CLI 客户端官方 rqlite 工具,适合运维和调试
Pythonrequests 库最灵活,pyrqlite 提供 DB-API 2.0 接口
Go官方库 rqlite-go,也可使用标准库直接调用 HTTP API
Java使用 java.net.http 或 Apache HttpClient
Node.js使用内置 http 模块或 node-fetch
通用原则所有语言都通过 HTTP API 操作 rqlite

上一章:第 9 章:性能优化 下一章:第 11 章:容器化部署