使用 Azure Cosmos DB 将来自 Azure IoT 中心的设备连接事件排序Order device connection events from Azure IoT Hub using Azure Cosmos DB

借助 Azure 事件网格,可以基于应用程序生成事件,并轻松地在业务解决方案中集成 IoT 事件。Azure Event Grid helps you build event-based applications and easily integrate IoT events in your business solutions. 本文逐步讲解一种可用于跟踪最新设备连接状态并将其存储在 Cosmos DB 中的设置。This article walks you through a setup which can be used to track and store the latest device connection state in Cosmos DB. 我们将使用“设备已连接”和“设备已断开连接”事件中提供的序号,并在 Cosmos DB 中存储最新状态。We will use the sequence number available in the Device Connected and Device Disconnected events and store the latest state in Cosmos DB. 此外,我们将使用一个存储过程,它是针对 Cosmos DB 中的集合执行的应用程序逻辑。We are going to use a stored procedure, which is an application logic that is executed against a collection in Cosmos DB.

序号是十六进制数的字符串表示形式。The sequence number is a string representation of a hexadecimal number. 可以使用字符串比较来确定更大的编号。You can use string compare to identify the larger number. 如果将字符串转换为十六进制,则编号是一个 256 位数编号。If you are converting the string to hex, then the number will be a 256-bit number. 序号严格递增,最新事件的编号大于其他事件。The sequence number is strictly increasing, and the latest event will have a higher number than other events. 如果经常出现设备连接和断开连接事件,并且你想要确保只使用最新事件来触发下游操作(因为 Azure 事件网格不支持事件排序),则此功能非常有用。This is useful if you have frequent device connects and disconnects, and want to ensure only the latest event is used to trigger a downstream action, as Azure Event Grid doesn’t support ordering of events.

先决条件Prerequisites

  • 有效的 Azure 帐户。An active Azure account. 如果没有,可以创建试用帐户If you don't have one, you can create a trial account.

  • 有效的 Azure Cosmos DB SQL API 帐户。An active Azure Cosmos DB SQL API account. 如果尚未创建此帐户,请参阅创建数据库帐户中的演练。If you haven't created one yet, see Create a database account for a walkthrough.

  • 数据库中的集合。A collection in your database. 请参阅添加集合中的演练。See Add a collection for a walkthrough. 创建集合时,请对分区键使用 /idWhen you create your collection, use /id for the partition key.

  • Azure 中的 IoT 中心。An IoT Hub in Azure. 如果尚未创建 Iot 中心,请参阅 IoT 中心入门中的演练。If you haven't created one yet, see Get started with IoT Hub for a walkthrough.

创建存储过程Create a stored procedure

