使用 MQTT 协议与 IoT 中心通信

IoT 中心允许设备通过以下方式与 IoT 中心设备终结点通信:

  • 在 TCP 端口 8883 上使用 MQTT v3.1.1
  • 在 TCP 端口 443 上使用基于 WebSocket 的 MQTT v3.1.1。

IoT 中心不是功能完备的 MQTT 中转站,不支持 MQTT v3.1.1 标准中指定的所有行为。 本文介绍设备如何使用受支持的 MQTT 行为来与 IoT 中心通信。

注意

本文中提到的某些功能(例如云到设备消息传递、设备孪生、设备管理)仅在 IoT 中心的标准层中提供。 有关 IoT 中心基本层和标准/免费层的详细信息,请参阅选择适合你的解决方案的 IoT 中心层

所有通过 IoT 中心进行的设备通信都必须使用 TLS/SSL 来保护。 因此,IoT 中心不支持通过 TCP 端口 1883 进行的不安全的连接。

连接到 IoT 中心

设备可以通过以下选项之一使用 MQTT 协议连接到 IoT 中心:

许多企业和教育网络环境中都会 MQTT 端口(TCP 端口8883)。 如果无法在防火墙中打开端口 8883,建议使用基于 WebSocket 的 MQTT。 基于 WebSocket 的 MQTT 通过端口 443 进行通信,该端口在网络环境中几乎始终是打开的。 若要了解如何在使用 Azure IoT SDK 时指定 MQTT 和基于 WebSocket 的 MQTT 协议,请参阅使用设备 SDK

使用设备 SDK

支持 MQTT 协议的设备 SDK 可用于 Java、Node.js、C、C# 和 Python。 设备 SDK 使用选定身份验证机制来连接到 IoT 中心。 要使用 MQTT 协议,必须将客户端协议参数设置为 MQTT。 还可以在客户端协议参数中指定基于 WebSocket 的 MQTT。 默认情况下,设备 SDK 在 CleanSession 标志设置为 0 的情况下连接到 IoT 中心,并使用 QoS 1 来与 IoT 中心交换消息。 虽然可以配置 QoS 0 以加快消息交换速度,但你应该注意,这种传递不能得到保证或确认。 出于此原因,QoS 0 通常称为“用后即焚”。

当设备连接到 IoT 中心时,设备 SDK 会提供方法,让设备与 IoT 中心交换消息。

下表包含了每种受支持语言的代码示例链接,并指定了通过 MQTT 或基于 WebSocket 的 MQTT 协议建立到 IoT 中心的连接时要使用的参数。

语言 MQTT 协议参数 基于 WebSocket 的 MQTT 协议参数
Node.js azure-iot-device-mqtt.Mqtt azure-iot-device-mqtt.MqttWs
Java IotHubClientProtocol.MQTT IotHubClientProtocol.MQTT_WS
C MQTT_Protocol MQTT_WebSocket_Protocol
C# TransportType.Mqtt 如果 MQTT 失败,TransportType.Mqtt 将退回到基于 WebSocket 的 MQTT。 若仅指定基于 WebSocket 的 MQTT,请使用 TransportType.Mqtt_WebSocket_Only
Python 默认支持 MQTT 在调用中添加 websockets=True 来创建客户端

以下片段展示如何在使用 Azure IoT Node.js SDK 时指定基于 WebSocket 的 MQTT 协议:

var Client = require('azure-iot-device').Client;
var Protocol = require('azure-iot-device-mqtt').MqttWs;
var client = Client.fromConnectionString(deviceConnectionString, Protocol);

以下片段展示如何在使用 Azure IoT Python SDK 时指定基于 WebSocket 的 MQTT 协议:

from azure.iot.device.aio import IoTHubDeviceClient
device_client = IoTHubDeviceClient.create_from_connection_string(deviceConnectionString, websockets=True)

默认的 keep-alive 超时

为了确保客户端/IoT 中心连接保持活动状态,服务和客户端会定期向对方发送一个 keep-alive ping。 使用 IoT SDK 的客户端按下表中定义的时间间隔发送 keep-alive:

