OpenResty 高性能网关开发教程 / 第 10 章 - 数据转换与协议处理
第 10 章 - 数据转换与协议处理
10.1 概述
API 网关经常需要在客户端和后端服务之间进行数据格式和协议的转换。
| 转换类型 | 说明 | 场景 |
|---|---|---|
| 请求改写 | 修改请求头、URI、Body | 统一 API 格式 |
| 响应改写 | 修改响应头、Body、状态码 | 统一响应格式 |
| 协议转换 | HTTP ↔ gRPC ↔ WebSocket | 新旧系统对接 |
| 数据格式 | JSON ↔ XML ↔ Form | 第三方 API 对接 |
10.2 请求改写
10.2.1 URI 改写
-- /usr/local/openresty/lua/transform/uri_rewrite.lua
local _M = {}
-- 路径前缀替换
function _M.strip_prefix(prefix)
local uri = ngx.var.uri
if uri:sub(1, #prefix) == prefix then
ngx.req.set_uri(uri:sub(#prefix + 1), true)
end
end
-- 路径添加前缀
function _M.add_prefix(prefix)
local uri = ngx.var.uri
ngx.req.set_uri(prefix .. uri, false)
end
-- URI 版本映射
function _M.version_map(version_map)
local uri = ngx.var.uri
for pattern, replacement in pairs(version_map) do
local new_uri = uri:gsub(pattern, replacement)
if new_uri ~= uri then
ngx.req.set_uri(new_uri, false)
return
end
end
end
-- 查询参数添加
function _M.add_query_params(params)
local args = ngx.req.get_uri_args()
for k, v in pairs(params) do
if not args[k] then
args[k] = v
end
end
ngx.req.set_uri_args(args)
end
return _M
10.2.2 请求头改写
-- /usr/local/openresty/lua/transform/header_transform.lua
local _M = {}
-- 添加标准网关头
function _M.add_gateway_headers()
ngx.req.set_header("X-Forwarded-For", ngx.var.remote_addr)
ngx.req.set_header("X-Forwarded-Proto", ngx.var.scheme)
ngx.req.set_header("X-Forwarded-Host", ngx.var.host)
ngx.req.set_header("X-Request-ID", ngx.var.request_id or ngx.var.request_time)
ngx.req.set_header("X-Gateway-Time", ngx.now())
end
-- 移除敏感头
function _M.remove_sensitive_headers()
ngx.req.clear_header("Cookie")
ngx.req.clear_header("X-Internal-Token")
end
-- 头映射(将自定义头转为标准头)
function _M.map_headers(mapping)
local headers = ngx.req.get_headers()
for source, target in pairs(mapping) do
local value = headers[source]
if value then
ngx.req.set_header(target, value)
ngx.req.clear_header(source)
end
end
end
return _M
10.2.3 请求体改写
-- /usr/local/openresty/lua/transform/body_transform.lua
local _M = {}
local cjson = require "cjson"
-- JSON 请求体转换
function _M.transform_json(transform_func)
ngx.req.read_body()
local body = ngx.req.get_body_data()
if not body or body == "" then
return nil, "empty body"
end
local data, err = cjson.decode(body)
if not data then
return nil, "invalid JSON: " .. err
end
-- 应用转换函数
local transformed = transform_func(data)
ngx.req.set_body_data(cjson.encode(transformed))
return transformed
end
-- 添加默认字段
function _M.add_defaults(defaults)
return function(data)
for k, v in pairs(defaults) do
if data[k] == nil then
data[k] = v
end
end
return data
end
end
-- 字段重命名
function _M.rename_fields(mapping)
return function(data)
for old_name, new_name in pairs(mapping) do
if data[old_name] ~= nil then
data[new_name] = data[old_name]
data[old_name] = nil
end
end
return data
end
end
-- 数据验证与过滤
function _M.filter_fields(allowed_fields)
return function(data)
local filtered = {}
for _, field in ipairs(allowed_fields) do
if data[field] ~= nil then
filtered[field] = data[field]
end
end
return filtered
end
end
return _M
10.3 响应改写
10.3.1 响应头改写
-- /usr/local/openresty/lua/transform/response_transform.lua
local _M = {}
-- 在 header_filter 阶段执行
function _M.transform_headers()
-- 移除后端服务器信息
ngx.header["Server"] = "Gateway"
ngx.header["X-Powered-By"] = nil
-- 添加安全头
ngx.header["X-Content-Type-Options"] = "nosniff"
ngx.header["X-Frame-Options"] = "DENY"
ngx.header["Strict-Transport-Security"] = "max-age=31536000"
-- 添加追踪头
ngx.header["X-Request-ID"] = ngx.var.request_id or ""
ngx.header["X-Response-Time"] = ngx.var.request_time
end
-- CORS 头设置
function _M.set_cors(options)
options = options or {}
local origin = options.allow_origin or "*"
local methods = options.allow_methods or "GET, POST, PUT, DELETE, OPTIONS"
local headers = options.allow_headers or "Content-Type, Authorization, X-Request-ID"
local max_age = options.max_age or 86400
ngx.header["Access-Control-Allow-Origin"] = origin
ngx.header["Access-Control-Allow-Methods"] = methods
ngx.header["Access-Control-Allow-Headers"] = headers
ngx.header["Access-Control-Max-Age"] = max_age
-- OPTIONS 请求直接返回
if ngx.req.get_method() == "OPTIONS" then
ngx.status = 204
return ngx.exit(204)
end
end
return _M
10.3.2 响应体改写
-- body_filter 阶段改写响应体
local function body_transform()
local chunk = ngx.arg[1]
local eof = ngx.arg[2]
if not eof and not chunk then
return
end
-- 统一响应格式包装
-- 注意:分块传输时,只能在最后一块做完整处理
-- 这里演示简单的文本替换
if chunk then
-- 移除敏感字段
chunk = chunk:gsub('"password"%s*:%s*"[^"]*"', '"password":"***"')
chunk = chunk:gsub('"token"%s*:%s*"[^"]*"', '"token":"***"')
ngx.arg[1] = chunk
end
end
-- 更完整的响应包装(需要缓冲整个响应)
local function wrap_response()
local chunks = {}
local eof = false
return function(chunk_arg, eof_arg)
if eof_arg then
eof = true
end
if chunk_arg then
table.insert(chunks, chunk_arg)
end
if eof then
local body = table.concat(chunks)
local cjson = require "cjson"
-- 尝试解析并包装 JSON
local ok, data = pcall(cjson.decode, body)
if ok then
local wrapped = {
code = ngx.status,
message = "success",
data = data,
timestamp = ngx.time(),
request_id = ngx.var.request_id or "",
}
ngx.arg[1] = cjson.encode(wrapped)
else
ngx.arg[1] = body
end
ngx.arg[2] = true
end
end
end
10.4 JSON 与 XML 转换
-- /usr/local/openresty/lua/transform/json_xml.lua
local _M = {}
-- JSON 转 XML
function _M.json_to_xml(json_str, root_name)
local cjson = require "cjson"
local data = cjson.decode(json_str)
root_name = root_name or "root"
local function to_xml(obj, name)
local parts = {}
if type(obj) == "table" then
if #obj > 0 then
-- 数组
for _, item in ipairs(obj) do
table.insert(parts, to_xml(item, name))
end
else
-- 对象
table.insert(parts, "<" .. name .. ">")
for k, v in pairs(obj) do
table.insert(parts, to_xml(v, k))
end
table.insert(parts, "</" .. name .. ">")
end
elseif type(obj) == "string" then
table.insert(parts, "<" .. name .. ">" ..
obj:gsub("&", "&"):gsub("<", "<"):gsub(">", ">") ..
"</" .. name .. ">")
else
table.insert(parts, "<" .. name .. ">" .. tostring(obj) .. "</" .. name .. ">")
end
return table.concat(parts)
end
return '<?xml version="1.0" encoding="UTF-8"?>\n' .. to_xml(data, root_name)
end
-- XML 转 JSON(简化版)
function _M.xml_to_json(xml_str)
-- 简单的 XML 解析(生产环境建议使用 lua-xml)
local result = {}
-- 提取标签和内容
for tag, content in xml_str:gmatch("<(%w+)>(.-)</%1>") do
if content:match("<") then
-- 嵌套标签,递归解析
result[tag] = _M.xml_to_json(content)
else
-- 叶子节点
local num = tonumber(content)
if num then
result[tag] = num
elseif content == "true" then
result[tag] = true
elseif content == "false" then
result[tag] = false
else
result[tag] = content
end
end
end
return result
end
return _M
10.5 gRPC-HTTP 转换
-- /usr/local/openresty/lua/transform/grpc_gateway.lua
local _M = {}
local cjson = require "cjson"
-- HTTP REST 转 gRPC 请求
function _M.http_to_grpc(path, method, body)
-- 解析 gRPC 服务和方法
local service, rpc_method = path:match("^/([^/]+)/([^/]+)$")
if not service or not rpc_method then
return nil, "Invalid gRPC path: " .. path
end
-- JSON 转 Protobuf(简化处理,实际需要 protobuf 序列化)
local grpc_request = {
service = service,
method = rpc_method,
message = body,
}
return grpc_request
end
-- gRPC 响应转 HTTP JSON
function _M.grpc_to_http(grpc_response)
local http_response = {
status = grpc_response.status or 200,
body = cjson.encode(grpc_response.message or {}),
}
return http_response
end
-- gRPC 状态码映射到 HTTP 状态码
local grpc_to_http_status = {
[0] = 200, -- OK
[1] = 499, -- CANCELLED
[2] = 500, -- UNKNOWN
[3] = 400, -- INVALID_ARGUMENT
[4] = 504, -- DEADLINE_EXCEEDED
[5] = 404, -- NOT_FOUND
[6] = 409, -- ALREADY_EXISTS
[7] = 403, -- PERMISSION_DENIED
[8] = 429, -- RESOURCE_EXHAUSTED
[9] = 400, -- FAILED_PRECONDITION
[10] = 409, -- ABORTED
[11] = 400, -- OUT_OF_RANGE
[12] = 501, -- UNIMPLEMENTED
[13] = 500, -- INTERNAL
[14] = 503, -- UNAVAILABLE
[15] = 500, -- DATA_LOSS
[16] = 401, -- UNAUTHENTICATED
}
function _M.map_grpc_status(grpc_status)
return grpc_to_http_status[grpc_status] or 500
end
return _M
10.6 请求聚合(API Composition)
将多个后端服务的响应聚合为一个响应。
-- /usr/local/openresty/lua/transform/aggregator.lua
local _M = {}
local http = require "resty.http"
local cjson = require "cjson"
-- 并发请求多个后端
function _M.aggregate(requests)
local results = {}
local threads = {}
-- 使用 ngx.thread 并发请求
for i, req in ipairs(requests) do
threads[i] = ngx.thread.spawn(function()
local httpc = http.new()
httpc:set_timeout(req.timeout or 5000)
local res, err = httpc:request_uri(req.url, {
method = req.method or "GET",
body = req.body,
headers = req.headers or {},
})
return {
name = req.name,
status = res and res.status or 0,
body = res and res.body or nil,
error = err,
}
end)
end
-- 收集结果
for i, thread in ipairs(threads) do
local ok, result = ngx.thread.wait(thread)
if ok then
results[requests[i].name] = result
else
results[requests[i].name] = {
status = 0,
error = "thread failed",
}
end
end
return results
end
-- 用户详情聚合示例
function _M.get_user_detail(user_id)
local results = _M.aggregate({
{
name = "profile",
url = "http://user-service:8081/users/" .. user_id,
},
{
name = "orders",
url = "http://order-service:8082/users/" .. user_id .. "/orders?limit=5",
},
{
name = "preferences",
url = "http://config-service:8083/users/" .. user_id .. "/preferences",
},
})
-- 聚合响应
local response = {
user_id = user_id,
profile = results.profile and cjson.decode(results.profile.body) or nil,
recent_orders = results.orders and cjson.decode(results.orders.body) or {},
preferences = results.preferences and cjson.decode(results.preferences.body) or {},
}
return response
end
return _M
nginx 配置:
location /api/users/(\d+)/detail {
content_by_lua_block {
local cjson = require "cjson"
local aggregator = require "transform.aggregator"
local user_id = ngx.var[1]
local result = aggregator.get_user_detail(user_id)
ngx.header["Content-Type"] = "application/json"
ngx.say(cjson.encode(result))
}
}
10.7 流式响应处理
-- 流式代理(不缓冲响应体)
local function stream_proxy(upstream_url)
local httpc = http.new()
httpc:set_timeout(60000) -- 长连接超时
local res, err = httpc:request_uri(upstream_url, {
method = ngx.req.get_method(),
headers = ngx.req.get_headers(),
})
if not res then
ngx.status = 502
return
end
ngx.status = res.status
-- 设置响应头
for k, v in pairs(res.headers) do
if k ~= "transfer-encoding" then
ngx.header[k] = v
end
end
-- 流式输出
local reader = res.body_reader
while true do
local chunk, err = reader(8192)
if not chunk then
break
end
ngx.print(chunk)
ngx.flush(true)
end
end
10.8 注意事项
body_filter 限制:修改响应体会阻止 Nginx 的
sendfile优化,大响应建议使用proxy_buffering off流式传输。
内存占用:缓冲整个请求/响应体会消耗大量内存,大文件上传/下载场景应使用流式处理。
编码问题:转换 JSON/XML 时注意字符编码,中文等多字节字符在字节截断时可能损坏。
上一章:← 第 09 章 - 缓存策略 下一章:第 11 章 - 日志与监控 →