创建应用以使用托管流式引入客户端获取数据

适用于:✅Azure 数据资源管理器

流式引入允许将数据写入 Kusto,并具有近乎实时的延迟。 在向大量表写入少量数据时,它还非常有用,从而使批处理效率低下。

本文介绍如何使用托管流引入客户端将数据引入到 Kusto。 你将以文件或内存中流的形式引入数据流。

注释

流式引入是一种高速引入协议。 使用 托管流引入 引入客户端进行引入与 流源的引入不同。

客户端类型是指引入数据 的方式 - 使用 托管流 引入或 式引入客户端引入时,使用流引入协议将数据发送到 Kusto - 它使用流式 处理服务 来允许低延迟引入。

流源 引入是指数据的存储方式。 例如,在 C# 中,可以从对象创建MemoryStream。 这与从磁盘上的文件创建 的文件源 相反。

引入方法取决于使用的客户端:使用排队引入时,源中的数据首先上传到 Blob 存储,然后排队进行引入;通过流式引入,数据将直接发送到流式处理 HTTP 请求的正文中的 Kusto。

重要

引入 API 现在有两个版本:V1 和 V2。 V1 API 是原始 API,而 V2 API 是一个重新映像的版本,可在提供更多自定义的同时简化引入 API。

引入版本 2 为 预览 版,提供以下语言: C#

另请注意,查询 V2 API 与引入 V2 API 无关。

流式处理和托管流式处理

Kusto SDK 提供两种流式引入客户端:流式引入 客户端 和托管流式引入客户端,其中 托管流 式处理具有内置重试和故障转移逻辑

注释

本文介绍如何使用 托管流式引入。 如果要使用纯 流式引入 而不是 托管流式处理,只需将实例化客户端类型更改为 流式引入客户端

使用 托管流式引入 API 引入时,会自动处理失败和重试,如下所示:

  • 由于服务器端大小限制而失败的流式处理请求将移动到排队引入。
  • 估计大于流式处理限制的数据会自动发送到排队引入。
    • 流式处理限制的大小取决于数据的格式和压缩。
    • 可以通过在初始化中传递的托管流引入策略中设置大小因子来更改限制。
  • 暂时性故障(例如限制)重试三次,然后移动到排队引入。
  • 不会重试永久失败。

注释

如果流式引入失败,并将数据移动到排队引入,则由于数据被批处理并排队进行引入,数据需要更长的时间才能引入。 可以通过 批处理策略控制它。

局限性

与引入数据队列数据相比,数据流式处理存在一些限制。

  • 无法对数据设置标记。
  • 只能使用 ingestionMappingReference. 不支持内联映射。
  • 请求中发送的有效负载不能超过 10 MB,而不考虑格式或压缩。
  • 流式引入不支持此属性 ignoreFirstRecord ,因此引入的数据不得包含标题行。

有关详细信息,请参阅 流式处理限制

先决条件

在您开始之前

在创建应用之前,需要执行以下步骤。 以下各节详细介绍了每个步骤。

  1. 在 Azure 数据资源管理器群集上配置流式引入。
  2. 创建用于将数据引入到的 Kusto 表。
  3. 对表启用流式引入策略。
  4. 下载包含 1,000 条 storm 事件记录 的stormevent.csv 示例数据文件。

配置流式引入

若要配置流式引入,请参阅 在 Azure 数据资源管理器群集上配置流式引入。 配置可能需要几分钟才能生效。 如果使用 Fabric 或免费群集,则会自动启用流式引入。

创建 Kusto 表

通过 Kusto Explorer(桌面)或 Kusto Web Explorer 在数据库上运行以下命令。

  1. 创建名为 Storm 事件的表
.create table MyStormEvents (StartTime:datetime, EndTime:datetime, State:string, DamageProperty:int, DamageCrops:int, Source:string, StormSummary:dynamic)

启用流式引入策略

使用以下命令之一对表或整个数据库启用流式引入:

表级别:

.alter table <your table name> policy streamingingestion enable

数据库级别:


.alter database <databaseName> policy streamingingestion enable

策略生效可能需要长达两分钟的时间。

有关流式处理策略的详细信息,请参阅 流式引入策略

创建基本客户端应用程序

创建连接到 Kusto 帮助群集的基本客户端应用程序。 在相关变量中输入群集查询并引入 URI 和数据库名称。 应用使用两个客户端:一个用于查询,一个用于引入。 每个客户端都会打开浏览器窗口以对用户进行身份验证。

