使用 AMQP 协议来与 IoT 中心通信Communicate with your IoT hub by using the AMQP Protocol

Azure IoT 中心支持使用 OASIS 高级消息队列协议 (AMQP) 版本 1.0 通过面向设备和面向服务的终结点来交付各种功能。Azure IoT Hub supports OASIS Advanced Message Queuing Protocol (AMQP) version 1.0 to deliver a variety of functionalities through device-facing and service-facing endpoints. 本文档介绍如何使用 AMQP 客户端连接到 IoT 中心,以使用 IoT 中心功能。This document describes the use of AMQP clients to connect to an IoT hub to use IoT Hub functionality.

服务客户端Service client

建立连接并在 IoT 中心(服务客户端)进行身份验证Connect and authenticate to an IoT hub (service client)

若要使用 AMQP 连接到 IoT 中心,客户端可以使用基于声明的安全性 (CBS)简单身份验证和安全层 (SASL) 身份验证To connect to an IoT hub by using AMQP, a client can use the claims-based security (CBS) or Simple Authentication and Security Layer (SASL) authentication.

服务客户端需要以下信息:The following information is required for the service client:

信息Information ValueValue
IoT 中心主机名IoT hub hostname <iot-hub-name>.azure-devices.cn
密钥名称Key name service
访问密钥Access key 与服务关联的主要密钥或辅助密钥A primary or secondary key that's associated with the service
共享访问签名Shared access signature 采用以下格式的短生存期共享访问签名:SharedAccessSignature sig={signature-string}&se={expiry}&skn={policyName}&sr={URL-encoded-resourceURI}A short-lived shared access signature in the following format: SharedAccessSignature sig={signature-string}&se={expiry}&skn={policyName}&sr={URL-encoded-resourceURI}. 若要获取用于生成此签名的代码,请参阅控制对 IoT 中心的访问To get the code for generating this signature, see Control access to IoT Hub.

以下代码片段使用 Python 中的 uAMQP 库通过发送方链接连接到 IoT 中心。The following code snippet uses the uAMQP library in Python to connect to an IoT hub via a sender link.

import uamqp
import urllib
import time

# Use generate_sas_token implementation available here: 
# https://docs.azure.cn/iot-hub/iot-hub-devguide-security#security-token-structure
from helper import generate_sas_token

iot_hub_name = '<iot-hub-name>'
hostname = '{iot_hub_name}.azure-devices.cn'.format(iot_hub_name=iot_hub_name)
policy_name = 'service'
access_key = '<primary-or-secondary-key>'
operation = '<operation-link-name>' # example: '/messages/devicebound'

username = '{policy_name}@sas.root.{iot_hub_name}'.format(iot_hub_name=iot_hub_name, policy_name=policy_name)
sas_token = generate_sas_token(hostname, access_key, policy_name)
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username), urllib.quote_plus(sas_token), hostname, operation)

# Create a send or receive client
send_client = uamqp.SendClient(uri, debug=True)
receive_client = uamqp.ReceiveClient(uri, debug=True)

调用云到设备的消息(服务客户端)Invoke cloud-to-device messages (service client)

若要了解服务与 IoT 中心之间,以及设备与 IoT 中心之间的云到设备消息交换,请参阅从 IoT 中心发送云到设备的消息To learn about the cloud-to-device message exchange between the service and the IoT hub and between the device and the IoT hub, see Send cloud-to-device messages from your IoT hub. 服务客户端使用两个链接发送消息,以及接收先前从设备发送的消息的反馈,如下表中所述:The service client uses two links to send messages and receive feedback for previously sent messages from devices, as described in the following table:

创建者Created by 链接类型Link type 链接路径Link path 说明Description
服务Service 发送方链接Sender link /messages/devicebound 服务会将发往设备的云到设备消息发送到此链接。Cloud-to-device messages that are destined for devices are sent to this link by the service. 通过此链接发送的消息的 To 属性设置为目标设备的接收方链接路径 /devices/<deviceID>/messages/deviceboundMessages sent over this link have their To property set to the target device's receiver link path, /devices/<deviceID>/messages/devicebound.
服务Service 接收方链接Receiver link /messages/serviceBound/feedback 服务在此链接上收到的、来自设备的完成、拒绝和丢弃反馈消息。Completion, rejection, and abandonment feedback messages that come from devices received on this link by service. 有关反馈消息的详细信息,请参阅从 IoT 中心发送云到设备的消息For more information about feedback messages, see Send cloud-to-device messages from an IoT hub.

