使用 Socket.IO 生成实时代码流应用并将其托管在 Azure 上

生成类似于 Microsoft Word 中的共同创作功能的实时体验可能颇具挑战性。

凭借其易用的 API,Socket.IO 已证实它是一个可用于在客户端与服务器之间进行实时通信的库。 但是,Socket.IO 用户经常报告 Socket.IO 连接的缩放比较困难。 借助 Web PubSub for Socket.IO,开发人员不再需要担心如何管理持久连接。

概述

本文介绍如何生成一个应用,使代码编写人员能够将代码编写活动流式传输给受众。 可以通过以下方法生成此应用程序:

  • Monaco Editor:为 Visual Studio Code 助力的代码编辑器。
  • Express:一个 Node.js Web 框架。
  • Socket.IO 库为实时通信提供的 API。
  • 使用 Web PubSub for Socket.IO 的主机 Socket.IO 连接。

完成的应用

已完成的应用允许代码编辑器用户共享一个 Web 链接,受众可以通过该链接观看键入内容。

Screenshot of the finished code-streaming app.

为使过程在 15 分钟左右的时间内突出重点并易于理解,本文定义了两个用户角色以及他们可以在编辑器中执行的操作:

  • 作家,可以在联机编辑器中键入并流式传输内容
  • 观众,接收作家键入的实时内容,但无法编辑内容

体系结构

目的 好处
Socket.IO 库 在后端应用程序与客户端之间提供低延迟的双向数据交换机制 易用的 API,涵盖大多数实时通信方案
Web PubSub for Socket.IO 与 Socket.IO 客户端建立主机 WebSocket 或基于轮询的持久连接 支持 10 万个并发连接;简化的应用程序体系结构

Diagram that shows how the Web PubSub for Socket.IO service connects clients with a server.

先决条件

若要执行本文中的所有步骤,需要:

  • 一个 Azure 帐户。 如果没有 Azure 订阅,请在开始前创建 Azure 试用版订阅
  • 安装 Azure CLI(2.29.0 或更高版本)或 Azure Power Shell,用于管理 Azure 资源。

创建 Web PubSub for Socket.IO 资源

使用 Azure CLI 创建资源:

az webpubsub create -n <resource-name> \
                    -l <resource-location> \
                    -g <resource-group> \
                    --kind SocketIO \
                    --sku Free_F1

获取连接字符串

使用连接字符串可以连接 Web PubSub for Socket.IO。

运行以下命令。 请将返回的连接字符串保存在某个位置,因为稍后在本文中运行应用程序时需要用到。

az webpubsub key show -n <resource-name> \ 
                      -g <resource-group> \ 
                      --query primaryKey \
                      -o tsv

编写应用程序的服务器端代码

在服务器端开始编写应用程序的代码。

生成 HTTP 服务器

  1. 创建 Node.js 项目:

    mkdir codestream
    cd codestream
    npm init
    
  2. 安装服务器 SDK 和 Express:

    npm install @azure/web-pubsub-socket.io
    npm install express
    
  3. 导入所需的包并创建一个 HTTP 服务器来提供静态文件:

    /*server.js*/
    
    // Import required packages
    const express = require('express');
    const path = require('path');
    
    // Create an HTTP server based on Express
    const app = express();
    const server = require('http').createServer(app);
    
    app.use(express.static(path.join(__dirname, 'public')));
    
  4. 定义一个名为 /negotiate 的终结点。 作家客户端首先进入此终结点。 此终结点返回一条 HTTP 响应。 该响应包含客户端用来建立持久连接的终结点。 它还返回客户端所分配到的 room 值。

    /*server.js*/
    app.get('/negotiate', async (req, res) => {
        res.json({
            url: endpoint
            room_id: Math.random().toString(36).slice(2, 7),
        });
    });
    
    // Make the Socket.IO server listen on port 3000
    io.httpServer.listen(3000, () => {
        console.log('Visit http://localhost:%d', 3000);
    });
    