该代码示例包含用于打印查询结果的服务函数 PrintResultAsValueList()

使用以下命令添加 Kusto 库:

dotnet add package Microsoft.Azure.Kusto.Data
dotnet add package Microsoft.Azure.Kusto.ingest
using System.Data;

using Kusto.Data;
using Kusto.Data.Common;
using Kusto.Data.Net.Client;
using Kusto.Ingest;
using Kusto.Ingest.Common;


using Azure.Identity;

namespace BatchIngest;

class BatchIngest
{
    static async Task Main()
    {
       var tokenCredential = new InteractiveBrowserCredential();
       var clusterUri = "<your cluster>"; // e.g., "https://<your_cluster_name>.<region>.kusto.chinacloudapi.cn"
       var clusterKcsb = new KustoConnectionStringBuilder(clusterUri).WithAadAzureTokenCredentialsAuthentication(tokenCredential);
        using var kustoClient = KustoClientFactory.CreateCslQueryProvider(clusterKcsb);
        var database = "<your database>";
        var table = "MyStormEvents";
        var query = table + " | count";
        using (var response = await kustoClient.ExecuteQueryAsync(database, query, null))
        {
            Console.WriteLine("\nNumber of rows in " + table + " BEFORE ingestion:");
            PrintResultsAsValueList(response);
        }
    }

    static void PrintResultsAsValueList(IDataReader response)
    {
        while (response.Read())
        {
            for (var i = 0; i < response.FieldCount; i++)
            {
                object val = response.GetValue(i);
                string value = val.ToString() ?? "None";
                Console.WriteLine("\t{0} - {1}", response.GetName(i), value);
            }
        }
    }
}

流式传输用于引入的文件

使用该方法 IngestFromStorageAsync 引入 stormevents.csv 文件。

将stormevents.csv 文件复制到脚本所在的同一位置。 由于我们的输入是 CSV 文件,因此 Format = DataSourceFormat.csv 在引入属性中使用。

使用以下行添加到末尾 Main()的添加和引入部分。

using var ingestClient = KustoIngestFactory.CreateManagedStreamingIngestClient(clusterKcsb);
var ingestProperties = new KustoIngestionProperties(databaseName, tableName) 
    {
        Format = DataSourceFormat.csv
    };
//Ingestion section
Console.WriteLine("Ingesting data from a file");
await ingestClient.IngestFromStorageAsync(".\\stormevents.csv", ingestProperties);

此外,让我们查询引入后的新行数和最近一行。 在引入命令后面添加以下行:

Console.WriteLine("Number of rows in " + tableName);
result = await kustoClient.ExecuteQueryAsync(databaseName, tableName + " | count", new ClientRequestProperties());
PrintResultAsValueList(result);

Console.WriteLine("Example line from " + tableName);
result = await kustoClient.ExecuteQueryAsync(databaseName, tableName + " | top 1 by EndTime", new ClientRequestProperties());
PrintResultAsValueList(result);

该代码示例包含用于打印查询结果的服务函数 PrintResultAsValueList()

使用以下命令添加 Kusto 库:

dotnet add package Microsoft.Azure.Kusto.Data
dotnet add package Microsoft.Azure.Kusto.ingest.V2
using System.Data;

using Kusto.Data;
using Kusto.Data.Common;
using Kusto.Data.Net.Client;
using Kusto.Ingest.V2;
using Kusto.Ingest.Common;

using Azure.Identity;

namespace BatchIngest;

class BatchIngest
{
    static async Task Main()
    {
       var tokenCredential = new InteractiveBrowserCredential();
       var clusterUri = "<your cluster>"; // e.g., "https://<your_cluster_name>.<region>.kusto.chinacloudapi.cn"
       var clusterKcsb = new KustoConnectionStringBuilder(clusterUri).WithAadAzureTokenCredentialsAuthentication(tokenCredential);
        var database = "<your database>";
        var table = "MyStormEvents";
        var query = table + " | count";
        using var kustoClient = KustoClientFactory.CreateCslQueryProvider(clusterKcsb);
        using (var response = await kustoClient.ExecuteQueryAsync(database, query, null))
        {
            Console.WriteLine("\nNumber of rows in " + table + " BEFORE ingestion:");
            PrintResultsAsValueList(response);
        }
    }