以下代码片段演示如何使用 Python 中的 uAMQP 库创建一条云到设备的消息并将其发送到设备。The following code snippet demonstrates how to create a cloud-to-device message and send it to a device by using the uAMQP library in Python.

import uuid
# Create a message and set message property 'To' to the device-bound link on device
msg_id = str(uuid.uuid4())
msg_content = b"Message content goes here!"
device_id = '<device-id>'
to = '/devices/{device_id}/messages/devicebound'.format(device_id=device_id)
ack = 'full' # Alternative values are 'positive', 'negative', and 'none'
app_props = { 'iothub-ack': ack }
msg_props = uamqp.message.MessageProperties(message_id=msg_id, to=to)
msg = uamqp.Message(msg_content, properties=msg_props, application_properties=app_props)

# Send the message by using the send client that you created and connected to the IoT hub earlier
send_client.queue_message(msg)
results = send_client.send_all_messages()

# Close the client if it's not needed
send_client.close()

若要接收反馈,服务客户端需创建一个接收方链接。To receive feedback, the service client creates a receiver link. 以下代码片段演示如何使用 Python 中的 uAMQP 库创建链接。The following code snippet demonstrates how to create a link by using the uAMQP library in Python:

import json

operation = '/messages/serviceBound/feedback'

# ...
# Re-create the URI by using the preceding feedback path and authenticate it
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username), urllib.quote_plus(sas_token), hostname, operation)

receive_client = uamqp.ReceiveClient(uri, debug=True)
batch = receive_client.receive_message_batch(max_batch_size=10)
for msg in batch:
  print('received a message')
  # Check content_type in message property to identify feedback messages coming from device
  if msg.properties.content_type == 'application/vnd.microsoft.iothub.feedback.json':
    msg_body_raw = msg.get_data()
    msg_body_str = ''.join(msg_body_raw)
    msg_body = json.loads(msg_body_str)
    print(json.dumps(msg_body, indent=2))
    print('******************')
    for feedback in msg_body:
      print('feedback received')
      print('\tstatusCode: ' + str(feedback['statusCode']))
      print('\toriginalMessageId: ' + str(feedback['originalMessageId']))
      print('\tdeviceId: ' + str(feedback['deviceId']))
      print
  else:
    print('unknown message:', msg.properties.content_type)

如以上代码中所示,云到设备的反馈消息的内容类型为 application/vnd.microsoft.iothub.feedback.jsonAs shown in the preceding code, a cloud-to-device feedback message has a content type of application/vnd.microsoft.iothub.feedback.json. 可以使用消息的 JSON 正文中的属性推断原始消息的传送状态:You can use the properties in the message's JSON body to infer the delivery status of the original message:

  • 反馈正文中的键 statusCode 包含以下值之一:SuccessExpiredDeliveryCountExceededRejectedPurgedKey statusCode in the feedback body has one of the following values: Success, Expired, DeliveryCountExceeded, Rejected, or Purged.

  • 反馈正文中的键 deviceId 包含目标设备的 ID。Key deviceId in the feedback body has the ID of the target device.

  • 反馈正文中的键 originalMessageId 包含服务发送的原始云到设备消息的 ID。Key originalMessageId in the feedback body has the ID of the original cloud-to-device message that was sent by the service. 可以使用此 ID 将反馈关联到云到设备的消息。You can use this delivery status to correlate feedback to cloud-to-device messages.

接收遥测消息(服务客户端)Receive telemetry messages (service client)

默认情况下,IoT 中心将引入的设备遥测消息存储在内置的事件中心内。By default, your IoT hub stores ingested device telemetry messages in a built-in event hub. 服务客户端可以使用 AMQP 协议接收存储的事件。Your service client can use the AMQP Protocol to receive the stored events.

为此,服务客户端首先需要连接到 IoT 中心终结点,并接收内置事件中心的重定向地址。For this purpose, the service client first needs to connect to the IoT hub endpoint and receive a redirection address to the built-in event hubs. 然后,服务客户端使用提供的地址连接到内置的事件中心。The service client then uses the provided address to connect to the built-in event hub.