创建 Web PubSub for Socket.IO 服务器

  1. 导入 Web PubSub for Socket.IO SDK 并定义选项:

    /*server.js*/
    const { useAzureSocketIO } = require("@azure/web-pubsub-socket.io");
    
    const wpsOptions = {
        hub: "codestream",
        connectionString: process.argv[2]
    }
    
  2. 创建 Web PubSub for Socket.IO 服务器:

    /*server.js*/
    
    const io = require("socket.io")();
    useAzureSocketIO(io, wpsOptions);
    

这两个步骤与你在一般情况下创建 Socket.IO 服务器的方式略有不同,具体请参阅此 Socket.IO 文档。 通过这两个步骤,服务器端代码可以将持久连接的管理工作转移到 Azure 服务。 在 Azure 服务的帮助下,你的应用程序服务器仅充当一台轻型 HTTP 服务器。

实现业务逻辑

创建由 Web PubSub 托管的 Socket.IO 服务器后,可以使用 Socket.IO 的 API 定义客户端与服务器通信的方式。 此过程称为实现业务逻辑。

  1. 连接客户端后,应用程序服务器将通过发送名为 login 的自定义事件来向客户端告知它已登录。

    /*server.js*/
    io.on('connection', socket => {
        socket.emit("login");
    });
    
  2. 每个客户端发出服务器可以响应的两个事件:joinRoomsendToRoom。 服务器获取客户端想要加入的 room_id 值后,你可以使用 Socket.IO 的 API 中的 socket.join 将目标客户端加入指定的房间。

    /*server.js*/
    socket.on('joinRoom', async (message) => {
        const room_id = message["room_id"];
        await socket.join(room_id);
    });
    
  3. 客户端加入后,服务器将通过发送 message 事件向客户端告知成功结果。 当客户端收到类型为 ackJoinRoommessage 事件时,客户端可以请求服务器发送最新的编辑器状态。

    /*server.js*/
    socket.on('joinRoom', async (message) => {
        // ...
        socket.emit("message", {
            type: "ackJoinRoom", 
            success: true 
        })
    });
    
    /*client.js*/
    socket.on("message", (message) => {
        let data = message;
        if (data.type === 'ackJoinRoom' && data.success) {
            sendToRoom(socket, `${room_id}-control`, { data: 'sync'});
        }
        // ... 
    });
    
  4. 当客户端向服务器发送 sendToRoom 事件时,服务器会将代码编辑器的状态更改广播到指定的房间。 现在,房间中的所有客户端都可以接收最新更新。

    socket.on('sendToRoom', (message) => {
        const room_id = message["room_id"]
        const data = message["data"]
    
        socket.broadcast.to(room_id).emit("message", {
            type: "editorMessage",
            data: data
        });
    });
    

编写应用程序的客户端代码

完成服务器端过程后,便可以在客户端操作。

初始设置

需要创建一个 Socket.IO 客户端来与服务器通信。 问题在于,客户端应与哪台服务器建立持久连接。 由于使用的是 Web PubSub for Socket.IO,因此服务器是一个 Azure 服务。 回顾前文,你已定义一个 /negotiate 路由来为客户端提供 Web PubSub for Socket.IO 的终结点。

/*client.js*/

async function initialize(url) {
    let data = await fetch(url).json()

    updateStreamId(data.room_id);

    let editor = createEditor(...); // Create an editor component

    var socket = io(data.url, {
        path: "/clients/socketio/hubs/codestream",
    });

    return [socket, editor, data.room_id];
}

initialize(url) 函数将几个设置操作组织在一起:

  • 从 HTTP 服务器提取 Azure 服务的终结点
  • 创建 Monaco Editor 实例
  • 与 Web PubSub for Socket.IO 建立持久连接

作家客户端

