在 Azure 数据资源管理器群集上配置流式引入

在需要实现引入和查询之间的低延迟时,流式引入对于加载数据非常有用。 对于以下情况,请考虑使用流式引入:

  • 要求延迟少于一秒钟。
  • 需要优化多个表的操作处理,其中进入每个表的数据流相对较小(每秒几条记录),但总体数据引入量较大(每秒成千上万条记录)。

如果引入每个表的数据流很大(每小时超过 4 GB),请考虑使用排队引入

若要详细了解各种引入方法,请参阅数据引入概述

有关基于以前的 SDK 版本的代码示例,请参阅存档的文章

选择适当的流式引入类型

支持两种流式引入类型:

引入类型 说明
数据连接 如果已在群集级别启用流式引入,事件中心、IoT 中心和事件网格数据连接便可以使用流式引入。 使用流式引入的决定是根据在目标表上配置的流式引入策略来完成的。
若要了解如何管理数据连接,请参阅事件中心IoT 中心事件网格
自定义引入 “自定义引入”需要编写使用某个 Azure 数据资源管理器客户端库之一的应用程序。
使用本主题中的信息配置自定义引入。 你可能还会发现 C# 流引入示例应用程序很有帮助。

使用下表帮助你选择适合你的环境的引入类型:

条件 数据连接 自定义引入
启动引入之后、数据可供查询之前的数据延迟 延迟更长 延迟更短
开发开销 快速轻松的设置,不产生开发开销 创建应用程序引入数据、处理错误和确保数据一致性的开发开销较高

注意

可以使用 Azure 门户或使用 C# 以编程方式管理在群集上启用禁用流式引入的过程。 如果你将 c # 用于 自定义应用程序,你可能会发现使用编程方法会更方便。

先决条件

性能和操作注意事项

可能会影响流式引入的主要因素包括:

  • VM 和群集大小:增大 VM 和群集大小可以提高流式引入的性能和容量。 并发引入请求数量限制为每个核心六个。 例如,对于 16 核 SKU(如 D14 和 L16),支持的最大负载为 96 个并发引入请求。 对于双核 SKU(例如 D11),支持的最大负载为 12 个并发引入请求。
  • 数据大小限制:流式引入请求的数据大小限制为 4 MB。 这包括引入期间为更新策略创建的任何数据。
  • 架构更新:对流式引入服务进行架构更新(例如创建和修改表与引入映射)最长可能需要花费 5 分钟时间。 有关详细信息,请参阅流式引入和架构更改
  • SSD 容量:即使数据不是流式引入的,在群集上启用流式引入也会占用群集计算机的一部分本地 SSD 磁盘用于存储流式引入数据,因此会减少热缓存的可用存储。

在群集上启用流式引入

必须先在群集上启用流式引入,并定义流式引入策略,然后才能使用流式引入。 可以在创建群集将流式引入添加到现有群集时启用流式引入。

警告

在启用流式引入前查看限制

创建新群集时启用流式引入

可以在使用 Azure 门户或使用 C# 以编程方式创建新群集时启用流式引入。

按照创建 Azure 数据资源管理器群集和数据库中的步骤创建群集时,请在“配置”选项卡中选择“流式引入”>“开” 。

Enable streaming ingestion while creating a cluster in Azure Data Explorer.

在现有群集上启用流式引入

如果已有群集,可以使用 Azure 门户或使用 C# 以编程方式启用流式引入。

  1. 在 Azure 门户中,转到 Azure 数据资源管理器群集。

  2. 在“设置”中选择“配置”。

  3. 在“配置”窗格中,选择“打开”以启用“流式引入”。

  4. 选择“保存”。

    Turn on streaming ingestion in Azure Data Explorer.

创建目标表并定义策略

