通过订阅事件网格通知将 Blob 引入 Azure 数据资源管理器Ingest blobs into Azure Data Explorer by subscribing to Event Grid notifications

Azure 数据资源管理器是一项快速且高度可缩放的数据探索服务,适用于日志和遥测数据。Azure Data Explorer is a fast and highly scalable data exploration service for log and telemetry data. Azure 数据资源管理器提供了从事件中心、IoT 中心和写入 blob 容器的 blob 引入数据(数据加载)的功能。Azure Data Explorer offers ingestion (data loading) from Event Hubs, IoT Hubs, and blobs written to blob containers.

本文介绍如何使用事件网格数据连接将 blob 从存储帐户引入到 Azure 数据资源管理器。In this article, you learn how to ingest blobs from your storage account into Azure Data Explorer using an Event Grid data connection. 你将创建一个用于设置 Azure 事件网格订阅的事件网格数据连接。You'll create an Event Grid data connection that sets an Azure Event Grid subscription. 事件网格订阅通过 Azure 事件中心将事件从存储帐户路由到 Azure 数据资源管理器。The Event Grid subscription routes events from your storage account to Azure Data Explorer via an Azure Event Hub. 然后将看到展示整个系统中数据流的示例。Then you'll see an example of the data flow throughout the system.

有关如何从事件网格引入 Azure 数据资源管理器的一般信息,请参阅连接到事件网格For general information about ingesting into Azure Data Explorer from Event Grid, see Connect to Event Grid. 若要在 Azure 门户中手动创建资源,请参阅手动创建资源以实现事件网格引入To create resources manually in the Azure portal, see Manually create resources for Event Grid ingestion.

必备条件Prerequisites

在 Azure 数据资源管理器中创建目标表Create a target table in Azure Data Explorer

在 Azure 数据资源管理器中创建一个表,事件中心会向该表发送数据。Create a table in Azure Data Explorer where Event Hubs will send data. 在“先决条件”中准备好的群集和数据库中创建表。Create the table in the cluster and database prepared in the prerequisites.

  1. 在 Azure 门户中的群集下,选择“查询”。In the Azure portal, under your cluster, select Query.

    链接到查询资源管理器

  2. 将以下命令复制到窗口中,然后选择“运行”以创建将接收引入数据的表 (TestTable)。Copy the following command into the window and select Run to create the table (TestTable) that will receive the ingested data.

    .create table TestTable (TimeStamp: datetime, Value: string, Source:string)
    

    运行命令 create table

  3. 将以下命令复制到窗口中,然后选择“运行”将传入的 JSON 数据映射到表 (TestTable) 的列名和数据类型。Copy the following command into the window and select Run to map the incoming JSON data to the column names and data types of the table (TestTable).

    .create table TestTable ingestion json mapping 'TestMapping' '[{"column":"TimeStamp","path":"$.TimeStamp"},{"column":"Value","path":"$.Value"},{"column":"Source","path":"$.Source"}]'
    

在 Azure 数据资源管理器中创建事件网格数据连接Create an Event Grid data connection in Azure Data Explorer

现在将存储帐户连接到 Azure 数据资源管理器,以便将流入存储的数据流式传输到测试表。Now connect the storage account to Azure Data Explorer, so that data flowing into the storage is streamed to the test table.

  1. 在创建的群集下,选择“数据库” > “TestDatabase”。 Under the cluster you created, select Databases > TestDatabase.

    选择测试数据库

  2. 选择“数据引入” > “添加数据连接”。 Select Data ingestion > Add data connection.

    添加数据连接以实现数据引入

