强曰为道
与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

Node.js 开发指南 / 第 16 章 · WebSocket 实时通信

第 16 章 · WebSocket 实时通信

16.1 HTTP vs WebSocket

特性HTTPWebSocket
通信模式请求-响应(单向)全双工(双向)
连接短连接(每次新建)长连接(持久)
服务端推送不支持(需要轮询)原生支持
协议http:// / https://ws:// / wss://
开销每次请求带完整头部数据帧开销小
适用场景REST API、静态资源聊天、实时数据、游戏
HTTP 轮询:
客户端 ──请求──> 服务器
客户端 <──响应── 服务器
(等待 N 秒)
客户端 ──请求──> 服务器
客户端 <──响应── 服务器

WebSocket:
客户端 ──握手──> 服务器    (HTTP 升级)
客户端 <──确认── 服务器
客户端 <──> 服务器          (双向通信)
客户端 <──> 服务器

16.2 原生 WebSocket(ws 模块)

npm install ws

服务端

const { WebSocketServer } = require('ws');
const http = require('http');

const server = http.createServer();
const wss = new WebSocketServer({ server });

wss.on('connection', (ws, req) => {
  const ip = req.socket.remoteAddress;
  console.log(`客户端已连接: ${ip}`);

  // 发送欢迎消息
  ws.send(JSON.stringify({ type: 'welcome', message: '连接成功' }));

  // 接收消息
  ws.on('message', (data) => {
    const message = JSON.parse(data.toString());
    console.log('收到:', message);

    // 回复
    ws.send(JSON.stringify({ type: 'echo', data: message }));

    // 广播给所有客户端
    wss.clients.forEach((client) => {
      if (client.readyState === WebSocket.OPEN) {
        client.send(JSON.stringify({ type: 'broadcast', data: message }));
      }
    });
  });

  ws.on('close', () => console.log('客户端断开'));
  ws.on('error', (err) => console.error('WebSocket 错误:', err));
});

server.listen(8080, () => console.log('WebSocket 服务运行在 ws://localhost:8080'));

客户端

// 浏览器或 Node.js 客户端
const WebSocket = require('ws'); // Node.js 端
// const ws = new WebSocket('ws://localhost:8080'); // 浏览器端直接使用

ws.on('open', () => {
  console.log('已连接');
  ws.send(JSON.stringify({ type: 'greeting', text: '你好' }));
});

ws.on('message', (data) => {
  console.log('收到:', JSON.parse(data.toString()));
});

ws.on('close', () => console.log('已断开'));

16.3 Socket.io

npm install socket.io

服务端

const express = require('express');
const { createServer } = require('http');
const { Server } = require('socket.io');

const app = express();
const server = createServer(app);
const io = new Server(server, {
  cors: { origin: '*', methods: ['GET', 'POST'] },
  pingTimeout: 60000,
});

// 连接事件
io.on('connection', (socket) => {
  console.log(`用户连接: ${socket.id}`);

  // 基本消息
  socket.on('message', (data) => {
    console.log('收到:', data);
    socket.emit('message', { from: 'server', text: `收到: ${data.text}` });
  });

  // 加入房间
  socket.on('join-room', (roomId) => {
    socket.join(roomId);
    console.log(`${socket.id} 加入房间 ${roomId}`);
    
    // 通知房间内其他人
    socket.to(roomId).emit('user-joined', {
      userId: socket.id,
      message: '新用户加入',
    });
  });

  // 离开房间
  socket.on('leave-room', (roomId) => {
    socket.leave(roomId);
    socket.to(roomId).emit('user-left', { userId: socket.id });
  });

  // 房间内广播
  socket.on('room-message', ({ roomId, message }) => {
    io.to(roomId).emit('room-message', {
      from: socket.id,
      message,
      timestamp: Date.now(),
    });
  });

  // 私信
  socket.on('private-message', ({ to, message }) => {
    io.to(to).emit('private-message', {
      from: socket.id,
      message,
    });
  });

  // 断开连接
  socket.on('disconnect', (reason) => {
    console.log(`用户断开: ${socket.id}, 原因: ${reason}`);
  });
});

// 广播给所有客户端
function broadcastToAll(event, data) {
  io.emit(event, data);
}

