收发云到设备消息

Azure IoT 中心是一项完全托管的服务,可实现双向通信,包括从解决方案后端到数百万台设备的云到设备 (C2D) 消息。

本文介绍如何使用 Azure IoT SDK 生成以下类型的应用程序:

  • 接收和处理来自 IoT 中心消息队列的云到设备消息的设备应用程序。

  • 通过 IoT 中心消息队列将云到设备消息发送到单个设备的后端应用程序。

本文旨在补充本文中引用的可运行 SDK 示例。

注意

本文所述的功能只能用于 IoT 中心的标准层。 有关 IoT 中心基本层和标准/免费层的详细信息,请参阅选择适合你的解决方案的 IoT 中心层

概述

要使设备应用程序接收云到设备消息,它必须连接到 IoT 中心,然后设置消息处理程序来处理传入消息。 Azure IoT 中心设备 SDK 提供类和方法,设备可以使用这些类和方法接收和处理来自服务的消息。 本文讨论接收消息的任何设备应用程序的关键元素,包括:

  • 声明设备客户端对象
  • 连接到 IoT 中心
  • 从 IoT 中心消息队列检索消息
  • 处理消息并将确认发送回 IoT 中心
  • 配置接收消息重试策略

若要使后端应用程序发送云到设备消息,它必须连接到 IoT 中心并通过 IoT 中心消息队列发送消息。 Azure IoT 中心服务 SDK 提供类和方法,应用程序可使用这些类和方法将消息发送到设备。 本文讨论向设备发送消息的任何应用程序的关键元素,包括:

  • 声明服务客户端对象
  • 连接到 IoT 中心
  • 生成并发送消息
  • 接收送达反馈
  • 配置发送消息重试策略

了解消息队列

若要了解云到设备消息传送,请务必了解有关 IoT 中心设备消息队列工作原理的一些基础知识。

系统通过 IoT 中心路由从解决方案后端应用程序发送到 IoT 设备的云到设备消息。 解决方案后端应用程序和目标设备之间没有直接对等消息传送通信。 IoT 中心将传入消息放入其消息队列中,可供目标 IoT 设备下载。

为了保证至少一次消息传递,IoT 中心将云到设备消息保存在每个设备的队列中。 在 IoT 中心从队列中删除消息之前,设备必须显式确认消息的相关操作已完成。 此方法保证了连接失败和设备故障时能够恢复。

当 IoT 中心将消息放入设备消息队列时,它将消息状态设置为“已排队”。 当设备线程从队列获取消息时,IoT 中心通过将消息状态设置为“不可见”来锁定消息。 此状态可防止设备上的其他线程处理同一消息。 当设备线程成功完成消息处理时,它会通知 IoT 中心,然后 IoT 中心会将消息状态设置为“已完成”。

如果设备应用程序已成功接收和处理消息,则称为已完成消息。 但是,如有必要,设备还可以:

  • 拒绝消息,这会使 IoT 中心将此消息设置为“死信”状态。 通过消息队列遥测传输 (MQTT) 协议进行连接的设备无法拒绝云到设备的消息。
  • 放弃消息,这会使 IoT 中心将消息放回队列,并将消息状态设置为“已排队”。 通过 MQTT 协议连接的设备无法放弃云到设备的消息。

若要获取有关云到设备消息生命周期以及 IoT 中心如何处理云到设备消息的详细信息,请参阅从 IoT 中心发送云到设备的消息

接收云到设备的消息

本部分介绍如何使用适用于 .NET 的 Azure IoT SDK 中的 DeviceClient 类接收云到设备消息。

设备客户端应用程序可以使用两个选项来接收消息:

  • 轮询:设备应用程序使用代码循环(例如,whilefor 循环)检查新的 IoT 中心消息。 循环持续执行,检查消息。
  • 回调:设备应用程序设置一个异步消息处理程序方法,系统在消息到达时立即调用该方法。

声明 DeviceClient 对象

DeviceClient 包括从 IoT 中心接收消息所需的方法和属性。

例如:

static DeviceClient deviceClient;

提供连接参数

使用 CreateFromConnectionString 方法向 DeviceClient 提供 IoT 中心主连接字符串和设备 ID。 除了所需的 IoT 中心主连接字符串外,还可以重载 CreateFromConnectionString 方法以包括以下可选参数:

  • transportType - 传输协议:HTTP 版本 1、AMQP 或 MQTT 的变体。 AMQP 是默认值。 若要查看所有可用值,请参阅 TransportType 枚举
  • transportSettings - 用于定义 DeviceClientModuleClient 的各种传输特定设置的接口。 有关详细信息,请参阅 ITransportSettings 接口
  • ClientOptions - 允许在初始化期间配置设备或模块客户端实例的选项。

此示例调用 CreateFromConnectionString 来定义 DeviceClient 连接 IoT 中心和设备 ID 设置。

static string connectionString = "{your IoT hub connection string}";
static string deviceID = "{your device ID}";
deviceClient = DeviceClient.CreateFromConnectionString(connectionString,deviceID);

轮询

轮询使用 ReceiveAsync 检查消息。

ReceiveAsync 的调用可以采用以下形式:

  • ReceiveAsync() - 等待消息的默认超时期限,然后再继续。
  • ReceiveAsync (Timespan) - 使用特定超时从设备队列接收消息。
  • ReceiveAsync (CancellationToken) - 使用取消令牌从设备队列接收消息。 使用取消令牌时,不使用默认超时期限。

