使用 MQTT 协议与 IoT 中心通信
IoT 中心允许设备通过以下方式与 IoT 中心设备终结点通信:
- 在 TCP 端口 8883 上使用 MQTT v3.1.1
- 在 TCP 端口 443 上使用基于 WebSocket 的 MQTT v3.1.1。
IoT 中心不是功能完备的 MQTT 中转站,不支持 MQTT v3.1.1 标准中指定的所有行为。 本文介绍设备如何使用受支持的 MQTT 行为来与 IoT 中心通信。
注意
本文中提到的某些功能(例如云到设备消息传递、设备孪生、设备管理)仅在 IoT 中心的标准层中提供。 有关 IoT 中心基本层和标准/免费层的详细信息,请参阅选择适合你的解决方案的 IoT 中心层。
所有通过 IoT 中心进行的设备通信都必须使用 TLS/SSL 来保护。 因此,IoT 中心不支持通过 TCP 端口 1883 进行的不安全的连接。
连接到 IoT 中心
设备可以通过以下选项之一使用 MQTT 协议连接到 IoT 中心:
- 通过 Azure IoT SDK 中的库。
- 直接通过 MQTT 协议。
许多企业和教育网络环境中都会 MQTT 端口(TCP 端口8883)。 如果无法在防火墙中打开端口 8883,建议使用基于 WebSocket 的 MQTT。 基于 WebSocket 的 MQTT 通过端口 443 进行通信,该端口在网络环境中几乎始终是打开的。 若要了解如何在使用 Azure IoT SDK 时指定 MQTT 和基于 WebSocket 的 MQTT 协议,请参阅使用设备 SDK。
使用设备 SDK
支持 MQTT 协议的设备 SDK 可用于 Java、Node.js、C、C# 和 Python。 设备 SDK 使用选定身份验证机制来连接到 IoT 中心。 要使用 MQTT 协议,必须将客户端协议参数设置为 MQTT。 还可以在客户端协议参数中指定基于 WebSocket 的 MQTT。 默认情况下,设备 SDK 在 CleanSession 标志设置为 0 的情况下连接到 IoT 中心,并使用 QoS 1 来与 IoT 中心交换消息。 虽然可以配置 QoS 0 以加快消息交换速度,但你应该注意,这种传递不能得到保证或确认。 出于此原因,QoS 0 通常称为“用后即焚”。
当设备连接到 IoT 中心时,设备 SDK 会提供方法,让设备与 IoT 中心交换消息。
下表包含了每种受支持语言的代码示例链接,并指定了通过 MQTT 或基于 WebSocket 的 MQTT 协议建立到 IoT 中心的连接时要使用的参数。
语言 | MQTT 协议参数 | 基于 WebSocket 的 MQTT 协议参数 |
---|---|---|
Node.js | azure-iot-device-mqtt.Mqtt | azure-iot-device-mqtt.MqttWs |
Java | IotHubClientProtocol.MQTT | IotHubClientProtocol.MQTT_WS |
C | MQTT_Protocol | MQTT_WebSocket_Protocol |
C# | TransportType.Mqtt | 如果 MQTT 失败,TransportType.Mqtt 将退回到基于 WebSocket 的 MQTT。 若仅指定基于 WebSocket 的 MQTT,请使用 TransportType.Mqtt_WebSocket_Only |
Python | 默认支持 MQTT | 在调用中添加 websockets=True 来创建客户端 |
以下片段展示如何在使用 Azure IoT Node.js SDK 时指定基于 WebSocket 的 MQTT 协议:
var Client = require('azure-iot-device').Client;
var Protocol = require('azure-iot-device-mqtt').MqttWs;
var client = Client.fromConnectionString(deviceConnectionString, Protocol);
以下片段展示如何在使用 Azure IoT Python SDK 时指定基于 WebSocket 的 MQTT 协议:
from azure.iot.device.aio import IoTHubDeviceClient
device_client = IoTHubDeviceClient.create_from_connection_string(deviceConnectionString, websockets=True)
默认的 keep-alive 超时
为了确保客户端/IoT 中心连接保持活动状态,服务和客户端会定期向对方发送一个 keep-alive ping。 使用 IoT SDK 的客户端按下表中定义的时间间隔发送 keep-alive:
语言 | 默认的 keep-alive 时间间隔 | 可配置性 |
---|---|---|
Node.js | 180 秒 | 否 |
Java | 230 秒 | 是 |
C | 240 秒 | 是 |
C# | 300 秒* | 是 |
Python | 60 秒 | 是 |
*C# SDK 将 MQTT KeepAliveInSeconds 属性的默认值定义为 300 秒。 实际上,SDK 会在每次设置的 keep-alive 持续时间发送四次 ping 请求。 换句话说,SDK 每 75 秒发送一次 keep-alive ping。
根据 MQTT v3.1.1 规范,IoT 中心的 keep-alive ping 间隔是客户端 keep-alive 值的 1.5 倍;但 IoT 中心将服务器端超时最大值限制为 29.45 分钟(1767 秒)。 存在此限制是因为所有 Azure 服务都绑定到 Azure 负载均衡器 TCP 空闲超时,即 29.45 分钟。
例如,使用 Java SDK 的设备会发送 keep-alive ping,然后失去网络连接。 230 秒后,设备会由于处于脱机状态而错过 keep-alive ping。 但是,IoT 中心不会立即关闭连接 - 它会再等待 (230 * 1.5) - 230 = 115
秒,然后再断开设备的连接,并显示错误 404104 DeviceConnectionClosedRemotely。
可设置的客户端最大 keep-alive 值为 1767 / 1.5 = 1177
秒。 任何流量都会重置 keep-alive。 例如,成功的共享访问签名 (SAS) 令牌刷新会重置 keep-alive。
将设备应用从 AMQP 迁移到 MQTT
如果使用设备 SDK,则从使用 AMQP 切换到 MQTT 需要在客户端初始化中更改协议参数,如前所述。
执行此操作时,请确保检查下列各项:
AMQP 针对许多条件返回错误,而 MQTT 会终止连接。 因此异常处理逻辑可能需要进行一些更改。
Python SDK 不支持 AMQP。
直接使用 MQTT 协议(作为设备)
如果设备不能使用设备 SDK,仍可以使用端口 8883 上的 MQTT 协议连接到公共设备端点。
在 CONNECT 数据包中,设备应使用以下值:
ClientId 字段使用 deviceId。
“用户名”字段使用
{iotHub-hostname}/{device-id}/?api-version=2021-04-12
,其中{iotHub-hostname}
是 IoT 中心的完整CName
。例如,如果 IoT 中心的名称为 contoso.azure-devices.net,设备的名称为 MyDevice01,则完整“用户名”字段应包含:
contoso.azure-devices.net/MyDevice01/?api-version=2021-04-12
建议在字段中包含 api-version。 否则,可能会导致意外行为。
“密码”字段使用 SAS 令牌。 对于 HTTPS 和 AMQP 协议,SAS 令牌的格式是相同的:
SharedAccessSignature sig={signature-string}&se={expiry}&sr={URL-encoded-resourceURI}
注意
如果使用 X.509 证书身份验证,则不需要使用 SAS 令牌密码。 有关详细信息,请参阅教程:创建和上传用于测试的证书并按照 TLS/SSL 配置部分中的代码说明进行操作。
有关如何生成 SAS 令牌的详细信息,请参阅使用共享访问签名控制对 IoT 中心的访问的将 SAS 令牌用作设备部分。
还可以使用跨平台的适用于 Visual Studio Code 的 Azure IoT 中心扩展或 CLI 扩展命令 az iot hub generate-sas-token 快速生成一个 SAS 令牌。 随后,你可以将此 SAS 令牌复制并粘贴到自己的代码中,以便进行测试。
使用适用于 Visual Studio Code 的 Azure IoT 中心扩展
在侧栏中,展开“Azure IoT 中心”部分下的“设备”节点。
右键单击 IoT 设备,然后从上下文菜单中选择“为设备生成 SAS 令牌”。
在输入框中输入 SAS 令牌的过期时间(以小时为单位),然后选择 Enter 键。
将创建 SAS 令牌并将其复制到剪贴板。
所生成的 SAS 令牌具有以下结构:
HostName={iotHub-hostname};DeviceId=javadevice;SharedAccessSignature=SharedAccessSignature sr={iotHub-hostname}%2Fdevices%2FMyDevice01%2Fapi-version%3D2016-11-14&sig=vSgHBMUG.....Ntg%3d&se=1456481802
此令牌中要用作“密码”字段以便使用 MQTT 进行连接的部分是:
SharedAccessSignature sr={iotHub-hostname}%2Fdevices%2FMyDevice01%2Fapi-version%3D2016-11-14&sig=vSgHBMUG.....Ntg%3d&se=1456481802
设备应用可以在 CONNECT 数据包中指定 Will 消息 。 设备应用应该使用 devices/{device-id}/messages/events/
或 devices/{device-id}/messages/events/{property-bag}
作为 Will 主题名称,用于定义要作为遥测消息转发的 Will 消息 。 在此情况下,如果关闭网络连接,但之前未从设备中接收到 DISCONNECT 数据包,IoT 中心将 CONNECT 数据包中提供的 Will 消息发送到遥测通道。 遥测通道可以是默认事件终结点或由 IoT 中心路由定义的自定义终结点。 消息具有 iothub-MessageType 属性,其中包含分配给它的 Will 的值 。
直接使用 MQTT 协议(作为模块)
可使用模块标识通过 MQTT 连接到IoT 中心,类似于作为设备的形式连接到 IoT 中心。 有关通过 MQTT 作为设备连接到IoT 中心的详细信息,请参阅直接使用 MQTT 协议(用作设备)。 但需要使用以下值:
将客户端 ID 设置为
{device-id}/{module-id}
。如果使用用户名和密码进行身份验证,请将用户名设置为
<hubname>.azure-devices.net/{device_id}/{module_id}/?api-version=2021-04-12
,并使用与模块标识关联的 SAS 令牌作为密码。使用
devices/{device-id}/modules/{module-id}/messages/events/
作为主题,用于发布遥测。使用
devices/{device-id}/modules/{module-id}/messages/events/
作为 WILL 主题。使用
devices/{device-id}/modules/{module-id}/#
作为主题,用于接收消息。模块和设备的孪生 GET 和 PATCH 主题是相同的。
模块和设备的孪生状态主题是相同的。
有关将 MQTT 用于模块的详细信息,请参阅 IoT Edge 中心 MQTT 终结点。
使用 MQTT 而无 Azure IoT SDK 的示例
IoT MQTT 示例存储库包含 C/C++、Python 和 CLI 示例,演示了如何在不使用 Azure 设备 SDK 的情况下发送遥测消息、接收云到设备的消息以及使用设备孪生。
C/C++ 示例使用 Eclipse Mosquitto 库,Python 示例使用 Eclipse Paho,CLI 示例使用 mosquitto_pub
。
TLS/SSL 配置
若要直接使用 MQTT 协议,客户端必须通过 TLS/SSL 连接。 尝试跳过此步骤失败并显示连接错误。
若要建立 TLS 连接,可能需要下载并引用 DigiCert Baltimore 根证书。 此证书是 Azure 用来保护连接安全的。 可以在 Azure-iot-sdk-c 存储库中找到此证书。 可以在 Digicert 网站上找到有关这些证书的详细信息。
下面的示例演示如何使用 Eclipse Foundation 提供的 Python 版本的 Paho MQTT 库实现此配置。
首先,从命令行环境安装 Paho 库:
pip install paho-mqtt
然后,在 Python 脚本中实现客户端。 替换以下代码片段中的这些占位符:
<local path to digicert.cer>
是包含 DigiCert Baltimore Root 证书的本地文件的路径。 创建此文件时,可以在用于 C 的 Azure IoT SDK 中复制 certs.c 中的证书信息。包括-----BEGIN CERTIFICATE-----
行和-----END CERTIFICATE-----
行,删除每行开头和结尾的"
标记,并删除每行结尾的\r\n
字符。<device id from device registry>
是添加到 IoT 中心的设备的 ID。<generated SAS token>
是已创建设备的 SAS 令牌,如本文前面所述。<iot hub name>
:IoT 中心的名称。
from paho.mqtt import client as mqtt
import ssl
path_to_root_cert = "<local path to digicert.cer file>"
device_id = "<device id from device registry>"
sas_token = "<generated SAS token>"
iot_hub_name = "<iot hub name>"
def on_connect(client, userdata, flags, rc):
print("Device connected with result code: " + str(rc))
def on_disconnect(client, userdata, rc):
print("Device disconnected with result code: " + str(rc))
def on_publish(client, userdata, mid):
print("Device sent message")
client = mqtt.Client(client_id=device_id, protocol=mqtt.MQTTv311)
client.on_connect = on_connect
client.on_disconnect = on_disconnect
client.on_publish = on_publish
client.username_pw_set(username=iot_hub_name+".azure-devices.net/" +
device_id + "/?api-version=2021-04-12", password=sas_token)
client.tls_set(ca_certs=path_to_root_cert, certfile=None, keyfile=None,
cert_reqs=ssl.CERT_REQUIRED, tls_version=ssl.PROTOCOL_TLSv1_2, ciphers=None)
client.tls_insecure_set(False)
client.connect(iot_hub_name+".azure-devices.net", port=8883)
client.publish("devices/" + device_id + "/messages/events/", '{"id":123}', qos=1)
client.loop_forever()
若要使用设备证书进行身份验证,请使用以下代码片段中指定的更改更新以前的代码片段。 有关如何准备基于证书的身份验证的详细信息,请参阅使用 X.509 CA 证书验证设备的获取 X.509 CA 证书部分。
# Create the client as before
# ...
# Set the username but not the password on your client
client.username_pw_set(username=iot_hub_name+".azure-devices.net/" +
device_id + "/?api-version=2021-04-12", password=None)
# Set the certificate and key paths on your client
cert_file = "<local path to your certificate file>"
key_file = "<local path to your device key file>"
client.tls_set(ca_certs=path_to_root_cert, certfile=cert_file, keyfile=key_file,
cert_reqs=ssl.CERT_REQUIRED, tls_version=ssl.PROTOCOL_TLSv1_2, ciphers=None)
# Connect as before
client.connect(iot_hub_name+".azure-devices.net", port=8883)
发送“设备到云”消息
设备连接后,设备可以使用 devices/{device-id}/messages/events/
或 devices/{device-id}/messages/events/{property-bag}
作为“主题名称”将消息发送到 IoT 中心。 {property-bag}
元素可让设备使用 URL 编码格式发送包含其他属性的消息。 例如:
RFC 2396-encoded(<PropertyName1>)=RFC 2396-encoded(<PropertyValue1>)&RFC 2396-encoded(<PropertyName2>)=RFC 2396-encoded(<PropertyValue2>)…
注意
此 {property_bag}
元素使用与 HTTPS 协议中的查询字符串相同的编码。
注意
如果要将 D2C 消息路由到 Azure 存储帐户,并且想利用 JSON 编码,则必须指定内容类型和内容编码信息,包含 $.ct=application%2Fjson&$.ce=utf-8
作为上述 {property_bag}
的一部分。
这些属性的格式取决于协议。 IoT 中心将这些属性转换为其相应的系统属性。 有关详细信息,请参阅 IoT 中心消息路由查询语法的系统属性部分。
下面的列表描述了 IoT 中心特定于实现的行为:
IoT 中心不支持 QoS 2 消息。 如果设备应用使用 QoS 2 发布消息,IoT 中心将断开网络连接。
IoT 中心不保存 Retain 消息。 如果设备在 RETAIN 标志设置为 1 的情况下发送消息,则 IoT 中心会在消息中添加 mqtt-retain 应用程序属性 。 在此情况下,IoT 中心不会存储保留消息,而将其传递到后端应用。
IoT 中心仅支持每个设备一个活动 MQTT 连接。 代表相同设备 ID 的任何新 MQTT 连接都会导致 IoT 中心删除现有连接,系统会将“400027 ConnectionForcefullyClosedOnNewConnection”记录到 IoT 中心日志中
若要基于消息正文路由消息,必须首先将属性“contentType”(
ct
) 添加到 MQTT 主题的末尾,并将其值设置为application/json;charset=utf-8
,如下例所示。 若要详细了解基于消息属性或消息正文的路由消息,请参阅 IoT 中心消息路由查询语法文档。devices/{device-id}/messages/events/$.ct=application%2Fjson%3Bcharset%3Dutf-8
有关详细信息,请参阅使用 IoT 中心发送设备到云和云到设备的消息。
接收“云到设备”消息
若要从 IoT 中心接收消息,设备应使用 devices/{device-id}/messages/devicebound/#
作为主题筛选器来进行订阅。 主题筛选器中的多级通配符 #
仅用于允许设备接收主题名称中的其他属性。 IoT 中心不允许使用 #
或 ?
通配符筛选子主题。 由于 IoT Hub 不是通用的发布-订阅消息传送中转站,它只支持记录的主题名称和主题筛选器。
在设备成功订阅由 devices/{device-id}/messages/devicebound/#
主题筛选器表示的设备特定终结点前,不会从 IoT 中心收到任何消息。 建立订阅后,设备会接收建立订阅后发送给它的云到设备消息。 如果设备在 CleanSession 标志设置为 0 的情况下进行连接,则订阅在经历不同的会话后仍然持久存在。 在此情况下,下次使用 CleanSession 0 进行连接时,设备会收到断开连接时发送给它的未处理消息。 但是,如果设备使用设置为 1 的 CleanSession 标志,在订阅其设备终结点前,它不会从 IoT 中心收到任何消息。
如有消息属性,IoT 中心会传送包含主题名称 devices/{device-id}/messages/devicebound/
或 devices/{device-id}/messages/devicebound/{property-bag}
的消息。 {property-bag}
包含 URL 编码的消息属性键/值对。 属性包中只包含应用程序属性和用户可设置的系统属性(例如 messageId 或 correlationId)。 系统属性名称具有前缀 $ ,但应用程序属性使用没有前缀的原始属性名称。 有关属性包格式的其他详细信息,请参阅发送设备到云的消息。
在云到设备消息中,属性包中的值的表示形式如下表所示:
属性值 | 表示形式 | 说明 |
---|---|---|
null |
key |
属性包中只显示密钥 |
空字符串 | key= |
密钥后跟一个等号,无值 |
不可为 null、非空值 | key=value |
密钥后跟一个等号和值 |
以下示例显示包含三个应用程序属性的属性包:值为 null
的 prop1、为空字符串 ("") 的 prop2 和值为“string”的 prop3 。
/?prop1&prop2=&prop3=a%20string
当设备应用使用 QoS 2 订阅主题时,IoT 中心会在 SUBACK 包中授予最高 QoS 级别 1。 之后,IoT 中心会使用 QoS 1 将消息传送到设备。
检索设备克隆的属性
首先,设备订阅 $iothub/twin/res/#
,接收操作的响应。 然后,它向主题 $iothub/twin/GET/?$rid={request id}
发送一条空消息,其中包含请求 ID 的填充值。 服务随后会发送一条响应消息,其中包含关于主题 $iothub/twin/res/{status}/?$rid={request-id}
的设备孪生数据,并且使用与请求相同的请求 ID。
请求 ID 可以是消息属性值的任何有效值,且需要验证确保状态是整数。 有关详细信息,请参阅使用 IoT 中心发送设备到云和云到设备的消息。
响应正文包含设备孪生的 properties 节,如以下响应示例所示:
{
"desired": {
"telemetrySendFrequency": "5m",
"$version": 12
},
"reported": {
"telemetrySendFrequency": "5m",
"batteryLevel": 55,
"$version": 123
}
}
可能的状态代码为:
状态 | 说明 |
---|---|
200 | Success |
429 | 请求过多(受限制)。 有关详细信息,请参阅 IoT 中心限制 |
5** | 服务器错误 |
有关详细信息,请参阅了解并在 IoT 中心内使用设备孪生。
更新设备孪生的报告属性
若要更新已报告的属性,设备通过指定的 MQTT 主题上的发布向 IoT 中心发出请求。 处理完请求后,IoT 中心会通过发布到其他主题来响应更新操作的成功或失败状态。 设备可以订阅此主题,以便通知它有关其孪生更新请求的结果。 为了在 MQTT 中实现此类型的请求/响应交互,我们使用设备最初在其更新请求中提供的请求 ID ($rid
) 的概念。 此请求 ID 也包含在 IoT 中心的响应中,以允许设备将响应与其特定的早期请求相关联。
以下序列描述了设备如何在 IoT 中心中更新设备孪生中报告的属性:
设备必须首先订阅
$iothub/twin/res/#
主题才能从 IoT 中心接收操作的响应。设备将包含设备孪生更新的消息发送到
$iothub/twin/PATCH/properties/reported/?$rid={request-id}
主题。 此消息包含请求 ID 值。然后,服务发送一个响应消息,其中包含
$iothub/twin/res/{status}/?$rid={request-id}
主题上报告的属性集合的新 ETag 值。 此响应消息使用与请求相同的请求 ID。
请求消息正文包含 JSON 文档,其中包含已报告属性的新值。 JSON 文档中的每个成员都会在设备克隆文档中更新或添加相应成员。 如果某个成员设置为 null
,则会从包含方对象中删除该成员。 例如:
{
"telemetrySendFrequency": "35m",
"batteryLevel": 60
}
可能的状态代码为:
状态 | 说明 |
---|---|
204 | 成功(不返回任何内容) |
400 | 错误的请求。 格式不正确的 JSON |
429 | 请求过多(受限),如 IoT 中心限制中所述 |
5** | 服务器错误 |
下面的 Python 代码片段演示了通过 MQTT(使用 Paho MQTT 客户端)进行的孪生体报告属性更新过程:
from paho.mqtt import client as mqtt
# authenticate the client with IoT Hub (not shown here)
client.subscribe("$iothub/twin/res/#")
rid = "1"
twin_reported_property_patch = "{\"firmware_version\": \"v1.1\"}"
client.publish("$iothub/twin/PATCH/properties/reported/?$rid=" +
rid, twin_reported_property_patch, qos=0)
上述孪生体报告前面代码片段中的属性更新过程之后,来自 IoT 中心的发布消息将具有以下主题:$iothub/twin/res/204/?$rid=1&$version=6
,其中 204
是表示成功的状态代码,$rid=1
对应于代码中设备提供的请求 ID,$version
对应于更新后设备孪生已报告属性部分的版本。
有关详细信息,请参阅了解并在 IoT 中心内使用设备孪生。
接收所需属性更新通知
设备连接时,IoT 中心会向主题 $iothub/twin/PATCH/properties/desired/?$version={new-version}
发送通知,内附解决方案后端执行的更新内容。 例如:
{
"telemetrySendFrequency": "5m",
"route": null,
"$version": 8
}
对于属性更新,null
值表示正在删除 JSON 对象成员。 另外,$version
指示孪生的所需属性部分的新版本。
重要
IoT 中心仅在连接设备时才会生成更改通知。 请确保实现设备重新连接流,让 IoT 中心和设备应用之间的所需属性保持同步。
有关详细信息,请参阅了解并在 IoT 中心内使用设备孪生。
响应直接方法
首先,设备需要订阅 $iothub/methods/POST/#
。 IoT 中心向主题 $iothub/methods/POST/{method-name}/?$rid={request-id}
发送方法请求,其中包含有效的 JSON 或空正文。
进行响应时,设备向主题 $iothub/methods/res/{status}/?$rid={request-id}
发送带有有效 JSON 或空正文的消息。 在此消息中,request ID 必须与请求消息中的相符,status 必须为整数。
有关详细信息,请参阅了解直接方法并从 IoT 中心进行调用。
后续步骤
若要了解有关 MQTT 协议的详细信息,请参阅 MQTT 文档。
若要详细了解如何使用 IoT 设备 SDK,请参阅:
若要深入了解如何规划 IoT 中心部署,请参阅: