Compartir a través de

使用 Azure Data Lake Analytics 查询 Avro 数据

本文介绍如何查询 Avro 数据,以有效地将消息从 Azure IoT 中心路由到 Azure 服务。 通过消息路由 ,可以使用基于消息属性、消息正文、设备孪生标记和设备孪生属性的丰富查询来筛选数据。 若要详细了解消息路由中的查询功能,请参阅 IoT 中心消息路由查询语法

挑战在于,当 Azure IoT 中心将消息路由到 Azure Blob 存储时,IoT 中心默认以 Avro 格式写入内容,该内容具有消息正文属性和消息属性。 Avro 格式不用于任何其他终结点。 尽管 Avro 格式非常适合数据和消息保存,但使用 Avro 格式查询数据是一项挑战。 相比之下,JSON 或 CSV 格式更易于查询数据。 IoT 中心现在支持将数据写入 JSON 和 AVRO 中的 Blob 存储。

有关详细信息,请参阅 Azure 存储作为路由终结点

为了应对非关系型大数据需求和格式并克服这一挑战,可以使用许多大数据模式来转换和缩放数据。 其中一种模式是“按查询付费”,即 Azure Data Lake Analytics,本文重点介绍。 尽管可以轻松地在 Hadoop 或其他解决方案中执行查询,但 Data Lake Analytics 通常更适合这种“按查询付费”方法。

U-SQL 中有一个用于 Avro 的“提取器”。 有关详细信息,请参阅 U-SQL Avro 示例

查询 Avro 数据并将其导出到 CSV 文件

