教程:使用子协议在 WebSocket 客户端之间发布和订阅消息

构建聊天应用教程中,你已了解如何使用 WebSocket API 通过 Azure Web PubSub 发送和接收数据。 客户端与服务进行通信时不需要协议。 例如,可以使用 WebSocket.send() 发送任何类型的数据,服务器将按原样接收它。 WebSocket API 流程易于使用,但功能有限。 例如,无法在向服务器发送事件时指定事件名称,也不能将消息发布到其他客户端,而只能将其发送到你的服务器。 本教程介绍如何使用子协议来扩展客户端的功能。

本教程介绍如何执行下列操作:

  • 创建 Web PubSub 服务实例
  • 生成完整 URL 以建立 WebSocket 连接
  • 使用子协议在 WebSocket 客户端之间发布消息

如果没有 Azure 试用版订阅,请在开始前创建一个试用版订阅

先决条件

  • 如需在本地运行 CLI 参考命令,请安装 Azure CLI。 如果在 Windows 或 macOS 上运行,请考虑在 Docker 容器中运行 Azure CLI。 有关详细信息,请参阅如何在 Docker 容器中运行 Azure CLI

    • 如果使用的是本地安装,请使用 az login 命令登录到 Azure CLI。 若要完成身份验证过程,请遵循终端中显示的步骤。 有关其他登录选项,请参阅使用 Azure CLI 登录

    • 出现提示时,请在首次使用时安装 Azure CLI 扩展。 有关扩展详细信息,请参阅使用 Azure CLI 的扩展

    • 运行 az version 以查找安装的版本和依赖库。 若要升级到最新版本,请运行 az upgrade

  • 本设置需要 Azure CLI 版本 2.22.0 或更高版本。

创建 Azure Web PubSub 实例

创建资源组

资源组是在其中部署和管理 Azure 资源的逻辑容器。 使用 az group create 命令在 chinaeast 位置创建名为 myResourceGroup 的资源组。

az group create --name myResourceGroup --location ChinaEast

创建 Web PubSub 实例

运行 az extension add,安装 webpubsub 扩展或将其升级到当前版本。

az extension add --upgrade --name webpubsub

使用 Azure CLI az webpubsub create 命令在已创建的资源组中创建 Web PubSub。 以下命令在 ChinaEast 的资源组 myResourceGroup 下创建一个免费的 Web PubSub 资源:

重要

每个 Web PubSub 资源必须具有唯一名称。 在以下示例中,将 <your-unique-resource-name> 替换为 Web PubSub 的名称。

az webpubsub create --name "<your-unique-resource-name>" --resource-group "myResourceGroup" --location "ChinaEast" --sku Free_F1

此命令的输出会显示新建的资源的属性。 请记下下面列出的两个属性:

  • 资源名称:为上面的 --name 参数提供的名称。
  • 主机名:在本例中,主机名为 <your-unique-resource-name>.webpubsub.azure.cn/

目前,只有你的 Azure 帐户才有权对这个新资源执行任何操作。

获取 ConnectionString 以供将来使用

重要

连接字符串包括应用程序访问 Azure Web PubSub 服务所需的授权信息。 连接字符串中的访问密钥类似于服务的根密码。 在生产环境中,请始终小心保护访问密钥。 使用 Azure 密钥保管库安全地管理和轮换密钥。 避免将访问密钥分发给其他用户、对其进行硬编码或将其以纯文本形式保存在其他人可以访问的任何位置。 如果你认为访问密钥可能已泄露,请轮换密钥。

使用 Azure CLI az webpubsub key 命令获取服务的 ConnectionString。 将 <your-unique-resource-name> 占位符替换为 Azure Web PubSub 实例的名称。

az webpubsub key show --resource-group myResourceGroup --name <your-unique-resource-name> --query primaryConnectionString --output tsv

复制主连接字符串以供稍后使用。

复制已提取的 ConnectionString,本教程稍后会将其用作 <connection_string> 的值。

设置项目

先决条件

使用子协议

客户端可以使用特定的子协议启动 WebSocket 连接。 Azure Web PubSub 服务支持名为 json.webpubsub.azure.v1 的子协议,使客户端能够通过 Web PubSub 服务直接进行发布/订阅,而不是往返于上游服务器。 查看 Azure Web PubSub 支持的 JSON WebSocket 子协议,了解该子协议的详细信息。