数据连接 -“基本信息”选项卡Data connection - Basics tab

  1. 选择连接类型:Blob 存储。Select the connection type: Blob storage.

  2. 使用以下信息填写窗体:Fill out the form with the following information:

    在事件网格窗体中填写连接基本信息

    设置Setting 建议的值Suggested value 字段说明Field description
    数据连接名称Data connection name test-grid-connectiontest-grid-connection 要在 Azure 数据资源管理器中创建的连接的名称。The name of the connection that you want to create in Azure Data Explorer.
    存储帐户订阅Storage account subscription 订阅 IDYour subscription ID 存储帐户所在的订阅 ID。The subscription ID where your storage account is.
    存储帐户Storage account gridteststorage1gridteststorage1 前面创建的存储帐户的名称。The name of the storage account that you created previously.
    事件类型Event type “已创建 Blob”或“已重命名 Blob” Blob created or Blob renamed 触发引入的事件类型。The type of event that triggers ingestion. “已重命名 Blob”仅支持 ADLSv2 存储。Blob renamed is supported only for ADLSv2 storage. 支持的类型为:Microsoft.Storage.BlobCreated 或 Microsoft.Storage.BlobRenamed。Supported types are: Microsoft.Storage.BlobCreated or Microsoft.Storage.BlobRenamed.
    资源创建Resources creation 自动Automatic 定义是否希望 Azure 数据资源管理器为你创建事件网格订阅、事件中心命名空间和事件中心。Define whether you want Azure Data Explorer to create an Event Grid Subscription, an Event Hub namespace, and an Event Hub for you. 若要手动创建资源,请参阅手动创建资源以实现事件网格引入To create resources manually, see Manually create resources for Event Grid ingestion
  3. 如果要跟踪特定主题,请选择“筛选器设置”。Select Filter settings if you want to track specific subjects. 按如下所述设置通知筛选器:Set the filters for the notifications as follows:

    • 前缀字段是主题的文本前缀。Prefix field is the literal prefix of the subject. 由于应用的模式是 startswith,因此可以跨越多个容器、文件夹或 blob 应用。As the pattern applied is startswith, it can span multiple containers, folders, or blobs. 不允许通配符。No wildcards are allowed.
      • 若要在 blob 容器上定义筛选器,必须按照如下所示设置字段:/blobServices/default/containers/[container prefix]To define a filter on the blob container, the field must be set as follows: /blobServices/default/containers/[container prefix].
      • 若要在 blob 前缀(或 Azure Data Lake Gen2 中的文件夹)上定义筛选器,必须按照如下所示设置字段:/blobServices/default/containers/[container name]/blobs/[folder/blob prefix]To define a filter on a blob prefix (or a folder in Azure Data Lake Gen2), the field must be set as follows: /blobServices/default/containers/[container name]/blobs/[folder/blob prefix].
    • “后缀”字段是 Blob 的文本后缀。Suffix field is the literal suffix of the blob. 不允许通配符。No wildcards are allowed.
    • “区分大小写”字段指示前缀和后缀筛选器是否区分大小写Case-Sensitive field indicates whether the prefix and suffix filters are case-sensitive
    • 有关筛选事件的详细信息,请参阅 blob 存储事件For more information about filtering events, see Blob storage events.

    筛选设置事件网格

  4. 选择“下一步:引入属性”。Select Next: Ingest properties.

数据连接 - 引入属性选项卡Data connection - Ingest properties tab

  1. 使用以下信息填写窗体。Fill out the form with the following information. 表和映射名称区分大小写:Table and mapping names are case-sensitive:

    查看并创建表和映射的引入属性

    引入属性:Ingest properties:

    设置Setting 建议的值Suggested value 字段说明Field description
    表名Table name TestTableTestTable 在“TestDatabase”中创建的表。The table you created in TestDatabase.
    数据格式Data format JSONJSON 支持的格式有 Avro、CSV、JSON、MULTILINE JSON、ORC、PARQUET、PSV、SCSV、SOHSV、TSV、TXT、TSVE、APACHEAVRO、RAW 和 W3CLOG。Supported formats are Avro, CSV, JSON, MULTILINE JSON, ORC, PARQUET, PSV, SCSV, SOHSV, TSV, TXT, TSVE, APACHEAVRO, RAW, and W3CLOG. 支持的压缩选项为 Zip 和 GZip。Supported compression options are Zip and GZip.
    映射Mapping TestMappingTestMapping 在 TestDatabase 中创建的 映射,它将传入的 JSON 数据映射到 TestTable 的列名称和数据类型。The mapping you created in TestDatabase, which maps incoming JSON data to the column names and data types of TestTable.
    高级设置Advanced settings 我的数据具有标头My data has headers 忽略标头。Ignores headers. 支持 *SV 类型文件。Supported for *SV type files.

    备注

    无需指定所有默认路由设置。You don't have to specify all Default routing settings. 部分设置也是接受的。Partial settings are also accepted.

  2. 选择“下一步: 查看 + 创建”Select Next: Review + Create