当使用传输类型 HTTP 1(而不是 MQTT 或 AMQP)时,ReceiveAsync 方法会立即返回。 对于使用 HTTP 1 的云到设备消息,其支持模式是间歇连接到设备,且不常检查消息(至少每 25 分钟检查一次)。 发出更多 HTTP 1 接收会导致 IoT 中心限制请求。 有关 MQTT、AMQP 和 HTTP 1 支持之间的差异的详细信息,请参阅云到设备通信指南选择通信协议

CompleteAsync 方法

设备收到消息后,设备应用程序调用 CompleteAsync 方法以通知 IoT 中心该消息已成功处理,并且可以安全地从 IoT 中心设备队列中删除该消息。 无论设备使用何种传输协议,其都应在处理成功完成后调用此方法。

放弃、拒绝消息或消息超时

使用 AMQP 和 HTTP 版本 1 协议(但不是 MQTT 协议),设备还可以:

  • 通过调用 AbandonAsync 放弃消息。 这会使 IoT 中心将消息保留在设备队列中供将来使用。
  • 通过调用 RejectAsync 拒绝消息。 这会永久删除设备队列中的消息。

如果发生阻止设备完成、放弃或拒绝消息的情况,IoT 中心会在固定的超时期限过后再次对消息排队以进行传递。 因此,设备应用中的消息处理逻辑必须是幂等的,这样,多次接收相同的消息才会生成相同的结果。

若要获取有关云到设备消息生命周期以及 IoT 中心如何处理云到设备消息的详细信息,请参阅从 IoT 中心发送云到设备的消息

轮询循环

使用轮询,应用程序使用代码循环反复调用 ReceiveAsync 方法来检查新消息,直到停止为止。

如果使用带有超时值或默认超时的 ReceiveAsync,则在循环中对 ReceiveAsync 的每次调用都会等待指定的超时期限。 如果 ReceiveAsync 超时,则返回 null 值并继续循环。

收到消息时,ReceiveAsync 将返回一个 Task 对象,该对象应传递给 CompleteAsync。 调用 CompleteAsync 会根据 Task 参数通知 IoT 中心从消息队列中删除指定的消息。

在此示例中,循环会在收到消息或轮询循环停止之前调用 ReceiveAsync

static bool stopPolling = false;

while (!stopPolling)
{
   // Check for a message. Wait for the default DeviceClient timeout period.
   using Message receivedMessage = await _deviceClient.ReceiveAsync();

   // Continue if no message was received
   if (receivedMessage == null)
   {
      continue;
   }
   else  // A message was received
   {
      // Print the message received
      Console.WriteLine($"{DateTime.Now}> Polling using ReceiveAsync() - received message with Id={receivedMessage.MessageId}");
      PrintMessage(receivedMessage);

      // Notify IoT Hub that the message was received. IoT Hub will delete the message from the message queue.
      await _deviceClient.CompleteAsync(receivedMessage);
      Console.WriteLine($"{DateTime.Now}> Completed C2D message with Id={receivedMessage.MessageId}.");
   }

   // Check to see if polling loop should end
   stopPolling = ShouldPollingstop ();
}

回调

若要在设备应用程序中接收回调云到设备消息,应用程序必须连接到 IoT 中心并设置回调侦听器来处理传入消息。 系统从 IoT 中心消息队列接收设备的传入消息。

通过回调,设备应用程序可使用 SetReceiveMessageHandlerAsync 设置消息处理程序方法。 调用消息处理程序,然后接收消息。 创建回调方法来接收消息,无需不断轮询接收的消息。

回调仅适用于以下协议:

  • Mqtt
  • Mqtt_WebSocket_Only
  • Mqtt_Tcp_Only
  • Amqp
  • Amqp_WebSocket_Only
  • Amqp_Tcp_only

Http1 协议选项不支持回调,因为 SDK 方法无论如何都需要轮询接收到的消息,这违反了回调原则。

在此示例中,SetReceiveMessageHandlerAsync 设置了一个名为 OnC2dMessageReceivedAsync 的回调处理程序方法,每次收到消息时都会调用该方法。

// Subscribe to receive C2D messages through a callback (which isn't supported over HTTP).
await deviceClient.SetReceiveMessageHandlerAsync(OnC2dMessageReceivedAsync, deviceClient);
Console.WriteLine($"\n{DateTime.Now}> Subscribed to receive C2D messages over callback.");

接收消息重试策略

可以使用 DeviceClient.SetRetryPolicy 定义设备客户端消息重试策略。

消息重试超时存储在 DeviceClient.OperationTimeoutInMilliseconds 属性中。

SDK 接收消息示例

.NET/C# SDK 包含一个消息接收示例,其中包含本节中描述的接收消息方法。

发送“云到设备”消息

本部分介绍使用适用于 .NET 的 Azure IoT SDK 中的 ServiceClient 类将消息从解决方案后端应用程序发送到 IoT 设备的基本代码。 如前所述,解决方案后端应用程序会连接到 IoT 中心,并将消息发送到使用目标设备编码的 IoT 中心。 IoT 中心将传入的消息存储到其消息队列中,然后消息从 IoT 中心消息队列传递到目标设备。

解决方案后端应用程序还可以请求并接收发送到 IoT 中心的消息的传递反馈,该消息旨在通过消息队列传送到设备。

声明 ServiceClient 对象

ServiceClient 包括通过 IoT 中心从应用程序向设备发送消息所需的方法和属性。

static ServiceClient serviceClient;

提供连接字符串

