Migrating from Ingest V1 to Ingest V2

This guide provides code migration steps from Ingest V1 SDK to the Ingest V2 SDK. The guide walks through key differences and improvements and includes detailed code examples and best practice recommendations.

Creating the Ingest client

Ingest V1 - Factory methods

In Ingest V1, clients are created using static factory methods, built from a connection string.

Example (V1):

var kcsb = new KustoConnectionStringBuilder(clusterUrl).WithAadUserPromptAuthentication();
var client = KustoIngestFactory.CreateQueuedIngestClient(kcsb);

Ingest V2: Builder pattern

In Ingest V2, clients are created using builder classes. This pattern allows for more flexible and readable configuration, supports method chaining, and makes it easier to add new options in the future.

Connection strings are no longer used to create clients. Instead, you provide a Uri for the cluster and an authentication provider.

An authentication provider can be any implementation of Azure.Identity.TokenCredential, Kusto's IKustoTokenCredentialsProvider, or a delegate function of the format Func<string, Task<KustoTokenCredentials>>.

Example - Ingest V2:

using Azure.Identity;
using Kusto.Ingest.V2;

var auth = new InteractiveBrowserCredential();
var client = QueuedIngestClientBuilder.Create(new Uri(clusterUrl))
    .WithAuthentication(auth)
    .Build();

Other V2 clients (similar to V1):

  • StreamingIngestClientBuilder
  • ManagedStreamingIngestClientBuilder

Note

In both V1 and V2, you can now pass the cluster URL to a client without the "ingest-" prefix. The constructor converts the URL to the correct format automatically.

Managed Streaming ingestion

Managed Streaming Ingest client is similar to the one in V1 - it tries to stream the data first, and if it fails after a few retries, it falls back to queued ingestion.

In V2, ManagedStreamingPolicy becomes the IManagedStreamingPolicy interface, which provides methods for finer control of managed streaming.

Sources

Ingest V1: Multiple methods for different sources

V1 exposes different methods for each source type, such as IngestFromStorage, IngestFromStreamAsync, etc. Format and compression are provided in ingestionProperties.

Example (V1):

var ingestionProperties = new KustoIngestionProperties(database, table) { Format = DataSourceFormat.csv };
await client.IngestFromStorageAsync(filePath, ingestionProperties);
await client.IngestFromStreamAsync(stream, ingestionProperties);

Ingest V2: Unified source abstractions

V2 introduces source classes that encapsulate all relevant information, including format and compression. There's one method to ingest a single source: the IngestAsync method. The client and source types determine the ingestion behavior.

Source types:

  • FileSource (local files)
  • StreamSource (.NET streams)
  • BlobSource (cloud storage)
  • DataReaderSource (.NET data readers)

The "database" and "table" properties are now parameters of the IngestAsync method, rather than properties of the ingestion properties. This means that for most cases, you don't need to create IngestProperties.

Example - Ingest V2:

var source = new FileSource(filePath, DataSourceFormat.csv);
await client.IngestAsync(source, database, table);

Uploaders

Important

  • The V1 Ingest SDK operated almost entirely with Azure Storage and Azure Queue and calls to Kusto were fairly limited in quantity.

  • The V2 ingest SDK replaces Azure Queue operations with calls to REST calls Kusto. This means the ingest client is more sensitive to Kusto maintenance windows and request rates. Please be mindful of this an incorporate retries and throttle backoffs in your applications that meet your scale and volume of ingestion needs.

V1 had several limitations:

  • It was done implicitly without any control by the SDK user.
  • It always used Kusto's internal storage
    • It had hidden costs for the user.
    • Couldn't be monitored or scaled by the user.
    • Private networks caused issues.
  • It always used the same strategy to upload the data.

In V2, uploaders are introduced to provide more flexibility and control over the upload process.

  • UserContainersUploader - Uploads data to a list of user-defined Azure Blob Storage containers.
  • ManagedUploader - Uploads data to Kusto's internal storage, similar to V1.

You can also implement your own uploader by implementing the IUploader interface.

Note

Uploaders are only relevant for queued ingestion, done from QueuedIngestClient or ManagedStreamingIngestClient.

Streaming ingestion sends the data directly in the HTTP request, so it doesn't use uploaders.

