使用 Azure 流分析处理实时 IoT 数据流

本文介绍如何创建流处理逻辑,以从物联网 (IoT) 设备收集数据。 使用真实的物联网 (IoT) 用例来演示如何经济实惠地快速生成解决方案。

必备条件

方案

Contoso 是一家工业自动化领域的公司,该公司已将其制造过程自动化。 这家工厂中的设备配有可实时发送数据流的传感器。 在此方案中,产品车间经理希望通过传感器数据获得实时见解,从而找到规律并采取措施。 可以对传感器数据使用流分析查询语言 (SAQL),查找传入数据流的有趣规律。

在本示例中,数据是 Texas Instruments SensorTag 设备生成的。 数据的有效负载采用 JSON 格式,如以下示例片段所示:

{
    "time": "2016-01-26T20:47:53.0000000",  
    "dspl": "sensorE",  
    "temp": 123,  
    "hmdt": 34  
}  

在实际情况下,其中可能有数百个传感器以流的形式生成事件。 理想情况下,网关设备会运行代码,将这些事件推送到 Azure 事件中心Azure IoT 中心。 流分析作业将从事件中心或 IoT 中心引入这些事件,并针对流运行实时分析查询。 然后,可以将结果发送到支持的输出之一。

为了方便使用,本入门指南提供从实际 SensorTag 设备中捕获的示例数据文件。 可以对示例数据运行查询并查看结果。 在后续教程中,你将学习如何将作业连接到输入和输出并将其部署到 Azure 服务。

创建流分析作业

  1. 导航到 Azure 门户

  2. 在左侧导航菜单中选择“所有服务”,选择“分析”,将鼠标悬停在“流分析作业”上,然后选择“创建”。

    Screenshot that shows the selection of Create button for a Stream Analytics job.

  3. 在“新建流分析作业”页中,执行以下步骤:

    1. 对于“订阅”,请选择你的 Azure 订阅。

    2. 对于“资源组”,请选择现有的资源组,或创建一个资源组。

    3. 对于“名称”,请输入流分析作业的唯一名称。

    4. 选择要将流分析作业部署到的区域。 为资源组和所有资源使用同一位置,以提高处理速度并降低成本。

    5. 选择“查看 + 创建”。

      Screenshot that shows the New Stream Analytics job page.

  4. 在“查看 + 创建”页上,查看设置,然后选择“创建” 。

  5. 部署成功后,选择“转到资源”以导航到你的流分析作业的“流分析作业”页。

创建 Azure 流分析查询

创建作业后,编写查询。 可以使用示例数据测试查询,无需将输入或输出连接到作业。

  1. 从 GitHub 下载 HelloWorldASA-InputStream.json

  2. 在 Azure 门户中的“Azure 流分析作业”页上,从左侧菜单中选择“作业拓扑”下的“查询”。

  3. 选择“上传示例输入”,选择下载的 HelloWorldASA-InputStream.json 文件,然后选择“确定”。

    Screenshot that shows the **Query** page with **Upload sample input** selected.

  4. 请注意,系统将在“输入预览”表中自动填充数据预览 。

    Screenshot that shows sample input data in the Input preview tab.

查询:存档原始数据

查询的最简单形式是传递查询,该查询会将所有输入数据存档到其指定的输出。 此查询是在新的 Azure 流分析作业中填充的默认查询。

  1. 在“查询”窗口中输入以下查询,然后选择工具栏上的“测试查询”。

    SELECT
        *
    INTO
        youroutputalias
    FROM
        yourinputalias
    
  2. 在底部窗格中的“测试结果”选项卡中查看结果。

    Screenshot that shows the sample query and its results.

查询:根据条件筛选数据

让我们更新查询以根据条件筛选结果。 例如,以下查询显示来自 sensorA 的事件。

  1. 使用以下示例更新查询:

    SELECT 
        time,
        dspl AS SensorName,
        temp AS Temperature,
        hmdt AS Humidity
    INTO
       youroutputalias
    FROM
        yourinputalias
    WHERE dspl='sensorA'
    
  2. 选择“测试查询”以查看查询结果。

    Screenshot that shows the query results with the filter.

查询:触发业务工作流的警报

让我们创建更详细的查询。 对于每个类型的传感器,我们想要每 30 秒监视一次平均温度,且仅在平均温度高于 100 度的情况下显示结果。

  1. 更新查询以:

    SELECT 
        System.Timestamp AS OutputTime,
        dspl AS SensorName,
        Avg(temp) AS AvgTemperature
    INTO
       youroutputalias
    FROM
        yourinputalias TIMESTAMP BY time
    GROUP BY TumblingWindow(second,30),dspl
    HAVING Avg(temp)>100
    
  2. 选择“测试查询”以查看查询结果。

    Screenshot that shows the query with a tumbling window.

    会看到结果只有 245 行,以及平均温度高于 100 度的传感器的名称。 此查询按 dspl(传感器名称)以 30 秒的轮转窗口对事件流进行分组。 临时查询必须声明你希望的时间进展方式。 通过使用 TIMESTAMP BY 子句,你已指定 OUTPUTTIME 列用于将时间与所有临时计算关联 。 有关详细信息,请阅读有关 Time Management(时间管理)和 Windowing functions(窗口化函数)的文章。

查询:检测事件缺失

如何编写查询来确定是否缺少输入事件? 让我们找出传感器最后一次发送数据且下 5 秒未发送事件的情况。

  1. 更新查询以:

    SELECT 
        t1.time,
        t1.dspl AS SensorName
    INTO
       youroutputalias
    FROM
        yourinputalias t1 TIMESTAMP BY time
    LEFT OUTER JOIN yourinputalias t2 TIMESTAMP BY time
    ON
        t1.dspl=t2.dspl AND
        DATEDIFF(second,t1,t2) BETWEEN 1 and 5
    WHERE t2.dspl IS NULL
    
  2. 选择“测试查询”以查看查询结果。

    Screenshot that shows the query that detects absence of events.

    此时使用 LEFT OUTER 联接到相同的数据流(自联接)。 对于 INNER 联接,仅当找到匹配项时才返回结果。 对于 LEFT OUTER 联接,如果联接左侧的事件不匹配,则返回右侧所有列中带 NULL 的行。 这种方法对于查找事件缺失很有用。 有关详细信息,请参阅 JOIN

结束语

本文旨在演示如何编写不同的流分析查询语言查询,并在浏览器中查看结果。 但是,本文只是为了帮助你入门。 流分析支持多种输入和输出,甚至可以使用 Azure 机器学习中的函数,因而是用于分析数据流的可靠工具。 有关如何编写查询的详细信息,请阅读有关常用查询模式的文章。