HTTP 协议详解教程 / 第 12 章:分块传输与流式响应
第 12 章:分块传输与流式响应
分块传输(Chunked Transfer)允许在不知道响应总大小的情况下逐步发送数据,是实现流式响应、实时推送的基础。
12.1 分块传输概述
什么时候需要分块传输
| 场景 | 说明 |
|---|---|
| 动态生成内容 | 服务端边生成边发送 |
| 大文件传输 | 避免一次性加载到内存 |
| 实时数据流 | SSE、日志流 |
| 代理转发 | 边接收边转发 |
Content-Length vs Chunked
# 方式 1:已知长度
HTTP/1.1 200 OK
Content-Length: 1234
Content-Type: application/json
{"data": "..."}
# 方式 2:分块传输(长度未知)
HTTP/1.1 200 OK
Transfer-Encoding: chunked
Content-Type: application/json
5\r\n
Hello\r\n
6\r\n
World\r\n
0\r\n
\r\n
12.2 Chunked Transfer 编码
格式
每个数据块:
<块大小的十六进制>\r\n
<块数据>\r\n
结束块:
0\r\n
\r\n
实际示例
HTTP/1.1 200 OK
Transfer-Encoding: chunked
Content-Type: text/plain
7\r\n
Mozilla\r\n
9\r\n
Developer\r\n
7\r\n
Network\r\n
0\r\n
\r\n
解码后:MozillaDeveloperNetwork
Node.js 分块传输
const http = require('http');
const server = http.createServer((req, res) => {
if (req.url === '/stream') {
res.writeHead(200, {
'Content-Type': 'text/plain',
'Transfer-Encoding': 'chunked'
});
// 逐块发送
let count = 0;
const interval = setInterval(() => {
if (count >= 5) {
res.end(); // 发送结束块
clearInterval(interval);
return;
}
res.write(`数据块 ${count + 1}\n`);
count++;
}, 1000);
}
});
server.listen(3000);
# 测试分块传输
curl -N http://localhost:3000/stream
# 输出(逐步显示):
# 数据块 1
# 数据块 2
# 数据块 3
# 数据块 4
# 数据块 5
12.3 Trailer 头部
Trailer 允许在响应体结束后发送额外的头部字段。
HTTP/1.1 200 OK
Transfer-Encoding: chunked
Trailer: X-Checksum
5\r\n
Hello\r\n
6\r\n
World\r\n
0\r\n
X-Checksum: abc123\r\n
\r\n
Node.js Trailer 示例
const http = require('http');
const crypto = require('crypto');
const server = http.createServer((req, res) => {
res.writeHead(200, {
'Content-Type': 'text/plain',
'Transfer-Encoding': 'chunked',
'Trailer': 'X-Content-Checksum'
});
const hash = crypto.createHash('md5');
const chunks = ['Hello', ' ', 'World'];
chunks.forEach(chunk => {
hash.update(chunk);
res.write(chunk);
});
// 在 end 中设置 Trailer
res.addTrailers({
'X-Content-Checksum': hash.digest('hex')
});
res.end();
});
server.listen(3000);
12.4 Server-Sent Events (SSE)
SSE 是 HTML5 提供的服务器向客户端推送技术。
特点
| 特性 | 说明 |
|---|---|
| 方向 | 服务器 → 客户端(单向) |
| 协议 | 基于 HTTP |
| 格式 | 纯文本,text/event-stream |
| 自动重连 | 浏览器自动重连 |
| 事件 ID | 支持断点续传 |
事件格式
event: message\n
id: 1\n
retry: 5000\n
data: {"user": "alice", "action": "login"}\n
\n
| 字段 | 说明 |
|---|---|
event | 事件类型(默认 message) |
id | 事件 ID(用于断点续传) |
retry | 重连间隔(毫秒) |
data | 事件数据(多行用多个 data:) |
服务端实现
const express = require('express');
const app = express();
app.get('/events', (req, res) => {
res.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
'X-Accel-Buffering': 'no' // 禁用 Nginx 缓冲
});
let eventId = 0;
// 发送事件
const sendEvent = (data, eventType = 'message') => {
eventId++;
if (eventType !== 'message') {
res.write(`event: ${eventType}\n`);
}
res.write(`id: ${eventId}\n`);
res.write(`data: ${JSON.stringify(data)}\n\n`);
};
// 初始连接
sendEvent({ message: '连接成功' });
// 定时推送
const interval = setInterval(() => {
sendEvent({
time: new Date().toISOString(),
value: Math.random() * 100
}, 'update');
}, 3000);
// 心跳(防止连接断开)
const heartbeat = setInterval(() => {
res.write(': heartbeat\n\n');
}, 15000);
// 客户端断开连接
req.on('close', () => {
clearInterval(interval);
clearInterval(heartbeat);
console.log('客户端断开连接');
});
});
app.listen(3000);
客户端实现
<!DOCTYPE html>
<html>
<body>
<div id="output"></div>
<script>
const output = document.getElementById('output');
const eventSource = new EventSource('/events');
// 默认消息
eventSource.onmessage = (event) => {
const data = JSON.parse(event.data);
output.innerHTML += `<p>消息: ${data.message}</p>`;
};
// 自定义事件
eventSource.addEventListener('update', (event) => {
const data = JSON.parse(event.data);
output.innerHTML += `<p>更新: ${data.time} - ${data.value.toFixed(2)}</p>`;
});
// 连接打开
eventSource.onopen = () => {
console.log('连接已建立');
};
// 错误处理
eventSource.onerror = (error) => {
console.error('SSE 错误:', error);
// 浏览器会自动重连
};
// 使用 Last-Event-ID 断点续传
// 浏览器自动在重连时发送 Last-Event-ID 头
// 关闭连接
// eventSource.close();
</script>
</body>
</html>
Python 服务端示例
from flask import Flask, Response
import json
import time
app = Flask(__name__)
@app.route('/events')
def events():
def generate():
event_id = 0
while True:
event_id += 1
data = {
'time': time.strftime('%Y-%m-%d %H:%M:%S'),
'value': event_id * 10
}
yield f"id: {event_id}\n"
yield f"event: update\n"
yield f"data: {json.dumps(data)}\n\n"
time.sleep(3)
return Response(
generate(),
mimetype='text/event-stream',
headers={
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
'X-Accel-Buffering': 'no'
}
)
12.5 SSE vs WebSocket
| 特性 | SSE | WebSocket |
|---|---|---|
| 方向 | 服务器 → 客户端 | 双向 |
| 协议 | HTTP | 独立协议 (ws://) |
| 自动重连 | 内置 | 需手动实现 |
| 数据格式 | 文本 | 文本 + 二进制 |
| 代理兼容 | 好 | 需特殊配置 |
| 适用场景 | 通知、数据推送 | 聊天、游戏 |
12.6 业务场景:实时股票行情
// 股票行情 SSE 推送
const express = require('express');
const app = express();
// 订阅管理
const subscribers = new Map();
app.get('/stocks/stream', (req, res) => {
const symbols = req.query.symbols?.split(',') || ['AAPL'];
res.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive'
});
// 初始化订阅
symbols.forEach(symbol => {
res.write(`event: subscribe\n`);
res.write(`data: {"symbol": "${symbol}", "status": "subscribed"}\n\n`);
});
// 模拟行情更新
const interval = setInterval(() => {
symbols.forEach(symbol => {
const price = (Math.random() * 100 + 100).toFixed(2);
const change = (Math.random() * 10 - 5).toFixed(2);
res.write(`event: quote\n`);
res.write(`data: ${JSON.stringify({
symbol,
price: parseFloat(price),
change: parseFloat(change),
time: new Date().toISOString()
})}\n\n`);
});
}, 2000);
req.on('close', () => {
clearInterval(interval);
});
});
app.listen(3000);
<!-- 客户端 -->
<script>
const eventSource = new EventSource('/stocks/stream?symbols=AAPL,GOOGL,MSFT');
eventSource.addEventListener('quote', (event) => {
const quote = JSON.parse(event.data);
const el = document.getElementById(`stock-${quote.symbol}`);
if (el) {
el.querySelector('.price').textContent = quote.price;
el.querySelector('.change').textContent = quote.change;
el.querySelector('.change').className =
`change ${quote.change >= 0 ? 'up' : 'down'}`;
}
});
</script>
12.7 流式 JSON 响应
// 流式返回大量数据
app.get('/api/large-dataset', (req, res) => {
res.writeHead(200, {
'Content-Type': 'application/json',
'Transfer-Encoding': 'chunked'
});
res.write('[');
let first = true;
const stream = db.queryStream('SELECT * FROM large_table');
stream.on('data', (row) => {
if (!first) res.write(',');
res.write(JSON.stringify(row));
first = false;
});
stream.on('end', () => {
res.end(']');
});
stream.on('error', (err) => {
res.end();
});
});
⚠️ 注意事项
- SSE 不能发送二进制数据:需要先 Base64 编码
- 代理缓冲:确保代理(Nginx)禁用缓冲
X-Accel-Buffering: no - 连接数限制:浏览器对同一域名的 SSE 连接数有限制(通常 6 个)
- 心跳保活:定期发送注释行
:heartbeat\n\n防止连接超时 - 错误处理:客户端需要处理
onerror事件,浏览器默认 3 秒后重连
🔗 扩展阅读
下一章:第 13 章:WebSocket 协议 — 握手流程、帧格式、心跳保活、断线重连