By default, the clients create a ManagedUploader instance, but you can specify a different uploader using the WithUploader method in the builder.

Example - Ingest V2:

var client = QueuedIngestClientBuilder.Create(new Uri(clusterUrl))
    .WithAuthentication(auth)
    .Build();

// Equivalent to:
var uploader = ManagedUploaderBuilder.Create()
    .WithAuthentication(auth)
    .Build();
var client = QueuedIngestClientBuilder.Create(new Uri(clusterUrl))
    .WithUploader(uploader)
    .Build();

// Or, with user provided upload containers:
var uploader = UserContainersUploaderBuilder.Create()
    .AddContainer("<address>")
    .AddContainer("<address2>", tokenCredential)
    .Build();

Manual uploads

You can use the uploader to convert local and data reader sources to BlobSource, and follow by ingesting them with the V2 client.

This allows for greater control of retry and failure behaviors, building ingestion pipelines where upload and ingestion are handled in parallel, and leveraging the Multi-Ingestion API provided by V2 (see advanced topics below).

var source = new FileSource(filePath, DataSourceFormat.csv);
BlobSource blobSource = await uploader.UploadAsync(source);

Or even multiple sources:

BlobSource source1 = new FileSource(filePath1, DataSourceFormat.csv);
BlobSource source2 = new StreamSource(stream, DataSourceFormat.json);
(IEnumerable<BlobSource> successes, IEnumerable<IngestResult> failures) = await uploader.UploadAsync(new[] { source1, source2 });

Ingestion properties

As mentioned before, in V2, these properties are no longer part of the ingestion properties class:

  • database and table are now parameters of the IngestAsync method.
  • format and compression are now part of the source classes.

The new class is named IngestProperties, and it contains properties that are relevant to the ingestion process, some examples:

  • EnableTracking - Whether to enable tracking for the ingestion.
  • MappingReference - The name of the mapping to use for the ingestion.
  • SkipBatching - Whether to skip batching and ingest the data immediately (equivalent to FlushImmediately in V1). Not recommended for most use cases.

Example (V2):

var source = new FileSource(filePath, DataSourceFormat.csv);
var properties = new IngestProperties
{
    EnableTracking = true,
    MappingReference = "MyMapping"
};
await client.IngestAsync(source, database, table, properties);

Status tracking

In V1, status tracking was done using the ReportLevel and ReportMethod properties in the ingestion properties.

Tracking was reimplemented in V2 and is now simpler.

Queued Ingestion

When data is queued for ingestion (via a QueuedIngestClient or ManagedStreamingIngestClient), the ingestion is asynchronous, and the result isn't immediate.

The method returns an IngestionOperation object:

var source = new FileSource(filePath, DataSourceFormat.csv);
var operation = await client.IngestAsync(source, database, table);
Assert.IsTrue(operation.IngestionMethod == IngestionMethod.Queued);

If IngestAsync returns successfully, the data was queued for ingestion, but it can still fail later in the ingestion pipeline. If you need to track the status of the ingestion, enable tracking by setting EnableTracking to true in the IngestProperties:

var properties = new IngestProperties { EnableTracking = true };
var operation = await client.IngestAsync(source, database, table, properties);

Then, operation becomes a handle to the ingestion operation, and you can use it to track the status of the ingestion using the client:

var summary = await client.GetOperationSummaryAsync(operation);
// `summary.Status` can be `Succeeded`, `Failed`, `InProgress`, or `Cancelled`.

You may query the status again until it's no longer InProgress.

To get more details about the ingestion, use the GetIngestionDetailsAsync method:

var details = await client.GetIngestionDetailsAsync(operation);
var blob = details.IngestResults.Single();
blob.Status // Succeeded, Failed, etc.
blob.Details // Additional details about the ingestion
blob.Exception // If the ingestion failed, this will contain the exception details

Important

  • Each call to GetOperationSummaryAsync or GetIngestionDetailsAsync will make an HTTP request to the Kusto service.
  • Too frequent calls may lead to throttling or performance issues.
  • Consider waiting a few seconds between calls, or using a backoff strategy.

Streaming Ingestion

For streaming ingestion, the result of the ingestion is immediate. If the method returns successfully, the data was ingested successfully.

Still, the interfaces of the methods are the same, and GetOperationSummaryAsync and GetOperationDetailsAsync return the expected results.