如果使用其他协议名称,这些名称将被服务忽略,并在连接事件处理程序中传递到服务器,因此你可以构建自己的协议。

使用 json.webpubsub.azure.v1 子协议创建 Web 应用程序。

  1. 安装依赖项

    mkdir logstream
    cd logstream
    dotnet new web
    dotnet add package Microsoft.Extensions.Azure
    dotnet add package Azure.Messaging.WebPubSub
    
  2. 创建服务器端来托管 /negotiate API 和 Web 页面。

    使用以下代码更新 Program.cs

    • 使用 AddAzureClients 添加服务客户端,并从配置中读取连接字符串。
    • app.Run(); 前面添加 app.UseStaticFiles();,以支持静态文件。
    • 并使用 /negotiate 请求更新 app.MapGet,以生成客户端访问令牌。
    using Azure.Messaging.WebPubSub;
    using Microsoft.Extensions.Azure;
    
    var builder = WebApplication.CreateBuilder(args);
    builder.Services.AddAzureClients(s =>
    {
        s.AddWebPubSubServiceClient(builder.Configuration["Azure:WebPubSub:ConnectionString"], "stream");
    });
    
    var app = builder.Build();
    app.UseStaticFiles();
    app.MapGet("/negotiate", async context =>
    {
        var service = context.RequestServices.GetRequiredService<WebPubSubServiceClient>();
        var response = new
        {
            url = service.GetClientAccessUri(roles: new string[] { "webpubsub.sendToGroup.stream", "webpubsub.joinLeaveGroup.stream" }).AbsoluteUri
        };
        await context.Response.WriteAsJsonAsync(response);
    });
    
    app.Run();
    
  3. 创建网页

    使用以下内容创建一个 HTML 页面,并将其另存为 wwwroot/index.html

    <html>
      <body>
        <div id="output"></div>
        <script>
          (async function () {
            let res = await fetch('/negotiate')
            let data = await res.json();
            let ws = new WebSocket(data.url, 'json.webpubsub.azure.v1');
            ws.onopen = () => {
              console.log('connected');
            };
    
            let output = document.querySelector('#output');
            ws.onmessage = event => {
              let d = document.createElement('p');
              d.innerText = event.data;
              output.appendChild(d);
            };
          })();
        </script>
      </body>
    </html>                                                                
    

    上面的代码会连接到服务,并将收到的任何消息打印到页面上。 主要的变化是在创建 WebSocket 连接时指定了子协议。

  4. 运行服务器

    使用适用于 .NET Core 的机密管理器工具来设置连接字符串。 运行以下命令,将 <connection_string> 替换为在上一步中提取的连接字符串,并在浏览器中打开 http://localhost:5000/index.html :

    dotnet user-secrets init
    dotnet user-secrets set Azure:WebPubSub:ConnectionString "<connection-string>"
    dotnet run
    

    如果你使用的是 Chrome,可以按 F12,或右键单击,选择 ->“检查”->“开发人员工具”,并选择“网络”选项卡。加载网页,可以看到已建立 WebSocket 连接。 选择以检查 WebSocket 连接,可以看到客户端中收到以下 connected 事件消息。 可以发现,你能够获得为该客户端生成的 connectionId

    {"type":"system","event":"connected","userId":null,"connectionId":"<the_connection_id>"}
    

可以发现,借助子协议,可在连接为 connected 时获取连接的某些元数据。

客户端现在接收到 JSON 消息,而不是纯文本。 JSON 消息包含更多信息,例如消息的类型和来源。 因此,你可以使用此信息对消息进行更多的处理(例如,如果消息来自不同的来源,则以不同的样式显示消息),可以在后面的部分中了解这些信息。

从客户端发布消息

构建聊天应用教程中,当客户端通过 WebSocket 连接将消息发送到 Web PubSub 服务时,该服务将在服务器端触发用户事件。 使用子协议,客户端通过发送 JSON 消息获得更多的功能。 例如,可以通过 Web PubSub 服务将消息从一个客户端直接发布到其他客户端。

如果要将大量数据实时流式传输到其他客户端,这很有用。 使用此功能来构建日志流式处理应用程序,该应用程序可实时将控制台日志流式传输到浏览器。

  1. 创建流式处理程序

    创建 stream 程序:

    mkdir stream
    cd stream
    dotnet new console
    

    使用以下内容更新 Program.cs

    using System;
    using System.Net.Http;
    using System.Net.WebSockets;
    using System.Text;
    using System.Text.Json;
    using System.Threading.Tasks;
    
    namespace stream
    {
        class Program
        {
            private static readonly HttpClient http = new HttpClient();
            static async Task Main(string[] args)
            {
                // Get client url from remote
                var stream = await http.GetStreamAsync("http://localhost:5000/negotiate");
                var url = (await JsonSerializer.DeserializeAsync<ClientToken>(stream)).url;
                var client = new ClientWebSocket();
                client.Options.AddSubProtocol("json.webpubsub.azure.v1");
    
                await client.ConnectAsync(new Uri(url), default);
    
                Console.WriteLine("Connected.");
                var streaming = Console.ReadLine();
                while (streaming != null)
                {
                    if (!string.IsNullOrEmpty(streaming))
                    {
                        var message = JsonSerializer.Serialize(new
                        {
                            type = "sendToGroup",
                            group = "stream",
                            data = streaming + Environment.NewLine,
                        });
                        Console.WriteLine("Sending " + message);
                        await client.SendAsync(Encoding.UTF8.GetBytes(message), WebSocketMessageType.Text, true, default);
                    }
    
                    streaming = Console.ReadLine();
                }
    
                await client.CloseAsync(WebSocketCloseStatus.NormalClosure, null, default);
            }
    
            private sealed class ClientToken
            {
                public string url { get; set; }
            }
        }
    }
    
    

    可以看到此处有一个新的概念“组”。 组是中心中的逻辑概念,你可以在中心内将消息发布到一组连接。 在中心中,可以有多个组,一个客户端可以同时订阅多个组。 使用子协议时,只能发布到组,而不是广播到整个中心。 有关术语的详细信息,请查看基本概念

  2. 由于我们在此处使用组,因此在 ws.onopen 回调内建立 WebSocket 连接时,还需要更新 index.html 网页以加入组。

    let ackId = 0;
    ws.onopen = () => {
      console.log('connected');
      ws.send(JSON.stringify({
        type: 'joinGroup',
        group: 'stream',
        ackId: ++ackId
      }));
    };
    

    可以发现,客户端通过发送 joinGroup 类型的消息来加入该组。

  3. 此外,稍微更新 ws.onmessage 回调逻辑,以分析 JSON 响应并只打印来自 stream 组中的消息,使其充当实时流打印机。

    ws.onmessage = event => {
      let message = JSON.parse(event.data);
      if (message.type === 'message' && message.group === 'stream') {
        let d = document.createElement('span');
        d.innerText = message.data;
        output.appendChild(d);
        window.scrollTo(0, document.body.scrollHeight);
      }
    };
    
  4. 出于安全考虑,默认情况下,客户端不能自行发布或订阅组。 因此,你会注意到,我们在生成令牌时为客户端设置了 roles

    Startup.cs 中执行 GenerateClientAccessUri 时,按如下所示设置 roles

    service.GenerateClientAccessUri(roles: new string[] { "webpubsub.sendToGroup.stream", "webpubsub.joinLeaveGroup.stream" })
    
  5. 最后,还要对 index.html 应用一些样式,使它得到更好的显示。

    <html>
    
      <head>
        <style>
          #output {
            white-space: pre;
            font-family: monospace;
          }
        </style>
      </head>
    

现在,运行以下代码,键入任意文本,这些文本会在浏览器中实时显示:

ls -R | dotnet run

# Or call `dir /s /b | dotnet run` when you are using CMD under Windows

或者,你可以让它的速度放慢,以便可以看到数据实时流式传输到浏览器:

for i in $(ls -R); do echo $i; sleep 0.1; done | dotnet run

可以在此处找到本教程的完整代码示例。

后续步骤

本教程提供有关如何连接到 Web PubSub 服务以及如何使用子协议将消息发布到连接的客户端的基本概念。

查看其他教程,进一步深入了解如何使用该服务。