本指南提供从引入 V1 SDK 到 Ingest V2 SDK 的代码迁移步骤。 本指南逐步讲解关键差异和改进,并包括详细的代码示例和最佳做法建议。
创建引入客户端
引入 V1 - 工厂方法
在引入 V1 中,使用从连接字符串生成的静态工厂方法创建客户端。
示例(V1):
var kcsb = new KustoConnectionStringBuilder(clusterUrl).WithAadUserPromptAuthentication();
var client = KustoIngestFactory.CreateQueuedIngestClient(kcsb);
引入 V2:生成器模式
在引入 V2 中,使用生成器类创建客户端。 此模式允许更灵活且可读的配置,支持方法链接,并更轻松地在未来添加新选项。
连接字符串不再用于创建客户端。 而是为群集和身份验证提供程序提供一个 Uri 。
身份验证提供程序可以是格式的任何 Azure.Identity.TokenCredential实现、Kusto 或 IKustoTokenCredentialsProvider委托函数 Func<string, Task<KustoTokenCredentials>>。
示例 - 引入 V2:
using Azure.Identity;
using Kusto.Ingest.V2;
var auth = new InteractiveBrowserCredential();
var client = QueuedIngestClientBuilder.Create(new Uri(clusterUrl))
.WithAuthentication(auth)
.Build();
其他 V2 客户端(类似于 V1):
StreamingIngestClientBuilderManagedStreamingIngestClientBuilder
注释
在 V1 和 V2 中,现在可以将群集 URL 传递到没有“引入”前缀的客户端。 构造函数会自动将 URL 转换为正确的格式。
托管流式引入
托管流引入客户端类似于 V1 中的客户端 -- 它尝试先流式传输数据,如果在几次重试后失败,它将回退到排队引入。
在 V2 中, ManagedStreamingPolicy 成为 IManagedStreamingPolicy 接口,该接口提供对托管流式处理进行精细控制的方法。
Sources
引入 V1:不同源的多个方法
V1 为每个源类型公开不同的方法,例如IngestFromStorageIngestFromStreamAsync,等等。提供格式和压缩ingestionProperties。
示例(V1):
var ingestionProperties = new KustoIngestionProperties(database, table) { Format = DataSourceFormat.csv };
await client.IngestFromStorageAsync(filePath, ingestionProperties);
await client.IngestFromStreamAsync(stream, ingestionProperties);
引入 V2:统一源抽象
V2 引入了封装所有相关信息的源类,包括格式和压缩。 有一种方法可以引入单个源:该方法 IngestAsync 。 客户端和源类型确定引入行为。
源类型:
-
FileSource(本地文件) -
StreamSource(.NET 流) -
BlobSource(云存储) -
DataReaderSource(.NET 数据读取器)
“database”和“table”属性现在是方法的参数 IngestAsync ,而不是引入属性的属性。 这意味着在大多数情况下,无需创建 IngestProperties。
示例 - 引入 V2:
var source = new FileSource(filePath, DataSourceFormat.csv);
await client.IngestAsync(source, database, table);
上传程序
重要
V1 引入 SDK 几乎完全使用 Azure 存储和 Azure 队列运行,对 Kusto 的调用数量相当有限。
V2 引入 SDK 将 Azure 队列作替换为对 REST 调用 Kusto 的调用。 这意味着引入客户端对 Kusto 维护时段和请求速率更为敏感。 请注意,在满足引入需求的规模和数量的应用程序中包含重试和限制回退。
V1 有几个限制:
- 它由 SDK 用户隐式完成,没有任何控制。
- 它始终使用 Kusto 的内部存储
- 它为用户隐藏了成本。
- 用户无法监视或缩放。
- 专用网络导致问题。
- 它始终使用相同的策略上传数据。
在 V2 中,引入了上传程序,以更灵活地控制上传过程。
-
UserContainersUploader- 将数据上传到用户定义的 Azure Blob 存储容器列表。 -
ManagedUploader- 将数据上传到 Kusto 的内部存储,类似于 V1。
还可以通过实现 IUploader 接口来实现自己的上传程序。
注释
上传程序仅适用于排队引入、从 QueuedIngestClient 中执行或 ManagedStreamingIngestClient完成。
流式引入直接在 HTTP 请求中发送数据,因此它不使用上传程序。
默认情况下,客户端会创建一个 ManagedUploader 实例,但你可以使用生成器中的方法指定不同的上传程序 WithUploader 。
示例 - 引入 V2:
var client = QueuedIngestClientBuilder.Create(new Uri(clusterUrl))
.WithAuthentication(auth)
.Build();
// Equivalent to:
var uploader = ManagedUploaderBuilder.Create()
.WithAuthentication(auth)
.Build();
var client = QueuedIngestClientBuilder.Create(new Uri(clusterUrl))
.WithUploader(uploader)
.Build();
// Or, with user provided upload containers:
var uploader = UserContainersUploaderBuilder.Create()
.AddContainer("<address>")
.AddContainer("<address2>", tokenCredential)
.Build();
手动上传
可以使用上传程序将本地和数据读取器源 BlobSource转换为,然后通过 V2 客户端引入它们。
这可以更好地控制重试和失败行为,构建并行处理上传和引入的引入管道,并利用 V2 提供的多引入 API(请参阅以下高级主题)。
var source = new FileSource(filePath, DataSourceFormat.csv);
BlobSource blobSource = await uploader.UploadAsync(source);
甚至多个源:
BlobSource source1 = new FileSource(filePath1, DataSourceFormat.csv);
BlobSource source2 = new StreamSource(stream, DataSourceFormat.json);
(IEnumerable<BlobSource> successes, IEnumerable<IngestResult> failures) = await uploader.UploadAsync(new[] { source1, source2 });
摄入属性
如前所述,在 V2 中,这些属性不再是引入属性类的一部分:
-
database现在是table方法的参数IngestAsync。 -
format并且compression现在是源类的一部分。
新类命名 IngestProperties,它包含与引入过程相关的属性,一些示例:
-
EnableTracking- 是否启用引入跟踪。 -
MappingReference- 要用于引入的映射的名称。 -
SkipBatching- 是否跳过批处理并立即引入数据(相当于FlushImmediately在 V1 中)。 不建议用于大多数用例。
示例(V2):
var source = new FileSource(filePath, DataSourceFormat.csv);
var properties = new IngestProperties
{
EnableTracking = true,
MappingReference = "MyMapping"
};
await client.IngestAsync(source, database, table, properties);
状态跟踪
在 V1 中,状态跟踪是使用 ReportLevel 引入属性中的和 ReportMethod 属性完成的。
跟踪已在 V2 中重新实现,现在更简单。
排队引入
当数据排队进行引入(通过或QueuedIngestClientManagedStreamingIngestClient)时,引入是异步的,并且结果不是即时的。
该方法返回一个 IngestionOperation 对象:
var source = new FileSource(filePath, DataSourceFormat.csv);
var operation = await client.IngestAsync(source, database, table);
Assert.IsTrue(operation.IngestionMethod == IngestionMethod.Queued);
如果 IngestAsync 成功返回,则数据已排队等待引入,但稍后在引入管道中仍会失败。
如果需要跟踪引入的状态,可通过在以下项中EnableTracking设置为 IngestProperties true 启用跟踪:
var properties = new IngestProperties { EnableTracking = true };
var operation = await client.IngestAsync(source, database, table, properties);
然后, operation 成为引入作的句柄,你可以使用它来跟踪使用客户端的引入状态:
var summary = await client.GetOperationSummaryAsync(operation);
// `summary.Status` can be `Succeeded`, `Failed`, `InProgress`, or `Cancelled`.
可以再次查询状态,直到状态不再 InProgress。
若要获取有关引入的更多详细信息,请使用 GetIngestionDetailsAsync 以下方法:
var details = await client.GetIngestionDetailsAsync(operation);
var blob = details.IngestResults.Single();
blob.Status // Succeeded, Failed, etc.
blob.Details // Additional details about the ingestion
blob.Exception // If the ingestion failed, this will contain the exception details
重要
- 每次调用
GetOperationSummaryAsync或GetIngestionDetailsAsync将向 Kusto 服务发出 HTTP 请求。 - 调用过于频繁可能会导致限制或性能问题。
- 请考虑在呼叫之间等待几秒钟,或使用退避策略。
流式引入
对于流式引入,引入的结果是即时的。 如果方法成功返回,则已成功引入数据。
不过,方法的接口相同,并GetOperationSummaryAsyncGetOperationDetailsAsync返回预期结果。
托管流式引入
托管流引入可以解析为排队或流式引入。 无论哪种方式,如果启用了跟踪,则可以使用相同的方法来跟踪引入作的状态。
序列化引入作
运行启用了跟踪的作后,你可能不希望立即跟踪它。
在 V2 中,可以使用和ToJsonString方法序列化和反序列化引入作FromJsonString。
这样,就可以将作存储在数据库或文件中,然后检索作以继续监视引入状态。 需要使用与创建作的客户端的地址和类型匹配的客户端,并且已启用跟踪。
var serialized = operation.ToJsonString();
var deserializedOperation = IngestionOperation.FromJsonString(serialized, client);
var summary = await client.GetOperationSummaryAsync(deserializedOperation);
高级主题
多引入
QueuedIngestClient 在 V2 中 IMultiIngest 实现接口,该接口允许在单个调用中引入多个源。
每个调用可能的来源数有限。 可以通过方法int GetMaxSourcesPerMultiIngest()IMultiIngest获取限制。
目前,仅支持列表 BlobSource 。 在使用此方法之前,可能需要使用上传程序将本地文件或流转换为该上传 BlobSource 程序。
示例(V2):
var uploader = new ManagedUploaderBuilder()
.WithAuthentication(auth)
.Build();
var client = QueuedIngestClientBuilder.Create(new Uri(clusterUrl))
.WithUploader(uploader)
.WithAuthentication(auth)
.Build();
var source1 = new FileSource(filePath1, DataSourceFormat.csv);
var source2 = new FileSource(filePath2, DataSourceFormat.csv);
var (successes, failures) = uploader.UploadManyAsync(new[] { source1, source2 });
foreach (var blob in failures)
{
Console.WriteLine($"Failed to upload {blob.SourceId}: {blob.Exception?.Message}");
}
var operation = await client.IngestAsync(successes, database, table,
new IngestProperties { EnableTracking = true });
跟踪作时,状态包含成功次数和失败次数:
var summary = await client.GetOperationSummaryAsync(operation);
Assert.IsTrue(summary.Successes > 0);
Assert.IsTrue(summary.Failures > 0);
summary.Status // Succeeded, Failed, Cancelled and can also be InProgress in multi-ingestion
你可以获取每个源的详细信息:
var details = await client.GetIngestionDetailsAsync(operation);
foreach (var blob in details.IngestResults)
{
if (blob.Status == IngestStatus.Succeeded)
{
Console.WriteLine($"Blob {blob.BlobName} was ingested successfully.");
}
else if (blob.Status == IngestStatus.Failed)
{
Console.WriteLine($"Blob {blob.BlobName} failed to ingest: {blob.Exception?.Message}");
}
}
DataReaderSource
在 V2 中,可用于 DataReaderSource 从 .NET IDataReader 实现引入数据。 与其他源不同,DataReaders 可以部分引入,这意味着它们可以引入某些数据或分批引入。
与 IngestAsync a DataReaderSource一起使用时,其内部 MaxBytesPerFragment 和 MaxRecordsPerFragment 属性用于确定要引入的数据量。
超出该范围的任何数据都将保留在读取器中以供下一次引入调用。 可以通过检查 HasDataRemaining 读取器的属性 DataReaderSource来了解读取器是否具有更多要引入的数据。
示例(V2):
var dataReader = GetMyDataReader();
var source = new DataReaderSource(dataReader, maxBytesPerFragment: 1024);
await client.IngestAsync(source, database, table); // Will ingest up to 1024 bytes of data
if (source.HasDataRemaining)
{
// There is more data to ingest; you can call IngestAsync again
}
如果要引入多批数据读取器,可以使用上传程序:
var uploader = new ManagedUploaderBuilder()
.WithAuthentication(auth)
.Build();
(IEnumerable<BlobSource> successes, IEnumerable<IngestResult> failures) = await uploader.UploadManyAsync(
dataReaderSource,
maxFragmentsToCreate: 10, // defaults to the maximum number of blobs you can ingest in a single operation
props);
// dataReaderSource.HasDataRemaining can still be true if `maxFragmentsToCreate` was reached before all data was ingested.
if (dataReaderSource.HasDataRemaining)
{
Console.WriteLine("There is more data to ingest.");
}
await client.IngestAsync(successes, database, table, props);