在每个步骤中,客户端都需要提供以下信息片段:In each step, the client needs to present the following pieces of information:

  • 有效的服务凭据(服务共享访问签名令牌)。Valid service credentials (service shared access signature token).

  • 采用正确格式的使用者组分区路径,客户端将从中检索消息。A well-formatted path to the consumer group partition that it intends to retrieve messages from. 对于给定的使用者组和分区 ID,该路径采用以下格式:/messages/events/ConsumerGroups/<consumer_group>/Partitions/<partition_id>(默认的使用者组为 $Default)。For a given consumer group and partition ID, the path has the following format: /messages/events/ConsumerGroups/<consumer_group>/Partitions/<partition_id> (the default consumer group is $Default).

  • 一个用于在分区中指定起点的可选筛选谓词。An optional filtering predicate to designate a starting point in the partition. 此谓词可以采用序列号、偏移量或排队时间戳的形式。This predicate can be in the form of a sequence number, offset, or enqueued timestamp.

以下代码片段使用 Python 中的 uAMQP 库来演示上述步骤:The following code snippet uses the uAMQP library in Python to demonstrate the preceding steps:

import json
import uamqp
import urllib
import time

# Use the generate_sas_token implementation that's available here: https://docs.azure.cn/zh-cn/iot-hub/iot-hub-devguide-security#security-token-structure
from helper import generate_sas_token

iot_hub_name = '<iot-hub-name>'
hostname = '{iot_hub_name}.azure-devices.cn'.format(iot_hub_name=iot_hub_name)
policy_name = 'service'
access_key = '<primary-or-secondary-key>'
operation = '/messages/events/ConsumerGroups/{consumer_group}/Partitions/{p_id}'.format(consumer_group='$Default', p_id=0)

username = '{policy_name}@sas.root.{iot_hub_name}'.format(policy_name=policy_name, iot_hub_name=iot_hub_name)
sas_token = generate_sas_token(hostname, access_key, policy_name)
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username), urllib.quote_plus(sas_token), hostname, operation)

# Optional filtering predicates can be specified by using endpoint_filter
# Valid predicates include:
# - amqp.annotation.x-opt-sequence-number
# - amqp.annotation.x-opt-offset
# - amqp.annotation.x-opt-enqueued-time
# Set endpoint_filter variable to None if no filter is needed
endpoint_filter = b'amqp.annotation.x-opt-sequence-number > 2995'

# Helper function to set the filtering predicate on the source URI
def set_endpoint_filter(uri, endpoint_filter=''):
  source_uri = uamqp.address.Source(uri)
  source_uri.set_filter(endpoint_filter)
  return source_uri

receive_client = uamqp.ReceiveClient(set_endpoint_filter(uri, endpoint_filter), debug=True)
try:
  batch = receive_client.receive_message_batch(max_batch_size=5)
except uamqp.errors.LinkRedirect as redirect:
  # Once a redirect error is received, close the original client and recreate a new one to the re-directed address
  receive_client.close()

  sas_auth = uamqp.authentication.SASTokenAuth.from_shared_access_key(redirect.address, policy_name, access_key)
  receive_client = uamqp.ReceiveClient(set_endpoint_filter(redirect.address, endpoint_filter), auth=sas_auth, debug=True)

# Start receiving messages in batches
batch = receive_client.receive_message_batch(max_batch_size=5)
for msg in batch:
  print('*** received a message ***')
  print(''.join(msg.get_data()))
  print('\t: ' + str(msg.annotations['x-opt-sequence-number']))
  print('\t: ' + str(msg.annotations['x-opt-offset']))
  print('\t: ' + str(msg.annotations['x-opt-enqueued-time']))

对于给定的设备 ID,IoT 中心使用该设备 ID 的哈希来确定要将其消息存储到哪些分区。For a given device ID, the IoT hub uses a hash of the device ID to determine which partition to store its messages in. 以上代码片段演示如何从单个此类分区接收事件。The preceding code snippet demonstrates how events are received from a single such partition. 但请注意,典型的应用程序通常需要检索存储在所有事件中心分区中的事件。However, note that a typical application often needs to retrieve events that are stored in all event hub partitions.