    static void PrintResultsAsValueList(IDataReader response)
    {
        while (response.Read())
        {
            for (var i = 0; i < response.FieldCount; i++)
            {
                object val = response.GetValue(i);
                string value = val.ToString() ?? "None";
                Console.WriteLine("\t{0} - {1}", response.GetName(i), value);
            }
        }
    }
}

流式传输用于引入的文件

使用该方法 IngestAsync 引入 stormevents.csv 文件。

将stormevents.csv 文件复制到脚本所在的同一位置。 由于我们的输入是 CSV 文件,因此请 DataSourceFormat.csv 将其用作格式 FileSource

使用以下行添加到末尾 Main()的添加和引入部分。

using var ingestClient = ManagedStreamingIngestClientBuilder.Create(new Uri(clusterUri)).WithAuthentication(tokenCredential).Build();
var fileSource = new FileSource(.\\stormevents.csv, DataSourceFormat.csv);
await ingestClient.IngestAsync(fileSource, database, table);

此外,让我们查询引入后的新行数和最近一行。 在引入命令后面添加以下行:

Console.WriteLine("Number of rows in " + tableName);
result = await kustoClient.ExecuteQueryAsync(databaseName, tableName + " | count", new ClientRequestProperties());
PrintResultAsValueList(result);

Console.WriteLine("Example line from " + tableName);
result = await kustoClient.ExecuteQueryAsync(databaseName, tableName + " | top 1 by EndTime", new ClientRequestProperties());
PrintResultAsValueList(result);

首次运行应用程序时,结果如下所示:

Number of rows in MyStormEvents
row 1 :
         Count - 0
Ingesting data from a file
New number of rows in MyStormEvents
row 1 :
         Count - 1000
Example line from MyStormEvents
row 1 :
         StartTime - 2007-12-31 11:15:00+00:00
         EndTime - 2007-12-31 13:21:00+00:00
         State - HAWAII
         DamageProperty - 0
         DamageCrops - 0
         Source - COOP Observer
         StormSummary - {'TotalDamages': 0, 'StartTime': '2007-12-31T11:15:00.0000000Z', 'EndTime': '2007-12-31T13:21:00.0000000Z', 'Details': {'Description': 'Heavy showers caused flash flooding in the eastern part of Molokai.  Water was running over the bridge at Halawa Valley.', 'Location': 'HAWAII'}}

流式传输用于引入的内存中数据

若要从内存中引入数据,请创建包含要引入的数据的流。

若要从内存中引入流,请调用该方法 IngestFromStreamAsync()

将引入部分替换为以下代码:

    // Ingestion section
    Console.WriteLine("Ingesting data from memory");
    var singleLine = "2018-01-26 00:00:00.0000000,2018-01-27 14:00:00.0000000,MEXICO,0,0,Unknown,'{}'";
    byte[] byteArray = Encoding.UTF8.GetBytes(singleLine);
    using var ingestClient = KustoIngestFactory.CreateManagedStreamingIngestClient(clusterKcsb);
    using var stream = new MemoryStream(byteArray);

    var streamSourceOptions = new StreamSourceOptions
    {
        LeaveOpen = false
    };

    await ingestClient.IngestFromStreamAsync(stream, ingestProperties, streamSourceOptions);
    // Ingestion section
    Console.WriteLine("Ingesting data from memory");
    var singleLine = "2018-01-26 00:00:00.0000000,2018-01-27 14:00:00.0000000,MEXICO,0,0,Unknown,'{}'";
    byte[] byteArray = Encoding.UTF8.GetBytes(singleLine);
    using var ingestClient = QueuedIngestClientBuilder.Create(new Uri(clusterUri)).WithAuthentication(tokenCredential).Build();
    using var ingestClient = KustoIngestFactory.CreateManagedStreamingIngestClient(clusterKcsb);
    using var stream = new MemoryStream(byteArray);
    var streamSource = new StreamSource(.\\stormevents.csv, DataSourceCompressionType.None, DataSourceFormat.csv);
    await ingestClient.IngestAsync(streamSource, database, table);

结果如下所示:

Number of rows in MyStormEvents
row 1 :
	 Count - 1000

Ingesting data from memory

New number of rows in MyStormEvents
row 1 :
	 Count - 1001

Example line from MyStormEvents
row 1 :
	 StartTime - 2018-01-26 00:00:00+00:00
	 EndTime - 2018-01-27 14:00:00+00:00
	 State - MEXICO
	 DamageProperty - 0
	 DamageCrops - 0
	 Source - Unknown
	 StormSummary - {}

资源