首先,创建一个存储过程,并将其设置为运行一个逻辑来比较传入事件的序号,并在数据库中记录每个设备的最新事件。First, create a stored procedure and set it up to run a logic that compares sequence numbers of incoming events and records the latest event per device in the database.

  1. 在 Cosmos DB SQL API 中,选择“数据资源管理器” > “项” > “新建存储过程”。 In your Cosmos DB SQL API, select Data Explorer > Items > New Stored Procedure.

    创建存储过程

  2. 输入 LatestDeviceConnectionState 作为存储过程 ID,并将以下内容粘贴到“存储过程正文”中。 Enter LatestDeviceConnectionState for the stored procedure ID and paste the following in the Stored Procedure body. 请注意,此代码应替换存储过程正文中的任何现有代码。Note that this code should replace any existing code in the stored procedure body. 此代码为每个设备 ID 保留一行,并通过标识最高序号来记录该设备 ID 的最新连接状态。This code maintains one row per device ID and records the latest connection state of that device ID by identifying the highest sequence number.

    // SAMPLE STORED PROCEDURE
    function UpdateDevice(deviceId, moduleId, hubName, connectionState, connectionStateUpdatedTime, sequenceNumber) {
      var collection = getContext().getCollection();
      var response = {};
    
      var docLink = getDocumentLink(deviceId, moduleId);
    
      var isAccepted = collection.readDocument(docLink, function(err, doc) {
        if (err) {
          console.log('Cannot find device ' + docLink + ' - ');
          createDocument();
        } else {
          console.log('Document Found - ');
          replaceDocument(doc);
        }
      });
    
      function replaceDocument(document) {
        console.log(
          'Old Seq :' +
            document.sequenceNumber +
            ' New Seq: ' +
            sequenceNumber +
            ' - '
        );
        if (sequenceNumber > document.sequenceNumber) {
          document.connectionState = connectionState;
          document.connectionStateUpdatedTime = connectionStateUpdatedTime;
          document.sequenceNumber = sequenceNumber;
    
          console.log('replace doc - ');
    
          isAccepted = collection.replaceDocument(docLink, document, function(
            err,
            updated
          ) {
            if (err) {
              getContext()
                .getResponse()
                .setBody(err);
            } else {
              getContext()
                .getResponse()
                .setBody(updated);
            }
          });
        } else {
          getContext()
            .getResponse()
            .setBody('Old Event - current: ' + document.sequenceNumber + ' Incoming: ' + sequenceNumber);
        }
      }
      function createDocument() {
        document = {
          id: deviceId + '-' + moduleId,
          deviceId: deviceId,
          moduleId: moduleId,
          hubName: hubName,
          connectionState: connectionState,
          connectionStateUpdatedTime: connectionStateUpdatedTime,
          sequenceNumber: sequenceNumber
        };
        console.log('Add new device - ' + collection.getAltLink());
        isAccepted = collection.createDocument(
          collection.getAltLink(),
          document,
          function(err, doc) {
            if (err) {
              getContext()
                .getResponse()
                .setBody(err);
            } else {
              getContext()
                .getResponse()
                .setBody(doc);
            }
          }
        );
      }
    
      function getDocumentLink(deviceId, moduleId) {
        return collection.getAltLink() + '/docs/' + deviceId + '-' + moduleId;
      }
    }
    
  3. 保存存储过程:Save the stored procedure:

    保存存储过程

创建逻辑应用Create a logic app

首先,创建逻辑应用并添加事件网格触发器,以便监视虚拟机的资源组。First, create a logic app and add an Event grid trigger that monitors the resource group for your virtual machine.

创建逻辑应用资源Create a logic app resource

  1. Azure 门户中,依次选择“+ 创建资源”、“集成”、“逻辑应用”。 In the Azure portal, select +Create a resource, select Integration and then Logic App.

    创建逻辑应用

  2. 为逻辑应用指定一个在订阅中唯一的名称,然后选择 IoT 中心所在的同一订阅、资源组和位置。Give your logic app a name that's unique in your subscription, then select the same subscription, resource group, and location as your IoT hub.

    新建逻辑应用

  3. 选择“创建”以创建逻辑应用。 Select Create to create the logic app.

    你现在已为逻辑应用程序创建 Azure 资源。You've now created an Azure resource for your logic app. 在 Azure 部署逻辑应用后,逻辑应用设计器会显示针对常用模式的模板,以便你可以更快地入门。After Azure deploys your logic app, the Logic Apps Designer shows you templates for common patterns so you can get started faster.

    备注

    若要查找并再次打开你的逻辑应用,请选择“资源组”,然后选择要在本操作指南中使用的资源组。To find and open your logic app again, select Resource groups and select the resource group you are using for this how-to. 然后选择你的新逻辑应用。Then select your new logic app. 此时会打开逻辑应用设计器。This opens the Logic App Designer.

  4. 在逻辑应用设计器中向右滚动,直至看到常用的触发器。In the Logic App Designer, scroll to the right until you see common triggers. 在“模板”下,选择“空白逻辑应用”,以便从头开始生成逻辑应用。 Under Templates , choose Blank Logic App so that you can build your logic app from scratch.

选择触发器Select a trigger