语言 默认的 keep-alive 时间间隔 可配置性
Node.js 180 秒
Java 230 秒
C 240 秒
C# 300 秒*
Python 60 秒

*C# SDK 将 MQTT KeepAliveInSeconds 属性的默认值定义为 300 秒。 实际上,SDK 会在每次设置的 keep-alive 持续时间发送四次 ping 请求。 换句话说,SDK 每 75 秒发送一次 keep-alive ping。

根据 MQTT v3.1.1 规范,IoT 中心的 keep-alive ping 间隔是客户端 keep-alive 值的 1.5 倍;但 IoT 中心将服务器端超时最大值限制为 29.45 分钟(1767 秒)。 存在此限制是因为所有 Azure 服务都绑定到 Azure 负载均衡器 TCP 空闲超时,即 29.45 分钟。

例如,使用 Java SDK 的设备会发送 keep-alive ping,然后失去网络连接。 230 秒后,设备会由于处于脱机状态而错过 keep-alive ping。 但是,IoT 中心不会立即关闭连接 - 它会再等待 (230 * 1.5) - 230 = 115 秒,然后再断开设备的连接,并显示错误 404104 DeviceConnectionClosedRemotely

可设置的客户端最大 keep-alive 值为 1767 / 1.5 = 1177 秒。 任何流量都会重置 keep-alive。 例如,成功的共享访问签名 (SAS) 令牌刷新会重置 keep-alive。

将设备应用从 AMQP 迁移到 MQTT

如果使用设备 SDK,则从使用 AMQP 切换到 MQTT 需要在客户端初始化中更改协议参数,如前所述。

执行此操作时,请确保检查下列各项:

  • AMQP 针对许多条件返回错误,而 MQTT 会终止连接。 因此异常处理逻辑可能需要进行一些更改。

  • MQTT 在接收云到设备消息时不支持拒绝操作。 如果后端应用需要接收来自设备应用的响应,请考虑使用直接方法

  • Python SDK 不支持 AMQP。

直接使用 MQTT 协议(作为设备)

如果设备不能使用设备 SDK,仍可以使用端口 8883 上的 MQTT 协议连接到公共设备端点。

在 CONNECT 数据包中,设备应使用以下值:

  • ClientId 字段使用 deviceId

  • “用户名”字段使用 {iotHub-hostname}/{device-id}/?api-version=2021-04-12,其中 {iotHub-hostname} 是 IoT 中心的完整 CName

    例如,如果 IoT 中心的名称为 contoso.azure-devices.net,设备的名称为 MyDevice01,则完整“用户名”字段应包含:

    contoso.azure-devices.net/MyDevice01/?api-version=2021-04-12

    建议在字段中包含 api-version。 否则,可能会导致意外行为。

  • 密码”字段使用 SAS 令牌。 对于 HTTPS 和 AMQP 协议,SAS 令牌的格式是相同的:

    SharedAccessSignature sig={signature-string}&se={expiry}&sr={URL-encoded-resourceURI}

    注意

    如果使用 X.509 证书身份验证,则不需要使用 SAS 令牌密码。 有关详细信息,请参阅教程:创建和上传用于测试的证书并按照 TLS/SSL 配置部分中的代码说明进行操作。

    有关如何生成 SAS 令牌的详细信息,请参阅使用共享访问签名控制对 IoT 中心的访问将 SAS 令牌用作设备部分。

    还可以使用跨平台的适用于 Visual Studio Code 的 Azure IoT 中心扩展或 CLI 扩展命令 az iot hub generate-sas-token 快速生成一个 SAS 令牌。 随后,你可以将此 SAS 令牌复制并粘贴到自己的代码中,以便进行测试。

使用适用于 Visual Studio Code 的 Azure IoT 中心扩展

  1. 在侧栏中,展开“Azure IoT 中心”部分下的“设备”节点。

  2. 右键单击 IoT 设备,然后从上下文菜单中选择“为设备生成 SAS 令牌”。

  3. 在输入框中输入 SAS 令牌的过期时间(以小时为单位),然后选择 Enter 键。

  4. 将创建 SAS 令牌并将其复制到剪贴板。

    所生成的 SAS 令牌具有以下结构:

    HostName={iotHub-hostname};DeviceId=javadevice;SharedAccessSignature=SharedAccessSignature sr={iotHub-hostname}%2Fdevices%2FMyDevice01%2Fapi-version%3D2016-11-14&sig=vSgHBMUG.....Ntg%3d&se=1456481802

    此令牌中要用作“密码”字段以便使用 MQTT 进行连接的部分是:

    SharedAccessSignature sr={iotHub-hostname}%2Fdevices%2FMyDevice01%2Fapi-version%3D2016-11-14&sig=vSgHBMUG.....Ntg%3d&se=1456481802

设备应用可以在 CONNECT 数据包中指定 Will 消息 。 设备应用应该使用 devices/{device-id}/messages/events/devices/{device-id}/messages/events/{property-bag} 作为 Will 主题名称,用于定义要作为遥测消息转发的 Will 消息 。 在此情况下,如果关闭网络连接,但之前未从设备中接收到 DISCONNECT 数据包,IoT 中心将 CONNECT 数据包中提供的 Will 消息发送到遥测通道。 遥测通道可以是默认事件终结点或由 IoT 中心路由定义的自定义终结点。 消息具有 iothub-MessageType 属性,其中包含分配给它的 Will 的值 。

直接使用 MQTT 协议(作为模块)

可使用模块标识通过 MQTT 连接到IoT 中心,类似于作为设备的形式连接到 IoT 中心。 有关通过 MQTT 作为设备连接到IoT 中心的详细信息,请参阅直接使用 MQTT 协议(用作设备)。 但需要使用以下值:

  • 将客户端 ID 设置为 {device-id}/{module-id}

  • 如果使用用户名和密码进行身份验证,请将用户名设置为 <hubname>.azure-devices.net/{device_id}/{module_id}/?api-version=2021-04-12,并使用与模块标识关联的 SAS 令牌作为密码。

  • 使用 devices/{device-id}/modules/{module-id}/messages/events/ 作为主题,用于发布遥测。

  • 使用 devices/{device-id}/modules/{module-id}/messages/events/ 作为 WILL 主题。

  • 使用 devices/{device-id}/modules/{module-id}/# 作为主题,用于接收消息。

  • 模块和设备的孪生 GET 和 PATCH 主题是相同的。

  • 模块和设备的孪生状态主题是相同的。

有关将 MQTT 用于模块的详细信息,请参阅 IoT Edge 中心 MQTT 终结点

使用 MQTT 而无 Azure IoT SDK 的示例

IoT MQTT 示例存储库包含 C/C++、Python 和 CLI 示例,演示了如何在不使用 Azure 设备 SDK 的情况下发送遥测消息、接收云到设备的消息以及使用设备孪生。

C/C++ 示例使用 Eclipse Mosquitto 库,Python 示例使用 Eclipse Paho,CLI 示例使用 mosquitto_pub

TLS/SSL 配置

若要直接使用 MQTT 协议,客户端必须通过 TLS/SSL 连接。 尝试跳过此步骤失败并显示连接错误。

若要建立 TLS 连接,可能需要下载并引用 DigiCert Baltimore 根证书。 此证书是 Azure 用来保护连接安全的。 可以在 Azure-iot-sdk-c 存储库中找到此证书。 可以在 Digicert 网站上找到有关这些证书的详细信息。

下面的示例演示如何使用 Eclipse Foundation 提供的 Python 版本的 Paho MQTT 库实现此配置。

首先,从命令行环境安装 Paho 库:

pip install paho-mqtt

然后,在 Python 脚本中实现客户端。 替换以下代码片段中的这些占位符:

  • <local path to digicert.cer> 是包含 DigiCert Baltimore Root 证书的本地文件的路径。 创建此文件时,可以在用于 C 的 Azure IoT SDK 中复制 certs.c 中的证书信息。包括 -----BEGIN CERTIFICATE----- 行和 -----END CERTIFICATE----- 行,删除每行开头和结尾的 " 标记,并删除每行结尾的 \r\n 字符。

  • <device id from device registry> 是添加到 IoT 中心的设备的 ID。

  • <generated SAS token> 是已创建设备的 SAS 令牌,如本文前面所述。

  • <iot hub name>:IoT 中心的名称。

from paho.mqtt import client as mqtt
import ssl

path_to_root_cert = "<local path to digicert.cer file>"
device_id = "<device id from device registry>"
sas_token = "<generated SAS token>"
iot_hub_name = "<iot hub name>"


def on_connect(client, userdata, flags, rc):
    print("Device connected with result code: " + str(rc))


def on_disconnect(client, userdata, rc):
    print("Device disconnected with result code: " + str(rc))


def on_publish(client, userdata, mid):
    print("Device sent message")


client = mqtt.Client(client_id=device_id, protocol=mqtt.MQTTv311)

client.on_connect = on_connect
client.on_disconnect = on_disconnect
client.on_publish = on_publish

client.username_pw_set(username=iot_hub_name+".azure-devices.net/" +
                       device_id + "/?api-version=2021-04-12", password=sas_token)

client.tls_set(ca_certs=path_to_root_cert, certfile=None, keyfile=None,
               cert_reqs=ssl.CERT_REQUIRED, tls_version=ssl.PROTOCOL_TLSv1_2, ciphers=None)
client.tls_insecure_set(False)

client.connect(iot_hub_name+".azure-devices.net", port=8883)

client.publish("devices/" + device_id + "/messages/events/", '{"id":123}', qos=1)
client.loop_forever()

若要使用设备证书进行身份验证,请使用以下代码片段中指定的更改更新以前的代码片段。 有关如何准备基于证书的身份验证的详细信息,请参阅使用 X.509 CA 证书验证设备获取 X.509 CA 证书部分。

# Create the client as before
# ...

# Set the username but not the password on your client
client.username_pw_set(username=iot_hub_name+".azure-devices.net/" +
                       device_id + "/?api-version=2021-04-12", password=None)

# Set the certificate and key paths on your client
cert_file = "<local path to your certificate file>"
key_file = "<local path to your device key file>"
client.tls_set(ca_certs=path_to_root_cert, certfile=cert_file, keyfile=key_file,
               cert_reqs=ssl.CERT_REQUIRED, tls_version=ssl.PROTOCOL_TLSv1_2, ciphers=None)

# Connect as before
client.connect(iot_hub_name+".azure-devices.net", port=8883)

发送“设备到云”消息

设备连接后,设备可以使用 devices/{device-id}/messages/events/devices/{device-id}/messages/events/{property-bag} 作为“主题名称”将消息发送到 IoT 中心。 {property-bag} 元素可让设备使用 URL 编码格式发送包含其他属性的消息。 例如:

RFC 2396-encoded(<PropertyName1>)=RFC 2396-encoded(<PropertyValue1>)&RFC 2396-encoded(<PropertyName2>)=RFC 2396-encoded(<PropertyValue2>)…

注意

{property_bag} 元素使用与 HTTPS 协议中的查询字符串相同的编码。

注意

如果要将 D2C 消息路由到 Azure 存储帐户,并且想利用 JSON 编码,则必须指定内容类型和内容编码信息,包含 $.ct=application%2Fjson&$.ce=utf-8 作为上述 {property_bag} 的一部分。

这些属性的格式取决于协议。 IoT 中心将这些属性转换为其相应的系统属性。 有关详细信息,请参阅 IoT 中心消息路由查询语法系统属性部分。