使用 CreateFromConnectionString 方法向 ServiceClient 提供 IoT 中心主连接字符串。 除了所需的 IoT 中心主连接字符串外,还可以重载 CreateFromConnectionString 方法以包括以下可选参数:

  • transportType - AmqpAmqp_WebSocket_Only
  • transportSettings - 服务客户端的 AMQP 和 HTTP 代理设置。
  • ServiceClientOptions - 允许在初始化期间配置服务客户端实例的选项。 有关详细信息,请参阅 ServiceClientOptions

此示例使用 IoT 中心连接字符串创建 ServiceClient 对象。

static string connectionString = "{your iot hub connection string}";
serviceClient = ServiceClient.CreateFromConnectionString(connectionString);

发送异步云到设备消息

在使用 sendAsync 的情况下,通过云(IoT 中心)将异步消息从应用程序发送到设备。 系统使用 AMQP 协议执行调用。

sendAsync 使用以下参数:

  • deviceID - 目标设备的字符串标识符。
  • message - 云到设备消息。 该消息属于 Message 类型,可以进行相应的格式化。
  • timeout - 可选的超时值。 如果未指定,则默认值为 1 分钟。

此示例向目标设备发送一条具有 10 秒超时值的测试消息。

string targetDevice = "Device-1";
static readonly TimeSpan operationTimeout = TimeSpan.FromSeconds(10);
var commandMessage = new
Message(Encoding.ASCII.GetBytes("Cloud to device message."));
await serviceClient.SendAsync(targetDevice, commandMessage, operationTimeout);

接收送达反馈

发送程序可以向 IoT 中心请求每条云到设备消息的送达(或过期)确认。 此选项使发送程序能够使用通知、重试或补偿逻辑。 若要了解消息反馈操作和属性的完整描述,请参阅消息反馈

若要接收邮件传递反馈,请执行以下操作:

  • 创建 feedbackReceiver 对象
  • 使用 Ack 参数发送消息
  • 等待接收反馈

创建 feedbackReceiver 对象

调用 GetFeedbackReceiver 以创建 FeedbackReceiver 对象。 FeedbackReceiver 包含服务可以用来执行反馈接收操作的方法。

var feedbackReceiver = serviceClient.GetFeedbackReceiver();

使用 Ack 参数发送消息

每条消息都必须包含一个发送确认 Ack 属性的值,以便接收发送反馈。 Ack 属性可以是下列值之一:

  • none(默认):不生成反馈消息。

  • Positive:如果消息已完成,则会收到反馈消息。

  • Negative:如果消息过期(或达到发送次数上限)且设备尚未完成,则接收反馈消息。

  • Full:对 PositiveNegative 结果的反馈。

在此示例中,已将 Ack 属性设置为 Full,请求对一条消息的正面或负面的消息发送反馈。

var commandMessage = new
Message(Encoding.ASCII.GetBytes("Cloud to device message."));
commandMessage.Ack = DeliveryAcknowledgement.Full;
await serviceClient.SendAsync(targetDevice, commandMessage);

等待接收反馈

定义 CancellationToken。 然后在循环中重复调用 ReceiveAsync,检查发送反馈消息。 每次调用 ReceiveAsync 都会等待为 ServiceClient 对象定义的超时期限。

  • 如果 ReceiveAsync 超时已到期且未收到任何消息,则 ReceiveAsync 会返回 null 并且循环继续。
  • 如果收到反馈消息,则 ReceiveAsync 将返回一个 Task 对象,该对象应与取消令牌一起传递给 CompleteAsync。 对 CompleteAsync 的调用会根据 Task 参数从消息队列中删除指定的已发送消息。
  • 如有需要,接收代码可以调用 AbandonAsync 将发送消息放回队列。
var feedbackReceiver = serviceClient.GetFeedbackReceiver();
// Define the cancellation token.
CancellationTokenSource source = new CancellationTokenSource();
CancellationToken token = source.Token;
// Call ReceiveAsync, passing the token. Wait for the timout period.
var feedbackBatch = await feedbackReceiver.ReceiveAsync(token);
if (feedbackBatch == null) continue;

此示例显示了包含这些步骤的方法。

private async static void ReceiveFeedbackAsync()
{
      var feedbackReceiver = serviceClient.GetFeedbackReceiver();

      Console.WriteLine("\nReceiving c2d feedback from service");
      while (true)
      {
         // Check for messages, wait for the timeout period.
         var feedbackBatch = await feedbackReceiver.ReceiveAsync();
         // Continue the loop if null is received after a timeout.
         if (feedbackBatch == null) continue;

         Console.ForegroundColor = ConsoleColor.Yellow;
         Console.WriteLine("Received feedback: {0}",
            string.Join(", ", feedbackBatch.Records.Select(f => f.StatusCode)));
         Console.ResetColor();

         await feedbackReceiver.CompleteAsync(feedbackBatch);
      }
   }

请注意,此反馈接收模式与设备应用程序中接收云到设备消息所使用的模式类似。

服务客户端重新连接

遇到异常时,服务客户端会将该信息传递给调用应用程序。 此时,建议检查异常详细信息并采取必要的操作。

例如:

  • 如果是网络异常,可以重试该操作。
  • 如果是安全异常(未经授权的异常),请检查凭据并确保它们是最新的。
  • 如果是超出限制/配额异常,请监视和/或修改发送请求的频率,或更新中心实例缩放单元。 有关详细信息,请参阅 IoT 中心配额和限制

发送消息重试策略

可以使用 ServiceClient.SetRetryPolicy 定义 ServiceClient 消息重试策略。