触发器是启动逻辑应用的特定事件。A trigger is a specific event that starts your logic app. 在本教程中,触发工作流的触发器通过 HTTP 接收请求。For this tutorial, the trigger that sets off the workflow is receiving a request over HTTP.

  1. 在连接器和触发器搜索栏中,键入 HTTP 并按 Enter。In the connectors and triggers search bar, type HTTP and hit Enter.

  2. 选择“请求 - 当收到 HTTP 请求时”作为触发器。 Select Request - When an HTTP request is received as the trigger.

    选择 HTTP 请求触发器

  3. 选择“使用示例有效负载生成架构”。 Select Use sample payload to generate schema.

    使用示例有效负载生成架构

  4. 在文本框中粘贴以下示例 JSON 代码,然后选择“完成”: Paste the following sample JSON code into the text box, then select Done:

    [{
     "id": "fbfd8ee1-cf78-74c6-dbcf-e1c58638ccbd",
     "topic":
       "/SUBSCRIPTIONS/DEMO5CDD-8DAB-4CF4-9B2F-C22E8A755472/RESOURCEGROUPS/EGTESTRG/PROVIDERS/MICROSOFT.DEVICES/IOTHUBS/MYIOTHUB",
     "subject": "devices/Demo-Device-1",
     "eventType": "Microsoft.Devices.DeviceConnected",
     "eventTime": "2018-07-03T23:20:11.6921933+00:00",
     "data": {
       "deviceConnectionStateEventInfo": {
         "sequenceNumber":
           "000000000000000001D4132452F67CE200000002000000000000000000000001"
       },
       "hubName": "MYIOTHUB",
       "deviceId": "48e44e11-1437-4907-83b1-4a8d7e89859e",
       "moduleId": ""
     },
     "dataVersion": "1",
     "metadataVersion": "1"
    }]
    

    粘贴示例 JSON 有效负载

  5. 可能会收到一条弹出通知,其中指出,“请记住要在请求中包含设为 application/json 的内容类型标头”。 You may receive a pop-up notification that says, Remember to include a Content-Type header set to application/json in your request. 可以放心忽略此建议,并转到下一部分。You can safely ignore this suggestion, and move on to the next section.

创建条件Create a condition

在满足逻辑应用工作流中的特定条件后,条件可帮助运行特定的操作。In your logic app workflow, conditions help run specific actions after passing that specific condition. 一旦满足条件,即可定义所需的操作。Once the condition is met, a desired action can be defined. 在本教程中,条件是检查 eventType 是“设备已连接”还是“设备已断开连接”。For this tutorial, the condition is to check whether eventType is device connected or device disconnected. 操作是在数据库中执行存储过程。The action will be to execute the stored procedure in your database.

  1. 依次选择“+ 新建步骤”、“内置”,然后找到并选择“条件”。 Select + New step then Built-in , then find and select Condition . 单击“选择值”,此时会弹出一个框,其中显示了“动态内容”-- 可选择的字段。Click in Choose a value and a box will pop up showing the Dynamic content -- the fields that can be selected. 如下所示填写字段,以便仅对“设备已连接”和“设备已断开连接”事件执行此操作:Fill in the fields as shown below to only execute this for Device Connected and Device Disconnected events:

    • 选择值: eventType - 从单击此字段时显示的动态内容中的字段内选择此值。Choose a value: eventType -- select this from the fields in the dynamic content that appear when you click on this field.

    • 将“等于”更改为“结尾为”。Change "is equal to" to ends with .

    • 选择值: nectedChoose a value: nected .

      填写条件

  2. 在“如果为 true”对话框中,单击“添加操作”。 In the if true dialog, click on Add an action .

    为 true 时添加操作

  3. 搜索 Cosmos DB 并选择“Azure Cosmos DB - 执行存储过程”Search for Cosmos DB and select Azure Cosmos DB - Execute stored procedure

    搜索 CosmosDB

  4. 在“连接名称”中填写 cosmosdb-connection ,在表中选择该条目,然后选择“创建”。 Fill in cosmosdb-connection for the Connection Name and select the entry in the table, then select Create . 此时会显示“执行存储过程”面板。You see the Execute stored procedure panel. 为以下字段输入值:Enter the values for the fields:

    数据库 ID :ToDoListDatabase ID : ToDoList

    集合 ID :ItemsCollection ID : Items

    存储过程 ID :LatestDeviceConnectionStateSproc ID : LatestDeviceConnectionState

  5. 选择“添加新参数”。Select Add new parameter . 在显示的下拉列表中,选中“分区键”和“存储过程的参数”旁边的框,然后单击屏幕上的其他任何位置;此时将为分区键值添加一个字段,并为存储过程的参数添加一个字段。 In the dropdown that appears, check the boxes next to Partition key and Parameters for the stored procedure , then click anywhere else on the screen; it adds a field for partition key value and a field for parameters for the stored procedure.

    填充逻辑应用操作

  6. 现在,如下所示输入分区键值和参数。Now enter the partition key value and parameters as shown below. 请务必如图中所示添加方括号和双引号。Be sure to put in the brackets and double-quotes as shown. 你可能必须单击“添加动态内容”来获取可在此处使用的有效值。You may have to click Add dynamic content to get the valid values you can use here.

    填充逻辑应用操作

  7. 在显示了“For Each”的窗格顶部,在“选择前面步骤的输出”下,确保“正文”已选中。 At the top of the pane where it says For Each , under Select an output from previous steps , make sure it Body is selected.

    填充逻辑应用 for-each

  8. 保存逻辑应用。Save your logic app.

