教程:使用 Azure 资源管理器模板配置 IoT 中心消息路由

消息路由 允许将遥测数据从 IoT 设备发送到与事件中心兼容的内置终结点或自定义终结点,例如 Blob 存储、服务总线队列、服务总线主题和事件中心。 若要配置自定义消息路由,请创建 路由查询 以自定义与特定条件匹配的路由。 设置后,传入数据会自动由 IoT 中心路由到终结点。 如果消息与任何定义的路由查询不匹配,则会将其路由到默认终结点。

本 2 部分教程介绍如何在 IoT 中心设置和使用这些自定义路由查询。 将消息从 IoT 设备路由到多个终结点之一,包括 Blob 存储和服务总线队列。 发送到服务总线队列的消息由逻辑应用选取,并通过电子邮件发送。 未定义自定义消息路由的消息将发送到默认终结点,然后由 Azure 流分析选取并在 Power BI 可视化效果中查看。

若要完成本教程的第 1 部分和第 2 部分,请执行以下任务:

第一部分:创建资源,设置消息路由

  • 创建资源 - IoT 中心、存储帐户、服务总线队列和模拟设备。 可以使用 Azure 门户、Azure 资源管理器模板、Azure CLI 或 Azure PowerShell 来完成此作。
  • 在 IoT 中心为存储帐户和服务总线队列配置终结点和消息路由。

第 II 部分:将消息发送到中心,查看路由结果

  • 创建在将消息添加到服务总线队列时触发并发送电子邮件的逻辑应用。
  • 下载并运行模拟 IoT 设备向中心发送消息的应用,以获取不同的路由选项。
  • 为发送到默认终结点的数据创建 Power BI 可视化效果。
  • 查看结果...
  • ...在服务总线队列和电子邮件中。
  • ...在存储帐户中。
  • ...在 Power BI 可视化效果中。

