从引入 V1 迁移到引入 V2

本指南提供从引入 V1 SDK 到 Ingest V2 SDK 的代码迁移步骤。 本指南逐步讲解关键差异和改进,并包括详细的代码示例和最佳做法建议。

创建引入客户端

引入 V1 - 工厂方法

在引入 V1 中,使用从连接字符串生成的静态工厂方法创建客户端。

示例(V1):

var kcsb = new KustoConnectionStringBuilder(clusterUrl).WithAadUserPromptAuthentication();
var client = KustoIngestFactory.CreateQueuedIngestClient(kcsb);

引入 V2:生成器模式

在引入 V2 中,使用生成器类创建客户端。 此模式允许更灵活且可读的配置,支持方法链接,并更轻松地在未来添加新选项。

连接字符串不再用于创建客户端。 而是为群集和身份验证提供程序提供一个 Uri

身份验证提供程序可以是格式的任何 Azure.Identity.TokenCredential实现、Kusto 或 IKustoTokenCredentialsProvider委托函数 Func<string, Task<KustoTokenCredentials>>

示例 - 引入 V2:

using Azure.Identity;
using Kusto.Ingest.V2;

var auth = new InteractiveBrowserCredential();
var client = QueuedIngestClientBuilder.Create(new Uri(clusterUrl))
    .WithAuthentication(auth)
    .Build();

其他 V2 客户端(类似于 V1):

  • StreamingIngestClientBuilder
  • ManagedStreamingIngestClientBuilder

注释

在 V1 和 V2 中,现在可以将群集 URL 传递到没有“引入”前缀的客户端。 构造函数会自动将 URL 转换为正确的格式。

托管流式引入

托管流引入客户端类似于 V1 中的客户端 -- 它尝试先流式传输数据,如果在几次重试后失败,它将回退到排队引入。

在 V2 中, ManagedStreamingPolicy 成为 IManagedStreamingPolicy 接口,该接口提供对托管流式处理进行精细控制的方法。

Sources

引入 V1:不同源的多个方法

V1 为每个源类型公开不同的方法,例如IngestFromStorageIngestFromStreamAsync,等等。提供格式和压缩ingestionProperties

示例(V1):

var ingestionProperties = new KustoIngestionProperties(database, table) { Format = DataSourceFormat.csv };
await client.IngestFromStorageAsync(filePath, ingestionProperties);
await client.IngestFromStreamAsync(stream, ingestionProperties);

引入 V2:统一源抽象

V2 引入了封装所有相关信息的源类,包括格式和压缩。 有一种方法可以引入单个源:该方法 IngestAsync 。 客户端和源类型确定引入行为。

源类型:

  • FileSource (本地文件)
  • StreamSource (.NET 流)
  • BlobSource (云存储)
  • DataReaderSource (.NET 数据读取器)

“database”和“table”属性现在是方法的参数 IngestAsync ,而不是引入属性的属性。 这意味着在大多数情况下,无需创建 IngestProperties

示例 - 引入 V2:

var source = new FileSource(filePath, DataSourceFormat.csv);
await client.IngestAsync(source, database, table);

上传程序

重要

  • V1 引入 SDK 几乎完全使用 Azure 存储和 Azure 队列运行,对 Kusto 的调用数量相当有限。

  • V2 引入 SDK 将 Azure 队列作替换为对 REST 调用 Kusto 的调用。 这意味着引入客户端对 Kusto 维护时段和请求速率更为敏感。 请注意,在满足引入需求的规模和数量的应用程序中包含重试和限制回退。

V1 有几个限制:

  • 它由 SDK 用户隐式完成,没有任何控制。
  • 它始终使用 Kusto 的内部存储
    • 它为用户隐藏了成本。
    • 用户无法监视或缩放。
    • 专用网络导致问题。
  • 它始终使用相同的策略上传数据。

在 V2 中,引入了上传程序,以更灵活地控制上传过程。

  • UserContainersUploader - 将数据上传到用户定义的 Azure Blob 存储容器列表。
  • ManagedUploader - 将数据上传到 Kusto 的内部存储,类似于 V1。

还可以通过实现 IUploader 接口来实现自己的上传程序。

注释

上传程序仅适用于排队引入、从 QueuedIngestClient 中执行或 ManagedStreamingIngestClient完成。

流式引入直接在 HTTP 请求中发送数据,因此它不使用上传程序。