数据连接 - “查看 + 创建”选项卡Data connection - Review + Create tab

  1. 查看自动创建的资源,并选择“创建”。Review the resources that were auto created for you and select Create.

    查看并创建事件网格的数据连接

部署Deployment

等待部署完成。Wait until the deployment is completed. 如果部署失败,请在失败阶段的旁边选择“操作详细信息”,获取有关失败原因的详细信息。If your deployment failed, select Operation details next to the failed stage to get more information for the failure reason. 选择“重新部署”,再次尝试部署资源。Select Redeploy to try to deploy the resources again. 你可以在部署之前修改参数。You can alter the parameters before deployment.

部署事件网格资源

生成示例数据Generate sample data

连接 Azure 数据资源管理器和存储帐户后,可以创建示例数据。Now that Azure Data Explorer and the storage account are connected, you can create sample data.

将 Blob 上传到存储容器Upload blob to the storage container

我们将使用一个小型 shell 脚本,该脚本会发出一些基本的 Azure CLI 命令来与 Azure 存储资源交互。We'll work with a small shell script that issues a few basic Azure CLI commands to interact with Azure Storage resources. 此脚本执行以下操作:This script does the following actions:

  1. 在存储帐户中创建一个新容器。Creates a new container in your storage account.
  2. 将一个现有文件(作为 Blob)上传到该容器。Uploads an existing file (as a blob) to that container.
  3. 列出容器中的 Blob。Lists the blobs in the container.

将数据保存到某个文件,然后使用以下脚本上传该文件:Save the data into a file and upload it with this script:

{"TimeStamp": "1987-11-16 12:00","Value": "Hello World","Source": "TestSource"}
#!/bin/bash
### A simple Azure Storage example script

    export AZURE_STORAGE_ACCOUNT=<storage_account_name>
    export AZURE_STORAGE_KEY=<storage_account_key>

    export container_name=<container_name>
    export blob_name=<blob_name>
    export file_to_upload=<file_to_upload>
    export destination_file=<destination_file>

    echo "Creating the container..."
    az storage container create --name $container_name

    echo "Uploading the file..."
    az storage blob upload --container-name $container_name --file $file_to_upload --name $blob_name --metadata "rawSizeBytes=1024"

    echo "Listing the blobs..."
    az storage blob list --container-name $container_name --output table

    echo "Done"

备注

若要达到最佳引入性能,必须提供为引入提交的已压缩的 Blob 的未压缩大小。To achieve the best ingestion performance, the uncompressed size of the compressed blobs submitted for ingestion must be communicated. 因为事件网格通知仅包含基本详细信息,所以必须显示传达大小信息。Because Event Grid notifications contain only basic details, the size information must be explicitly communicated. 通过使用未压缩数据大小(以字节为单位)来设置 Blob 元数据上的 rawSizeBytes 属性,可以提供未压缩的大小信息。The uncompressed size information can be provided by setting the rawSizeBytes property on the blob metadata with the uncompressed data size in bytes.

重命名 BlobRename blob

