使用 Azure 流分析处理实时 IoT 数据流

本文介绍如何创建流处理逻辑,以从物联网 (IoT) 设备收集数据。 使用真实的物联网 (IoT) 用例来演示如何经济实惠地快速生成解决方案。

必备条件

方案

Contoso 是一家工业自动化领域的公司,该公司已将其制造过程自动化。 这家工厂中的设备配有可实时发送数据流的传感器。 在此方案中,产品车间经理希望通过传感器数据获得实时见解,从而找到规律并采取措施。 可以对传感器数据使用流分析查询语言 (SAQL),查找传入数据流的有趣规律。

在本示例中,数据是 Texas Instruments SensorTag 设备生成的。 数据的有效负载采用 JSON 格式,如以下示例片段所示:

{
    "time": "2016-01-26T20:47:53.0000000",  
    "dspl": "sensorE",  
    "temp": 123,  
    "hmdt": 34  
}  

在实际情况下,其中可能有数百个传感器以流的形式生成事件。 理想情况下,网关设备会运行代码,将这些事件推送到 Azure 事件中心Azure IoT 中心。 流分析作业将从事件中心或 IoT 中心引入这些事件,并针对流运行实时分析查询。 然后,可以将结果发送到支持的输出之一。

为了方便使用,本入门指南提供从实际 SensorTag 设备中捕获的示例数据文件。 可以对示例数据运行查询并查看结果。 在后续教程中,你将学习如何将作业连接到输入和输出并将其部署到 Azure 服务。

创建流分析作业

  1. 导航到 Azure 门户

  2. 在左侧导航菜单中选择“所有服务”,选择“分析”,将鼠标悬停在“流分析作业”上,然后选择“创建”。

    显示选择流分析作业对应的“创建”按钮的屏幕截图。

  3. 在“新建流分析作业”页中,执行以下步骤:

    1. 对于“订阅”,请选择你的 Azure 订阅。

    2. 对于“资源组”,请选择现有的资源组,或创建一个资源组。

    3. 对于“名称”,请输入流分析作业的唯一名称。

    4. 选择要将流分析作业部署到的区域。 为资源组和所有资源使用同一位置,以提高处理速度并降低成本。

    5. 选择“查看 + 创建”。

      显示“新建流分析作业”页的屏幕截图。

  4. 在“查看 + 创建”页上,查看设置,然后选择“创建” 。

  5. 部署成功后,选择“转到资源”以导航到你的流分析作业的“流分析作业”页。

创建 Azure 流分析查询

创建作业后,编写查询。 可以使用示例数据测试查询,无需将输入或输出连接到作业。

  1. 从 GitHub 下载 HelloWorldASA-InputStream.json

  2. 在 Azure 门户中的“Azure 流分析作业”页上,从左侧菜单中选择“作业拓扑”下的“查询”。

  3. 选择“上传示例输入”,选择下载的 HelloWorldASA-InputStream.json 文件,然后选择“确定”。

    显示“查询”页的屏幕截图,其中选择了“上传示例输入”。

  4. 请注意,系统将在“输入预览”表中自动填充数据预览 。

    显示“输入预览”选项卡中的示例输入数据的屏幕截图。

查询:存档原始数据

查询的最简单形式是传递查询,该查询会将所有输入数据存档到其指定的输出。 此查询是在新的 Azure 流分析作业中填充的默认查询。

  1. 在“查询”窗口中输入以下查询,然后选择工具栏上的“测试查询”。

    SELECT
        *
    INTO
        youroutputalias
    FROM
        yourinputalias
    
  2. 在底部窗格中的“测试结果”选项卡中查看结果。

    显示示例查询及其结果的屏幕截图。

查询:根据条件筛选数据

让我们更新查询以根据条件筛选结果。 例如,以下查询显示来自 sensorA 的事件。

  1. 使用以下示例更新查询:

    SELECT 
        time,
        dspl AS SensorName,
        temp AS Temperature,
        hmdt AS Humidity
    INTO
       youroutputalias
    FROM
        yourinputalias
    WHERE dspl='sensorA'
    
  2. 选择“测试查询”以查看查询结果。

    显示带筛选器的查询结果的屏幕截图。

查询:触发业务工作流的警报

让我们创建更详细的查询。 对于每个类型的传感器,我们想要每 30 秒监视一次平均温度,且仅在平均温度高于 100 度的情况下显示结果。

  1. 更新查询以:

    SELECT 
        System.Timestamp AS OutputTime,
        dspl AS SensorName,
        Avg(temp) AS AvgTemperature
    INTO
       youroutputalias
    FROM
        yourinputalias TIMESTAMP BY time
    GROUP BY TumblingWindow(second,30),dspl
    HAVING Avg(temp)>100
    
  2. 选择“测试查询”以查看查询结果。

    显示带滚动窗口的查询的屏幕截图。

    会看到结果只有 245 行,以及平均温度高于 100 度的传感器的名称。 此查询按 dspl(传感器名称)以 30 秒的轮转窗口对事件流进行分组。 临时查询必须声明你希望的时间进展方式。 通过使用 TIMESTAMP BY 子句,你已指定 OUTPUTTIME 列用于将时间与所有临时计算关联 。 有关详细信息,请阅读有关 Time Management(时间管理)和 Windowing functions(窗口化函数)的文章。

查询:检测事件缺失

如何编写查询来确定是否缺少输入事件? 让我们找出传感器最后一次发送数据且下 5 秒未发送事件的情况。

  1. 更新查询以:

    SELECT 
        t1.time,
        t1.dspl AS SensorName
    INTO
       youroutputalias
    FROM
        yourinputalias t1 TIMESTAMP BY time
    LEFT OUTER JOIN yourinputalias t2 TIMESTAMP BY time
    ON
        t1.dspl=t2.dspl AND
        DATEDIFF(second,t1,t2) BETWEEN 1 and 5
    WHERE t2.dspl IS NULL
    
  2. 选择“测试查询”以查看查询结果。

    显示检测事件缺失的查询的屏幕截图。

    此时使用 LEFT OUTER 联接到相同的数据流(自联接)。 对于 INNER 联接,仅当找到匹配项时才返回结果。 对于 LEFT OUTER 联接,如果联接左侧的事件不匹配,则返回右侧所有列中带 NULL 的行。 这种方法对于查找事件缺失很有用。 有关详细信息,请参阅 JOIN

结束语

本文旨在演示如何编写不同的流分析查询语言查询,并在浏览器中查看结果。 但是,本文只是为了帮助你入门。 流分析支持多种输入和输出,甚至可以使用 Azure 机器学习中的函数,因而是用于分析数据流的可靠工具。 有关如何编写查询的详细信息,请阅读有关常用查询模式的文章。