默认情况下,客户端会创建一个 ManagedUploader 实例,但你可以使用生成器中的方法指定不同的上传程序 WithUploader

示例 - 引入 V2:

var client = QueuedIngestClientBuilder.Create(new Uri(clusterUrl))
    .WithAuthentication(auth)
    .Build();

// Equivalent to:
var uploader = ManagedUploaderBuilder.Create()
    .WithAuthentication(auth)
    .Build();
var client = QueuedIngestClientBuilder.Create(new Uri(clusterUrl))
    .WithUploader(uploader)
    .Build();

// Or, with user provided upload containers:
var uploader = UserContainersUploaderBuilder.Create()
    .AddContainer("<address>")
    .AddContainer("<address2>", tokenCredential)
    .Build();

手动上传

可以使用上传程序将本地和数据读取器源 BlobSource转换为,然后通过 V2 客户端引入它们。

这可以更好地控制重试和失败行为,构建并行处理上传和引入的引入管道,并利用 V2 提供的多引入 API(请参阅以下高级主题)。

var source = new FileSource(filePath, DataSourceFormat.csv);
BlobSource blobSource = await uploader.UploadAsync(source);

甚至多个源:

BlobSource source1 = new FileSource(filePath1, DataSourceFormat.csv);
BlobSource source2 = new StreamSource(stream, DataSourceFormat.json);
(IEnumerable<BlobSource> successes, IEnumerable<IngestResult> failures) = await uploader.UploadAsync(new[] { source1, source2 });

摄入属性

如前所述,在 V2 中,这些属性不再是引入属性类的一部分:

  • database 现在是 table 方法的参数 IngestAsync
  • format 并且 compression 现在是源类的一部分。

新类命名 IngestProperties,它包含与引入过程相关的属性,一些示例:

  • EnableTracking - 是否启用引入跟踪。
  • MappingReference - 要用于引入的映射的名称。
  • SkipBatching - 是否跳过批处理并立即引入数据(相当于 FlushImmediately 在 V1 中)。 不建议用于大多数用例。

示例(V2):

var source = new FileSource(filePath, DataSourceFormat.csv);
var properties = new IngestProperties
{
    EnableTracking = true,
    MappingReference = "MyMapping"
};
await client.IngestAsync(source, database, table, properties);

状态跟踪

在 V1 中,状态跟踪是使用 ReportLevel 引入属性中的和 ReportMethod 属性完成的。

跟踪已在 V2 中重新实现,现在更简单。

排队引入

当数据排队进行引入(通过或QueuedIngestClientManagedStreamingIngestClient)时,引入是异步的,并且结果不是即时的。

该方法返回一个 IngestionOperation 对象:

var source = new FileSource(filePath, DataSourceFormat.csv);
var operation = await client.IngestAsync(source, database, table);
Assert.IsTrue(operation.IngestionMethod == IngestionMethod.Queued);

如果 IngestAsync 成功返回,则数据已排队等待引入,但稍后在引入管道中仍会失败。 如果需要跟踪引入的状态,可通过在以下项中EnableTracking设置为 IngestProperties true 启用跟踪:

var properties = new IngestProperties { EnableTracking = true };
var operation = await client.IngestAsync(source, database, table, properties);

然后, operation 成为引入作的句柄,你可以使用它来跟踪使用客户端的引入状态:

var summary = await client.GetOperationSummaryAsync(operation);
// `summary.Status` can be `Succeeded`, `Failed`, `InProgress`, or `Cancelled`.

可以再次查询状态,直到状态不再 InProgress

若要获取有关引入的更多详细信息,请使用 GetIngestionDetailsAsync 以下方法:

var details = await client.GetIngestionDetailsAsync(operation);
var blob = details.IngestResults.Single();
blob.Status // Succeeded, Failed, etc.
blob.Details // Additional details about the ingestion
blob.Exception // If the ingestion failed, this will contain the exception details

重要

  • 每次调用 GetOperationSummaryAsyncGetIngestionDetailsAsync 将向 Kusto 服务发出 HTTP 请求。
  • 调用过于频繁可能会导致限制或性能问题。
  • 请考虑在呼叫之间等待几秒钟,或使用退避策略。

流式引入

对于流式引入,引入的结果是即时的。 如果方法成功返回,则已成功引入数据。

不过,方法的接口相同,并GetOperationSummaryAsyncGetOperationDetailsAsync返回预期结果。

