Socket.IO

12003

Socket.IO 是一个基于 Node.js 的实时应用程序框架。在即时通讯、通知与消息推送,实时分析等场景中有较为广泛的应用。

WebSocket 的产生源于 Web 开发中日益增长的实时通信需求。对比传统的基于 http 的轮询方式,它大大节约了网络带宽,同时也降低了服务器的性能消耗。socket.io 支持 websocket 和 polling 两种数据传输方式,以兼容不支持 WebSocket 的浏览器。

框架提供了 egg-socket.io 插件,增加了以下开发规约:

  • namespace:通过配置的方式定义 namespace(命名空间)。
  • middleware:对每一次 socket 连接的建立/断开、每一次消息/数据传递进行预处理。
  • controller:响应 socket.io 的 event 事件。
  • router:统一了 socket.io 的 event 与框架路由的处理配置方式。

安装 egg-socket.io

安装

  1. $ npm i egg-socket.io --save

开启插件:

  1. // {app_root}/config/plugin.js
  2. exports.io = {
  3. enable: true,
  4. package: 'egg-socket.io',
  5. };

配置

  1. // {app_root}/config/config.${env}.js
  2. exports.io = {
  3. init: {}, // 传递给 engine.io
  4. namespace: {
  5. '/': {
  6. connectionMiddleware: [],
  7. packetMiddleware: [],
  8. },
  9. '/example': {
  10. connectionMiddleware: [],
  11. packetMiddleware: [],
  12. },
  13. },
  14. };

命名空间为 //example,而不是 example

uws

Egg Socket 内部默认使用 ws 引擎。uws 因为某些原因被废止了。

如果坚持要使用 uws,请按照以下配置:

  1. // {app_root}/config/config.${env}.js
  2. exports.io = {
  3. init: { wsEngine: 'uws' }, // 默认是 ws
  4. };

redis

egg-socket.io 内置了 socket.io-redis。在 cluster 模式下,使用 redis 可以简单地实现 clients/rooms 等信息共享。

  1. // {app_root}/config/config.${env}.js
  2. exports.io = {
  3. redis: {
  4. host: { redis server host },
  5. port: { redis server port },
  6. auth_pass: { redis server password },
  7. db: 0,
  8. },
  9. };

开启 redis 后,程序在启动时会尝试连接到 redis 服务器。此处的 redis 仅用于存储连接实例信息,详见 #server.adapter

注意:
如果项目中同时使用了 egg-redis,请分别配置,不可共用。

部署

由于框架是以 Cluster 方式启动的,而 socket.io 协议实现需要 sticky 特性支持,在多进程模式下才能正常工作。

由于 socket.io 的设计,多进程服务器必须在 sticky 模式下工作。因此,需要给 startCluster 传递 sticky 参数。

修改 package.json 中的 npm scripts 脚本:

  1. {
  2. "scripts": {
  3. "dev": "egg-bin dev --sticky",
  4. "start": "egg-scripts start --sticky"
  5. }
  6. }

Nginx 配置

  1. location / {
  2. proxy_set_header Upgrade $http_upgrade;
  3. proxy_set_header Connection "upgrade";
  4. proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
  5. proxy_set_header Host $host;
  6. proxy_pass http://127.0.0.1:7001;
  7. # http://nginx.org/en/docs/http/ngx_http_proxy_module.html#proxy_bind

proxy_bind $remote_addr transparent;

  1. ## 使用 `egg-socket.io`
  2. 开启 [egg-socket.io] 的项目目录结构如下:

chat
├── app
│ ├── extend
│ │ └── helper.js
│ ├── io
│ │ ├── controller
│ │ │ └── default.js
│ │ └── middleware
│ │ ├── connection.js
│ │ └── packet.js
│ └── router.js
├── config
└── package.json

  1. > 注意:对应的文件都在 `app/io` 目录下。
  2. ### Middleware
  3. 中间件有以下两种场景:
  4. - Connection
  5. - Packet
  6. 它们的配置位于各个命名空间下,根据上述两种场景分别起作用。
  7. **注意:**
  8. 如果启用了框架中间件,会在项目中发现以下目录:
  9. - `app/middleware`:框架中间件。
  10. - `app/io/middleware`:插件中间件。
  11. 两者的区别:
  12. - 框架中间件基于 HTTP 模型设计,处理 HTTP 请求。
  13. - 插件中间件基于 socket 模型设计,处理 `socket.io` 请求。
  14. 虽然框架通过插件尽量统一了它们的风格,但必须注意,它们的使用场景是不同的。详情请参见 issue [#1416](https://github.com/eggjs/egg/issues/1416)。
  15. #### Connection
  16. 每个客户端连接或退出时起作用。因此,通常在这一步进行授权认证,并对认证失败的客户端进行处理。
  17. ```js
  18. // {app_root}/app/io/middleware/connection.js
  19. module.exports = app => {
  20. return async (ctx, next) => {
  21. ctx.socket.emit('res', 'connected!');
  22. await next();
  23. // execute when disconnect.
  24. console.log('disconnection!');
  25. };
  26. };

踢出用户示例:

  1. const tick = (id, msg) => {
  2. logger.debug('#tick', id, msg);
  3. socket.emit(id, msg);
  4. app.io.of('/').adapter.remoteDisconnect(id, true, err => {
  5. logger.error(err);
  6. });
  7. };

针对当前连接的简单处理示例:

  1. // {app_root}/app/io/middleware/connection.js
  2. module.exports = app => {
  3. return async (ctx, next) => {
  4. if (true) {
  5. ctx.socket.disconnect();
  6. return;
  7. }
  8. await next();
  9. console.log('disconnection!');
  10. };
  11. };

Packet

每条数据包(消息)都会执行该中间件。在生产环境中,通常用于对消息做预处理,或者对加密消息进行解密等操作。

  1. // {app_root}/app/io/middleware/packet.js
  2. module.exports = app => {
  3. return async (ctx, next) => {
  4. ctx.socket.emit('res', 'packet received!');
  5. console.log('packet:', ctx.packet);
  6. await next();
  7. };
  8. };

Controller

Controller 对客户端发送的 event 进行处理;由于其继承自 egg.Controller,拥有以下成员对象:

  • ctx
  • app
  • service
  • config
  • logger

详情参考 Controller 文档。

  1. // {app_root}/app/io/controller/default.js
  2. 'use strict';
  3. const Controller = require('egg').Controller;
  4. class DefaultController extends Controller {
  5. async ping() {
  6. const { ctx } = this;
  7. const message = ctx.args[0];
  8. await ctx.socket.emit('res', `Hi! I've got your message: ${message}`);
  9. }
  10. }
  11. module.exports = DefaultController;

Router

路由负责将 socket 连接的不同 events 分发到对应的 controller,框架统一了其使用方式。

  1. // {app_root}/app/router.js
  2. module.exports = app => {
  3. const { router, controller, io } = app;
  4. // default
  5. router.get('/', controller.home.index);
  6. // socket.io
  7. io.of('/').route('server', io.controller.home.server);
  8. };

注意:

nsp 有如下的系统事件:

  • disconnecting:正在断开连接。
  • disconnect:连接已断开。
  • error:发生错误。

Namespace/Room

Namespace (nsp)

namespace 通常意味着分配到不同的接入点或者路径,如果客户端没有指定 nsp,则默认分配到 "/" 这个默认的命名空间。

在 socket.io 中我们通过 of 来划分命名空间;鉴于 nsp 通常是预定义且相对固定的存在,框架将其进行了封装,采用配置的方式来划分不同的命名空间。

  1. // socket.io
  2. const nsp = io.of('/my-namespace');
  3. nsp.on('connection', socket => {
  4. console.log('someone connected');
  5. });
  6. nsp.emit('hi', 'everyone!');
  7. // egg
  8. exports.io = {
  9. namespace: {
  10. '/': {
  11. connectionMiddleware: [],
  12. packetMiddleware: [],
  13. },
  14. },
  15. };

