教程:使用子协议在 WebSocket 客户端之间发布和订阅消息
在构建聊天应用教程中,你已了解如何使用 WebSocket API 通过 Azure Web PubSub 发送和接收数据。 客户端与服务进行通信时不需要协议。 例如,可以使用 WebSocket.send()
发送任何类型的数据,服务器将按原样接收它。 WebSocket API 流程易于使用,但功能有限。 例如,无法在向服务器发送事件时指定事件名称,也不能将消息发布到其他客户端,而只能将其发送到你的服务器。 本教程介绍如何使用子协议来扩展客户端的功能。
本教程介绍如何执行下列操作:
- 创建 Web PubSub 服务实例
- 生成完整 URL 以建立 WebSocket 连接
- 使用子协议在 WebSocket 客户端之间发布消息
如果没有 Azure 试用版订阅,请在开始前创建 Azure 试用版订阅。
先决条件
- 本设置需要 Azure CLI 版本 2.22.0 或更高版本。
创建 Azure Web PubSub 实例
创建资源组
资源组是在其中部署和管理 Azure 资源的逻辑容器。 使用 az group create 命令在 chinaeast
位置创建名为 myResourceGroup
的资源组。
az group create --name myResourceGroup --location ChinaEast
创建 Web PubSub 实例
运行 az extension add,安装 webpubsub 扩展或将其升级到当前版本。
az extension add --upgrade --name webpubsub
使用 Azure CLI az webpubsub create 命令在已创建的资源组中创建 Web PubSub。 以下命令在 ChinaEast 的资源组 myResourceGroup 下创建一个免费的 Web PubSub 资源:
重要
每个 Web PubSub 资源必须具有唯一名称。 在以下示例中,将 <your-unique-resource-name> 替换为 Web PubSub 的名称。
az webpubsub create --name "<your-unique-resource-name>" --resource-group "myResourceGroup" --location "ChinaEast" --sku Free_F1
此命令的输出会显示新建的资源的属性。 请记下下面列出的两个属性:
- 资源名称:为上面的
--name
参数提供的名称。
- 主机名:在本例中,主机名为
<your-unique-resource-name>.webpubsub.azure.cn/
。
目前,只有你的 Azure 帐户才有权对这个新资源执行任何操作。
获取 ConnectionString 以供将来使用
本文中出现的原始连接字符串仅用于演示目的。 在生产环境中,请始终保护访问密钥。 使用 Azure Key Vault 安全地管理和轮换密钥,并使用 WebPubSubServiceClient
对连接进行保护。
使用 Azure CLI az webpubsub key 命令获取服务的 ConnectionString。 将 <your-unique-resource-name>
占位符替换为 Azure Web PubSub 实例的名称。
az webpubsub key show --resource-group myResourceGroup --name <your-unique-resource-name> --query primaryConnectionString --output tsv
复制主连接字符串以供稍后使用。
复制已提取的 ConnectionString,本教程稍后会将其用作 <connection_string>
的值。
设置项目
先决条件
使用子协议
客户端可以使用特定的子协议启动 WebSocket 连接。 Azure Web PubSub 服务支持名为 json.webpubsub.azure.v1
的子协议,使客户端能够通过 Web PubSub 服务直接进行发布/订阅,而不是往返于上游服务器。 查看 Azure Web PubSub 支持的 JSON WebSocket 子协议,了解该子协议的详细信息。
如果使用其他协议名称,这些名称将被服务忽略,并在连接事件处理程序中传递到服务器,因此你可以构建自己的协议。
使用 json.webpubsub.azure.v1
子协议创建 Web 应用程序。
安装依赖项
mkdir logstream
cd logstream
dotnet new web
dotnet add package Microsoft.Extensions.Azure
dotnet add package Azure.Messaging.WebPubSub
mkdir logstream
cd logstream
npm init -y
npm install --save express
npm install --save ws
npm install --save node-fetch
npm install --save @azure/web-pubsub
mkdir logstream
cd logstream
# Create venv
python -m venv env
# Active venv
source ./env/bin/activate
pip install azure-messaging-webpubsubservice
你将使用 Javalin Web 框架来托管网页。
首先,使用 Maven 创建一个新应用 logstream-webserver
,并切换到 logstream-webserver 文件夹:
mvn archetype:generate --define interactiveMode=n --define groupId=com.webpubsub.tutorial --define artifactId=logstream-webserver --define archetypeArtifactId=maven-archetype-quickstart --define archetypeVersion=1.4
cd logstream-webserver
让我们将 Azure Web PubSub SDK 和 javalin
Web 框架依赖项添加到 pom.xml
的 dependencies
节点中:
javalin
:适用于 Java 的简单 Web 框架
slf4j-simple
:适用于 Java 的记录器
azure-messaging-webpubsub
:用于使用 Azure Web PubSub 的服务客户端 SDK
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-webpubsub</artifactId>
<version>1.0.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/io.javalin/javalin -->
<dependency>
<groupId>io.javalin</groupId>
<artifactId>javalin</artifactId>
<version>3.13.6</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.30</version>
</dependency>
创建服务器端来托管 /negotiate
API 和 Web 页面。
使用以下代码更新 Program.cs
。
- 使用
AddAzureClients
添加服务客户端,并从配置中读取连接字符串。
- 在
app.Run();
前面添加 app.UseStaticFiles();
,以支持静态文件。
- 并使用
/negotiate
请求更新 app.MapGet
,以生成客户端访问令牌。
using Azure.Messaging.WebPubSub;
using Microsoft.Extensions.Azure;
var builder = WebApplication.CreateBuilder(args);
builder.Services.AddAzureClients(s =>
{
s.AddWebPubSubServiceClient(builder.Configuration["Azure:WebPubSub:ConnectionString"], "stream");
});
var app = builder.Build();
app.UseStaticFiles();
app.MapGet("/negotiate", async context =>
{
var service = context.RequestServices.GetRequiredService<WebPubSubServiceClient>();
var response = new
{
url = service.GetClientAccessUri(roles: new string[] { "webpubsub.sendToGroup.stream", "webpubsub.joinLeaveGroup.stream" }).AbsoluteUri
};
await context.Response.WriteAsJsonAsync(response);
});
app.Run();
创建 server.js
并添加以下代码:
const express = require('express');
const { WebPubSubServiceClient } = require('@azure/web-pubsub');
let service = new WebPubSubServiceClient(process.env.WebPubSubConnectionString, 'stream');
const app = express();
app.get('/negotiate', async (req, res) => {
let token = await service.getClientAccessToken({
roles: ['webpubsub.sendToGroup.stream', 'webpubsub.joinLeaveGroup.stream']
});
res.send({
url: token.url
});
});
app.use(express.static('public'));
app.listen(8080, () => console.log('server started'));
创建 server.py
以托管 /negotiate
API 和 Web 页面。
import json
import sys
from http.server import HTTPServer, SimpleHTTPRequestHandler
from azure.messaging.webpubsubservice import WebPubSubServiceClient
service = WebPubSubServiceClient.from_connection_string(sys.argv[1], hub='stream')
class Request(SimpleHTTPRequestHandler):
def do_GET(self):
if self.path == '/':
self.path = 'public/index.html'
return SimpleHTTPRequestHandler.do_GET(self)
elif self.path == '/negotiate':
roles = ['webpubsub.sendToGroup.stream',
'webpubsub.joinLeaveGroup.stream']
token = service.get_client_access_token(roles=roles)
self.send_response(200)
self.send_header('Content-Type', 'application/json')
self.end_headers()
self.wfile.write(json.dumps({
'url': token['url']
}).encode())
if __name__ == '__main__':
if len(sys.argv) != 2:
print('Usage: python server.py <connection-string>')
exit(1)
server = HTTPServer(('localhost', 8080), Request)
print('server started')
server.serve_forever()
让我们导航到 /src/main/java/com/webpubsub/tutorial 目录,在编辑器中打开 App.java 文件,使用 Javalin.create
提供静态文件:
package com.webpubsub.tutorial;
import com.azure.messaging.webpubsub.WebPubSubServiceClient;
import com.azure.messaging.webpubsub.WebPubSubServiceClientBuilder;
import com.azure.messaging.webpubsub.models.GetClientAccessTokenOptions;
import com.azure.messaging.webpubsub.models.WebPubSubClientAccessToken;
import io.javalin.Javalin;
public class App {
public static void main(String[] args) {
if (args.length != 1) {
System.out.println("Expecting 1 arguments: <connection-string>");
return;
}
// create the service client
WebPubSubServiceClient service = new WebPubSubServiceClientBuilder()
.connectionString(args[0])
.hub("chat")
.buildClient();
// start a server
Javalin app = Javalin.create(config -> {
config.addStaticFiles("public");
}).start(8080);
// Handle the negotiate request and return the token to the client
app.get("/negotiate", ctx -> {
GetClientAccessTokenOptions option = new GetClientAccessTokenOptions();
option.addRole("webpubsub.sendToGroup.stream");
option.addRole("webpubsub.joinLeaveGroup.stream");
WebPubSubClientAccessToken token = service.getClientAccessToken(option);
// return JSON string
ctx.result("{\"url\":\"" + token.getUrl() + "\"}");
return;
});
}
}
根据你的设置,你可能需要在 pom.xml 中将语言级别显式设置为 Java 8。 添加以下代码片段:
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
创建网页
使用以下内容创建一个 HTML 页面,并将其另存为 wwwroot/index.html
:
使用以下内容创建一个 HTML 页面,并将其另存为 public/index.html
:
使用以下内容创建一个 HTML 页面,并将其另存为 public/index.html
:
创建一个具有下列内容的 HTML 页面,并将其保存到 /src/main/resources/public/index.html 中:
<html>
<body>
<div id="output"></div>
<script>
(async function () {
let res = await fetch('/negotiate')
let data = await res.json();
let ws = new WebSocket(data.url, 'json.webpubsub.azure.v1');
ws.onopen = () => {
console.log('connected');
};
let output = document.querySelector('#output');
ws.onmessage = event => {
let d = document.createElement('p');
d.innerText = event.data;
output.appendChild(d);
};
})();
</script>
</body>
</html>
上面的代码会连接到服务,并将收到的任何消息打印到页面上。 主要的变化是在创建 WebSocket 连接时指定了子协议。
运行服务器
使用适用于 .NET Core 的机密管理器工具来设置连接字符串。 运行以下命令,将 <connection_string>
替换为在上一步中提取的连接字符串,并在浏览器中打开 http://localhost:5000/index.html :
dotnet user-secrets init
dotnet user-secrets set Azure:WebPubSub:ConnectionString "<connection-string>"
dotnet run
运行以下命令,将 <connection-string>
替换为在上一步中提取的 ConnectionString,并在浏览器中打开 http://localhost:8080:
export WebPubSubConnectionString="<connection-string>"
node server
运行以下命令,将 <connection-string>
替换为在上一步中提取的 ConnectionString,并在浏览器中打开 http://localhost:8080:
python server.py "<connection-string>"
运行以下命令,将 <connection-string>
替换为在上一步中提取的 ConnectionString,并在浏览器中打开 http://localhost:8080:
mvn compile & mvn package & mvn exec:java -Dexec.mainClass="com.webpubsub.tutorial.App" -Dexec.cleanupDaemonThreads=false -Dexec.args="'<connection_string>'"
如果你使用的是 Chrome,可以按 F12,或右键单击,选择 ->“检查”->“开发人员工具”,并选择“网络”选项卡。加载网页,可以看到已建立 WebSocket 连接。 选择以检查 WebSocket 连接,可以看到客户端中收到以下 connected
事件消息。 可以发现,你能够获得为该客户端生成的 connectionId
。
{"type":"system","event":"connected","userId":null,"connectionId":"<the_connection_id>"}
可以发现,借助子协议,可在连接为 connected
时获取连接的某些元数据。
客户端现在接收到 JSON 消息,而不是纯文本。 JSON 消息包含更多信息,例如消息的类型和来源。 因此,你可以使用此信息对消息进行更多的处理(例如,如果消息来自不同的来源,则以不同的样式显示消息),可以在后面的部分中了解这些信息。
从客户端发布消息
在构建聊天应用教程中,当客户端通过 WebSocket 连接将消息发送到 Web PubSub 服务时,该服务将在服务器端触发用户事件。 使用子协议,客户端通过发送 JSON 消息获得更多的功能。 例如,可以通过 Web PubSub 服务将消息从一个客户端直接发布到其他客户端。
如果要将大量数据实时流式传输到其他客户端,这很有用。 使用此功能来构建日志流式处理应用程序,该应用程序可实时将控制台日志流式传输到浏览器。
创建流式处理程序
创建 stream
程序:
mkdir stream
cd stream
dotnet new console
使用以下内容更新 Program.cs
:
using System;
using System.Net.Http;
using System.Net.WebSockets;
using System.Text;
using System.Text.Json;
using System.Threading.Tasks;
namespace stream
{
class Program
{
private static readonly HttpClient http = new HttpClient();
static async Task Main(string[] args)
{
// Get client url from remote
var stream = await http.GetStreamAsync("http://localhost:5000/negotiate");
var url = (await JsonSerializer.DeserializeAsync<ClientToken>(stream)).url;
var client = new ClientWebSocket();
client.Options.AddSubProtocol("json.webpubsub.azure.v1");
await client.ConnectAsync(new Uri(url), default);
Console.WriteLine("Connected.");
var streaming = Console.ReadLine();
while (streaming != null)
{
if (!string.IsNullOrEmpty(streaming))
{
var message = JsonSerializer.Serialize(new
{
type = "sendToGroup",
group = "stream",
data = streaming + Environment.NewLine,
});
Console.WriteLine("Sending " + message);
await client.SendAsync(Encoding.UTF8.GetBytes(message), WebSocketMessageType.Text, true, default);
}
streaming = Console.ReadLine();
}
await client.CloseAsync(WebSocketCloseStatus.NormalClosure, null, default);
}
private sealed class ClientToken
{
public string url { get; set; }
}
}
}
使用以下内容创建 stream.js
。
const WebSocket = require('ws');
const fetch = (...args) => import('node-fetch').then(({default: fetch}) => fetch(...args));
async function main() {
let res = await fetch(`http://localhost:8080/negotiate`);
let data = await res.json();
let ws = new WebSocket(data.url, 'json.webpubsub.azure.v1');
let ackId = 0;
ws.on('open', () => {
process.stdin.on('data', data => {
ws.send(JSON.stringify({
type: 'sendToGroup',
group: 'stream',
ackId: ++ackId,
dataType: 'text',
data: data.toString()
}));
});
});
ws.on('message', data => console.log("Received: %s", data));
process.stdin.on('close', () => ws.close());
}
main();
以上代码创建与服务的 WebSocket 连接,然后每当它接收到某些数据时,就使用 ws.send()
发布数据。 若要发布到其他连接,只需要将 type
设置为 sendToGroup
,并在消息中指定组名。
打开 stream
程序的另一个 bash 窗口,并安装 websockets
依赖项:
mkdir stream
cd stream
# Create venv
python -m venv env
# Active venv
source ./env/bin/activate
pip install websockets
使用以下内容创建 stream.py
。
import asyncio
import sys
import threading
import time
import websockets
import requests
import json
async def connect(url):
async with websockets.connect(url, subprotocols=['json.webpubsub.azure.v1']) as ws:
print('connected')
id = 1
while True:
data = input()
payload = {
'type': 'sendToGroup',
'group': 'stream',
'dataType': 'text',
'data': str(data + '\n'),
'ackId': id
}
id = id + 1
await ws.send(json.dumps(payload))
await ws.recv()
if __name__ == '__main__':
res = requests.get('http://localhost:8080/negotiate').json()
try:
asyncio.get_event_loop().run_until_complete(connect(res['url']))
except KeyboardInterrupt:
pass
以上代码创建与服务的 WebSocket 连接,然后每当它接收到某些数据时,就使用 ws.send()
发布数据。 若要发布到其他连接,只需要将 type
设置为 sendToGroup
,并在消息中指定组名。
使用另一个终端,返回到根文件夹来创建一个流式处理控制台应用 logstream-streaming
,并切换到 logstream-streaming 文件夹:
mvn archetype:generate --define interactiveMode=n --define groupId=com.webpubsub.quickstart --define artifactId=logstream-streaming --define archetypeArtifactId=maven-archetype-quickstart --define archetypeVersion=1.4
cd logstream-streaming
将 HttpClient 依赖项添加到 pom.xml
的 dependencies
节点中:
<!-- https://mvnrepository.com/artifact/org.apache.httpcomponents/httpclient -->
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.13</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.9</version>
</dependency>
现在,使用 WebSocket 连接到服务。 让我们导航到 /src/main/java/com/webpubsub/quickstart 目录,在编辑器中打开 App.java 文件,并将代码替换为以下内容:
package com.webpubsub.quickstart;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.net.http.WebSocket;
import java.util.concurrent.CompletionStage;
import com.google.gson.Gson;
public class App
{
public static void main( String[] args ) throws IOException, InterruptedException
{
HttpClient client = HttpClient.newHttpClient();
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create("http://localhost:8080/negotiate"))
.build();
HttpResponse<String> response = client.send(request, HttpResponse.BodyHandlers.ofString());
Gson gson = new Gson();
String url = gson.fromJson(response.body(), Entity.class).url;
WebSocket ws = HttpClient.newHttpClient().newWebSocketBuilder().subprotocols("json.webpubsub.azure.v1")
.buildAsync(URI.create(url), new WebSocketClient()).join();
int id = 0;
BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
String streaming = reader.readLine();
App app = new App();
while (streaming != null && !streaming.isEmpty()){
String frame = gson.toJson(app.new GroupMessage(streaming + "\n", ++id));
System.out.println("Sending: " + frame);
ws.sendText(frame, true);
streaming = reader.readLine();
}
}
private class GroupMessage{
public String data;
public int ackId;
public final String type = "sendToGroup";
public final String group = "stream";
GroupMessage(String data, int ackId){
this.data = data;
this.ackId = ackId;
}
}
private static final class WebSocketClient implements WebSocket.Listener {
private WebSocketClient() {
}
@Override
public void onOpen(WebSocket webSocket) {
System.out.println("onOpen using subprotocol " + webSocket.getSubprotocol());
WebSocket.Listener.super.onOpen(webSocket);
}
@Override
public CompletionStage<?> onText(WebSocket webSocket, CharSequence data, boolean last) {
System.out.println("onText received " + data);
return WebSocket.Listener.super.onText(webSocket, data, last);
}
@Override
public void onError(WebSocket webSocket, Throwable error) {
System.out.println("Bad day! " + webSocket.toString());
WebSocket.Listener.super.onError(webSocket, error);
}
}
private static final class Entity {
public String url;
}
}
- 导航到包含 pom.xml 文件的目录,并使用以下命令运行该项目
mvn compile & mvn package & mvn exec:java -Dexec.mainClass="com.webpubsub.quickstart.App" -Dexec.cleanupDaemonThreads=false
可以看到此处有一个新的概念“组”。 组是中心中的逻辑概念,你可以在中心内将消息发布到一组连接。 在中心中,可以有多个组,一个客户端可以同时订阅多个组。 使用子协议时,只能发布到组,而不是广播到整个中心。 有关术语的详细信息,请查看基本概念。
由于我们在此处使用组,因此在 ws.onopen
回调内建立 WebSocket 连接时,还需要更新 index.html
网页以加入组。
let ackId = 0;
ws.onopen = () => {
console.log('connected');
ws.send(JSON.stringify({
type: 'joinGroup',
group: 'stream',
ackId: ++ackId
}));
};
可以发现,客户端通过发送 joinGroup
类型的消息来加入该组。
此外,稍微更新 ws.onmessage
回调逻辑,以分析 JSON 响应并只打印来自 stream
组中的消息,使其充当实时流打印机。
ws.onmessage = event => {
let message = JSON.parse(event.data);
if (message.type === 'message' && message.group === 'stream') {
let d = document.createElement('span');
d.innerText = message.data;
output.appendChild(d);
window.scrollTo(0, document.body.scrollHeight);
}
};
出于安全考虑,默认情况下,客户端不能自行发布或订阅组。 因此,你会注意到,我们在生成令牌时为客户端设置了 roles
:
在 Startup.cs
中执行 GenerateClientAccessUri
时,按如下所示设置 roles
:
service.GenerateClientAccessUri(roles: new string[] { "webpubsub.sendToGroup.stream", "webpubsub.joinLeaveGroup.stream" })
在 server.js
中执行 getClientAccessToken
时,按如下所示添加 roles
:
app.get('/negotiate', async (req, res) => {
let token = await service.getClientAccessToken({
roles: ['webpubsub.sendToGroup.stream', 'webpubsub.joinLeaveGroup.stream']
});
...
});
在访问令牌生成过程中,在 server.py
中为客户端设置正确的角色:
roles = ['webpubsub.sendToGroup.stream',
'webpubsub.joinLeaveGroup.stream']
token = service.get_client_access_token(roles=roles)
在访问令牌生成过程中,在 App.java
中为客户端设置正确的角色:
GetClientAccessTokenOptions option = new GetClientAccessTokenOptions();
option.addRole("webpubsub.sendToGroup.stream");
option.addRole("webpubsub.joinLeaveGroup.stream");
WebPubSubClientAccessToken token = service.getClientAccessToken(option);
最后,还要对 index.html
应用一些样式,使它得到更好的显示。
<html>
<head>
<style>
#output {
white-space: pre;
font-family: monospace;
}
</style>
</head>
现在,运行以下代码,键入任意文本,这些文本会在浏览器中实时显示:
ls -R | dotnet run
# Or call `dir /s /b | dotnet run` when you are using CMD under Windows
或者,你可以让它的速度放慢,以便可以看到数据实时流式传输到浏览器:
for i in $(ls -R); do echo $i; sleep 0.1; done | dotnet run
可以在此处找到本教程的完整代码示例。
node stream
或者,你也可以使用此应用将来自另一个控制台应用的任何输出通过管道流式传输到浏览器。 例如:
ls -R | node stream
# Or call `dir /s /b | node stream` when you are using CMD under Windows
或者,你可以让它的速度放慢,以便可以看到数据实时流式传输到浏览器:
for i in $(ls -R); do echo $i; sleep 0.1; done | node stream
可以在此处找到本教程的完整代码示例。
现在,可以运行 python stream.py
,键入任意文本,这些文本会在浏览器中实时显示。
或者,你也可以使用此应用将来自另一个控制台应用的任何输出通过管道流式传输到浏览器。 例如:
ls -R | python stream.py
# Or call `dir /s /b | python stream.py` when you are using CMD under Windows
或者,你可以让它的速度放慢,以便可以看到数据实时流式传输到浏览器:
for i in $(ls -R); do echo $i; sleep 0.1; done | python stream.py
可以在此处找到本教程的完整代码示例。
现在,可以运行以下代码,键入任意文本,这些文本会在浏览器中实时显示。
mvn compile & mvn package & mvn exec:java -Dexec.mainClass="com.webpubsub.quickstart.App" -Dexec.cleanupDaemonThreads=false
可以在此处找到本教程的完整代码示例。
后续步骤
本教程提供有关如何连接到 Web PubSub 服务以及如何使用子协议将消息发布到连接的客户端的基本概念。
查看其他教程,进一步深入了解如何使用该服务。