复制 HTTP URLCopy the HTTP URL

在退出逻辑应用设计器之前,请复制逻辑应用要侦听的触发器 URL。Before you leave the Logic Apps Designer, copy the URL that your logic app is listening to for a trigger. 稍后要使用此 URL 来配置事件网格。You use this URL to configure Event Grid.

  1. 单击“当收到 HTTP 请求时”触发器配置框将其展开。 Expand the When a HTTP request is received trigger configuration box by clicking on it.

  2. 选择“HTTP POST URL”旁边的复制按钮复制其值。 Copy the value of HTTP POST URL by selecting the copy button next to it.

    复制 HTTP POST URL

  3. 保存此 URL,以便在下一部分可以引用它。Save this URL so that you can refer to it in the next section.

为 IoT 中心事件配置订阅Configure subscription for IoT Hub events

本部分将 IoT 中心配置为在发生事件时发布事件。In this section, you configure your IoT Hub to publish events as they occur.

  1. 在 Azure 门户中导航到 IoT 中心。In the Azure portal, navigate to your IoT hub.

  2. 选择“事件” 。Select Events.

    打开事件网格详细信息

  3. 选择“+ 事件订阅”。 Select + Event subscription.

    创建新的事件订阅

  4. 填写“事件订阅详细信息”: 提供一个说明性的名称,然后选择“事件网格架构” 。Fill in Event Subscription Details: Provide a descriptive name and select Event Grid Schema.

  5. 填写“事件类型”字段。 Fill in the Event Types fields. 在下拉列表中,从菜单中仅选择“设备已连接” 和“设备已断开连接” 。In the dropdown list, select only Device Connected and Device Disconnected from the menu. 单击屏幕上的任何其他位置以关闭列表并保存选择。Click anywhere else on the screen to close the list and save your selections.

    设置要查找的事件类型

  6. 对于“终结点详细信息”,请选择“Webhook”作为“终结点类型”,单击“选择终结点”并粘贴从逻辑应用中复制的 URL,然后确认选择。 For Endpoint Details , select Endpoint Type as Web Hook and click on select endpoint and paste the URL that you copied from your logic app and confirm selection.

    选择终结点 URL

  7. 窗体现在应类似于以下示例:The form should now look similar to the following example:

    示例事件订阅窗体

    选择“创建”保存事件订阅。Select Create to save the event subscription.

观察事件Observe events

设置事件订阅后,让我们通过连接设备进行测试。Now that your event subscription is set up, let's test by connecting a device.

在 IoT 中心注册设备Register a device in IoT Hub

  1. 在 IoT 中心选择“IoT 设备”。From your IoT hub, select IoT Devices .

  2. 在窗格顶部选择“+添加”。Select +Add at the top of the pane.

  3. 对于“设备 ID”,请输入 Demo-Device-1For Device ID , enter Demo-Device-1.

  4. 选择“保存”。Select Save .

  5. 可以添加具有不同设备 ID 的多个设备。You can add multiple devices with different device IDs.

    已添加到中心的设备

  6. 再次单击该设备;现在将会填入连接字符串和密钥。Click on the device again; now the connection strings and keys will be filled in. 复制“连接字符串 - 主键”供稍后使用。Copy the Connection string -- primary key for later use.

    设备的 ConnectionString