下面的列表描述了 IoT 中心特定于实现的行为:

  • IoT 中心不支持 QoS 2 消息。 如果设备应用使用 QoS 2 发布消息,IoT 中心将断开网络连接。

  • IoT 中心不保存 Retain 消息。 如果设备在 RETAIN 标志设置为 1 的情况下发送消息,则 IoT 中心会在消息中添加 mqtt-retain 应用程序属性 。 在此情况下,IoT 中心不会存储保留消息,而将其传递到后端应用。

  • IoT 中心仅支持每个设备一个活动 MQTT 连接。 代表相同设备 ID 的任何新 MQTT 连接都会导致 IoT 中心删除现有连接,系统会将“400027 ConnectionForcefullyClosedOnNewConnection”记录到 IoT 中心日志中

  • 若要基于消息正文路由消息,必须首先将属性“contentType”(ct) 添加到 MQTT 主题的末尾,并将其值设置为 application/json;charset=utf-8,如下例所示。 若要详细了解基于消息属性或消息正文的路由消息,请参阅 IoT 中心消息路由查询语法文档

    devices/{device-id}/messages/events/$.ct=application%2Fjson%3Bcharset%3Dutf-8

有关详细信息,请参阅使用 IoT 中心发送设备到云和云到设备的消息

接收“云到设备”消息

若要从 IoT 中心接收消息,设备应使用 devices/{device-id}/messages/devicebound/# 作为主题筛选器来进行订阅。 主题筛选器中的多级通配符 # 仅用于允许设备接收主题名称中的其他属性。 IoT 中心不允许使用 #? 通配符筛选子主题。 由于 IoT Hub 不是通用的发布-订阅消息传送中转站,它只支持记录的主题名称和主题筛选器。

在设备成功订阅由 devices/{device-id}/messages/devicebound/# 主题筛选器表示的设备特定终结点前,不会从 IoT 中心收到任何消息。 建立订阅后,设备会接收建立订阅后发送给它的云到设备消息。 如果设备在 CleanSession 标志设置为 0 的情况下进行连接,则订阅在经历不同的会话后仍然持久存在。 在此情况下,下次使用 CleanSession 0 进行连接时,设备会收到断开连接时发送给它的未处理消息。 但是,如果设备使用设置为 1 的 CleanSession 标志,在订阅其设备终结点前,它不会从 IoT 中心收到任何消息。

如有消息属性,IoT 中心会传送包含主题名称devices/{device-id}/messages/devicebound/devices/{device-id}/messages/devicebound/{property-bag} 的消息。 {property-bag} 包含 URL 编码的消息属性键/值对。 属性包中只包含应用程序属性和用户可设置的系统属性(例如 messageIdcorrelationId)。 系统属性名称具有前缀 $ ,但应用程序属性使用没有前缀的原始属性名称。 有关属性包格式的其他详细信息,请参阅发送设备到云的消息

在云到设备消息中,属性包中的值的表示形式如下表所示:

属性值 表示形式 说明
null key 属性包中只显示密钥
空字符串 key= 密钥后跟一个等号,无值
不可为 null、非空值 key=value 密钥后跟一个等号和值

以下示例显示包含三个应用程序属性的属性包:值为 null 的 prop1、为空字符串 ("") 的 prop2 和值为“string”的 prop3 。

/?prop1&prop2=&prop3=a%20string

当设备应用使用 QoS 2 订阅主题时,IoT 中心会在 SUBACK 包中授予最高 QoS 级别 1。 之后,IoT 中心会使用 QoS 1 将消息传送到设备。

检索设备克隆的属性

首先,设备订阅 $iothub/twin/res/#,接收操作的响应。 然后,它向主题 $iothub/twin/GET/?$rid={request id} 发送一条空消息,其中包含请求 ID 的填充值。 服务随后会发送一条响应消息,其中包含关于主题 $iothub/twin/res/{status}/?$rid={request-id} 的设备孪生数据,并且使用与请求相同的请求 ID

请求 ID 可以是消息属性值的任何有效值,且需要验证确保状态是整数。 有关详细信息,请参阅使用 IoT 中心发送设备到云和云到设备的消息

响应正文包含设备孪生的 properties 节,如以下响应示例所示:

{
    "desired": {
        "telemetrySendFrequency": "5m",
        "$version": 12
    },
    "reported": {
        "telemetrySendFrequency": "5m",
        "batteryLevel": 55,
        "$version": 123
    }
}

可能的状态代码为:

状态 说明
200 Success
429 请求过多(受限制)。 有关详细信息,请参阅 IoT 中心限制
5** 服务器错误

有关详细信息,请参阅了解并在 IoT 中心内使用设备孪生