SDK 发送消息示例

.NET/C# SDK 包含一个服务客户端示例,其中包含本节中描述的发送消息方法。

接收云到设备的消息

本部分介绍如何使用适用于 Java 的 Azure IoT SDK 中的 DeviceClient 类接收云到设备消息。

对于基于 Java 的设备应用程序来说,要接收云到设备消息,它必须连接到 IoT 中心,然后设置回调侦听器和消息处理程序来处理来自 IoT 中心的传入消息。 如果设备到 IoT 中心的消息连接断开,设备应用程序还应能够检测和处理断开连接。

导入 Azure IoT Java SDK 库

本文引用的代码使用了这些 SDK 库。

import com.microsoft.azure.sdk.iot.device.*;
import com.microsoft.azure.sdk.iot.device.exceptions.IotHubClientException;
import com.microsoft.azure.sdk.iot.device.transport.IotHubConnectionStatus;

声明 DeviceClient 对象

DeviceClient 对象实例化需要以下参数:

  • connString - IoT 设备连接字符串。 连接字符串是一组以“;”分隔的键值对,其中键和值以“=”分隔。 它应包含这些键的值:HostName, DeviceId, and SharedAccessKey
  • 传输协议 - DeviceClient 连接可以使用以下 IoTHubClientProtocol 传输协议之一。 AMQP 功能最丰富,允许频繁检查消息,并允许拒绝和取消消息。 MQTT 不支持拒绝或放弃消息的方法:
    • AMQPS
    • AMQPS_WS
    • HTTPS
    • MQTT
    • MQTT_WS

例如:

static string connectionString = "{a device connection string}";
static protocol = IotHubClientProtocol.AMQPS;
DeviceClient client = new DeviceClient(connectionString, protocol);

设置消息回调方法

使用 setMessageCallback 方法定义一个消息处理程序方法,通过使用该方法,可在从 IoT 中心收到消息时收到通知。

setMessageCallback 包括这些参数:

  • callback - 回调方法名称。 可以为 null
  • context - 类型为 object可选上下文。 如果未指定,则使用 null

在此示例中,名为 MessageCallback 且没有上下文参数的 callback 方法被传递给 setMessageCallback

client.setMessageCallback(new MessageCallback(), null);

创建消息回调处理程序

回调消息处理程序接收并处理从 IoT 中心消息队列传递的传入消息。

在此示例中,消息处理程序处理传入消息,然后返回 IotHubMessageResult.COMPLETEIotHubMessageResult.COMPLETE 返回值会通知 IoT 中心该消息已成功处理,并且可以安全地从设备队列中删除该消息。 当处理成功完成时,设备应该返回 IotHubMessageResult.COMPLETE,通知 IoT 中心应该从消息队列中删除该消息,无论使用何种协议均应如此。

  protected static class MessageCallback implements com.microsoft.azure.sdk.iot.device.MessageCallback
  {
      public IotHubMessageResult onCloudToDeviceMessageReceived(Message msg, Object context)
      {
          System.out.println(
                  "Received message with content: " + new String(msg.getBytes(), Message.DEFAULT_IOTHUB_MESSAGE_CHARSET));
          // Notify IoT Hub that the message
          return IotHubMessageResult.COMPLETE;
      }
  }

消息放弃和拒绝选项

尽管设备应该能够成功接收大量传入消息并触发 IotHubMessageResult.COMPLETE,但可能需要放弃或拒绝某条消息。

  • 通过使用 AMQP 和 HTTPS(而不是 MQTT),应用程序可以:
    • IotHubMessageResult.ABANDON 消息。 IoT 中心会重新将消息排队,并在以后再次发送。
    • IotHubMessageResult.REJECT 消息。 IoT 中心不会重新将消息排队,并永久删除消息队列中的消息。
  • 使用 MQTTMQTT_WS 的客户端无法 ABANDONREJECT 消息。

如果发生阻止设备完成、放弃或拒绝消息的情况,IoT 中心会在固定的超时期限过后再次对消息排队以进行传递。 因此,设备应用中的消息处理逻辑必须是幂等的,这样,多次接收相同的消息才会生成相同的结果。

若要获取有关云到设备消息生命周期以及 IoT 中心如何处理云到设备消息的详细信息,请参阅从 IoT 中心发送云到设备的消息

注意

如果使用 HTTPS(而不使用 MQTT 或 AMQP)作为传输,则 DeviceClient 实例不会频繁检查 IoT 中心发来的消息(频率最低为每 25 分钟一次)。 有关 MQTT、AMQP 和 HTTPS 支持之间的差异的详细信息,请参阅云到设备通信指南选择通信协议

创建消息状态回调方法

应用程序可以使用 registerConnectionStatusChangeCallback 注册一个当设备连接状态改变时执行的回调方法。 这样,应用程序可以检测到中断的消息连接并尝试重新连接。

在此示例中,我们将 IotHubConnectionStatusChangeCallbackLogger 注册为连接状态改变回调方法。

client.registerConnectionStatusChangeCallback(new IotHubConnectionStatusChangeCallbackLogger(), new Object());

系统会触发回调并传递 ConnectionStatusChangeContext 对象。

调用 connectionStatusChangeContext.getNewStatus() 以获取当前连接状态。

IotHubConnectionStatus status = connectionStatusChangeContext.getNewStatus();

返回的连接状态可以是以下值之一:

  • IotHubConnectionStatus.DISCONNECTED
  • IotHubConnectionStatus.DISCONNECTED_RETRYING
  • IotHubConnectionStatus.CONNECTED

调用 connectionStatusChangeContext.getNewStatusReason() 以获取连接状态更改的原因。

IotHubConnectionStatusChangeReason statusChangeReason = connectionStatusChangeContext.getNewStatusReason();

调用 connectionStatusChangeContext.getCause() 以查找连接状态更改的原因。 如果没有可用信息,则 getCause() 可能会返回 null

Throwable throwable = connectionStatusChangeContext.getCause();
if (throwable != null)
    throwable.printStackTrace();

有关介绍如何使用状态改变回调方法提取连接状态更改、设备状态更改的原因以及上下文的完整示例,请参阅本文 SDK 接收消息示例部分中列出的 HandleMessages 示例。

打开设备和 IoT 中心之间的连接

使用 open 在设备和 IoT 中心之间创建连接。 设备现在可以以异步方式向 IoT 中心发送消息或接收来自 IoT 中心的消息。 如果客户端已打开,该方法将不执行任何操作。

client.open(true);

SDK 接收消息示例

HandleMessages:适用于 Java 的 Azure IoT SDK 随附的示例设备应用,可连接到 IoT 中心并接收云到设备的消息。

发送“云到设备”消息

本部分介绍如何使用适用于 Java 的 Azure IoT SDK 中的 ServiceClient 类发送云到设备消息。 解决方案后端应用程序会连接到 IoT 中心,并将消息发送到使用目标设备编码的 IoT 中心。 IoT 中心将传入的消息存储到其消息队列中,然后消息从 IoT 中心消息队列传递到目标设备。

解决方案后端应用程序还可以请求并接收发送到 IoT 中心的消息的传递反馈,该消息旨在通过消息队列传送到设备。

添加依赖项语句

添加依赖项以使用应用程序中的 iothub-java-service-client 包与 IoT 中心服务通信:

<dependency>
  <groupId>com.microsoft.azure.sdk.iot</groupId>
  <artifactId>iot-service-client</artifactId>
  <version>1.7.23</version>
</dependency>

添加 import 语句

添加这些 import 语句以使用 Azure IoT Java SDK 和异常处理程序。

import com.microsoft.azure.sdk.iot.service.*;
import java.io.IOException;
import java.net.URISyntaxException;

定义连接协议

使用 IotHubServiceClientProtocol 定义服务客户端与 IoT 中心通信所使用的应用层协议。

IotHubServiceClientProtocol 仅接受 AMQPSAMQPS_WS 枚举。

private static final IotHubServiceClientProtocol protocol =    
    IotHubServiceClientProtocol.AMQPS;

创建 ServiceClient 对象

创建 ServiceClient 对象,提供 Iot 中心连接字符串和协议。

private static final String connectionString = "{yourhubconnectionstring}";
private static final ServiceClient serviceClient (connectionString, protocol);

打开应用程序与 IoT 中心之间的连接

使用 open 方法打开 AMQP 发送方连接。 此方法会创建应用程序与 IoT 中心之间的连接。

serviceClient.open();

打开消息发送反馈的反馈接收器

可以使用 FeedbackReceiver 接收有关发送到 IoT 中心反馈的消息的发送状态。 FeedbackReceiver 是一种专门的接收器,其 Receive 方法会返回 FeedbackBatch 而不是 Message

在此示例中,系统将创建 FeedbackReceiver 对象,并调用 open() 语句来等待反馈。

FeedbackReceiver feedbackReceiver = serviceClient
  .getFeedbackReceiver();
if (feedbackReceiver != null) feedbackReceiver.open();

添加消息属性

可以选择使用 setProperties 添加消息属性。 这些属性包含在发送到设备的消息中,设备应用程序在收到相应消息到后可以提取这些属性。

Map<String, String> propertiesToSend = new HashMap<String, String>();
propertiesToSend.put(messagePropertyKey,messagePropertyKey);
messageToSend.setProperties(propertiesToSend);

创建和发送异步消息

Message 对象会存储要发送的消息。 在此示例中,传送了“云到设备消息”。

使用 setDeliveryAcknowledgement 请求已传送/未传送至 IoT 中心的消息队列确认。 在此示例中,无论是已送达还是未送达,请求的确认均是 Full

使用 SendAsync 将异步消息从客户端发送到设备。 或者,可以使用 Send(非异步)方法,但此函数在内部同步,因此一次只允许执行一个发送操作。 将消息从应用程序发送到 IoT 中心。 IoT 中心将消息放入消息队列中,准备发送给目标设备。

Message messageToSend = new Message("Cloud to device message.");
messageToSend.setDeliveryAcknowledgementFinal(DeliveryAcknowledgement.Full);
serviceClient.sendAsync(deviceId, messageToSend);

接收消息发送反馈

消息从应用程序发出后,应用程序可以调用 receive 方法,无论该方法是否带有超时值。 如果未提供超时值,则使用默认超时。 这会传回一个 FeedbackBatch 对象,该对象包含可以检查的消息发送反馈属性。

此示例会创建 FeedbackBatch 接收方,调用 getEnqueuedTimeUtc,并输出消息排队时间。

FeedbackBatch feedbackBatch = feedbackReceiver.receive(10000);
if (feedbackBatch != null) {
  System.out.println("Message feedback received, feedback time: "
    + feedbackBatch.getEnqueuedTimeUtc().toString());
}

SDK 发送消息示例

安装 Azure IoT SDK 库

在调用任何相关代码之前,在开发计算机上安装 azure-iot-device SDK 库:

pip install azure-iot-device

有两个 Python SDK 类用于向 IoT 设备发送消息和从 IoT 设备接收消息。 我们会在本页的各部分中介绍这些类的消息处理方法。

  • IoTHubDeviceClient 类包括用户创建从设备到 Azure IoT 中心的同步连接的方法以及从 IoT 中心接收消息的方法。

  • IoTHubRegistryManager 类包括用于 IoT 中心注册表管理器操作的 API。 在本文中,此类中的方法演示如何连接到 IoT 中心并向设备发送消息。

接收云到设备的消息

本部分介绍如何使用适用于 Python 的 Azure IoT SDK 中的 IoTHubDeviceClient 类接收云到设备消息。

对于基于 Python 的设备应用程序来说,要接收云到设备消息,它必须连接到 IoT 中心,然后设置回调消息处理程序来处理来自 IoT 中心的传入消息。

导入 IoTHubDeviceClient 对象

添加一行代码以从 azure.iot.device SDK 导入 IoTHubDeviceClient 函数。

from azure.iot.device import IoTHubDeviceClient

连接设备客户端

实例化 IoTHubDeviceClient,将 IoT 中心连接字符串传递给 create_from_connection_string。 这会创建从设备到 IoT 中心的连接。

或者,可以使用以下方法之一将 IoTHubDeviceClient 连接到设备:

deviceConnectionString = "{your IoT hub connection string}";
client = IoTHubDeviceClient.create_from_connection_string ({deviceConnectionString})

处理重新连接

IoTHubDeviceClient 默认会尝试重新建立断开的连接。 重新连接行为由 IoTHubDeviceClient connection_retryconnection_retry_interval 参数控制。

创建消息处理程序

创建消息处理程序函数以处理到设备的传入消息。

在此示例中,系统会在收到消息时调用 message_handler。 系统会使用循环将消息属性 (.items) 输出到控制台。

def message_handler(message):
    global RECEIVED_MESSAGES
    RECEIVED_MESSAGES += 1
    print("")
    print("Message received:")

    # print data from both system and application (custom) properties
    for property in vars(message).items():
        print ("    {}".format(property))

    print("Total calls received: {}".format(RECEIVED_MESSAGES))

分配消息处理程序

使用 on_message_received 方法将消息处理程序方法分配给 IoTHubDeviceClient 对象。

在此示例中,系统会将名为 message_handler 的消息处理程序方法附加到 IoTHubDeviceClient client 对象。 client 对象等待从 IoT 中心接收云到设备消息。 此代码最多等待 300 秒(5 分钟)以接收消息,或者在你按下键盘键时停止执行。

try:
    # Attach the handler to the client
    client.on_message_received = message_handler

    while True:
        time.sleep(300)
except KeyboardInterrupt:
    print("IoT Hub C2D Messaging device sample stopped")
finally:
    # Graceful exit
    print("Shutting down IoT Hub Client")
    client.shutdown()

SDK 接收消息示例

接收消息 - 接收从 Azure IoT 中心发送到设备的云到设备 (C2D) 消息。

发送“云到设备”消息

本部分介绍如何使用适用于 Python 的 Azure IoT SDK 中的 IoTHubRegistryManager 类发送云到设备消息。 解决方案后端应用程序会连接到 IoT 中心,并将消息发送到使用目标设备编码的 IoT 中心。 IoT 中心将传入的消息存储到其消息队列中,然后消息从 IoT 中心消息队列传递到目标设备。

导入 IoTHubRegistryManager 对象

添加以下 import 语句: IoTHubRegistryManager 包括用于 IoT 中心注册表管理器操作的 API。

from azure.iot.hub import IoTHubRegistryManager

连接 IoT 中心注册表管理器

实例化连接到 IoT 中心的 IoTHubRegistryManager 对象,并将 IoT 中心连接字符串传递给 from_connection_string

IoTHubConnectionString = "{Primary connection string to an IoT hub}"
registry_manager = IoTHubRegistryManager.from_connection_string(IoTHubConnectionString)

生成并发送消息

使用 send_c2d_message 通过云(IoT 中心)向设备发送消息。

send_c2d_message 使用以下参数:

  • deviceID - 目标设备的字符串标识符。
  • message - 云到设备消息。 消息的类型为 str(字符串)。
  • properties - dict 类型属性的可选集合。 属性可以包含应用程序属性和系统属性。 默认值为 {}

此示例会将测试消息发送到目标设备。

# define the device ID
deviceID = "Device-1"

# define the message
message = "{\"c2d test message\"}"

# include optional properties
props={}
props.update(messageId = "message1")
props.update(prop1 = "test property-1")
props.update(prop1 = "test property-2")
prop_text = "Test message"
props.update(testProperty = prop_text)

# send the message through the cloud (IoT Hub) to the device
registry_manager.send_c2d_message(deviceID, message, properties=props)

SDK 发送消息示例

send_message.py - 演示如何发送云到设备消息。

安装 Node.js 消息传送包

运行以下命令,在开发计算机上安装 azure-iot-deviceazure-iothub 包:

npm install azure-iot-device --save
npm install azure-iothub --save

azure-iot-device 包包含与 IoT 设备交互的对象。 本文介绍从 IoT 中心接收消息的 Client 类代码。

azure-iothub 包包含与 IoT 中心交互的对象。 本文介绍了通过 IoT 中心将消息从应用程序发送到设备的 Client 类代码。

在设备应用程序中接收消息