// 获取房间人数
function getRoomSize(roomId) {
  return io.sockets.adapter.rooms.get(roomId)?.size || 0;
}

server.listen(3000, () => console.log('服务器运行在 http://localhost:3000'));

客户端

<!-- 浏览器端 -->
<script src="https://cdn.socket.io/4.7.5/socket.io.min.js"></script>
<script>
const socket = io('http://localhost:3000');

socket.on('connect', () => {
  console.log('已连接:', socket.id);
});

socket.on('message', (data) => {
  console.log('服务器消息:', data);
});

// 发送消息
socket.emit('message', { text: '你好服务器' });

// 加入房间
socket.emit('join-room', 'room-123');

// 房间消息
socket.emit('room-message', { roomId: 'room-123', message: '大家好' });

socket.on('room-message', (data) => {
  console.log(`${data.from}: ${data.message}`);
});

// 断线重连
socket.on('disconnect', () => console.log('已断开'));
socket.on('connect_error', (err) => console.error('连接错误:', err.message));
</script>

16.4 实战:聊天应用

// server.js
const express = require('express');
const { createServer } = require('http');
const { Server } = require('socket.io');

const app = express();
const server = createServer(app);
const io = new Server(server);

// 用户信息存储
const users = new Map();

io.use((socket, next) => {
  const token = socket.handshake.auth.token;
  if (!token) return next(new Error('未提供认证令牌'));
  try {
    const user = verifyToken(token);
    socket.user = user;
    next();
  } catch {
    next(new Error('令牌无效'));
  }
});

io.on('connection', (socket) => {
  users.set(socket.id, { ...socket.user, socketId: socket.id });
  
  // 在线用户列表
  io.emit('online-users', Array.from(users.values()).map(u => ({
    id: u.id, name: u.name
  })));

  // 聊天消息
  socket.on('chat-message', ({ room, content }) => {
    const message = {
      id: Date.now().toString(),
      content,
      sender: { id: socket.user.id, name: socket.user.name },
      room,
      timestamp: Date.now(),
    };
    
    if (room) {
      io.to(room).emit('chat-message', message);
    } else {
      io.emit('chat-message', message);
    }
  });

  // 输入状态
  socket.on('typing', ({ room }) => {
    socket.to(room || 'global').emit('typing', {
      userId: socket.user.id,
      name: socket.user.name,
    });
  });

  socket.on('disconnect', () => {
    users.delete(socket.id);
    io.emit('online-users', Array.from(users.values()));
  });
});

server.listen(3000);

16.5 Socket.io 中间件

// 认证中间件
io.use((socket, next) => {
  const token = socket.handshake.auth.token;
  if (!token) return next(new Error('认证失败'));
  try {
    socket.user = jwt.verify(token, JWT_SECRET);
    next();
  } catch {
    next(new Error('令牌无效'));
  }
});

// 速率限制中间件
io.use((socket, next) => {
  const messageCounts = new Map();
  socket.on('chat-message', () => {
    const count = (messageCounts.get(socket.id) || 0) + 1;
    messageCounts.set(socket.id, count);
    if (count > 30) {
      socket.emit('error', { message: '消息发送过于频繁' });
      return;
    }
    setTimeout(() => messageCounts.delete(socket.id), 60000);
  });
  next();
});

注意事项

⚠️ 心跳检测:Socket.io 内置心跳机制,但生产环境应配置合理的 pingTimeoutpingInterval

⚠️ 水平扩展:多实例部署时需要使用 @socket.io/redis-adapter 进行消息广播。

⚠️ 认证:WebSocket 连接建立后不会每次都验证令牌,需要定期检查会话有效性。

⚠️ 消息大小限制:设置 maxHttpBufferSize 防止大消息攻击。

业务场景

  1. 在线聊天:客服系统、社交聊天
  2. 实时协作:协同编辑文档、白板
  3. 实时数据看板:股票行情、监控面板
  4. 多人游戏:实时对战、棋类游戏
  5. 通知推送:订单状态变更、系统通知

扩展阅读


上一章第 15 章 · 认证与授权 下一章第 17 章 · 测试 — Jest、Mocha、Supertest 和代码覆盖率。