在本部分中,将查询 Avro 数据并将其导出到 Azure Blob 存储中的 CSV 文件,尽管可以轻松地将数据放置在其他存储库或数据存储中。

  1. 设置 Azure IoT 中心以使用消息正文中的属性将数据路由到 Azure Blob 存储终结点以选择消息。

    显示 Azure 门户中 IoT 中心的消息路由工作窗格中的“自定义终结点”选项卡的屏幕截图,其中突出显示了“Blob 存储”部分。

    显示 Azure 门户中 IoT 中心的“消息路由工作”窗格中的“路由”选项卡的屏幕截图,其中突出显示了路由的路由查询和终结点。

    有关设置路由和自定义终结点的详细信息,请参阅 使用 Azure 门户创建和删除路由和终结点

  2. 确保设备具有属性或消息正文中的编码、内容类型和所需数据,如产品文档中所引用。 在设备资源管理器中查看这些属性时,如此处所示,可以验证它们是否已正确设置。

    设备资源管理器中“数据”选项卡的屏幕截图,其中突出显示了消息的内容类型和内容编码属性。

  3. 设置 Azure Data Lake Store 实例和 Data Lake Analytics 实例。 Azure IoT 中心不会路由到 Data Lake Store 实例,但 Data Lake Analytics 实例需要一个。

    Azure 门户中资源组的工作窗格的屏幕截图,其中突出显示了 Data Lake Analytics 实例和 Data Lake Store 实例。

  4. 在 Data Lake Analytics 中,将 Azure Blob 存储配置为一个附加存储,这是 Azure IoT 中心用于路由数据的同一 Blob 存储。

    {Data Lake Analytics 中“数据源”窗格的屏幕截图,其中突出显示了 Azure 存储实例作为其他数据源。

  5. U-SQL Avro 示例中所述,需要四个 DLL 文件。 将这些文件上传到 Data Lake Store 实例中的某个位置。

    Data Lake Store 中“数据资源管理器”窗格的屏幕截图,其中突出显示了四个上传的 DLL 文件。

  6. 在 Visual Studio 中,创建 U-SQL 项目。

    Visual Studio 中“新建项目”对话框的屏幕截图,其中突出显示了 U-SQL 项目模板。

  7. 将以下脚本的内容粘贴到新创建的文件中。 修改三个突出显示的部分:Data Lake Analytics 帐户、关联的 DLL 文件路径和存储帐户的正确路径。

    Visual Studio 工作窗格中 U-SQL 脚本的 U-SQL 编辑器的屏幕截图,其中突出显示了要修改的三个部分。

    用于将简单输出到 CSV 文件的实际 U-SQL 脚本:

        DROP ASSEMBLY IF EXISTS [Avro];
        CREATE ASSEMBLY [Avro] FROM @"/Assemblies/Avro/Avro.dll";
        DROP ASSEMBLY IF EXISTS [Microsoft.Analytics.Samples.Formats];
        CREATE ASSEMBLY [Microsoft.Analytics.Samples.Formats] FROM @"/Assemblies/Avro/Microsoft.Analytics.Samples.Formats.dll";
        DROP ASSEMBLY IF EXISTS [Newtonsoft.Json];
        CREATE ASSEMBLY [Newtonsoft.Json] FROM @"/Assemblies/Avro/Newtonsoft.Json.dll";
        DROP ASSEMBLY IF EXISTS [log4net];
        CREATE ASSEMBLY [log4net] FROM @"/Assemblies/Avro/log4net.dll";
    
        REFERENCE ASSEMBLY [Newtonsoft.Json];
        REFERENCE ASSEMBLY [log4net];
        REFERENCE ASSEMBLY [Avro];
        REFERENCE ASSEMBLY [Microsoft.Analytics.Samples.Formats];
    
        // Blob container storage account filenames, with any path
        DECLARE @input_file string = @"wasb://hottubrawdata@kevinsayazstorage/kevinsayIoT/{*}/{*}/{*}/{*}/{*}/{*}";
        DECLARE @output_file string = @"/output/output.csv";
    
        @rs =
        EXTRACT
        EnqueuedTimeUtc string,
        Body byte[]
        FROM @input_file
    
        USING new Microsoft.Analytics.Samples.Formats.ApacheAvro.AvroExtractor(@"
        {
            ""type"":""record"",
            ""name"":""Message"",
            ""namespace"":""Microsoft.Azure.Devices"",
            ""fields"":
           [{
                ""name"":""EnqueuedTimeUtc"",
                ""type"":""string""
            },
            {
                ""name"":""Properties"",
                ""type"":
                {
                    ""type"":""map"",
                    ""values"":""string""
                }
            },
            {
                ""name"":""SystemProperties"",
                ""type"":
                {
                    ""type"":""map"",
                    ""values"":""string""
                }
            },
            {
                ""name"":""Body"",
                ""type"":[""null"",""bytes""]
            }]
        }"
        );
    
        @cnt =
        SELECT EnqueuedTimeUtc AS time, Encoding.UTF8.GetString(Body) AS jsonmessage
        FROM @rs;
    
        OUTPUT @cnt TO @output_file USING Outputters.Text(); 
    

    Data Lake Analytics 运行以下脚本需要 5 分钟,该脚本限制为 10 个分析单元并处理了 177 个文件。 结果显示在下图中显示的 CSV 文件输出中:

    显示作业图的 Visual Studio 工作窗格中提交的 U-SQL 脚本的“作业视图”选项卡的屏幕截图。

    在 Visual Studio 工作窗格中提交的 U-SQL 脚本的“文件预览”选项卡的屏幕截图,展示了输出被转换为逗号分隔值 (.csv) 文件中的行。

    若要分析 JSON,请继续执行步骤 8。

  8. 大多数 IoT 消息采用 JSON 文件格式。 通过添加以下行,可以将消息分析为 JSON 文件,以便添加 WHERE 子句并仅输出所需的数据。

       @jsonify =
         SELECT Microsoft.Analytics.Samples.Formats.Json.JsonFunctions.JsonTuple(Encoding.UTF8.GetString(Body))
           AS message FROM @rs;
    
        /*
        @cnt =
            SELECT EnqueuedTimeUtc AS time, Encoding.UTF8.GetString(Body) AS jsonmessage
            FROM @rs;
    
        OUTPUT @cnt TO @output_file USING Outputters.Text();
        */
    
        @cnt =
            SELECT message["message"] AS iotmessage,
                message["event"] AS msgevent,
                message["object"] AS msgobject,
                message["status"] AS msgstatus,
                message["host"] AS msghost
            FROM @jsonify;
    
        OUTPUT @cnt TO @output_file USING Outputters.Text();
    

    输出显示命令中每个项的 SELECT 列。

    这是提交的 U-SQL 脚本的工作窗格中“文件预览”选项卡的截图,展示了一个 JSON 文件被转换为逗号分隔值 (.csv) 文件后的查询输出。

后续步骤

本教程介绍了如何查询 Avro 数据,以有效地将消息从 Azure IoT 中心路由到 Azure 服务。