本部分介绍如何使用适用于 Node.js 的 Azure IoT SDK 中的 azure-iot-device 类接收云到设备消息。

对于基于 Node.js 的设备应用程序来说,要接收云到设备消息,它必须连接到 IoT 中心,然后设置回调消息处理程序来处理来自 IoT 中心的传入消息。 如果设备到 IoT 中心的消息连接断开,设备应用程序还应能够检测和处理断开连接。

创建客户端模块

azure-iot-device 包中,使用 Client 类创建一个 ClientClient 类包含设备可用于从 IoT 中心接收消息和向 IoT 中心发送消息的方法。

const Client = require('azure-iot-device').Client;

选择传输协议

Client 对象支持以下属性:

  • Amqp
  • Http - 使用 Http 时,Client 实例不会频繁地检查来自 IoT 中心的消息(至少每 25 分钟一次)。
  • Mqtt
  • MqttWs
  • AmqpWs

有关 MQTT、AMQP 和 HTTPS 支持之间的差异的详细信息,请参阅云到设备通信指南选择通信协议

此示例将 AMQP 协议分配给 Protocol 变量。 系统会将此协议变量传递给本文“添加连接字符串”部分中的 Client.fromConnectionString 方法

const Protocol = require('azure-iot-device-mqtt').Amqp;

消息完成、拒绝和放弃功能

可以根据所选协议使用消息完成、拒绝和放弃方法。

AMQP 和 HTTP

AMQP 和 HTTP 传输可以完成、拒绝或放弃消息:

  • 完成 - 若要完成消息,发送云到设备消息的服务会收到对方已接收消息的通知。 IoT 中心从消息队列中删除消息。 该方法采用 client.complete(message, callback function) 形式。
  • 拒绝 - 若要拒绝消息,发送云到设备消息的服务会收到通知,通知内容为“设备尚未处理该消息”。 IoT 中心会永久删除设备队列中的消息。 该方法采用 client.reject(message, callback function) 形式。
  • 放弃 - 若要放弃消息,IoT 中心会立即尝试重新发送消息。 IoT 中心会保留设备队列中的消息以供将来使用。 该方法采用 client.abandon(message, callback function) 形式。
MQTT

MQTT 不支持消息完成、拒绝或放弃函数。 相反,MQTT 默认接受消息,并从 IoT 中心消息队列中删除该消息。

重新发送尝试

如果发生阻止设备完成、放弃或拒绝消息的情况,IoT 中心会在固定的超时期限过后再次对消息排队以进行传递。 因此,设备应用中的消息处理逻辑必须是幂等的,这样,多次接收相同的消息才会生成相同的结果。

添加 IoT 中心字符串和传输协议

使用以下参数,调用 fromConnectionString 建立设备到 IoT 中心的连接:

  • connStr - 封装 IoT 中心“设备连接”权限的连接字符串。 连接字符串包含主机名、设备 ID 和共享访问密钥,格式如下:“HostName=<iothub_host_name>;DeviceId=<device_id>;SharedAccessKey=<device_key>”
  • transportCtor - 传输协议。
const Protocol = require('azure-iot-device-mqtt').Amqp;
let client = Client.fromConnectionString(deviceConnectionString, Protocol);

创建传入消息处理程序

为每个传入消息调用消息处理程序。

成功接收消息后,如果使用 AMQP 或 HTTP 传输,则调用 client.complete 方法通知 IoT 中心可以从消息队列中删除该消息。

例如,此消息处理程序将消息 ID 和消息正文输出到控制台,然后调用 client.complete 通知 IoT 中心它已处理该消息并且可以安全地将其从设备队列中删除。 如果使用 MQTT 传输,则不需要调用 complete 并且可以省略它。 AMQP 或 HTTPS 传输需要调用 complete

function messageHandler(msg) {
  console.log('Id: ' + msg.messageId + ' Body: ' + msg.data);
  client.complete(msg, printResultFor('completed'));
}

创建连接中断处理程序

连接中断是调用中断处理程序。 中断处理程序可用于实现重新连接代码。

此示例捕获并显示控制台的断开连接错误消息。

function disconnectHandler() {
  clearInterval(sendInterval);
  sendInterval = null;
  client.open().catch((err) => {
    console.error(err.message);
  });
}

添加事件侦听器

可以使用 .on 方法指定这些事件监听器。

  • 连接处理程序
  • 错误处理程序
  • 中断处理程序
  • 消息处理程序

此示例包括前面定义的消息和断开连接处理程序。

client.on('connect', connectHandler);
client.on('error', errorHandler);
client.on('disconnect', disconnectHandler);
client.on('message', messageHandler);

打开到 IoT 中心的连接

使用 open 方法打开 IoT 设备和 IoT 中心之间的连接。 使用 .catch(err) 捕获错误和调用处理程序代码。

例如:

client.open()
.catch((err) => {
  console.error('Could not connect: ' + err.message);
});

SDK 接收消息示例

simple_sample_device - 连接到 IoT 中心并接收云到设备消息的设备应用。

发送“云到设备”消息

本部分介绍如何使用适用于 Node.js 的 Azure IoT SDK 中的 azure-iothub 类发送云到设备消息。 如前所述,解决方案后端应用程序会连接到 IoT 中心,并将消息发送到使用目标设备编码的 IoT 中心。 IoT 中心将传入的消息存储到其消息队列中,然后消息从 IoT 中心消息队列传递到目标设备。

解决方案后端应用程序还可以请求并接收发送到 IoT 中心的消息的传递反馈,该消息旨在通过消息队列传送到设备。

