Create an app to get data using the managed streaming ingestion client

Switch services using the Version drop-down list. Learn more about navigation.
Applies to: ✅ Azure Data Explorer

Streaming ingestion enables you to write data to Kusto with near-real-time latencies. It's also useful when you need to write small amounts of data to a large number of tables, making batching inefficient.

In this article, you learn how to ingest data to Kusto by using the managed streaming ingestion client. You ingest a data stream in the form of a file or in-memory stream.

Note

Streaming ingestion is a high-velocity ingestion protocol. Ingesting by using a Managed Streaming Ingestion or Streaming Ingestion client isn't the same as ingesting by using a Stream Source.

The type of client refers to the way data is ingested. When you ingest by using a Managed Streaming Ingestion or Streaming Ingestion client, you send data to Kusto by using the streaming ingestion protocol. It uses a Streaming Service to allow for low latency ingestion.

Ingesting from a Stream Source refers to how the data is stored. For example, in C# you can create a Stream Source from a MemoryStream object. This approach is different from a File Source, which you create from a file on disk.

The ingestion method depends on the client you use: with queued ingestion, the process first uploads data from the source to blob storage, and then it queues the data for ingestion. By using streaming ingestion, the process sends data directly to Kusto in the body of a streaming HTTP request.

Important

The Ingest API now has two versions: V1 and V2. The V1 API is the original API. The V2 API is a reimagined version that simplifies the ingest API while offering more customization.

Ingest Version 2 is in preview and is available in the following languages: C#

Also, the Query V2 API isn't related to the Ingest V2 API.

Streaming and Managed Streaming

Kusto SDKs provide two flavors of streaming ingestion clients: a Streaming Ingestion Client and a Managed Streaming Ingestion Client. The Managed Streaming Ingestion Client includes built-in retry and failover logic.

Note

This article shows how to use Managed Streaming Ingestion. To use plain Streaming Ingestion instead of Managed Streaming, change the instantiated client type to Streaming Ingestion Client.

When you ingest data by using a Managed Streaming Ingestion API, it automatically handles failures and retries as follows:

  • It moves streaming requests that fail because of server-side size limitations to queued ingestion.
  • It automatically sends data that's estimated to be larger than the streaming limit to queued ingestion.
    • The size of the streaming limit depends on the format and compression of the data.
    • You can change the limit by setting the Size Factor in the Managed Streaming Ingest Policy, passed in initialization.
  • It retries transient failures, such as throttling, three times, and then moves the request to queued ingestion.
  • It doesn't retry permanent failures.

Note

If streaming ingestion fails and the system moves the data to queued ingestion, the data takes longer to ingest because the process batches and queues the data for ingestion. You can control this delay by using the batching policy.

Limitations

Data streaming has some limitations compared to queuing data for ingestion.

  • You can't set tags on data.
  • You can only provide mapping by using ingestionMappingReference. Inline mapping isn't supported.
  • The payload sent in the request can't exceed 10 MB, regardless of format or compression.
  • The ignoreFirstRecord property isn't supported for streaming ingestion, so ingested data must not contain a header row.

For more information, see Streaming Limitations.

Prerequisites

Before you begin

Before creating the app, complete the following steps. Each step is detailed in the following sections.

  1. Configure streaming ingestion on your Azure Data Explorer cluster.
  2. Create a Kusto table to ingest the data into.
  3. Enable the streaming ingestion policy on the table.
  4. Download the stormevent.csv sample data file containing 1,000 storm event records.

Configure streaming ingestion

To configure streaming ingestion, see Configure streaming ingestion on your Azure Data Explorer cluster. It can take several minutes for the configuration to take effect. If you're using Fabric or a free cluster, streaming ingestion is automatically enabled.

Create a Kusto table

Run the following commands on your database via Kusto Explorer (Desktop) or Kusto Web Explorer.

  1. Create a table called Storm Events
.create table MyStormEvents (StartTime:datetime, EndTime:datetime, State:string, DamageProperty:int, DamageCrops:int, Source:string, StormSummary:dynamic)

Enable the streaming ingestion policy

Enable streaming ingestion on the table or on the entire database by using one of the following commands:

Table level:

.alter table <your table name> policy streamingingestion enable

Database level:


.alter database <databaseName> policy streamingingestion enable

It can take up to two minutes for the policy to take effect.

For more information about streaming policy, see Streaming ingestion policy.

Create a basic client application

Create a basic client application that connects to the Kusto Help cluster. Enter the cluster query and ingest URI and database name in the relevant variables. The app uses two clients: one for querying and one for ingestion. Each client brings up a browser window to authenticate the user.

The code sample includes a service function PrintResultAsValueList() for printing query results.

Add the Kusto libraries by using the following commands:

dotnet add package Microsoft.Azure.Kusto.Data
dotnet add package Microsoft.Azure.Kusto.ingest
using System.Data;

using Kusto.Data;
using Kusto.Data.Common;
using Kusto.Data.Net.Client;
using Kusto.Ingest;
using Kusto.Ingest.Common;


using Azure.Identity;

namespace BatchIngest;

class BatchIngest
{
    static async Task Main()
    {
       var tokenCredential = new InteractiveBrowserCredential();
       var clusterUri = "<your cluster>"; // e.g., "https://<your_cluster_name>.<region>.kusto.chinacloudapi.cn"
       var clusterKcsb = new KustoConnectionStringBuilder(clusterUri).WithAadAzureTokenCredentialsAuthentication(tokenCredential);
        using var kustoClient = KustoClientFactory.CreateCslQueryProvider(clusterKcsb);
        var database = "<your database>";
        var table = "MyStormEvents";
        var query = table + " | count";
        using (var response = await kustoClient.ExecuteQueryAsync(database, query, null))
        {
            Console.WriteLine("\nNumber of rows in " + table + " BEFORE ingestion:");
            PrintResultsAsValueList(response);
        }
    }

    static void PrintResultsAsValueList(IDataReader response)
    {
        while (response.Read())
        {
            for (var i = 0; i < response.FieldCount; i++)
            {
                object val = response.GetValue(i);
                string value = val.ToString() ?? "None";
                Console.WriteLine("\t{0} - {1}", response.GetName(i), value);
            }
        }
    }
}

Stream a file for ingestion

Use the IngestFromStorageAsync method to ingest the stormevents.csv file.

Copy stormevents.csv file to the same location as your script. Since our input is a CSV file, use Format = DataSourceFormat.csv in the ingestion properties.

Add and ingestion section using the following lines to the end of Main().

using var ingestClient = KustoIngestFactory.CreateManagedStreamingIngestClient(clusterKcsb);
var ingestProperties = new KustoIngestionProperties(databaseName, tableName) 
    {
        Format = DataSourceFormat.csv
    };
//Ingestion section
Console.WriteLine("Ingesting data from a file");
await ingestClient.IngestFromStorageAsync(".\\stormevents.csv", ingestProperties);

Let’s also query the new number of rows and the most recent row after the ingestion. Add the following lines after the ingestion command:

Console.WriteLine("Number of rows in " + tableName);
result = await kustoClient.ExecuteQueryAsync(databaseName, tableName + " | count", new ClientRequestProperties());
PrintResultAsValueList(result);

Console.WriteLine("Example line from " + tableName);
result = await kustoClient.ExecuteQueryAsync(databaseName, tableName + " | top 1 by EndTime", new ClientRequestProperties());
PrintResultAsValueList(result);

The code sample includes a service function PrintResultAsValueList() for printing query results.

Add the Kusto libraries by using the following commands:

dotnet add package Microsoft.Azure.Kusto.Data
dotnet add package Microsoft.Azure.Kusto.ingest.V2
using System.Data;

using Kusto.Data;
using Kusto.Data.Common;
using Kusto.Data.Net.Client;
using Kusto.Ingest.V2;
using Kusto.Ingest.Common;

using Azure.Identity;

namespace BatchIngest;

class BatchIngest
{
    static async Task Main()
    {
       var tokenCredential = new InteractiveBrowserCredential();
       var clusterUri = "<your cluster>"; // e.g., "https://<your_cluster_name>.<region>.kusto.chinacloudapi.cn"
       var clusterKcsb = new KustoConnectionStringBuilder(clusterUri).WithAadAzureTokenCredentialsAuthentication(tokenCredential);
        var database = "<your database>";
        var table = "MyStormEvents";
        var query = table + " | count";
        using var kustoClient = KustoClientFactory.CreateCslQueryProvider(clusterKcsb);
        using (var response = await kustoClient.ExecuteQueryAsync(database, query, null))
        {
            Console.WriteLine("\nNumber of rows in " + table + " BEFORE ingestion:");
            PrintResultsAsValueList(response);
        }
    }

    static void PrintResultsAsValueList(IDataReader response)
    {
        while (response.Read())
        {
            for (var i = 0; i < response.FieldCount; i++)
            {
                object val = response.GetValue(i);
                string value = val.ToString() ?? "None";
                Console.WriteLine("\t{0} - {1}", response.GetName(i), value);
            }
        }
    }
}

Stream a file for ingestion

Use the IngestAsync method to ingest the stormevents.csv file.

Copy stormevents.csv file to the same location as your script. Since our input is a CSV file, use DataSourceFormat.csv as the format in the FileSource.

Add and ingestion section using the following lines to the end of Main().

using var ingestClient = ManagedStreamingIngestClientBuilder.Create(new Uri(clusterUri)).WithAuthentication(tokenCredential).Build();
var fileSource = new FileSource(.\\stormevents.csv, DataSourceFormat.csv);
await ingestClient.IngestAsync(fileSource, database, table);

Let’s also query the new number of rows and the most recent row after the ingestion. Add the following lines after the ingestion command:

Console.WriteLine("Number of rows in " + tableName);
result = await kustoClient.ExecuteQueryAsync(databaseName, tableName + " | count", new ClientRequestProperties());
PrintResultAsValueList(result);

Console.WriteLine("Example line from " + tableName);
result = await kustoClient.ExecuteQueryAsync(databaseName, tableName + " | top 1 by EndTime", new ClientRequestProperties());
PrintResultAsValueList(result);

The first time you run the application, you see the following results:

Number of rows in MyStormEvents
row 1 :
         Count - 0
Ingesting data from a file
New number of rows in MyStormEvents
row 1 :
         Count - 1000
Example line from MyStormEvents
row 1 :
         StartTime - 2007-12-31 11:15:00+00:00
         EndTime - 2007-12-31 13:21:00+00:00
         State - HAWAII
         DamageProperty - 0
         DamageCrops - 0
         Source - COOP Observer
         StormSummary - {'TotalDamages': 0, 'StartTime': '2007-12-31T11:15:00.0000000Z', 'EndTime': '2007-12-31T13:21:00.0000000Z', 'Details': {'Description': 'Heavy showers caused flash flooding in the eastern part of Molokai.  Water was running over the bridge at Halawa Valley.', 'Location': 'HAWAII'}}

Stream in-memory data for ingestion

To ingest data from memory, create a stream that contains the data for ingestion.

To ingest the stream from memory, call the IngestFromStreamAsync() method.

Replace the ingestion section with the following code:

    // Ingestion section
    Console.WriteLine("Ingesting data from memory");
    var singleLine = "2018-01-26 00:00:00.0000000,2018-01-27 14:00:00.0000000,MEXICO,0,0,Unknown,'{}'";
    byte[] byteArray = Encoding.UTF8.GetBytes(singleLine);
    using var ingestClient = KustoIngestFactory.CreateManagedStreamingIngestClient(clusterKcsb);
    using var stream = new MemoryStream(byteArray);

    var streamSourceOptions = new StreamSourceOptions
    {
        LeaveOpen = false
    };

    await ingestClient.IngestFromStreamAsync(stream, ingestProperties, streamSourceOptions);
    // Ingestion section
    Console.WriteLine("Ingesting data from memory");
    var singleLine = "2018-01-26 00:00:00.0000000,2018-01-27 14:00:00.0000000,MEXICO,0,0,Unknown,'{}'";
    byte[] byteArray = Encoding.UTF8.GetBytes(singleLine);
    using var ingestClient = QueuedIngestClientBuilder.Create(new Uri(clusterUri)).WithAuthentication(tokenCredential).Build();
    using var ingestClient = KustoIngestFactory.CreateManagedStreamingIngestClient(clusterKcsb);
    using var stream = new MemoryStream(byteArray);
    var streamSource = new StreamSource(.\\stormevents.csv, DataSourceCompressionType.None, DataSourceFormat.csv);
    await ingestClient.IngestAsync(streamSource, database, table);

The results are as follows:

Number of rows in MyStormEvents
row 1 :
	 Count - 1000

Ingesting data from memory

New number of rows in MyStormEvents
row 1 :
	 Count - 1001

Example line from MyStormEvents
row 1 :
	 StartTime - 2018-01-26 00:00:00+00:00
	 EndTime - 2018-01-27 14:00:00+00:00
	 State - MEXICO
	 DamageProperty - 0
	 DamageCrops - 0
	 Source - Unknown
	 StormSummary - {}

Resources