Managed Streaming Ingestion

Managed streaming ingestion can resolve to either queued or streaming ingestion. Either way, if tracking is enabled, you may use the same methods to track the status of the ingestion operation.

Serializing Ingestion Operations

After running an operation with tracking enabled, you may not want to track it immediately.

In V2, you can serialize and deserialize ingestion operations using the ToJsonString and FromJsonString methods.

This allows you to store the operation in a database or file, and later retrieve it to continue monitoring the ingestion status. You need to use a client that matches the address and type of the client that created the operation, and has tracking enabled.

var serialized = operation.ToJsonString();
var deserializedOperation = IngestionOperation.FromJsonString(serialized, client);
var summary = await client.GetOperationSummaryAsync(deserializedOperation);

Advanced topics

Multi-Ingestion

QueuedIngestClient in V2 implements the IMultiIngest interface, which allows you to ingest multiple sources in a single call.

The number of sources possible per call is limited. You can get the limit via the method int GetMaxSourcesPerMultiIngest() on IMultiIngest.

Currently, only a list of BlobSource is supported. You might need to use an uploader to convert your local files or streams to BlobSource before using this method.

Example (V2):

var uploader = new ManagedUploaderBuilder()
    .WithAuthentication(auth)
    .Build();
var client = QueuedIngestClientBuilder.Create(new Uri(clusterUrl))
    .WithUploader(uploader)
    .WithAuthentication(auth)
    .Build();
var source1 = new FileSource(filePath1, DataSourceFormat.csv);
var source2 = new FileSource(filePath2, DataSourceFormat.csv);
var (successes, failures) = uploader.UploadManyAsync(new[] { source1, source2 });

foreach (var blob in failures)
{
    Console.WriteLine($"Failed to upload {blob.SourceId}: {blob.Exception?.Message}");
}

var operation = await client.IngestAsync(successes, database, table,
    new IngestProperties { EnableTracking = true });

When tracking the operation, the status contains the number of successes and failures:

var summary = await client.GetOperationSummaryAsync(operation);
Assert.IsTrue(summary.Successes > 0);
Assert.IsTrue(summary.Failures > 0);
summary.Status // Succeeded, Failed, Cancelled and can also be InProgress in multi-ingestion

And you can get the details of each source:

var details = await client.GetIngestionDetailsAsync(operation);
foreach (var blob in details.IngestResults)
{
    if (blob.Status == IngestStatus.Succeeded)
    {
        Console.WriteLine($"Blob {blob.BlobName} was ingested successfully.");
    }
    else if (blob.Status == IngestStatus.Failed)
    {
        Console.WriteLine($"Blob {blob.BlobName} failed to ingest: {blob.Exception?.Message}");
    }
}

DataReaderSource

In V2, you can use DataReaderSource to ingest data from a .NET IDataReader implementation. Unlike other sources, DataReaders can be partially ingested, meaning they can ingest some of the data or be ingested in batches.

When using IngestAsync with a DataReaderSource, its internal MaxBytesPerFragment and MaxRecordsPerFragment properties are used to determine how much data to ingest.

Any data beyond that will remain in the reader for the next ingestion call. You can know if the reader has more data to ingest by checking the HasDataRemaining property of the DataReaderSource.

Example (V2):

var dataReader = GetMyDataReader();
var source = new DataReaderSource(dataReader, maxBytesPerFragment: 1024);
await client.IngestAsync(source, database, table); // Will ingest up to 1024 bytes of data
if (source.HasDataRemaining)
{
    // There is more data to ingest; you can call IngestAsync again
}

If you want to ingest multiple batches of data readers, you can use an uploader:

var uploader = new ManagedUploaderBuilder()
    .WithAuthentication(auth)
    .Build();
(IEnumerable<BlobSource> successes, IEnumerable<IngestResult> failures) = await uploader.UploadManyAsync(
    dataReaderSource,
    maxFragmentsToCreate: 10, // defaults to the maximum number of blobs you can ingest in a single operation
    props);

// dataReaderSource.HasDataRemaining can still be true if `maxFragmentsToCreate` was reached before all data was ingested.
if (dataReaderSource.HasDataRemaining)
{
    Console.WriteLine("There is more data to ingest.");
}

await client.IngestAsync(successes, database, table, props);