如前所述,客户端有两个用户角色:作家和观众。 作家键入的任何内容都会传输到观众的屏幕。

  1. 获取 Web PubSub for Socket.IO 的终结点和 room_id 值:

    /*client.js*/
    
    let [socket, editor, room_id] = await initialize('/negotiate');
    
  2. 在作家客户端连接到服务器后,服务器会向作家发送 login 事件。 作家可以通过请求服务器将自己加入指定的房间来做出响应。 作家客户端每隔 200 毫秒向房间发送最新的编辑器状态。 名为 flush 的函数组织发送逻辑。

    /*client.js*/
    
    socket.on("login", () => {
        updateStatus('Connected');
        joinRoom(socket, `${room_id}`);
        setInterval(() => flush(), 200);
        // Update editor content
        // ...
    });
    
  3. 如果作家未进行任何编辑,则 flush() 不会执行任何操作,而只会返回。 否则,会将编辑器的状态更改发送到房间。

    /*client.js*/
    
    function flush() {
        // No changes from editor need to be flushed
        if (changes.length === 0) return;
    
        // Broadcast the changes made to editor content
        sendToRoom(socket, room_id, {
            type: 'delta',
            changes: changes
            version: version++,
        });
    
        changes = [];
        content = editor.getValue();
    }
    
  4. 连接新的观众客户端后,观众需要获取编辑器的最新完整状态。 为此,会将一条包含 sync 数据的消息发送到作家客户端。 该消息请求作家客户端发送完整的编辑器状态。

    /*client.js*/
    
    socket.on("message", (message) => {
        let data = message.data;
        if (data.data === 'sync') {
            // Broadcast the full content of the editor to the room
            sendToRoom(socket, room_id, {
                type: 'full',
                content: content
                version: version,
            });
        }
    });
    

观众客户端

  1. 与作家客户端一样,观众客户端也通过 initialize() 创建其 Socket.IO 客户端。 当观众客户端已连接并从服务器收到 login 事件时,它会请求服务器将自己加入指定的房间。 查询 room_id 指定房间。

    /*client.js*/
    
    let [socket, editor] = await initialize(`/register?room_id=${room_id}`)
    socket.on("login", () => {
        updateStatus('Connected');
        joinRoom(socket, `${room_id}`);
    });
    
  2. 当观众客户端从服务器收到 message 事件并且数据类型为 ackJoinRoom 时,观众客户端将请求房间中的作家客户端发送完整的编辑器状态。

    /*client.js*/
    
    socket.on("message", (message) => {
        let data = message;
        // Ensures the viewer client is connected
        if (data.type === 'ackJoinRoom' && data.success) { 
            sendToRoom(socket, `${id}`, { data: 'sync'});
        } 
        else //...
    });
    
  3. 如果数据类型为 editorMessage,则观众客户端会根据其实际内容更新编辑器。

    /*client.js*/
    
    socket.on("message", (message) => {
        ...
        else 
            if (data.type === 'editorMessage') {
            switch (data.data.type) {
                case 'delta':
                    // ... Let editor component update its status
                    break;
                case 'full':
                    // ... Let editor component update its status
                    break;
            }
        }
    });
    
  4. 使用 Socket.IO 的 API 实现 joinRoom()sendToRoom()

    /*client.js*/
    
    function joinRoom(socket, room_id) {
        socket.emit("joinRoom", {
            room_id: room_id,
        });
    }
    
    function sendToRoom(socket, room_id, data) {
        socket.emit("sendToRoom", {
            room_id: room_id,
            data: data
        });
    }
    

运行应用程序

查找存储库

前面的部分介绍了在观众与作家之间同步编辑器状态的相关核心逻辑。 可以在示例存储库中找到完整代码。

克隆存储库

可以克隆该存储库,并运行 npm install 以安装项目依赖项。

启动服务器

node server.js <web-pubsub-connection-string>

这是在前面的步骤中收到的连接字符串。

随意体验实时代码编辑器

在浏览器选项卡上打开 http://localhost:3000。打开另一个选项卡并访问第一个网页中显示的 URL。

如果你在第一个选项卡上编写代码,另一个选项卡应会实时反映你的键入内容。Web PubSub for Socket.IO 简化了云中的消息传递。 express 服务器仅提供静态 index.html 文件和 /negotiate 终结点。