创建一个表来接收流式引入数据,然后使用 Azure 门户或使用 C# 以编程方式定义其相关策略。

  1. 在 Azure 门户中导航到群集。

  2. 选择“查询”。

    Select query in the Azure Data Explorer portal to enable streaming ingestion.

  3. 将以下命令复制到“查询窗格”中,然后选择“运行”以创建将通过流式引入接收数据的表 。

    .create table TestTable (TimeStamp: datetime, Name: string, Metric: int, Source:string)
    

    Create a table for streaming ingestion into Azure Data Explorer.

  4. 将以下命令之一复制到“查询窗格”并选择“运行” 。 这会在已创建的表上或包含该表的数据库上定义流式引入策略

    提示

    在数据库级别定义的策略将对数据库中的所有现有表和未来表应用相同的设置。 在数据库级别启用策略时,无需按表启用该策略。

    • 若要在创建的表上定义策略,请使用:

      .alter table TestTable policy streamingingestion enable
      
    • 若要在包含所创建的表的数据库上定义策略,请使用:

      .alter database StreamingTestDb policy streamingingestion enable
      

    Define the streaming ingestion policy in Azure Data Explorer.

创建流式引入应用程序以将数据引入群集

创建应用程序,以使用首选语言将数据引入群集。

using System.IO;
using System.Threading.Tasks;
using Kusto.Data; // Requires Package Microsoft.Azure.Kusto.Data
using Kusto.Data.Common;
using Kusto.Ingest; // Requires Package Microsoft.Azure.Kusto.Ingest
namespace StreamingIngestion;
class Program
{
    static async Task Main(string[] args)
    {
        var clusterPath = "https://<clusterName>.<region>.kusto.chinacloudapi.cn";
        var appId = "<appId>";
        var appKey = "<appKey>";
        var appTenant = "<appTenant>";
        // Create Kusto connection string with App Authentication
        var connectionStringBuilder = new KustoConnectionStringBuilder(clusterPath)
            .WithAadApplicationKeyAuthentication(
                applicationClientId: appId,
                applicationKey: appKey,
                authority: appTenant
            );
        // Create a disposable client that will execute the ingestion
        using var client = KustoIngestFactory.CreateStreamingIngestClient(connectionStringBuilder);
        // Ingest from a compressed file
        var fileStream = File.Open("MyFile.gz", FileMode.Open);
        // Initialize client properties
        var ingestionProperties = new KustoIngestionProperties(databaseName: "<databaseName>", tableName: "<tableName>");
        // Create source options
        var sourceOptions = new StreamSourceOptions { CompressionType = DataSourceCompressionType.GZip, };
        // Ingest from stream
        await client.IngestFromStreamAsync(fileStream, ingestionProperties, sourceOptions);
    }
}

在群集上禁用流式引入

警告

禁用流式引入可能需要几个小时。

在 Azure 数据资源管理器群集上禁用流式引入之前,请从所有相关的表和数据库中删除流式引入策略。 删除流式引入策略会触发 Azure 数据资源管理器群集内的数据重新排列。 流式引入数据将从初始存储移到列存储中的永久存储(盘区或分片)。 此过程可能需要几秒钟到几个小时,具体取决于初始存储中的数据量。

删除流式引入策略

可以使用 Azure 门户或使用 C# 以编程方式删除流式引入策略。

  1. 在 Azure 门户中,转到 Azure 数据资源管理器群集,然后选择“查询”。

  2. 要从表中删除流引入策略,请将以下命令复制到“查询窗格”并选择“运行” 。

    .delete table TestTable policy streamingingestion
    

    Delete streaming ingestion policy in Azure Data Explorer.

  3. 在“设置”中选择“配置”。

  4. 在“配置”窗格中,选择“关闭”以禁用“流式引入”。

  5. 选择“保存”。

    Turn off streaming ingestion in Azure Data Explorer.

限制

  • 数据映射必须已预先创建,以便在流式引入中使用。 单个流式处理引入请求不包含内联数据映射。
  • 盘区标记不能在流式引入数据上设置。
  • 更新策略。 更新策略只能引用源表中新引入的数据,而不能引用数据库中的任何其他数据或表。
  • 如果在用作后继数据库的先导数据库的群集上启用了流式引入,则必须在后续群集上启用流式引入,以遵循流式引入数据。