启动 Raspberry Pi 模拟器Start Raspberry Pi simulator

让我们使用 Raspberry Pi Web 模拟器来模拟设备连接。Let's use the Raspberry Pi web simulator to simulate device connection.

启动 Raspberry Pi 模拟器Start Raspberry Pi simulator

在 Raspberry Pi Web 模拟器上运行示例应用程序Run a sample application on the Raspberry Pi web simulator

这会触发“设备已连接”事件。This will trigger a device connected event.

  1. 在代码区域中,将第 15 行中的占位符替换为在上一部分结束时保存的 Azure IoT 中心设备连接字符串。In the coding area, replace the placeholder in Line 15 with your Azure IoT Hub device connection string that you saved at the end of the previous section.

    粘贴设备连接字符串

  2. 选择“运行”以运行该应用程序。 Run the application by selecting Run.

此时会显示类似于以下输出的信息,其中显示了传感器数据以及发送到 IoT 中心的消息。You see something similar to the following output that shows the sensor data and the messages that are sent to your IoT hub.

运行应用程序

单击“停止”以停止模拟器,并触发“设备已断开连接”事件。 Click Stop to stop the simulator and trigger a Device Disconnected event.

现已运行示例应用程序来收集传感器数据并将其发送到 IoT 中心。You have now run a sample application to collect sensor data and send it to your IoT hub.

在 Cosmos DB 中观察事件Observe events in Cosmos DB

可在 Cosmos DB 文档中查看已执行的存储过程的结果。You can see results of the executed stored procedure in your Cosmos DB document. 具体形式如下。Here's what it looks like. 每行包含每个设备的最新设备连接状态。Each row contains the latest device connection state per device.

操作方法屏幕截图

使用 Azure CLIUse the Azure CLI

如果不使用 Azure 门户,也可以使用 Azure CLI 来完成 IoT 中心相关的步骤。Instead of using the Azure portal, you can accomplish the IoT Hub steps using the Azure CLI. 有关详细信息,请参阅有关使用 Azure CLI 创建事件订阅创建 IoT 设备的网页。For details, see the Azure CLI pages for creating an event subscription and creating an IoT device.

清理资源Clean up resources

本教程使用的资源会在 Azure 订阅中产生费用。This tutorial used resources that incur charges on your Azure subscription. 学习完本教程并测试结果后,请禁用或删除不再想要保留的资源。When you're finished trying out the tutorial and testing your results, disable or delete resources that you don't want to keep.

如果不希望丢弃针对逻辑应用所执行的操作,可以禁用逻辑应用,但不要将其删除。If you don't want to lose the work on your logic app, disable it instead of deleting it.

  1. 导航到逻辑应用。Navigate to your logic app.

  2. 在“概述”边栏选项卡上,选择“删除”或“禁用”。 On the Overview blade, select Delete or Disable.

每个订阅可以包含一个 IoT 中心。Each subscription can have one IoT hub. 如果在本教程中创建了一个中心,则不需要将其删除,以免产生费用。If you created a hub for this tutorial, then you don't need to delete it to prevent charges.

  1. 导航到 IoT 中心。Navigate to your IoT hub.

  2. 在“概览”边栏选项卡上,选择“删除”。 On the Overview blade, select Delete .

即使保留了 IoT 中心中,你也仍可能想要删除创建的事件订阅。Even if you keep your IoT hub, you may want to delete the event subscription that you created.

  1. 在 IoT 中心,选择“事件网格”。 In your IoT hub, select Event Grid.

  2. 选择要删除的事件订阅。Select the event subscription that you want to remove.

  3. 选择“删除” 。Select Delete.

若要在 Azure 门户中删除 Azure Cosmos DB 帐户,请右键单击该帐户名,然后单击“删除帐户”。To remove an Azure Cosmos DB account from the Azure portal, right-click the account name and click Delete account . 参阅有关删除 Azure Cosmos DB 帐户的详细说明。See detailed instructions for deleting an Azure Cosmos DB account.

后续步骤Next steps