托管流式引入

托管流引入可以解析为排队或流式引入。 无论哪种方式,如果启用了跟踪,则可以使用相同的方法来跟踪引入作的状态。

序列化引入作

运行启用了跟踪的作后,你可能不希望立即跟踪它。

在 V2 中,可以使用和ToJsonString方法序列化和反序列化引入作FromJsonString

这样,就可以将作存储在数据库或文件中,然后检索作以继续监视引入状态。 需要使用与创建作的客户端的地址和类型匹配的客户端,并且已启用跟踪。

var serialized = operation.ToJsonString();
var deserializedOperation = IngestionOperation.FromJsonString(serialized, client);
var summary = await client.GetOperationSummaryAsync(deserializedOperation);

高级主题

多引入

QueuedIngestClient 在 V2 中 IMultiIngest 实现接口,该接口允许在单个调用中引入多个源。

每个调用可能的来源数有限。 可以通过方法int GetMaxSourcesPerMultiIngest()IMultiIngest获取限制。

目前,仅支持列表 BlobSource 。 在使用此方法之前,可能需要使用上传程序将本地文件或流转换为该上传 BlobSource 程序。

示例(V2):

var uploader = new ManagedUploaderBuilder()
    .WithAuthentication(auth)
    .Build();
var client = QueuedIngestClientBuilder.Create(new Uri(clusterUrl))
    .WithUploader(uploader)
    .WithAuthentication(auth)
    .Build();
var source1 = new FileSource(filePath1, DataSourceFormat.csv);
var source2 = new FileSource(filePath2, DataSourceFormat.csv);
var (successes, failures) = uploader.UploadManyAsync(new[] { source1, source2 });

foreach (var blob in failures)
{
    Console.WriteLine($"Failed to upload {blob.SourceId}: {blob.Exception?.Message}");
}

var operation = await client.IngestAsync(successes, database, table,
    new IngestProperties { EnableTracking = true });

跟踪作时,状态包含成功次数和失败次数:

var summary = await client.GetOperationSummaryAsync(operation);
Assert.IsTrue(summary.Successes > 0);
Assert.IsTrue(summary.Failures > 0);
summary.Status // Succeeded, Failed, Cancelled and can also be InProgress in multi-ingestion

你可以获取每个源的详细信息:

var details = await client.GetIngestionDetailsAsync(operation);
foreach (var blob in details.IngestResults)
{
    if (blob.Status == IngestStatus.Succeeded)
    {
        Console.WriteLine($"Blob {blob.BlobName} was ingested successfully.");
    }
    else if (blob.Status == IngestStatus.Failed)
    {
        Console.WriteLine($"Blob {blob.BlobName} failed to ingest: {blob.Exception?.Message}");
    }
}

DataReaderSource

在 V2 中,可用于 DataReaderSource 从 .NET IDataReader 实现引入数据。 与其他源不同,DataReaders 可以部分引入,这意味着它们可以引入某些数据或分批引入。

IngestAsync a DataReaderSource一起使用时,其内部 MaxBytesPerFragmentMaxRecordsPerFragment 属性用于确定要引入的数据量。

超出该范围的任何数据都将保留在读取器中以供下一次引入调用。 可以通过检查 HasDataRemaining 读取器的属性 DataReaderSource来了解读取器是否具有更多要引入的数据。

示例(V2):

var dataReader = GetMyDataReader();
var source = new DataReaderSource(dataReader, maxBytesPerFragment: 1024);
await client.IngestAsync(source, database, table); // Will ingest up to 1024 bytes of data
if (source.HasDataRemaining)
{
    // There is more data to ingest; you can call IngestAsync again
}

如果要引入多批数据读取器,可以使用上传程序:

var uploader = new ManagedUploaderBuilder()
    .WithAuthentication(auth)
    .Build();
(IEnumerable<BlobSource> successes, IEnumerable<IngestResult> failures) = await uploader.UploadManyAsync(
    dataReaderSource,
    maxFragmentsToCreate: 10, // defaults to the maximum number of blobs you can ingest in a single operation
    props);

// dataReaderSource.HasDataRemaining can still be true if `maxFragmentsToCreate` was reached before all data was ingested.
if (dataReaderSource.HasDataRemaining)
{
    Console.WriteLine("There is more data to ingest.");
}

await client.IngestAsync(successes, database, table, props);