教程:使用子协议在 WebSocket 客户端之间发布和订阅消息

构建聊天应用教程中,你已了解如何使用 WebSocket API 通过 Azure Web PubSub 发送和接收数据。 客户端与服务进行通信时不需要协议。 例如,可以使用 WebSocket.send() 发送任何类型的数据,服务器将按原样接收它。 WebSocket API 流程易于使用,但功能有限。 例如,无法在向服务器发送事件时指定事件名称,也不能将消息发布到其他客户端,而只能将其发送到你的服务器。 本教程介绍如何使用子协议来扩展客户端的功能。

本教程介绍如何执行下列操作:

  • 创建 Web PubSub 服务实例
  • 生成完整 URL 以建立 WebSocket 连接
  • 使用子协议在 WebSocket 客户端之间发布消息

如果没有 Azure 试用版订阅,请在开始前创建 Azure 试用版订阅

先决条件

  • 如需在本地运行 CLI 参考命令,请安装 Azure CLI。 如果在 Windows 或 macOS 上运行,请考虑在 Docker 容器中运行 Azure CLI。 有关详细信息,请参阅如何在 Docker 容器中运行 Azure CLI

    • 如果使用的是本地安装,请使用 az login 命令登录 Azure CLI。 若要完成身份验证过程,请遵循终端中显示的步骤。 有关其他登录选项,请参阅使用 Azure CLI 登录

    • 出现提示时,请在首次使用时安装 Azure CLI 扩展。 有关扩展详细信息,请参阅使用 Azure CLI 的扩展

    • 运行 az version 以查找安装的版本和依赖库。 若要升级到最新版本,请运行 az upgrade

  • 本设置需要 Azure CLI 版本 2.22.0 或更高版本。

创建 Azure Web PubSub 实例

创建资源组

资源组是在其中部署和管理 Azure 资源的逻辑容器。 使用 az group create 命令在 chinaeast 位置创建名为 myResourceGroup 的资源组。

az group create --name myResourceGroup --location ChinaEast

创建 Web PubSub 实例

运行 az extension add,安装 webpubsub 扩展或将其升级到当前版本。

az extension add --upgrade --name webpubsub

使用 Azure CLI az webpubsub create 命令在已创建的资源组中创建 Web PubSub。 以下命令在 ChinaEast 的资源组 myResourceGroup 下创建一个免费的 Web PubSub 资源:

重要

每个 Web PubSub 资源必须具有唯一名称。 在以下示例中,将 <your-unique-resource-name> 替换为 Web PubSub 的名称。

az webpubsub create --name "<your-unique-resource-name>" --resource-group "myResourceGroup" --location "ChinaEast" --sku Free_F1

此命令的输出会显示新建的资源的属性。 请记下下面列出的两个属性:

  • 资源名称:为上面的 --name 参数提供的名称。
  • 主机名:在本例中,主机名为 <your-unique-resource-name>.webpubsub.azure.cn/

目前,只有你的 Azure 帐户才有权对这个新资源执行任何操作。

获取 ConnectionString 以供将来使用

本文中出现的原始连接字符串仅用于演示目的。 在生产环境中,请始终保护访问密钥。 使用 Azure Key Vault 安全地管理和轮换密钥,并使用 WebPubSubServiceClient 对连接进行保护

使用 Azure CLI az webpubsub key 命令获取服务的 ConnectionString。 将 <your-unique-resource-name> 占位符替换为 Azure Web PubSub 实例的名称。

az webpubsub key show --resource-group myResourceGroup --name <your-unique-resource-name> --query primaryConnectionString --output tsv

复制主连接字符串以供稍后使用。

复制已提取的 ConnectionString,本教程稍后会将其用作 <connection_string> 的值。

设置项目

先决条件

使用子协议

客户端可以使用特定的子协议启动 WebSocket 连接。 Azure Web PubSub 服务支持名为 json.webpubsub.azure.v1 的子协议,使客户端能够通过 Web PubSub 服务直接进行发布/订阅,而不是往返于上游服务器。 查看 Azure Web PubSub 支持的 JSON WebSocket 子协议,了解该子协议的详细信息。

如果使用其他协议名称,这些名称将被服务忽略,并在连接事件处理程序中传递到服务器,因此你可以构建自己的协议。

使用 json.webpubsub.azure.v1 子协议创建 Web 应用程序。

  1. 安装依赖项

    mkdir logstream
    cd logstream
    dotnet new web
    dotnet add package Microsoft.Extensions.Azure
    dotnet add package Azure.Messaging.WebPubSub
    
  2. 创建服务器端来托管 /negotiate API 和 Web 页面。

    使用以下代码更新 Program.cs

    • 使用 AddAzureClients 添加服务客户端,并从配置中读取连接字符串。
    • app.Run(); 前面添加 app.UseStaticFiles();,以支持静态文件。
    • 并使用 /negotiate 请求更新 app.MapGet,以生成客户端访问令牌。
    using Azure.Messaging.WebPubSub;
    using Microsoft.Extensions.Azure;
    
    var builder = WebApplication.CreateBuilder(args);
    builder.Services.AddAzureClients(s =>
    {
        s.AddWebPubSubServiceClient(builder.Configuration["Azure:WebPubSub:ConnectionString"], "stream");
    });
    
    var app = builder.Build();
    app.UseStaticFiles();
    app.MapGet("/negotiate", async context =>
    {
        var service = context.RequestServices.GetRequiredService<WebPubSubServiceClient>();
        var response = new
        {
            url = service.GetClientAccessUri(roles: new string[] { "webpubsub.sendToGroup.stream", "webpubsub.joinLeaveGroup.stream" }).AbsoluteUri
        };
        await context.Response.WriteAsJsonAsync(response);
    });
    
    app.Run();
    
  3. 创建网页

    使用以下内容创建一个 HTML 页面,并将其另存为 wwwroot/index.html

    <html>
      <body>
        <div id="output"></div>
        <script>
          (async function () {
            let res = await fetch('/negotiate')
            let data = await res.json();
            let ws = new WebSocket(data.url, 'json.webpubsub.azure.v1');
            ws.onopen = () => {
              console.log('connected');
            };
    
            let output = document.querySelector('#output');
            ws.onmessage = event => {
              let d = document.createElement('p');
              d.innerText = event.data;
              output.appendChild(d);
            };
          })();
        </script>
      </body>
    </html>                                                                
    

    上面的代码会连接到服务,并将收到的任何消息打印到页面上。 主要的变化是在创建 WebSocket 连接时指定了子协议。

  4. 运行服务器

    使用适用于 .NET Core 的机密管理器工具来设置连接字符串。 运行以下命令,将 <connection_string> 替换为在上一步中提取的连接字符串,并在浏览器中打开 http://localhost:5000/index.html :

    dotnet user-secrets init
    dotnet user-secrets set Azure:WebPubSub:ConnectionString "<connection-string>"
    dotnet run
    

    如果你使用的是 Chrome,可以按 F12,或右键单击,选择 ->“检查”->“开发人员工具”,并选择“网络”选项卡。加载网页,可以看到已建立 WebSocket 连接。 选择以检查 WebSocket 连接,可以看到客户端中收到以下 connected 事件消息。 可以发现,你能够获得为该客户端生成的 connectionId

    {"type":"system","event":"connected","userId":null,"connectionId":"<the_connection_id>"}
    

可以发现,借助子协议,可在连接为 connected 时获取连接的某些元数据。

客户端现在接收到 JSON 消息,而不是纯文本。 JSON 消息包含更多信息,例如消息的类型和来源。 因此,你可以使用此信息对消息进行更多的处理(例如,如果消息来自不同的来源,则以不同的样式显示消息),可以在后面的部分中了解这些信息。

从客户端发布消息

构建聊天应用教程中,当客户端通过 WebSocket 连接将消息发送到 Web PubSub 服务时,该服务将在服务器端触发用户事件。 使用子协议,客户端通过发送 JSON 消息获得更多的功能。 例如,可以通过 Web PubSub 服务将消息从一个客户端直接发布到其他客户端。

