强曰为道
与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

HTTP 协议详解教程 / 第 13 章:WebSocket 协议

第 13 章:WebSocket 协议

WebSocket 提供全双工通信能力,是实现实时应用(聊天、游戏、协同编辑)的首选协议。


13.1 WebSocket 概述

HTTP vs WebSocket

特性HTTPWebSocket
通信模式请求-响应全双工
连接方式短连接/长轮询持久连接
服务端推送不支持(需要 SSE)原生支持
数据格式文本文本 + 二进制
协议开销每次请求头部首次握手后极小

适用场景

场景说明
即时通讯聊天室、私信
实时协作多人编辑、白板
在线游戏实时状态同步
金融行情实时股价推送
通知系统实时通知推送
监控面板实时数据展示

13.2 WebSocket 握手

握手流程

WebSocket 使用 HTTP 升级机制建立连接:

# 客户端请求(HTTP Upgrade)
GET /chat HTTP/1.1
Host: example.com
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==
Sec-WebSocket-Version: 13
Origin: https://example.com

# 服务器响应
HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=

握手关键头部

头部说明
Upgrade: websocket请求升级到 WebSocket
Connection: Upgrade表示连接升级
Sec-WebSocket-Key客户端随机 Base64 key
Sec-WebSocket-Accept服务器确认(key + 魔法字符串的 SHA1)
Sec-WebSocket-Version协议版本(始终为 13)
Sec-WebSocket-Protocol子协议协商
Sec-WebSocket-Extensions扩展协商

Node.js 手动实现握手

const http = require('http');
const crypto = require('crypto');

const MAGIC_STRING = '258EAFA5-E914-47DA-95CA-C5AB0DC85B11';

const server = http.createServer();

server.on('upgrade', (req, socket, head) => {
    const key = req.headers['sec-websocket-key'];
    const acceptKey = crypto
        .createHash('sha1')
        .update(key + MAGIC_STRING)
        .digest('base64');
    
    socket.write(
        'HTTP/1.1 101 Switching Protocols\r\n' +
        'Upgrade: websocket\r\n' +
        'Connection: Upgrade\r\n' +
        `Sec-WebSocket-Accept: ${acceptKey}\r\n` +
        '\r\n'
    );
    
    // 握手完成,可以开始 WebSocket 通信
    console.log('WebSocket 连接已建立');
});

server.listen(3000);

13.3 WebSocket 帧格式

帧结构

 0               1               2               3
 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7
+-+-+-+-+-------+-+-------------+-------------------------------+
|F|R|R|R| opcode|M| Payload len |    Extended payload length    |
|I|S|S|S|  (4)  |A|     (7)     |             (16/64)           |
|N|V|V|V|       |S|             |   (if payload len==126/127)   |
| |1|2|3|       |K|             |                               |
+-+-+-+-+-------+-+-------------+-------------------------------+
|     Extended payload length continued, if payload len == 127  |
+-------------------------------+-------------------------------+
|                               |Masking-key, if MASK set to 1  |
+-------------------------------+-------------------------------+
| Masking-key (continued)       |          Payload Data         |
+-------------------------------+-------------------------------+

操作码(Opcode)

含义说明
0x0Continuation延续帧
0x1Text文本帧(UTF-8)
0x2Binary二进制帧
0x8Close关闭连接
0x9Ping心跳请求
0xAPong心跳响应

13.4 使用 ws 库

服务端

const WebSocket = require('ws');
const http = require('http');

const server = http.createServer();
const wss = new WebSocket.Server({ server });

wss.on('connection', (ws, req) => {
    console.log('新连接:', req.socket.remoteAddress);
    
    // 接收消息
    ws.on('message', (data, isBinary) => {
        if (isBinary) {
            console.log('收到二进制数据:', data.length, '字节');
        } else {
            const message = data.toString();
            console.log('收到消息:', message);
            
            // 广播给所有客户端
            wss.clients.forEach(client => {
                if (client.readyState === WebSocket.OPEN) {
                    client.send(message);
                }
            });
        }
    });
    
    // 发送欢迎消息
    ws.send(JSON.stringify({
        type: 'welcome',
        message: '连接成功',
        timestamp: Date.now()
    }));
    
    // 连接关闭
    ws.on('close', (code, reason) => {
        console.log('连接关闭:', code, reason.toString());
    });
    
    // 错误处理
    ws.on('error', (error) => {
        console.error('WebSocket 错误:', error);
    });
});

