本快速入门指南演示了如何
-
连接到 Web PubSub 资源
-
订阅有关特定主题的消息
- 向主题发布消息
先决条件
- Web PubSub 资源。 若要创建,可以按照以下指导操作:创建 Web PubSub 资源
- 代码编辑器,如 Visual Studio Code
- 计划使用的语言的依赖项
注释
除了下面提到的 MQTT 客户端库,可以选择满足以下要求的任何标准 MQTT 客户端库来连接到 Web PubSub:
- 支持 WebSocket 传输。
- 支持 MQTT 协议 3.1.1 或 5.0。
mkdir pubsub_among_clients
cd pubsub_among_clients
npm install mqtt
mkdir pubsub_among_clients
cd pubsub_among_clients
# Create a new .NET console project
dotnet new console
dotnet add package MqttNet
mkdir pubsub_among_clients
cd pubsub_among_clients
pip install paho-mqtt
连接到 Web PubSub
MQTT 使用客户端访问 URL 来连接资源并向其进行身份验证。 此 URL 遵循 wss://<service_name>.webpubsub.azure.com/clients/mqtt/hubs/<hub_name>?access_token=<token> 的模式。
客户端可以通过几种方式获取客户端访问 URL。 最佳做法是不要在代码中对客户端访问 URL 进行硬编码。 在生产环境中,我们通常设置应用服务器按需返回此 URL。
生成客户端访问 URL 详细介绍了相关操作。
对于本快速入门,可以从 Azure 门户复制并粘贴 URL,如下图所示。
如前面的代码所示,客户端有权将消息发送到主题 group1 和订阅主题 group2。
在 MQTT 客户端中,存在一些你应遵循的限制,否则将会拒绝连接。 这些 MQTT 协议参数包括:
- 协议版本:3.1.1 或 5.0。
- 客户端 ID 格式
- 允许使用的字符:0-9、a-Z、A-Z
- 长度介于 1 和 128 之间
- MQTT 3.1.1 的保活动隔:1 - 180 秒
- 遗嘱主题格式:不为空,并且至少包含一个非空白字符。 最大长度为 1024。
- 遗嘱消息大小:最多 2,000 个字节
以下代码演示了如何使用 MQTT 协议版本 5.0、干净启动、30 秒会话过期间隔将 MQTT 客户端连接到 WebPubSub。
创建名为 index.js 的文件并添加以下代码
const mqtt = require('mqtt');
var client = mqtt.connect(`wss://<service_name>.webpubsub.azure.com/clients/mqtt/hubs/<hub_name>?access_token=<token>`,
{
clientId: "client1",
protocolVersion: 5, // Use MQTT 5.0 protocol
clean: true,
properties: {
sessionExpiryInterval: 30,
},
});
编辑 Program.cs 文件并添加以下代码
using MQTTnet;
using MQTTnet.Client;
var mqttFactory = new MqttFactory();
var client = mqttFactory.CreateMqttClient();
var mqttClientOptions = new MqttClientOptionsBuilder()
.WithWebSocketServer((MqttClientWebSocketOptionsBuilder b) =>
b.WithUri("wss://<service_name>.webpubsub.azure.com/clients/mqtt/hubs/<hub_name>?access_token=<token>"))
.WithClientId("client1")
.WithProtocolVersion(MQTTnet.Formatter.MqttProtocolVersion.V500)
.WithCleanStart()
.WithSessionExpiryInterval(30)
.Build();
await client.ConnectAsync(mqttClientOptions, CancellationToken.None);
import paho.mqtt.client as mqtt
from paho.mqtt.packettypes import PacketTypes
def on_connect(client, userdata, flags, reasonCode, properties):
print("Connected with result code "+str(reasonCode))
def on_connect_fail(client, userData):
print("Connection failed")
print(userData)
def on_log(client, userdata, level, buf):
print("log: ", buf)
host = "<service_name>.webpubsub.azure.com"
port = 443
client = mqtt.Client(client_id= client_id, transport="websockets", protocol= mqtt.MQTTv5)
client.ws_set_options(path="/clients/mqtt/hubs/<hub_name>?access_token=<token>")
client.tls_set()
client.on_connect = on_connect
client.on_connect_fail = on_connect_fail
client.on_log = on_log
connect_properties.SessionExpiryInterval = 30
client.connect(host, port, clean_start = True, properties=connect_properties)
Troubleshooting
如果客户端无法连接,则可以使用 Azure Monitor 排除故障。 有关更多详细信息,请参阅监视 Azure Web PubSub。
可以检查连接参数,并从 Azure Monitor 获取更详细的错误消息。 例如,以下 Azure Log Analytics 的屏幕截图显示由于设置的保活间隔无效而拒绝了连接。
订阅主题
要从主题接收消息,客户端
- 必须订阅它希望从中接收消息的主题
- 具有用于处理消息事件的回叫
以下代码显示客户端订阅了名为 group2 的主题。
// ...code from the last step
// Provide callback to the message event.
client.on("message", async (topic, payload, packet) => {
console.log(topic, payload)
});
// Subscribe to a topic.
client.subscribe("group2", { qos: 1 }, (err, granted) => { console.log("subscribe", granted); })
// ...code from the last step
// Provide callback to the message event.
client.ApplicationMessageReceivedAsync += (args) =>
{
Console.WriteLine($"Received message on topic '{args.ApplicationMessage.Topic}': {System.Text.Encoding.UTF8.GetString(args.ApplicationMessage.PayloadSegment)}");
return Task.CompletedTask;
};
// Subscribe to a topic "topic".
await client.SubscribeAsync("group2", MQTTnet.Protocol.MqttQualityOfServiceLevel.AtLeastOnce);
# ...code from the last step
# Provide callback to the message event.
def subscriber_on_message(client, userdata, msg):
print(msg.topic+" "+str(msg.payload))
client.on_message = subscriber_on_message
# Subscribe to a topic "topic".
client.subscribe("group2")
# Blocking call that processes network traffic, dispatches callbacks and
# handles reconnecting.
# Other loop*() functions are available that give a threaded interface and a
# manual interface.
client.loop_forever()
将消息发布到组
在上一步中,我们设置了从 group1 接收消息所需的一切,现在我们向该组发送消息。
// ...code from the last step
// Send message "Hello World" in the "text" format to "group1".
client.publish("group1", "Hello World!")
// ...code from the last step
// Send message "Hello World" in the "text" format to "group1".
await client.PublishStringAsync("group1", "Hello World!");
# ...code from the last step
# Send message "Hello World" in the "text" format to "group1".
client.publish("group1", "Hello World!")
通过使用客户端 SDK,你现在已了解如何
-
连接到 Web PubSub 资源
- 订阅主题
- 将消息发布到主题