Room

room 存在于 nsp 中,通过 join/leave 方法来加入或者离开;框架中使用方法相同。

  1. const room = 'default_room';
  2. module.exports = app => {
  3. return async (ctx, next) => {
  4. ctx.socket.join(room);
  5. ctx.app.io
  6. .of('/')
  7. .to(room)
  8. .emit('online', { msg: 'welcome', id: ctx.socket.id });
  9. await next();
  10. console.log('disconnection!');
  11. };
  12. };

注意: 每一个 socket 连接都会拥有一个随机且不可预测的唯一 id Socket#id,并且会自动加入到以这个 id 命名的 room 中。

实例

这里我们使用 egg-socket.io 来做一个支持 P2P 聊天的小例子。

客户端

UI 相关的内容不重复编写,通过 window.socket 调用即可。

  1. // 浏览器
  2. const log = console.log;
  3. window.onload = function () {
  4. // 初始化
  5. const socket = io('/', {
  6. // 实际使用中可以在这里传递参数
  7. query: {
  8. room: 'demo',
  9. userId: `client_${Math.random()}`, // 传递了 room 和 userId 两个参数
  10. },
  11. transports: ['websocket'],
  12. });
  13. socket.on('connect', () => {
  14. const id = socket.id;
  15. log('#connect,', id, socket);
  16. // 监听自身 id,以实现 P2P 通讯
  17. socket.on(id, (msg) => {
  18. log('#receive,', msg);
  19. });
  20. });
  21. // 接收在线用户信息
  22. socket.on('online', (msg) => {
  23. log('#online,', msg);
  24. });
  25. // 系统事件
  26. socket.on('disconnect', (msg) => {
  27. log('#disconnect', msg);
  28. });
  29. socket.on('disconnecting', () => {
  30. log('#disconnecting');
  31. });
  32. socket.on('error', () => {
  33. log('#error');
  34. });
  35. window.socket = socket;
  36. };

微信小程序

微信小程序提供的 API 为 WebSocket ,因为 socket.ioWebSocket 的上层封装,所以我们无法直接使用小程序的 API 连接。可以使用类似 weapp.socket.io 的库适配。

示例代码如下:

  1. // 小程序端示例代码
  2. const io = require('./your_path/weapp.socket.io.js'); // 请替换成实际路径
  3. const socket = io('http://localhost:8000');
  4. socket.on('connect', function () {
  5. console.log('connected');
  6. });
  7. socket.on('news', (d) => {
  8. console.log('received news:', d);
  9. });
  10. socket.emit('news', {
  11. title: 'this is a news',
  12. });

server

以下是 demo 的部分代码,并解释了各个方法的作用。

config

  1. // {app_root}/config/config.${env}.js
  2. exports.io = {
  3. namespace: {
  4. '/': {
  5. connectionMiddleware: ['auth'],
  6. packetMiddleware: [] // 针对消息的处理暂时不实现
  7. }
  8. },
  9. // cluster 模式下,通过 redis 实现数据共享
  10. redis: {
  11. host: '127.0.0.1',
  12. port: 6379
  13. }
  14. };
  15. // 可选
  16. exports.redis = {
  17. client: {
  18. port: 6379,
  19. host: '127.0.0.1',
  20. password: '',
  21. db: 0
  22. }
  23. };

helper

框架扩展用于封装数据格式。

  1. // {app_root}/app/extend/helper.js
  2. module.exports = {
  3. parseMsg(action, payload = {}, metadata = {}) {
  4. const meta = Object.assign({}, { timestamp: Date.now() }, metadata);
  5. return {
  6. meta,
  7. data: {
  8. action,
  9. payload
  10. }
  11. };
  12. }
  13. };