server.listen(3000, () => {
    console.log('WebSocket 服务器运行在 ws://localhost:3000');
});

客户端

// 浏览器端
const ws = new WebSocket('ws://localhost:3000');

// 连接打开
ws.onopen = () => {
    console.log('连接已建立');
    ws.send(JSON.stringify({ type: 'greeting', data: '你好' }));
};

// 接收消息
ws.onmessage = (event) => {
    const data = JSON.parse(event.data);
    console.log('收到:', data);
};

// 连接关闭
ws.onclose = (event) => {
    console.log('连接关闭:', event.code, event.reason);
};

// 错误处理
ws.onerror = (error) => {
    console.error('WebSocket 错误:', error);
};

// 发送消息
ws.send('Hello WebSocket');
ws.send(JSON.stringify({ type: 'message', content: '你好' }));
ws.send(arrayBuffer);  // 发送二进制数据

13.5 心跳保活

Ping/Pong 机制

// 服务端心跳
class HeartbeatManager {
    constructor(wss, interval = 30000) {
        this.wss = wss;
        this.interval = interval;
        this.start();
    }
    
    start() {
        this.timer = setInterval(() => {
            this.wss.clients.forEach(ws => {
                if (ws.isAlive === false) {
                    console.log('连接超时,断开');
                    return ws.terminate();
                }
                
                ws.isAlive = false;
                ws.ping();  // 发送 Ping
            });
        }, this.interval);
    }
    
    stop() {
        clearInterval(this.timer);
    }
}

// 使用
const wss = new WebSocket.Server({ server });

wss.on('connection', (ws) => {
    ws.isAlive = true;
    
    ws.on('pong', () => {
        ws.isAlive = true;  // 收到 Pong,连接正常
    });
});

const heartbeat = new HeartbeatManager(wss, 30000);

应用层心跳

// 应用层心跳(兼容性更好)
class AppHeartbeat {
    constructor(ws, interval = 25000) {
        this.ws = ws;
        this.interval = interval;
        this.timeout = null;
        this.start();
    }
    
    start() {
        // 定期发送 ping
        this.pingTimer = setInterval(() => {
            if (this.ws.readyState === WebSocket.OPEN) {
                this.ws.send(JSON.stringify({ type: 'ping', ts: Date.now() }));
            }
        }, this.interval);
        
        // 等待 pong 响应
        this.ws.on('message', (data) => {
            const msg = JSON.parse(data.toString());
            if (msg.type === 'pong') {
                clearTimeout(this.timeout);
            }
        });
    }
    
    stop() {
        clearInterval(this.pingTimer);
        clearTimeout(this.timeout);
    }
}

13.6 断线重连策略

指数退避重连

class ReconnectingWebSocket {
    constructor(url, options = {}) {
        this.url = url;
        this.maxRetries = options.maxRetries || 10;
        this.baseDelay = options.baseDelay || 1000;
        this.maxDelay = options.maxDelay || 30000;
        this.retryCount = 0;
        
        this.connect();
    }
    
    connect() {
        this.ws = new WebSocket(this.url);
        
        this.ws.onopen = () => {
            console.log('连接成功');
            this.retryCount = 0;  // 重置重试计数
            this.onopen?.();
        };
        
        this.ws.onmessage = (event) => {
            this.onmessage?.(event);
        };
        
        this.ws.onclose = (event) => {
            console.log('连接关闭:', event.code);
            this.onclose?.(event);
            
            if (!event.wasClean && this.retryCount < this.maxRetries) {
                this.scheduleReconnect();
            }
        };
        
        this.ws.onerror = (error) => {
            console.error('连接错误:', error);
            this.onerror?.(error);
        };
    }
    
    scheduleReconnect() {
        this.retryCount++;
        
        // 指数退避 + 随机抖动
        const delay = Math.min(
            this.baseDelay * Math.pow(2, this.retryCount - 1) + Math.random() * 1000,
            this.maxDelay
        );
        
        console.log(`将在 ${delay}ms 后重连 (第 ${this.retryCount} 次)`);
        
        setTimeout(() => this.connect(), delay);
    }
    
    send(data) {
        if (this.ws.readyState === WebSocket.OPEN) {
            this.ws.send(data);
        } else {
            console.warn('连接未就绪,消息未发送');
        }
    }
    
    close() {
        this.maxRetries = 0;  // 禁止自动重连
        this.ws.close();
    }
}