如果要从 ADLSv2 存储引入数据,并且已将“已重命名 Blob”定义为数据连接的事件类型,则 Blob 引入的事件触发器为“Blob 重命名”。If you are ingesting data from ADLSv2 storage and have defined Blob renamed as the event type for the data connection, the trigger for blob ingestion is blob renaming. 若要重命名 Blob,请导航到 Azure 门户中的 blob,右键单击该 Blob 并选择“重命名”:To rename a blob, navigate to the blob in Azure portal, right click on the blob and select Rename:

重命名 Azure 门户中的 Blob

引入属性Ingestion properties

可以通过 blob 元数据指定 blob 引入的引入属性You can specify the ingestion properties of the blob ingestion via the blob metadata.

备注

Azure 数据资源管理器在引入后不会删除 blob。Azure Data Explorer won't delete the blobs post ingestion. 将 blob 保留三到五天。Retain the blobs for three to five days. 请使用 Azure Blob 存储生命周期来管理 blob 删除。Use Azure Blob storage lifecycle to manage blob deletion.

查看数据流Review the data flow

备注

Azure 数据资源管理器具有用于数据引入的聚合(批处理)策略,旨在优化引入过程。Azure Data Explorer has an aggregation (batching) policy for data ingestion designed to optimize the ingestion process. 默认情况下,该策略配置为 5 分钟。By default, the policy is configured to 5 minutes. 以后可根据需要更改该策略。You'll be able to alter the policy at a later time if needed. 在本文中,预期会发生几分钟的延迟。In this article you can expect a latency of a few minutes.

  1. 在 Azure 门户中的事件网格下,可以看到应用运行时的活动高峰。In the Azure portal, under your event grid, you see the spike in activity while the app is running.

    事件网格的活动图

  2. 若要检查到目前为止已向数据库发送的消息数,请在测试数据库中运行以下查询。To check how many messages have made it to the database so far, run the following query in your test database.

    TestTable
    | count
    
  3. 若要查看消息的内容,请在测试数据库中运行以下查询。To see the content of the messages, run the following query in your test database.

    TestTable
    

    结果集应如下图所示:The result set should look like the following image:

    事件网格的消息结果集

清理资源Clean up resources

如果不打算再次使用事件网格,请清理自动创建的事件网格订阅、事件中心命名空间和事件中心,以免产生费用。If you don't plan to use your event grid again, clean up the Event Grid Subscription, Event Hub namespace, and Event Hub that were auto-created for you, to avoid incurring costs.

  1. 在 Azure 门户中,转到左侧菜单,然后选择“所有资源”。In Azure portal, go to the left menu and select All resources.

    选择所有资源以清理事件网格

  2. 搜索事件中心命名空间,然后选择“删除”以将其删除:Search for your Event Hub Namespace and select Delete to delete it:

    清理事件中心命名空间

  3. 在删除资源窗体中,确认删除以删除事件中心命名空间和事件中心资源。In the Delete resources form, confirm the deletion to delete the Event Hub Namespace and Event Hub resources.

  4. 转到存储帐户。Go to your storage account. 在左侧菜单中,选择“事件”:In the left menu, select Events:

    选择要为事件网格清理的事件

  5. 在图形下方,选择事件网格订阅,然后选择“删除”以将其删除:Below the graph, Select your Event Grid Subscription and then select Delete to delete it:

    删除事件网格订阅

  6. 若要删除事件网格数据连接,请转到 Azure 数据资源管理器群集。To delete your Event Grid data connection, go to your Azure Data Explorer cluster. 在左侧菜单中选择“数据库”。On the left menu, select Databases.

  7. 选择数据库“TestDatabase”:Select your database TestDatabase:

    选择要清理资源的数据库

  8. 在左侧菜单中,选择“数据引入”:On the left menu, select Data ingestion:

    选择要清理资源的数据引入

  9. 选择数据连接“test-grid-connection”,然后选择“删除”以将其删除。Select your data connection test-grid-connection and then select Delete to delete it.

后续步骤Next steps