如果要将大量数据实时流式传输到其他客户端,这很有用。 使用此功能来构建日志流式处理应用程序,该应用程序可实时将控制台日志流式传输到浏览器。

  1. 创建流式处理程序

    创建 stream 程序:

    mkdir stream
    cd stream
    dotnet new console
    

    使用以下内容更新 Program.cs

    using System;
    using System.Net.Http;
    using System.Net.WebSockets;
    using System.Text;
    using System.Text.Json;
    using System.Threading.Tasks;
    
    namespace stream
    {
        class Program
        {
            private static readonly HttpClient http = new HttpClient();
            static async Task Main(string[] args)
            {
                // Get client url from remote
                var stream = await http.GetStreamAsync("http://localhost:5000/negotiate");
                var url = (await JsonSerializer.DeserializeAsync<ClientToken>(stream)).url;
                var client = new ClientWebSocket();
                client.Options.AddSubProtocol("json.webpubsub.azure.v1");
    
                await client.ConnectAsync(new Uri(url), default);
    
                Console.WriteLine("Connected.");
                var streaming = Console.ReadLine();
                while (streaming != null)
                {
                    if (!string.IsNullOrEmpty(streaming))
                    {
                        var message = JsonSerializer.Serialize(new
                        {
                            type = "sendToGroup",
                            group = "stream",
                            data = streaming + Environment.NewLine,
                        });
                        Console.WriteLine("Sending " + message);
                        await client.SendAsync(Encoding.UTF8.GetBytes(message), WebSocketMessageType.Text, true, default);
                    }
    
                    streaming = Console.ReadLine();
                }
    
                await client.CloseAsync(WebSocketCloseStatus.NormalClosure, null, default);
            }
    
            private sealed class ClientToken
            {
                public string url { get; set; }
            }
        }
    }
    
    

    可以看到此处有一个新的概念“组”。 组是中心中的逻辑概念,你可以在中心内将消息发布到一组连接。 在中心中,可以有多个组,一个客户端可以同时订阅多个组。 使用子协议时,只能发布到组,而不是广播到整个中心。 有关术语的详细信息,请查看基本概念

  2. 由于我们在此处使用组,因此在 ws.onopen 回调内建立 WebSocket 连接时,还需要更新 index.html 网页以加入组。

    let ackId = 0;
    ws.onopen = () => {
      console.log('connected');
      ws.send(JSON.stringify({
        type: 'joinGroup',
        group: 'stream',
        ackId: ++ackId
      }));
    };
    

    可以发现,客户端通过发送 joinGroup 类型的消息来加入该组。

  3. 此外,稍微更新 ws.onmessage 回调逻辑,以分析 JSON 响应并只打印来自 stream 组中的消息,使其充当实时流打印机。

    ws.onmessage = event => {
      let message = JSON.parse(event.data);
      if (message.type === 'message' && message.group === 'stream') {
        let d = document.createElement('span');
        d.innerText = message.data;
        output.appendChild(d);
        window.scrollTo(0, document.body.scrollHeight);
      }
    };
    
  4. 出于安全考虑,默认情况下,客户端不能自行发布或订阅组。 因此,你会注意到,我们在生成令牌时为客户端设置了 roles

    Startup.cs 中执行 GenerateClientAccessUri 时,按如下所示设置 roles

    service.GenerateClientAccessUri(roles: new string[] { "webpubsub.sendToGroup.stream", "webpubsub.joinLeaveGroup.stream" })
    
  5. 最后,还要对 index.html 应用一些样式,使它得到更好的显示。

    <html>
    
      <head>
        <style>
          #output {
            white-space: pre;
            font-family: monospace;
          }
        </style>
      </head>
    

现在,运行以下代码,键入任意文本,这些文本会在浏览器中实时显示:

ls -R | dotnet run

# Or call `dir /s /b | dotnet run` when you are using CMD under Windows

或者,你可以让它的速度放慢,以便可以看到数据实时流式传输到浏览器:

for i in $(ls -R); do echo $i; sleep 0.1; done | dotnet run

可以在此处找到本教程的完整代码示例。

后续步骤

本教程提供有关如何连接到 Web PubSub 服务以及如何使用子协议将消息发布到连接的客户端的基本概念。

查看其他教程,进一步深入了解如何使用该服务。