加载客户端和消息模块

使用 azure-iothub 包中的 Client 类声明 Client 对象。

使用 azure-iot-common 包中的 Message 类声明 Message 对象。

'use strict';
var Client = require('azure-iothub').Client;
var Message = require('azure-iot-common').Message;

创建 Client 对象

使用以下参数,通过 fromConnectionString 创建 Client

  • IoT 中心连接字符串
  • 传输类型

在此示例中,我们使用 Amqp 传输类型创建 serviceClient 对象。

var connectionString = '{IoT Hub connection string}';
var serviceClient = Client.fromConnectionString(connectionString,`Amqp`);

打开客户端连接

调用 Client open 方法以打开应用程序与 IoT 中心之间的连接。

无论是否有指定在 open 操作完成时调用的回调函数,均可调用 open

在此示例中,open 方法包括一个可选的 err 打开连接回调函数。 如果出现打开错误,则返回一个错误对象。 如果打开连接成功,则返回 null 回调值。

serviceClient.open(function (err)
if (err)
  console.error('Could not connect: ' + err.message);

创建一条消息

消息对象包括异步云到设备消息。 消息功能在 AMQP、MQTT 和 HTTP 上的工作方式相同。

消息对象支持多个属性,包括这些属性。 请参阅消息属性,获取完整列表。

  • ack - 发送反馈。 在下一部分中介绍。
  • properties - 包含用于存储自定义消息属性的字符串键和值的映射。
  • messageId - 用于关联双向通信。

在消息对象实例化时添加消息正文。 在此示例中,将添加一条 'Cloud to device message.' 消息。

var message = new Message('Cloud to device message.');
message.ack = 'full';
message.messageId = "My Message ID";

发送确认

发送程序可以向 IoT 中心请求每条云到设备消息的送达(或过期)确认。 此选项使发送程序能够使用通知、重试或补偿逻辑。 若要了解消息反馈操作和属性的完整描述,请参阅消息反馈

每条消息都必须包含一个发送确认 ack 属性的值,以便接收发送反馈。 ack 属性可以是下列值之一:

  • none(默认):不生成反馈消息。

  • sent:如果消息已完成,则会收到反馈消息。

  • 如果消息过期(或达到发送次数上限)且设备尚未完成,则会收到反馈消息。

  • full:已发送结果和未发送结果的反馈。

在此示例中,已将 ack 属性设置为 full,请求对一条消息同时提供消息已发送和消息未发送的发送反馈。

message.ack = 'full';

消息反馈接收器回调函数使用 getFeedbackReceiver 链接到 Client

消息反馈接收器接收两个参数:

  • 错误对象(可以为 null)
  • AmqpReceiver 对象 - 客户端收到新的反馈消息时发出事件。

此示例函数接收发送反馈消息并将其输出到控制台。

function receiveFeedback(err, receiver){
  receiver.on('message', function (msg) {
    console.log('Feedback message:')
    console.log(msg.getData().toString('utf-8'));
  });
}

此代码使用 getFeedbackReceiverreceiveFeedback 反馈回调函数链接到服务 Client 对象。

serviceClient.getFeedbackReceiver(receiveFeedback);

定义消息完成结果处理程序

每条消息发送完成后,系统都会调用消息发送完成回调函数。

此示例函数将消息 send 操作结果输出到控制台。 在此示例中,printResultFor 函数将作为参数用于下一节中描述的 send 函数。

function printResultFor(op) {
  return function printResult(err, res) {
    if (err) console.log(op + ' error: ' + err.toString());
    if (res) console.log(op + ' status: ' + res.constructor.name);
  };
}

发送邮件

使用 send 函数通过 IoT 中心向设备应用发送异步云到设备消息。

send 支持以下参数:

  • deviceID - 目标设备的设备 ID。
  • message - 要发送到设备的消息正文。
  • done - 操作完成时要调用的可选函数。 使用两个参数调用 Done:
    • 错误对象(可以为 null)。
    • 传输特定的响应对象,用于日志记录或调试。

此代码调用 send,通过 IoT 中心将云到设备消息发送到设备应用。 上一节中定义的回调函数 printResultFor 会接收发送确认信息。

var targetDevice = '{device ID}';
serviceClient.send(targetDevice, message, printResultFor('send'));

本示例介绍在设备确认收到云到设备消息时,如何将消息发送到设备,并处理反馈消息:

serviceClient.open(function (err) {
  if (err) {
    console.error('Could not connect: ' + err.message);
  } else {
    console.log('Service client connected');
    serviceClient.getFeedbackReceiver(receiveFeedback);
    var message = new Message('Cloud to device message.');
    message.ack = 'full';
    message.messageId = "My Message ID";
    console.log('Sending message: ' + message.getData());
    serviceClient.send(targetDevice, message, printResultFor('send'));
  }
});

SDK 发送消息示例

send_c2d_message.js - 通过 IoT 中心向设备发送 C2D 消息。

消息保留时间、重试次数和发送次数上限

从 IoT 中心发送云到设备消息中所述,可以使用门户 IoT 中心配置选项或 Azure CLI 查看和配置以下消息值的默认值。 这些配置选项可能会影响消息发送和反馈。

  • 默认 TTL(生存时间)- 消息在被 IoT 中心判定为到期之前可用于设备的时长。
  • 反馈保留时间 - 对于用于云到设备消息的过期或成功送达的反馈,IoT 中心的保存时长。
  • IoT 中心尝试将云到设备消息发送到设备的次数。