设备客户端Device client

建立连接并在 IoT 中心(设备客户端)进行身份验证Connect and authenticate to an IoT hub (device client)

若要使用 AMQP 连接到 IoT 中心,设备可以使用基于声明的安全性 (CBS)简单身份验证和安全层 (SASL) 身份验证To connect to an IoT hub by using AMQP, a device can use claims based security (CBS) or Simple Authentication and Security Layer (SASL) authentication.

设备客户端需要以下信息:The following information is required for the device client:

信息Information ValueValue
IoT 中心主机名IoT hub hostname <iot-hub-name>.azure-devices.cn
访问密钥Access key 与设备关联的主要密钥或辅助密钥A primary or secondary key that's associated with the device
共享访问签名Shared access signature 采用以下格式的短生存期共享访问签名:SharedAccessSignature sig={signature-string}&se={expiry}&skn={policyName}&sr={URL-encoded-resourceURI}A short-lived shared access signature in the following format: SharedAccessSignature sig={signature-string}&se={expiry}&skn={policyName}&sr={URL-encoded-resourceURI}. 若要获取用于生成此签名的代码,请参阅控制对 IoT 中心的访问To get the code for generating this signature, see Control access to IoT Hub.

以下代码片段使用 Python 中的 uAMQP 库通过发送方链接连接到 IoT 中心。The following code snippet uses the uAMQP library in Python to connect to an IoT hub via a sender link.

import uamqp
import urllib
import uuid

# Use generate_sas_token implementation available here: 
# https://docs.azure.cn/zh-cn/iot-hub/iot-hub-devguide-security#security-token-structure
from helper import generate_sas_token

iot_hub_name = '<iot-hub-name>'
hostname = '{iot_hub_name}.azure-devices.cn'.format(iot_hub_name=iot_hub_name)
device_id = '<device-id>'
access_key = '<primary-or-secondary-key>'
username = '{device_id}@sas.{iot_hub_name}'.format(device_id=device_id, iot_hub_name=iot_hub_name)
sas_token = generate_sas_token('{hostname}/devices/{device_id}'.format(hostname=hostname, device_id=device_id), access_key, None)

operation = '<operation-link-name>' # e.g., '/devices/{device_id}/messages/devicebound'
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username), urllib.quote_plus(sas_token), hostname, operation)

receive_client = uamqp.ReceiveClient(uri, debug=True)
send_client = uamqp.SendClient(uri, debug=True)

在设备操作中支持使用以下链接路径:The following link paths are supported as device operations:

创建者Created by 链接类型Link type 链接路径Link path 说明Description
设备Devices 接收方链接Receiver link /devices/<deviceID>/messages/devicebound 每个目标设备将在此链接上接收发往设备的云到设备消息。Cloud-to-device messages that are destined for devices are received on this link by each destination device.
设备Devices 发送方链接Sender link /devices/<deviceID>/messages/events 设备发送的设备到云消息将通过此链接发送。Device-to-cloud messages that are sent from a device are sent over this link.
设备Devices 发送方链接Sender link /messages/serviceBound/feedback 设备通过此链接将云到设备的消息反馈发送到服务。Cloud-to-device message feedback sent to the service over this link by devices.

接收云到设备的命令(设备客户端)Receive cloud-to-device commands (device client)

发送到设备的云到设备命令抵达 /devices/<deviceID>/messages/devicebound 链接。Cloud-to-device commands that are sent to devices arrive on a /devices/<deviceID>/messages/devicebound link. 设备可以分批接收这些消息,并根据需要在消息中使用消息数据有效负载、消息属性、批注或应用程序属性。Devices can receive these messages in batches, and use the message data payload, message properties, annotations, or application properties in the message as needed.

以下代码片段使用 Python 中的 uAMQP 库接收设备发送的云到设备消息。The following code snippet uses the uAMQP library in Python) to receive cloud-to-device messages by a device.

# ...
# Create a receive client for the cloud-to-device receive link on the device
operation = '/devices/{device_id}/messages/devicebound'.format(device_id=device_id)
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username), urllib.quote_plus(sas_token), hostname, operation)