更新设备孪生的报告属性

若要更新已报告的属性,设备通过指定的 MQTT 主题上的发布向 IoT 中心发出请求。 处理完请求后,IoT 中心会通过发布到其他主题来响应更新操作的成功或失败状态。 设备可以订阅此主题,以便通知它有关其孪生更新请求的结果。 为了在 MQTT 中实现此类型的请求/响应交互,我们使用设备最初在其更新请求中提供的请求 ID ($rid) 的概念。 此请求 ID 也包含在 IoT 中心的响应中,以允许设备将响应与其特定的早期请求相关联。

以下序列描述了设备如何在 IoT 中心中更新设备孪生中报告的属性:

  1. 设备必须首先订阅 $iothub/twin/res/# 主题才能从 IoT 中心接收操作的响应。

  2. 设备将包含设备孪生更新的消息发送到 $iothub/twin/PATCH/properties/reported/?$rid={request-id} 主题。 此消息包含请求 ID 值。

  3. 然后,服务发送一个响应消息,其中包含 $iothub/twin/res/{status}/?$rid={request-id} 主题上报告的属性集合的新 ETag 值。 此响应消息使用与请求相同的请求 ID

请求消息正文包含 JSON 文档,其中包含已报告属性的新值。 JSON 文档中的每个成员都会在设备克隆文档中更新或添加相应成员。 如果某个成员设置为 null,则会从包含方对象中删除该成员。 例如:

{
    "telemetrySendFrequency": "35m",
    "batteryLevel": 60
}

可能的状态代码为:

状态 说明
204 成功(不返回任何内容)
400 错误的请求。 格式不正确的 JSON
429 请求过多(受限),如 IoT 中心限制中所述
5** 服务器错误

下面的 Python 代码片段演示了通过 MQTT(使用 Paho MQTT 客户端)进行的孪生体报告属性更新过程:

from paho.mqtt import client as mqtt

# authenticate the client with IoT Hub (not shown here)

client.subscribe("$iothub/twin/res/#")
rid = "1"
twin_reported_property_patch = "{\"firmware_version\": \"v1.1\"}"
client.publish("$iothub/twin/PATCH/properties/reported/?$rid=" +
               rid, twin_reported_property_patch, qos=0)

上述孪生体报告前面代码片段中的属性更新过程之后,来自 IoT 中心的发布消息将具有以下主题:$iothub/twin/res/204/?$rid=1&$version=6,其中 204 是表示成功的状态代码,$rid=1 对应于代码中设备提供的请求 ID,$version 对应于更新后设备孪生已报告属性部分的版本。

有关详细信息,请参阅了解并在 IoT 中心内使用设备孪生

接收所需属性更新通知

设备连接时,IoT 中心会向主题 $iothub/twin/PATCH/properties/desired/?$version={new-version} 发送通知,内附解决方案后端执行的更新内容。 例如:

{
    "telemetrySendFrequency": "5m",
    "route": null,
    "$version": 8
}

对于属性更新,null 值表示正在删除 JSON 对象成员。 另外,$version 指示孪生的所需属性部分的新版本。

重要

IoT 中心仅在连接设备时才会生成更改通知。 请确保实现设备重新连接流,让 IoT 中心和设备应用之间的所需属性保持同步。

有关详细信息,请参阅了解并在 IoT 中心内使用设备孪生

响应直接方法

首先,设备需要订阅 $iothub/methods/POST/#。 IoT 中心向主题 $iothub/methods/POST/{method-name}/?$rid={request-id} 发送方法请求,其中包含有效的 JSON 或空正文。

进行响应时,设备向主题 $iothub/methods/res/{status}/?$rid={request-id} 发送带有有效 JSON 或空正文的消息。 在此消息中,request ID 必须与请求消息中的相符,status 必须为整数。

有关详细信息,请参阅了解直接方法并从 IoT 中心进行调用

后续步骤

若要了解有关 MQTT 协议的详细信息,请参阅 MQTT 文档

若要详细了解如何使用 IoT 设备 SDK,请参阅:

若要深入了解如何规划 IoT 中心部署,请参阅: