OpenResty 高性能网关开发教程 / 第 08 章 - 反向代理与负载均衡
第 08 章 - 反向代理与负载均衡
8.1 反向代理基础
反向代理是 API 网关的核心能力,将客户端请求转发到后端服务。
# 基本反向代理
server {
listen 8080;
location /api/ {
proxy_pass http://backend_service;
# 传递客户端信息
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
proxy_set_header X-Request-ID $request_id;
# 超时配置
proxy_connect_timeout 5s;
proxy_send_timeout 30s;
proxy_read_timeout 30s;
# 缓冲配置
proxy_buffering on;
proxy_buffer_size 4k;
proxy_buffers 8 4k;
}
}
upstream backend_service {
server 10.0.1.1:8080 weight=5;
server 10.0.1.2:8080 weight=3;
server 10.0.1.3:8080 backup;
}
8.2 负载均衡策略
策略对比
| 策略 | 指令 | 特点 | 适用场景 |
|---|---|---|---|
| 轮询 | 默认 | 依次分配 | 服务器性能相近 |
| 加权轮询 | weight | 按权重分配 | 服务器性能不同 |
| IP 哈希 | ip_hash | 同一 IP 固定后端 | 会话保持 |
| 最少连接 | least_conn | 分配到连接最少的后端 | 长连接场景 |
| 一致性哈希 | hash $request_uri | 相同请求固定后端 | 缓存友好 |
| 随机 | random two least_conn | 随机选择 | 大规模集群 |
8.2.1 加权轮询
upstream backend {
server 10.0.1.1:8080 weight=5; # 50% 流量
server 10.0.1.2:8080 weight=3; # 30% 流量
server 10.0.1.3:8080 weight=2; # 20% 流量
}
8.2.2 IP 哈希(会话保持)
upstream backend {
ip_hash;
server 10.0.1.1:8080;
server 10.0.1.2:8080;
server 10.0.1.3:8080;
}
8.2.3 一致性哈希
upstream backend {
hash $request_uri consistent; # 一致性哈希
server 10.0.1.1:8080;
server 10.0.1.2:8080;
server 10.0.1.3:8080;
}
8.2.4 Lua 动态负载均衡
当 Nginx 原生策略不够灵活时,使用 Lua 实现自定义负载均衡。
-- /usr/local/openresty/lua/balancer.lua
local _M = {}
-- 加权随机选择
local function weighted_random(upstreams)
local total_weight = 0
for _, up in ipairs(upstreams) do
total_weight = total_weight + up.weight
end
local rand = math.random() * total_weight
local cumulative = 0
for _, up in ipairs(upstreams) do
cumulative = cumulative + up.weight
if rand <= cumulative then
return up
end
end
return upstreams[#upstreams]
end
-- 最少连接选择(基于 Nginx 变量)
local function least_conn(upstreams)
local min_conn = math.huge
local selected = nil
for _, up in ipairs(upstreams) do
if up.healthy then
-- 获取连接数(需要 stub_status 或自定义计数)
local conn = ngx.shared.gateway_config:get("conn:" .. up.addr) or 0
if conn < min_conn then
min_conn = conn
selected = up
end
end
end
return selected or upstreams[1]
end
-- 一致性哈希选择
local function consistent_hash(upstreams, key)
local hash = 5381
for i = 1, #key do
hash = ((hash * 33) + string.byte(key, i)) % 2147483647
end
local index = (hash % #upstreams) + 1
return upstreams[index]
end
-- 主选择函数
function _M.select_upstream(upstreams, strategy, key)
strategy = strategy or "weighted_random"
-- 过滤不健康的节点
local healthy = {}
for _, up in ipairs(upstreams) do
if up.healthy ~= false then
table.insert(healthy, up)
end
end
if #healthy == 0 then
return nil, "No healthy upstreams"
end
if strategy == "weighted_random" then
return weighted_random(healthy)
elseif strategy == "least_conn" then
return least_conn(healthy)
elseif strategy == "consistent_hash" then
return consistent_hash(healthy, key or ngx.var.uri)
else
return healthy[math.random(#healthy)]
end
end
return _M
8.3 健康检查
8.3.1 被动健康检查
Nginx 原生支持被动健康检查(max_fails + fail_timeout):
upstream backend {
server 10.0.1.1:8080 max_fails=3 fail_timeout=30s;
server 10.0.1.2:8080 max_fails=3 fail_timeout=30s;
server 10.0.1.3:8080 max_fails=3 fail_timeout=30s;
}
| 参数 | 说明 |
|---|---|
max_fails | 在 fail_timeout 期间允许的最大失败次数 |
fail_timeout | 标记为失败的时间窗口,也是标记为不可用的持续时间 |
8.3.2 主动健康检查
使用 ngx.timer 定期探测后端服务。
-- /usr/local/openresty/lua/health_check.lua
local _M = {}
local http = require "resty.http"
local cjson = require "cjson"
-- 后端配置
local backends = {
{name = "user-service", addr = "10.0.1.1:8080", path = "/health"},
{name = "order-service", addr = "10.0.1.2:8080", path = "/health"},
{name = "product-service", addr = "10.0.1.3:8080", path = "/health"},
}
-- 健康状态存储
local health_status = {}
-- 探测单个后端
local function probe(backend)
local httpc = http.new()
httpc:set_timeout(3000)
local start_time = ngx.now()
local res, err = httpc:request_uri(
"http://" .. backend.addr .. backend.path,
{method = "GET"}
)
local latency = (ngx.now() - start_time) * 1000 -- 毫秒
if not res then
return {
healthy = false,
error = err,
latency = latency,
}
end
return {
healthy = (res.status >= 200 and res.status < 400),
status = res.status,
latency = latency,
}
end
-- 定期健康检查
local function check_health(premature)
if premature then return end
for _, backend in ipairs(backends) do
local result = probe(backend)
local prev = health_status[backend.name]
if result.healthy then
-- 连续成功次数
local success_count = (prev and prev.consecutive_success or 0) + 1
health_status[backend.name] = {
healthy = true,
latency = result.latency,
last_check = ngx.time(),
consecutive_success = success_count,
consecutive_failures = 0,
}
else
-- 连续失败次数
local fail_count = (prev and prev.consecutive_failures or 0) + 1
local is_healthy = fail_count < 3 -- 连续失败 3 次才标记为不健康
if prev and prev.healthy and not is_healthy then
ngx.log(ngx.WARN, "Backend ", backend.name, " marked unhealthy after ",
fail_count, " consecutive failures")
end
health_status[backend.name] = {
healthy = is_healthy,
error = result.error,
latency = result.latency,
last_check = ngx.time(),
consecutive_success = 0,
consecutive_failures = fail_count,
}
end
end
end
-- 启动健康检查定时器
function _M.start(interval)
interval = interval or 10
-- 使用 timer.every 创建周期性任务
ngx.timer.every(interval, check_health)
ngx.log(ngx.INFO, "Health check started, interval: ", interval, "s")
end
-- 获取健康状态
function _M.get_status()
return health_status
end
-- 检查指定后端是否健康
function _M.is_healthy(name)
local status = health_status[name]
return status and status.healthy or true -- 默认健康
end
return _M
8.4 重试机制
8.4.1 Nginx 原生重试
upstream backend {
server 10.0.1.1:8080;
server 10.0.1.2:8080;
server 10.0.1.3:8080;
}
server {
listen 8080;
location /api/ {
proxy_pass http://backend;
# 重试配置
proxy_next_upstream error timeout http_502 http_503 http_504;
proxy_next_upstream_timeout 10s; # 重试总超时
proxy_next_upstream_tries 3; # 最大重试次数
# 幂等方法才重试(GET/HEAD/DELETE/PUT)
# POST 请求默认不重试
}
}
8.4.2 Lua 自定义重试
-- /usr/local/openresty/lua/retry.lua
local _M = {}
local http = require "resty.http"
-- 重试配置
local defaults = {
max_retries = 3,
retry_delay = 0.1, -- 初始延迟 100ms
max_delay = 2, -- 最大延迟 2s
backoff_multiplier = 2, -- 指数退避倍数
retry_on = { -- 可重试的状态码
[502] = true,
[503] = true,
[504] = true,
[408] = true,
},
}
-- 指数退避延迟
local function get_delay(retry_count, config)
local delay = config.retry_delay * (config.backoff_multiplier ^ (retry_count - 1))
return math.min(delay, config.max_delay)
end
-- 带重试的 HTTP 请求
function _M.request(url, opts, config)
config = config or defaults
local last_err
for attempt = 1, config.max_retries do
local httpc = http.new()
httpc:set_timeout(opts.timeout or 5000)
local res, err = httpc:request_uri(url, opts)
if res then
-- 检查是否需要重试
if config.retry_on[res.status] then
last_err = "HTTP " .. res.status
-- 等待后重试
if attempt < config.max_retries then
local delay = get_delay(attempt, config)
ngx.log(ngx.WARN, "Retry ", attempt, " after ", delay, "s, status: ", res.status)
ngx.sleep(delay)
end
else
-- 成功或不需要重试的状态码
return res, nil, attempt
end
else
last_err = err
if attempt < config.max_retries then
local delay = get_delay(attempt, config)
ngx.log(ngx.WARN, "Retry ", attempt, " after ", delay, "s, error: ", err)
ngx.sleep(delay)
end
end
end
return nil, "All retries exhausted: " .. last_err, config.max_retries
end
return _M
8.5 超时控制
server {
listen 8080;
location /api/ {
# 连接超时:与后端建立连接的超时
proxy_connect_timeout 5s;
# 发送超时:向后端发送请求的超时
proxy_send_timeout 30s;
# 读取超时:等待后端响应的超时
proxy_read_timeout 30s;
# 重试超时:重试的总时间限制
proxy_next_upstream_timeout 10s;
proxy_pass http://backend;
}
}
Lua 超时控制
-- /usr/local/openresty/lua/timeout_handler.lua
local http = require "resty.http"
local function proxy_with_timeout(url, timeout)
local httpc = http.new()
httpc:set_timeout(timeout or 5000)
-- 使用 cosocket 的异步连接
local ok, err = httpc:connect(url)
if not ok then
return nil, "connect timeout: " .. err
end
-- 发送请求
local res, err = httpc:request({method = "GET", path = "/"})
if not res then
return nil, "request timeout: " .. err
end
-- 读取响应体(带超时)
local body, err = res:read_body()
if not body then
return nil, "read timeout: " .. err
end
-- 放回连接池
httpc:set_keepalive(10000, 100)
return {status = res.status, body = body}
end
8.6 熔断器(Circuit Breaker)
熔断器是微服务架构中的关键保护机制,当下游服务出现故障时快速失败,避免级联故障。
熔断器状态机
请求成功率低
┌──────────────────────┐
│ ▼
┌─────┐ 超过阈值 ┌─────────┐ 超时/探测 ┌─────────┐
│CLOSED│ ──────────→ │ OPEN │ ──────────→ │HALF-OPEN│
│(正常) │ │ (熔断) │ │(半开) │
└─────┘ └─────────┘ └─────────┘
▲ │
│ 探测成功 │
└──────────────────────────────────────────────┘
探测失败 → 回到 OPEN
-- /usr/local/openresty/lua/circuit_breaker.lua
local _M = {}
-- 熔断器状态
local STATE_CLOSED = "closed" -- 正常状态
local STATE_OPEN = "open" -- 熔断状态
local STATE_HALF_OPEN = "half_open" -- 半开状态
-- 默认配置
local default_config = {
failure_threshold = 5, -- 连续失败次数触发熔断
success_threshold = 3, -- 半开状态连续成功次数恢复
timeout = 30, -- 熔断持续时间(秒)
window = 60, -- 统计窗口(秒)
error_rate_threshold = 0.5, -- 错误率阈值
min_requests = 10, -- 最少请求数(低于此数不计算错误率)
}
function _M.new(name, config)
config = config or {}
setmetatable(config, {__index = default_config})
local shared = ngx.shared.gateway_config
return {
name = name,
config = config,
shared = shared,
}
end
-- 获取当前状态
function _M:get_state()
local state = self.shared:get("cb:" .. self.name .. ":state")
return state or STATE_CLOSED
end
-- 记录请求结果
function _M:record(success)
local prefix = "cb:" .. self.name
local now = ngx.time()
-- 递增计数
if success then
self.shared:incr(prefix .. ":success", 1, 0, self.config.window)
else
self.shared:incr(prefix .. ":failure", 1, 0, self.config.window)
end
self.shared:incr(prefix .. ":total", 1, 0, self.config.window)
-- 检查状态转换
local state = self:get_state()
if state == STATE_CLOSED then
self:_check_open()
elseif state == STATE_HALF_OPEN then
self:_check_half_open(success)
end
end
-- 检查是否需要打开熔断器
function _M:_check_open()
local prefix = "cb:" .. self.name
-- 方式 1:连续失败次数
local consecutive = self.shared:get(prefix .. ":consecutive_failures") or 0
if consecutive >= self.config.failure_threshold then
self:_open()
return
end
-- 方式 2:错误率
local total = self.shared:get(prefix .. ":total") or 0
if total >= self.config.min_requests then
local failures = self.shared:get(prefix .. ":failure") or 0
local error_rate = failures / total
if error_rate >= self.config.error_rate_threshold then
self:_open()
end
end
end
-- 打开熔断器
function _M:_open()
local prefix = "cb:" .. self.name
self.shared:set(prefix .. ":state", STATE_OPEN, self.config.timeout)
self.shared:set(prefix .. ":open_time", ngx.time())
ngx.log(ngx.WARN, "Circuit breaker OPENED for: ", self.name)
end
-- 检查半开状态
function _M:_check_half_open(success)
local prefix = "cb:" .. self.name
if success then
local consecutive = self.shared:incr(prefix .. ":half_open_success", 1, 0)
if consecutive >= self.config.success_threshold then
-- 恢复正常
self.shared:set(prefix .. ":state", STATE_CLOSED)
self.shared:set(prefix .. ":consecutive_failures", 0)
self.shared:delete(prefix .. ":half_open_success")
ngx.log(ngx.INFO, "Circuit breaker CLOSED for: ", self.name)
end
else
-- 再次失败,重新打开
self:_open()
self.shared:delete(prefix .. ":half_open_success")
end
end
-- 检查是否允许请求通过
function _M:allow_request()
local state = self:get_state()
if state == STATE_CLOSED then
return true
end
if state == STATE_OPEN then
-- 检查是否到了探测时间
local open_time = self.shared:get("cb:" .. self.name .. ":open_time") or 0
if ngx.time() - open_time >= self.config.timeout then
-- 转换到半开状态
self.shared:set("cb:" .. self.name .. ":state", STATE_HALF_OPEN)
ngx.log(ngx.INFO, "Circuit breaker HALF-OPEN for: ", self.name)
return true -- 允许一个探测请求
end
return false -- 熔断中,拒绝请求
end
if state == STATE_HALF_OPEN then
return true -- 半开状态允许探测请求
end
return true
end
-- 重置熔断器
function _M:reset()
local prefix = "cb:" .. self.name
self.shared:delete(prefix .. ":state")
self.shared:delete(prefix .. ":consecutive_failures")
self.shared:delete(prefix .. ":open_time")
self.shared:delete(prefix .. ":half_open_success")
self.shared:delete(prefix .. ":success")
self.shared:delete(prefix .. ":failure")
self.shared:delete(prefix .. ":total")
end
return _M
使用示例
lua_shared_dict circuit_breaker 10m;
server {
listen 8080;
location /api/users {
access_by_lua_block {
local cb = require "circuit_breaker"
local breaker = cb.new("user-service", {
failure_threshold = 5,
timeout = 30,
})
if not breaker:allow_request() then
ngx.status = 503
ngx.header["Retry-After"] = "30"
ngx.say('{"error":"Service temporarily unavailable","service":"user-service"}')
return ngx.exit(503)
end
-- 存储 breaker 实例供后续使用
ngx.ctx.breaker = breaker
}
proxy_pass http://user-service;
header_filter_by_lua_block {
local breaker = ngx.ctx.breaker
if breaker then
-- 根据响应状态记录结果
local success = ngx.status < 500
breaker:record(success)
end
}
}
}
8.7 服务降级
当后端服务不可用时,返回降级响应。
-- /usr/local/openresty/lua/degrade.lua
local _M = {}
local cjson = require "cjson"
-- 降级策略配置
local degrade_rules = {
["/api/users"] = {
strategy = "cache", -- 使用缓存
cache_key = "degrade:users",
fallback = function()
return {
status = 200,
body = cjson.encode({
data = {},
message = "Service degraded, showing cached data",
}),
}
end,
},
["/api/recommendations"] = {
strategy = "default", -- 返回默认值
fallback = function()
return {
status = 200,
body = cjson.encode({
data = {{"default_product_1"}, {"default_product_2"}},
message = "Showing default recommendations",
}),
}
end,
},
["/api/search"] = {
strategy = "reject", -- 直接拒绝
fallback = function()
return {
status = 503,
body = cjson.encode({
error = "Service Unavailable",
message = "Search service is temporarily unavailable",
retry_after = 60,
}),
}
end,
},
}
-- 执行降级
function _M.execute(uri)
for pattern, rule in pairs(degrade_rules) do
if uri:match("^" .. pattern) then
local result = rule.fallback()
ngx.status = result.status
ngx.header["X-Degraded"] = "true"
ngx.header["Content-Type"] = "application/json"
ngx.say(result.body)
return true
end
end
return false
end
return _M
8.8 Lua 反向代理完整示例
-- /usr/local/openresty/lua/proxy_handler.lua
local cjson = require "cjson"
local http = require "resty.http"
local cb = require "circuit_breaker"
local degrade = require "degrade"
-- 后端服务配置
local services = {
["user-service"] = {
upstreams = {
{addr = "10.0.1.1:8080", weight = 5},
{addr = "10.0.1.2:8080", weight = 3},
},
},
["order-service"] = {
upstreams = {
{addr = "10.0.2.1:8080", weight = 5},
{addr = "10.0.2.2:8080", weight = 5},
},
},
}
-- 路由到服务
local route_map = {
["/api/users"] = "user-service",
["/api/orders"] = "order-service",
}
-- 选择后端
local function select_backend(service_name)
local service = services[service_name]
if not service then return nil end
-- 加权随机
local total = 0
for _, up in ipairs(service.upstreams) do
total = total + up.weight
end
local rand = math.random() * total
local cumulative = 0
for _, up in ipairs(service.upstreams) do
cumulative = cumulative + up.weight
if rand <= cumulative then
return up.addr
end
end
return service.upstreams[1].addr
end
-- 主处理函数
local function handle_request()
local uri = ngx.var.uri
local method = ngx.req.get_method()
-- 路由匹配
local service_name
for pattern, name in pairs(route_map) do
if uri:match("^" .. pattern) then
service_name = name
break
end
end
if not service_name then
ngx.status = 404
ngx.say(cjson.encode({error = "Service not found"}))
return
end
-- 熔断检查
local breaker = cb.new(service_name)
if not breaker:allow_request() then
-- 尝试降级
if degrade.execute(uri) then
return
end
ngx.status = 503
ngx.say(cjson.encode({error = "Service circuit breaker open"}))
return
end
-- 选择后端
local backend = select_backend(service_name)
if not backend then
ngx.status = 503
ngx.say(cjson.encode({error = "No available backend"}))
return
end
-- 读取请求体
ngx.req.read_body()
local body = ngx.req.get_body_data()
-- 转发请求
local httpc = http.new()
httpc:set_timeout(5000)
local upstream_url = "http://" .. backend .. uri
local res, err = httpc:request_uri(upstream_url, {
method = method,
body = body,
headers = {
["Content-Type"] = ngx.var.content_type,
["X-Request-ID"] = ngx.var.request_id or "",
["X-User-ID"] = ngx.var.user_id or "",
["X-Real-IP"] = ngx.var.remote_addr,
},
})
-- 记录熔断器状态
if res then
breaker:record(res.status < 500)
else
breaker:record(false)
end
if not res then
-- 尝试降级
if degrade.execute(uri) then
return
end
ngx.status = 502
ngx.say(cjson.encode({error = "Bad Gateway", message = err}))
return
end
-- 返回响应
ngx.status = res.status
for k, v in pairs(res.headers) do
if k ~= "transfer-encoding" then
ngx.header[k] = v
end
end
ngx.header["X-Upstream"] = backend
ngx.say(res.body)
end
-- 执行
local ok, err = pcall(handle_request)
if not ok then
ngx.log(ngx.ERR, "Proxy handler error: ", err)
ngx.status = 500
ngx.say(cjson.encode({error = "Internal Server Error"}))
end
8.9 注意事项
重试幂等性:只有幂等方法(GET、HEAD、PUT、DELETE)才应该重试。POST 请求重试可能导致重复操作。
连接池管理:使用
set_keepalive复用连接,避免频繁建立 TCP 连接。但注意连接池大小限制。
超时层级:
proxy_connect_timeout<proxy_read_timeout<proxy_next_upstream_timeout,确保层级合理。
熔断恢复:熔断器的
timeout不宜过短(避免频繁探测),也不宜过长(避免服务长时间不可用)。建议 30-60 秒。
上一章:← 第 07 章 - 认证与鉴权 下一章:第 09 章 - 缓存策略 →