// 使用
const ws = new ReconnectingWebSocket('ws://localhost:3000');
ws.onmessage = (event) => console.log('收到:', event.data);
ws.send('Hello');

13.7 房间/频道管理

// 多房间聊天室
const WebSocket = require('ws');

class ChatServer {
    constructor(server) {
        this.wss = new WebSocket.Server({ server });
        this.rooms = new Map();  // room -> Set<ws>
        this.clients = new Map(); // ws -> { rooms: Set, username: string }
        
        this.wss.on('connection', (ws) => this.handleConnection(ws));
    }
    
    handleConnection(ws) {
        this.clients.set(ws, { rooms: new Set(), username: null });
        
        ws.on('message', (data) => {
            const msg = JSON.parse(data.toString());
            this.handleMessage(ws, msg);
        });
        
        ws.on('close', () => {
            const client = this.clients.get(ws);
            client.rooms.forEach(room => this.leaveRoom(ws, room));
            this.clients.delete(ws);
        });
    }
    
    handleMessage(ws, msg) {
        switch (msg.type) {
            case 'join':
                this.joinRoom(ws, msg.room);
                break;
            case 'leave':
                this.leaveRoom(ws, msg.room);
                break;
            case 'message':
                this.broadcastToRoom(msg.room, {
                    type: 'message',
                    from: this.clients.get(ws).username,
                    content: msg.content,
                    timestamp: Date.now()
                }, ws);
                break;
            case 'set_username':
                this.clients.get(ws).username = msg.username;
                break;
        }
    }
    
    joinRoom(ws, room) {
        if (!this.rooms.has(room)) {
            this.rooms.set(room, new Set());
        }
        this.rooms.get(room).add(ws);
        this.clients.get(ws).rooms.add(room);
        
        this.broadcastToRoom(room, {
            type: 'system',
            message: `${this.clients.get(ws).username} 加入了房间`
        });
    }
    
    leaveRoom(ws, room) {
        this.rooms.get(room)?.delete(ws);
        this.clients.get(ws).rooms.delete(room);
        
        this.broadcastToRoom(room, {
            type: 'system',
            message: `${this.clients.get(ws).username} 离开了房间`
        });
    }
    
    broadcastToRoom(room, message, excludeWs = null) {
        const clients = this.rooms.get(room);
        if (!clients) return;
        
        const data = JSON.stringify(message);
        clients.forEach(client => {
            if (client !== excludeWs && client.readyState === WebSocket.OPEN) {
                client.send(data);
            }
        });
    }
}

module.exports = ChatServer;

13.8 Nginx WebSocket 代理

# WebSocket 代理配置
location /ws/ {
    proxy_pass http://backend;
    proxy_http_version 1.1;
    proxy_set_header Upgrade $http_upgrade;
    proxy_set_header Connection "Upgrade";
    proxy_set_header Host $host;
    proxy_set_header X-Real-IP $remote_addr;
    
    # 超时设置
    proxy_read_timeout 3600s;
    proxy_send_timeout 3600s;
}

13.9 安全考虑

安全措施说明
WSS 协议生产环境必须使用 wss://(加密)
Origin 验证检查 Origin 头防止跨站 WebSocket 劫持
认证握手时验证 Token 或 Cookie
限流限制消息频率和连接数
输入验证验证所有消息格式和内容
// 认证中间件
wss.on('connection', (ws, req) => {
    // 验证 Origin
    const origin = req.headers.origin;
    if (!allowedOrigins.includes(origin)) {
        ws.close(1008, '不允许的来源');
        return;
    }
    
    // 验证 Token
    const token = new URL(req.url, 'http://localhost').searchParams.get('token');
    try {
        const payload = jwt.verify(token, SECRET);
        ws.user = payload;
    } catch (err) {
        ws.close(1008, '认证失败');
        return;
    }
    
    // 限流
    ws.messageCount = 0;
    ws.lastMessageTime = Date.now();
});

⚠️ 注意事项

  1. 使用 WSS:生产环境必须加密
  2. 心跳保活:防止连接被中间设备断开
  3. 指数退避重连:避免重连风暴
  4. 消息大小限制:限制消息最大大小防止内存溢出
  5. 连接数限制:限制每个用户的连接数

🔗 扩展阅读


下一章第 14 章:HTTP/2 — 多路复用、头部压缩、服务器推送、帧结构