Format:

  1. {
  2. data: {
  3. action: 'exchange', // 'deny' || 'exchange' || 'broadcast'
  4. payload: {}
  5. },
  6. meta: {
  7. timestamp: 1512116201597,
  8. client: 'nNx88r1c5WuHf9XuAAAB',
  9. target: 'nNx88r1c5WuHf9XuAAAB'
  10. }
  11. }

中间件

egg-socket.io 中间件负责处理 socket 连接。

  1. // {app_root}/app/io/middleware/auth.js
  2. const PREFIX = 'room';
  3. module.exports = () => {
  4. return async (ctx, next) => {
  5. const { app, socket, logger, helper } = ctx;
  6. const id = socket.id;
  7. const nsp = app.io.of('/');
  8. const query = socket.handshake.query;
  9. // 用户信息
  10. const { room, userId } = query;
  11. const rooms = [room];
  12. logger.debug('#user_info', id, room, userId);
  13. const tick = (id, msg) => {
  14. logger.debug('#tick', id, msg);
  15. // 踢出用户前发送信息
  16. socket.emit(id, helper.parseMsg('deny', msg));
  17. // 调用 adapter 方法踢出用户,客户端会触发 disconnect 事件
  18. nsp.adapter.remoteDisconnect(id, true, (err) => {
  19. logger.error(err);
  20. });
  21. };
  22. // 检查房间是否存在,不存在则踢出用户
  23. // 注:此处 app.redis 与插件无关,可用其他存储替代
  24. const hasRoom = await app.redis.get(`${PREFIX}:${room}`);
  25. logger.debug('#has_exist', hasRoom);
  26. // 若房间不存在
  27. if (!hasRoom) {
  28. tick(id, {
  29. type: 'deleted',
  30. message: 'deleted, room has been deleted.'
  31. });
  32. return;
  33. }
  34. // 用户加入房间
  35. logger.debug('#join', room);
  36. socket.join(room);
  37. // 获取在线列表
  38. nsp.adapter.clients(rooms, (err, clients) => {
  39. logger.debug('#online_join', clients);
  40. // 更新在线用户列表
  41. nsp.to(room).emit('online', {
  42. clients,
  43. action: 'join',
  44. target: 'participator',
  45. message: `User(${id}) joined.`
  46. });
  47. });
  48. await next();
  49. // 用户离开房间
  50. logger.debug('#leave', room);
  51. // 获取在线列表
  52. nsp.adapter.clients(rooms, (err, clients) => {
  53. logger.debug('#online_leave', clients);
  54. // 更新在线用户列表
  55. nsp.to(room).emit('online', {
  56. clients,
  57. action: 'leave',
  58. target: 'participator',
  59. message: `User(${id}) leaved.`
  60. });
  61. });
  62. };
  63. };

控制器

P2P 通信,通过 exchange 方法实现数据交换。

  1. // {app_root}/app/io/controller/nsp.js
  2. const Controller = require('egg').Controller;
  3. class NspController extends Controller {
  4. async exchange() {
  5. const { ctx, app } = this;
  6. const nsp = app.io.of('/');
  7. const message = ctx.args[0] || {};
  8. const socket = ctx.socket;
  9. const client = socket.id;
  10. try {
  11. const { target, payload } = message;
  12. if (!target) return;
  13. const msg = ctx.helper.parseMsg('exchange', payload, { client, target });
  14. nsp.emit(target, msg);
  15. } catch (error) {
  16. app.logger.error(error);
  17. }
  18. }
  19. }
  20. module.exports = NspController;

router

  1. // {app_root}/app/router.js
  2. module.exports = (app) => {
  3. const { router, controller, io } = app;
  4. router.get('/', controller.home.index);
  5. // socket.io
  6. io.of('/').route('exchange', io.controller.nsp.exchange);
  7. };

打开两个 tab 页面,并调出控制台:

  1. socket.emit('exchange', {
  2. target: 'Dkn3UXSu8_jHvKBmAAHW',
  3. payload: {
  4. msg: 'test',
  5. },
  6. });

参考链接