先决条件

  • 本教程的第 1 部分:

    • 必须拥有 Azure 订阅。 如果没有 Azure 订阅,请在开始前创建一个试用版订阅
  • 对于本教程的第 2 部分:

    • 必须已完成本教程的第 1 部分,并且仍有可用资源。

    • 安装 Visual Studio

    • 有权访问 Power BI 帐户以分析默认终结点的流分析。 (免费试用 Power BI

    • 拥有用于发送通知电子邮件的工作或学校帐户。

    • 确保已在防火墙中打开端口 8883。 本教程中的示例使用 MQTT 协议,该协议通过端口 8883 进行通信。 某些企业和教育网络环境中可能会阻止此端口。 有关解决此问题的详细信息和方法,请参阅“连接到 IoT 中心”(MQTT)。

创建基本资源

在配置消息路由之前,需要创建 IoT 中心、存储帐户和服务总线队列。 可以使用本教程第 1 部分的四篇文章之一创建这些资源:Azure 门户、Azure 资源管理器模板、Azure CLI 或 Azure PowerShell。

为所有资源使用相同的资源组和位置。 最后,您可以通过删除资源组来一键清除所有资源。

下面是以下部分中要执行的步骤的摘要:

  1. 创建资源组

  2. 在 S1 层中创建 IoT 中心。 将使用者组添加到 IoT 中心。 检索数据时,Azure 流分析将使用使用者组。

    注释

    必须使用付费层中的 Iot 中心来完成本教程。 免费层仅允许设置一个终结点,本教程需要多个终结点。

  3. 创建具有Standard_LRS复制的标准 V1 存储帐户。

  4. 创建服务总线命名空间和队列。

  5. 为向中心发送消息的模拟设备创建设备标识。 保存测试阶段的密钥。 (如果创建资源管理器模板,则部署模板后完成此作。

消息路由

根据模拟设备附加到消息的属性,将消息路由到不同的资源。 未自定义路由的消息将发送到默认终结点(消息/事件)。 在下一教程中,将消息发送到 IoT 中心,并看到消息路由到不同的目标。

价值 结果
level=“storage” 写入 Azure 存储。
level=“critical” 写入服务总线队列。 逻辑应用从队列中检索消息,并使用 Office 365 发送电子邮件。
默认 使用 Power BI 显示此数据。

第一步是设置将数据路由到的终结点。 第二步是设置使用该终结点的消息路由。 设置路由后,可以在门户中查看终结点和消息路由。

下载模板和参数文件

在本教程的第二部分,下载并运行 Visual Studio 应用程序以将消息发送到 IoT 中心。 该下载中有一个文件夹,其中包含 Azure 资源管理器模板和参数文件,以及 Azure CLI 和 PowerShell 脚本。

继续下载 Azure IoT C# 示例 。 解压缩 master.zip 文件。 资源管理模板和参数文件位于 /iot-hub/Tutorials/Routing/SimulatedDevice/resources/ 中,文件名为 template_iothub.jsontemplate_iothub_parameters.json

创建资源

你将使用 Azure 资源管理器 (RM) 模板创建所有资源。 Azure CLI 和 PowerShell 脚本一次可以运行几行。 RM 模板可以一步部署。 本文分别介绍各节,以帮助你了解每个部分。 然后,它将演示如何部署模板,并创建用于测试的虚拟设备。 部署模板后,可以在门户中查看消息路由配置。

有多个资源名称必须全局唯一,例如 IoT 中心名称和存储帐户名称。 为了简化资源的命名,这些资源名称设置为追加从当前日期/时间生成的随机字母数字值。

如果查看模板,你将看到为这些资源设置变量的步骤,这些变量将传入的参数与 randomValue 连接起来。

以下部分介绍使用的参数。

参数

其中大多数参数都具有默认值。 以 _in 结尾的字符串与 randomValue 拼接,以使其在全局范围内唯一。

randomValue:此值是从部署模板的当前日期/时间生成的。 此字段不在参数文件中,因为它在模板本身中生成。

subscriptionId:此字段设置为要在其中部署模板的订阅。 此字段不在参数文件中,因为它已为你设置。

IoTHubName_in:此字段是基本 IoT 中心名称,该名称与 randomValue 连接,使其全局唯一。

位置:此字段是要在其中部署的 Azure 区域,例如“chinaeast”。

consumer_group:此字段是为通过路由终结点传递的消息设置的使用者组。 它用于筛选 Azure 流分析中的结果。 例如,您可以获取所有内容的整个流;或者,如果数据通过设置为 Contoso 的 consumer_group 传送过来,您可以设置一个 Azure 流分析流(和 Power BI 报表)来仅显示这些条目。 本教程的第 2 部分使用此字段。

sku_name:此字段是 IoT 中心的缩放。 此值必须为 S1 或更高版本;免费层不适用于本教程,因为它不允许多个终结点。

sku_units:此字段与 sku_name一起,并且是可以使用的 IoT 中心单元数。

d2c_partitions:此字段是用于事件流的分区数。

storageAccountName_in:此字段是要创建的存储帐户的名称。 消息将路由到存储帐户中的容器。 此字段与 randomValue 连接,使其全局唯一。

storageContainerName:此字段是存储路由到存储帐户的消息的容器的名称。

storage_endpoint:此字段是消息路由使用的存储帐户终结点的名称。

service_bus_namespace_in:此字段是要创建的服务总线命名空间的名称。 此值与 randomValue 连接,使其全局唯一。

service_bus_queue_in:此字段是用于路由消息的服务总线队列的名称。 此值与 randomValue 连接,使其全局唯一。

AuthRules_sb_queue:此字段是服务总线队列的授权规则,用于检索队列的连接字符串。

Variables

这些值在模板中使用,主要派生自参数。

queueAuthorizationRuleResourceId:此字段是用于服务总线队列授权规则的资源 ID。 ResourceId 又用于检索队列的连接字符串。

iotHubName:此字段是连接 randomValue 后 IoT 中心的名称。

storageAccountName:此字段是随机值连接后存储帐户的名称。

service_bus_namespace:此字段是串联了randomValue后的命名空间。

service_bus_queue:此字段是将随机值拼接后的服务总线队列名称。

sbVersion:要使用的服务总线 API 的版本。 在本例中,它是“2017-04-01”。

资源:存储帐户和容器

创建的第一个资源是存储帐户,以及消息路由到的容器。 容器是存储帐户下的资源。 dependsOn它有一个关于存储帐户的条款,要求在容器创建之前先创建存储帐户。

下面是此部分的外观:

{
    "type": "Microsoft.Storage/storageAccounts",
    "name": "[variables('storageAccountName')]",
    "apiVersion": "2018-07-01",
    "location": "[parameters('location')]",
    "sku": {
        "name": "Standard_LRS",
        "tier": "Standard"
    },
    "kind": "Storage",
    "properties": {},
    "resources": [
        {
        "type": "blobServices/containers",
        "apiVersion": "2018-07-01",
        "name": "[concat('default/', parameters('storageContainerName'))]",
        "properties": {
            "publicAccess": "None"
            } ,
        "dependsOn": [
            "[resourceId('Microsoft.Storage/storageAccounts', variables('storageAccountName'))]"
            ]
        }
    ]
}

资源:服务总线命名空间和队列

创建的第二个资源是服务总线命名空间,以及将消息路由到的服务总线队列。 SKU 设置为标准。 从变量中检索 API 版本。 它还设置为在部署本部分(status:Active)时激活服务总线命名空间。

{
    "type": "Microsoft.ServiceBus/namespaces",
    "comments": "The Sku should be 'Standard' for this tutorial.",
    "sku": {
        "name": "Standard",
        "tier": "Standard"
    },
    "name": "[variables('service_bus_namespace')]",
    "apiVersion": "[variables('sbVersion')]",
    "location": "[parameters('location')]",
    "properties": {
        "provisioningState": "Succeeded",
        "metricId": "[concat('a4295411-5eff-4f81-b77e-276ab1ccda12:', variables('service_bus_namespace'))]",
        "serviceBusEndpoint": "[concat('https://', variables('service_bus_namespace'),'.servicebus.windows.net:443/')]",
        "status": "Active"
    },
    "dependsOn": []
}

本部分将创建服务总线队列。 此部分脚本具有一个 dependsOn 子句,可确保在队列之前创建命名空间。

{
    "type": "Microsoft.ServiceBus/namespaces/queues",
    "name": "[concat(variables('service_bus_namespace'), '/', variables('service_bus_queue'))]",
    "apiVersion": "[variables('sbVersion')]",
    "location": "[parameters('location')]",
    "scale": null,
    "properties": {},
    "dependsOn": [
        "[resourceId('Microsoft.ServiceBus/namespaces', variables('service_bus_namespace'))]"
    ]
}

资源:IoT 中心和消息路由

在创建了存储帐户和服务总线队列之后,您可以创建一个将消息路由到它们的 IoT Hub。 RM 模板使用 dependsOn 子句,因此它不会尝试在创建服务总线资源和存储帐户之前创建中心。

下面是 IoT 中心部分的第一部分。 模板的这一部分设置依赖项,并从属性开始。

{
    "apiVersion": "2018-04-01",
    "type": "Microsoft.Devices/IotHubs",
    "name": "[variables('IoTHubName')]",
    "location": "[parameters('location')]",
    "dependsOn": [
        "[resourceId('Microsoft.Storage/storageAccounts', variables('storageAccountName'))]",
        "[resourceId('Microsoft.ServiceBus/namespaces', variables('service_bus_namespace'))]",
        "[resourceId('Microsoft.ServiceBus/namespaces/queues', variables('service_bus_namespace'), variables('service_bus_queue'))]"
    ],
    "properties": {
        "eventHubEndpoints": {}
            "events": {
                "retentionTimeInDays": 1,
                "partitionCount": "[parameters('d2c_partitions')]"
                }
            },

下一部分是 IoT 中心的消息路由配置部分。 首先是终结点的部分。 模板的这一部分为服务总线队列和存储帐户(包括连接字符串)设置路由终结点。

若要为队列创建连接字符串,需要在内联获取 queueAuthorizationRulesResourcedId。 若要为存储帐户创建连接字符串,请检索主存储密钥,然后以连接字符串的格式使用它。

终结点配置也是设定 Blob 格式为 AVROJSON 的位置。

注释

可将数据以 Apache Avro 格式(默认)或 JSON 格式写入 Blob 存储。

编码格式只能在配置 Blob 存储终结点时设置。 无法更改以前配置的终结点的格式。 使用 JSON 编码时,必须在消息系统属性中将 contentType 设置为 JSON,将 contentEncoding 设置为 UTF-8。

有关使用 Blob 存储终结点的更多详细信息,请参阅 Azure 存储作为路由终结点

"routing": {
   "endpoints": {
       "serviceBusQueues": [
       {
           "connectionString": "[Concat('Endpoint=sb://',variables('service_bus_namespace'),'.servicebus.windows.net/;SharedAccessKeyName=',parameters('AuthRules_sb_queue'),';SharedAccessKey=',listkeys(variables('queueAuthorizationRuleResourceId'),variables('sbVersion')).primaryKey,';EntityPath=',variables('service_bus_queue'))]",
           "name": "[parameters('service_bus_queue_endpoint')]",
           "subscriptionId": "[parameters('subscriptionId')]", 
           "resourceGroup": "[resourceGroup().Name]"
       }
       ],
       "serviceBusTopics": [],
       "eventHubs": [],
       "storageContainers": [
           {
               "connectionString": 
               "[Concat('DefaultEndpointsProtocol=https;AccountName=',variables('storageAccountName'),';AccountKey=',listKeys(resourceId('Microsoft.Storage/storageAccounts', variables('storageAccountName')), providers('Microsoft.Storage', 'storageAccounts').apiVersions[0]).keys[0].value)]",
               "containerName": "[parameters('storageContainerName')]",
               "fileNameFormat": "{iothub}/{partition}/{YYYY}/{MM}/{DD}/{HH}/{mm}",
               "batchFrequencyInSeconds": 100,
               "maxChunkSizeInBytes": 104857600,
               "encoding": "avro",
               "name": "[parameters('storage_endpoint')]",
               "subscriptionId": "[parameters('subscriptionId')]",
               "resourceGroup": "[resourceGroup().Name]"
           }
       ]
   },

下一部分用于消息路由到终结点。 为每个终结点设置了一个,因此服务总线队列设置了一个,存储帐户容器也设置了一个。

请记住,要路由到存储的消息的查询条件是 level="storage",要路由到服务总线队列的消息的查询条件是 level="critical"

"routes": [
    {
        "name": "contosoStorageRoute",
        "source": "DeviceMessages",
        "condition": "level=\"storage\"",
        "endpointNames": [
            "[parameters('storage_endpoint')]"
            ],
        "isEnabled": true
    },
    {
        "name": "contosoSBQueueRoute",
        "source": "DeviceMessages",
        "condition": "level=\"critical\"",
        "endpointNames": [
            "[parameters('service_bus_queue_endpoint')]"
            ],
        "isEnabled": true
    }
],

此 json 显示 IoT 中心部分的其余部分,其中包含中心的默认信息和 SKU。

            "fallbackRoute": {
                "name": "$fallback",
                "source": "DeviceMessages",
                "condition": "true",
                "endpointNames": [
                    "events"
                ],
                "isEnabled": true
            }
        },
        "storageEndpoints": {
            "$default": {
                "sasTtlAsIso8601": "PT1H",
                "connectionString": "",
                "containerName": ""
            }
        },
        "messagingEndpoints": {
            "fileNotifications": {
                "lockDurationAsIso8601": "PT1M",
                "ttlAsIso8601": "PT1H",
                "maxDeliveryCount": 10
            }
        },
        "enableFileUploadNotifications": false,
        "cloudToDevice": {
            "maxDeliveryCount": 10,
            "defaultTtlAsIso8601": "PT1H",
            "feedback": {
                "lockDurationAsIso8601": "PT1M",
                "ttlAsIso8601": "PT1H",
                "maxDeliveryCount": 10
            }
        }
    },
    "sku": {
        "name": "[parameters('sku_name')]",
        "capacity": "[parameters('sku_units')]"
    }
}

资源:服务总线队列授权规则

服务总线队列授权规则用于检索服务总线队列的连接字符串。 它使用 dependsOn 子句来确保它不会在服务总线命名空间和服务总线队列之前被创建。

{
    "type": "Microsoft.ServiceBus/namespaces/queues/authorizationRules",
    "name": "[concat(variables('service_bus_namespace'), '/', variables('service_bus_queue'), '/', parameters('AuthRules_sb_queue'))]",
    "apiVersion": "[variables('sbVersion')]",
    "location": "[parameters('location')]",
    "scale": null,
    "properties": {
        "rights": [
            "Send"
        ]
    },
    "dependsOn": [
        "[resourceId('Microsoft.ServiceBus/namespaces', variables('service_bus_namespace'))]",
        "[resourceId('Microsoft.ServiceBus/namespaces/queues', variables('service_bus_namespace'), variables('service_bus_queue'))]"
    ]
},

资源:消费者组

在本部分,你将在本教程的第二部分为 Azure 流分析使用的 IoT 中心数据创建使用者组。

{
    "type": "Microsoft.Devices/IotHubs/eventHubEndpoints/ConsumerGroups",
    "name": "[concat(variables('iotHubName'), '/events/',parameters('consumer_group'))]",
    "apiVersion": "2018-04-01",
    "dependsOn": [
        "[concat('Microsoft.Devices/IotHubs/', variables('iotHubName'))]"
    ]
}

资源:输出

如果要将值发送回要显示的部署脚本,请使用输出部分。 模板的此部分返回服务总线队列的连接字符串。 返回值不是必需的,它作为如何将结果返回到调用脚本的示例包含在内。

"outputs": {
    "sbq_connectionString": {
      "type": "string",
      "value": "[Concat('Endpoint=sb://',variables('service_bus_namespace'),'.servicebus.windows.net/;SharedAccessKeyName=',parameters('AuthRules_sb_queue'),';SharedAccessKey=',listkeys(variables('queueAuthorizationRuleResourceId'),variables('sbVersion')).primaryKey,';EntityPath=',variables('service_bus_queue'))]"
    }
  }

部署 RM 模板

若要将模板部署到 Azure,请将模板和参数文件上传到 Azure Cloud Shell,然后执行脚本来部署模板。 打开 Azure Cloud Shell 并登录。 此示例使用 PowerShell。

若要上传文件,请在菜单栏中选择 “上传/下载文件 ”图标,然后选择“上传”。

突出显示“上传/下载文件”的 Cloud Shell 菜单栏

使用弹出的文件资源管理器查找本地磁盘上的文件,然后选择“ 打开”。

上传文件后,结果对话框会显示如下图所示的内容。

突出显示“上传/下载文件”的 Cloud Shell 菜单栏

这些文件被上传到由您的 Cloud Shell 实例使用的共享。

运行脚本以执行部署。 此脚本的最后一行检索用于返回服务总线队列连接字符串的变量。

脚本设置和使用以下变量:

$RGName 是要向其部署模板的资源组名称。 部署模板之前会创建此字段。

$location 是用于模板的 Azure 位置,例如“chinaeast”。

deploymentname 是分配给部署的名称,用于检索返回的变量值。

下面是 PowerShell 脚本。 复制此 PowerShell 脚本并将其粘贴到 Cloud Shell 窗口中,然后按 Enter 运行它。

$RGName="ContosoResources"
$location = "chinaeast"
$deploymentname="contoso-routing"

# Remove the resource group if it already exists. 
#Remove-AzResourceGroup -name $RGName 
# Create the resource group.
New-AzResourceGroup -name $RGName -Location $location 

# Set a path to the parameter file. 
$parameterFile = "$HOME/template_iothub_parameters.json"
$templateFile = "$HOME/template_iothub.json"

# Deploy the template.
New-AzResourceGroupDeployment `
    -Name $deploymentname `
    -ResourceGroupName $RGName `
    -TemplateParameterFile $parameterFile `
    -TemplateFile $templateFile `
    -verbose

# Get the returning value of the connection string.
(Get-AzResourceGroupDeployment -ResourceGroupName $RGName -Name $deploymentname).Outputs.sbq_connectionString.value

如果有脚本错误,则可以在本地编辑脚本,再次将其上传到 Cloud Shell,然后再次运行该脚本。 脚本成功完成运行后,继续执行下一步。

创建模拟设备

接下来,创建设备标识并保存其密钥供以后使用。 模拟应用程序使用此设备标识将消息发送到 IoT 中心。 此功能在 PowerShell 中或使用 Azure 资源管理器模板时不可用。 以下步骤介绍如何使用 Azure 门户创建模拟设备。

  1. 打开 Azure 门户 并登录到 Azure 帐户。

  2. 选择 “资源组 ”,然后选择你的资源组。 本教程使用 ContosoResources

  3. 在资源列表中,选择 IoT 中心。 本教程使用 ContosoTestHub。 从“中心”窗格中选择 “IoT 设备 ”。

  4. “IoT 设备”窗格中选择“+添加设备”。 在“添加设备”窗格中,填写设备 ID。 本教程使用 Contoso-Test-Device。 将密钥留空,并选中 “自动生成密钥”。 确保已启用 连接设备到 IoT 中心。 选择“保存”

    “添加设备”屏幕

  5. 创建后,请选择设备以查看生成的密钥。 选择主键上的“复制”图标,并将其保存到某个位置,例如记事本,以用于本教程的测试阶段。

    设备详细信息,包括密钥

在门户中查看消息路由

设置终结点和消息路由后,可以在门户中查看其配置。 登录到 Azure 门户 并转到 资源组。 接下来,选择资源组,然后选择中心(在本教程中,中心名称以 ContosoTestHub 开头)。 可以看到“IoT 中心”窗格。

IoT 中心属性屏幕

在 IoT 中心的选项中,选择 消息路由。 将显示已成功设置的路由。

你设置的路由

“消息路由 ”屏幕上,选择 “自定义终结点 ”以查看为路由定义的终结点。

为路由设置的终结点

后续步骤

设置所有资源并配置消息路由后,请转到下一教程,了解如何处理和显示有关路由消息的信息。