receive_client = uamqp.ReceiveClient(uri, debug=True)
while True:
  batch = receive_client.receive_message_batch(max_batch_size=5)
  for msg in batch:
    print('*** received a message ***')
    print(''.join(msg.get_data()))

    # Property 'to' is set to: '/devices/device1/messages/devicebound',
    print('\tto:                     ' + str(msg.properties.to))

    # Property 'message_id' is set to value provided by the service
    print('\tmessage_id:             ' + str(msg.properties.message_id))

    # Other properties are present if they were provided by the service
    print('\tcreation_time:          ' + str(msg.properties.creation_time))
    print('\tcorrelation_id:         ' + str(msg.properties.correlation_id))
    print('\tcontent_type:           ' + str(msg.properties.content_type))
    print('\treply_to_group_id:      ' + str(msg.properties.reply_to_group_id))
    print('\tsubject:                ' + str(msg.properties.subject))
    print('\tuser_id:                ' + str(msg.properties.user_id))
    print('\tgroup_sequence:         ' + str(msg.properties.group_sequence))
    print('\tcontent_encoding:       ' + str(msg.properties.content_encoding))
    print('\treply_to:               ' + str(msg.properties.reply_to))
    print('\tabsolute_expiry_time:   ' + str(msg.properties.absolute_expiry_time))
    print('\tgroup_id:               ' + str(msg.properties.group_id))

    # Message sequence number in the built-in Event hub
    print('\tx-opt-sequence-number:  ' + str(msg.annotations['x-opt-sequence-number']))

发送遥测消息(设备客户端)Send telemetry messages (device client)

也可以使用 AMQP 从设备发送遥测消息。You can also send telemetry messages from a device by using AMQP. 设备可以选择性地提供应用程序属性的字典或各种消息属性,例如消息 ID。The device can optionally provide a dictionary of application properties, or various message properties, such as message ID.

以下代码片段使用 Python 中的 uAMQP 库从设备发送设备到云的消息。The following code snippet uses the uAMQP library in Python to send device-to-cloud messages from a device.

# ...
# Create a send client for the device-to-cloud send link on the device
operation = '/devices/{device_id}/messages/events'.format(device_id=device_id)
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username), urllib.quote_plus(sas_token), hostname, operation)

send_client = uamqp.SendClient(uri, debug=True)

# Set any of the applicable message properties
msg_props = uamqp.message.MessageProperties()
msg_props.message_id = str(uuid.uuid4())
msg_props.creation_time = None
msg_props.correlation_id = None
msg_props.content_type = None
msg_props.reply_to_group_id = None
msg_props.subject = None
msg_props.user_id = None
msg_props.group_sequence = None
msg_props.to = None
msg_props.content_encoding = None
msg_props.reply_to = None
msg_props.absolute_expiry_time = None
msg_props.group_id = None

# Application properties in the message (if any)
application_properties = { "app_property_key": "app_property_value" }

# Create message
msg_data = b"Your message payload goes here"
message = uamqp.Message(msg_data, properties=msg_props, application_properties=application_properties)

send_client.queue_message(message)
results = send_client.send_all_messages()

for result in results:
    if result == uamqp.constants.MessageState.SendFailed:
        print result

附加说明Additional notes

  • AMQP 连接可能由于网络波动或身份验证令牌(在代码中生成)过期而中断。The AMQP connections might be disrupted because of a network glitch or the expiration of the authentication token (generated in the code). 服务客户端必须处理这种情况,并根据需要重新建立连接和链接。The service client must handle these circumstances and reestablish the connection and links, if needed. 如果身份验证令牌会过期,客户端还可以在令牌过期之前主动续订令牌,以避免连接断开。If an authentication token expires, the client can avoid a connection drop by proactively renewing the token prior to its expiration.

  • 有时,客户端必须能够正确处理链接重定向。Your client must occasionally be able to handle link redirections correctly. 若要了解此类操作,请参阅 AMQP 客户端文档。To understand such an operation, see your AMQP client documentation.

后续步骤Next steps

若要详细了解 AMQP 协议,请参阅 AMQP v1.0 规范To learn more about the AMQP Protocol, see the AMQP v1.0 specification.

若要详细了解 IoT 中心消息传递,请参阅:To learn more about IoT Hub messaging, see: