Ingest data using the Kusto Java SDK

Azure Data Explorer is a fast and highly scalable data exploration service for log and telemetry data. The Java client library can be used to ingest data, issue management commands, and query data in Azure Data Explorer clusters.

In this article, learn how to ingest data using the Azure Data Explorer Java library. First, you'll create a table and a data mapping in a test cluster. Then you'll queue an ingestion from blob storage to the cluster using the Java SDK and validate the results.

Prerequisites

Review the code

This section is optional. Review the following code snippets to learn how the code works. To skip this section, go to run the application.

Authentication

The program uses Microsoft Entra authentication credentials with ConnectionStringBuilder`.

  1. Create a com.microsoft.azure.kusto.data.Client for query and management.

    static Client getClient() throws Exception {
        ConnectionStringBuilder csb = ConnectionStringBuilder.createWithAadApplicationCredentials(endpoint, clientID, clientSecret, tenantID);
        return ClientFactory.createClient(csb);
    }
    
  2. Create and use a com.microsoft.azure.kusto.ingest.IngestClient to queue data ingestion into Azure Data Explorer:

    static IngestClient getIngestionClient() throws Exception {
        String ingestionEndpoint = "https://ingest-" + URI.create(endpoint).getHost();
        ConnectionStringBuilder csb = ConnectionStringBuilder.createWithAadApplicationCredentials(ingestionEndpoint, clientID, clientSecret);
        return IngestClientFactory.createClient(csb);
    }
    

Management commands

Management commands, such as .drop and .create, are executed by calling execute on a com.microsoft.azure.kusto.data.Client object.

For example, the StormEvents table is created as follows:

static final String createTableCommand = ".create table StormEvents (StartTime: datetime, EndTime: datetime, EpisodeId: int, EventId: int, State: string, EventType: string, InjuriesDirect: int, InjuriesIndirect: int, DeathsDirect: int, DeathsIndirect: int, DamageProperty: int, DamageCrops: int, Source: string, BeginLocation: string, EndLocation: string, BeginLat: real, BeginLon: real, EndLat: real, EndLon: real, EpisodeNarrative: string, EventNarrative: string, StormSummary: dynamic)";

static void createTable(String database) {
    try {
        getClient().execute(database, createTableCommand);
        System.out.println("Table created");
    } catch (Exception e) {
        System.out.println("Failed to create table: " + e.getMessage());
        return;
    }

}

Data ingestion

Queue ingestion by using a file from an existing Azure Blob Storage container.

  • Use BlobSourceInfo to specify the Blob Storage path.
  • Use IngestionProperties to define table, database, mapping name, and data type. In the following example, the data type is CSV.
    ...
    static final String blobPathFormat = "https://%s.blob.core.chinacloudapi.cn/%s/%s%s";
    static final String blobStorageAccountName = "kustosamples";
    static final String blobStorageContainer = "samplefiles";
    static final String fileName = "StormEvents.csv";
    static final String blobStorageToken = ""; //If relevant add SAS token
    ....

    static void ingestFile(String database) throws InterruptedException {
        String blobPath = String.format(blobPathFormat, blobStorageAccountName, blobStorageContainer,
                fileName, blobStorageToken);
        BlobSourceInfo blobSourceInfo = new BlobSourceInfo(blobPath);

        IngestionProperties ingestionProperties = new IngestionProperties(database, tableName);
        ingestionProperties.setDataFormat(DATA_FORMAT.csv);
        ingestionProperties.setIngestionMapping(ingestionMappingRefName, IngestionMappingKind.Csv);
        ingestionProperties.setReportLevel(IngestionReportLevel.FailuresAndSuccesses);
        ingestionProperties.setReportMethod(IngestionReportMethod.QueueAndTable);
    ....

The ingestion process starts in a separate thread and the main thread waits for the ingestion thread to complete. This process uses CountdownLatch. The ingestion API (IngestClient#ingestFromBlob) isn't asynchronous. A while loop is used to poll the current status every 5 secs and waits for the ingestion status to change from Pending to a different status. The final status can be Succeeded, Failed, or PartiallySucceeded.

        ....
        CountDownLatch ingestionLatch = new CountDownLatch(1);
        new Thread(new Runnable() {
            @Override
            public void run() {
                IngestionResult result = null;
                try {
                    result = getIngestionClient().ingestFromBlob(blobSourceInfo, ingestionProperties);
                } catch (Exception e) {
                    ingestionLatch.countDown();
                }
                try {
                    IngestionStatus status = result.getIngestionStatusCollection().get(0);
                    while (status.status == OperationStatus.Pending) {
                        Thread.sleep(5000);
                        status = result.getIngestionStatusCollection().get(0);
                    }
                    ingestionLatch.countDown();
                } catch (Exception e) {
                    ingestionLatch.countDown();
                }
            }
        }).start();
        ingestionLatch.await();
    }

Tip

There are other methods to handle ingestion asynchronously for different applications. For example, you could use CompletableFuture to create a pipeline defining the action post-ingestion, such as query the table, or handle exceptions that were reported to the IngestionStatus.

Run the application

General

When you run the sample code, the following actions are performed:

  1. Drop table: StormEvents table is dropped (if it exists).
  2. Table creation: StormEvents table is created.
  3. Mapping creation: StormEvents_CSV_Mapping mapping is created.
  4. File ingestion: A CSV file (in Azure Blob Storage) is queued for ingestion.

The following sample code is from App.java:

public static void main(final String[] args) throws Exception {
    dropTable(database);
    createTable(database);
    createMapping(database);
    ingestFile(database);
}

Tip

To try different combinations of operations, uncomment/comment the respective methods in App.java.

Run the application

  1. Clone the sample code from GitHub:

    git clone https://github.com/Azure-Samples/azure-data-explorer-java-sdk-ingest.git
    cd azure-data-explorer-java-sdk-ingest
    
  2. Set the service principal information with the following information as environment variables used by the program:

    • Cluster endpoint
    • Database name
    export AZURE_SP_CLIENT_ID="<replace with appID>"
    export AZURE_SP_CLIENT_SECRET="<replace with password>"
    export KUSTO_ENDPOINT="https://<cluster name>.<azure region>.kusto.chinacloudapi.cn"
    export KUSTO_DB="name of the database"
    
  3. Build and run:

    mvn clean package
    java -jar target/adx-java-ingest-jar-with-dependencies.jar
    

    The output will be similar to:

    Table dropped
    Table created
    Mapping created
    Waiting for ingestion to complete...
    

Wait a few minutes for the ingestion process to complete. After successful completion, you'll see the following log message: Ingestion completed successfully. You can exit the program at this point and move to the next step without impacting the ingestion process, which has already been queued.

Validate

Wait five to 10 minutes for the queued ingestion to schedule the ingestion process and load data into Azure Data Explorer.

  1. Sign in to https://dataexplorer.azure.cn and connect to your cluster.

  2. Run the following command to get the count of records in the StormEvents table:

    StormEvents | count
    

Troubleshoot

  1. To see ingestion failures in the last four hours, run the following command on your database:

    .show ingestion failures
    | where FailedOn > ago(4h) and Database == "<DatabaseName>"
    
  2. To view the status of all ingestion operations in the last four hours, run the following command:

    .show operations
    | where StartedOn > ago(4h) and Database == "<DatabaseName>" and Operation == "DataIngestPull"
    | summarize arg_max(LastUpdatedOn, *) by OperationId
    

Clean up resources

If you don't plan to use the resources you have created, run the following command in your database to drop the StormEvents table.

.drop table StormEvents