Ingest data using the Azure Data Explorer Go SDK

Azure Data Explorer is a fast and highly scalable data exploration service for log and telemetry data. It provides a Go SDK client library for interacting with the Azure Data Explorer service. You can use the Go SDK to ingest, control, and query data in Azure Data Explorer clusters.

In this article, you first create a table and data mapping in a test cluster. You then queue an ingestion to the cluster using the Go SDK and validate the results.

Prerequisites

Install the Go SDK

The Azure Data Explorer Go SDK will be automatically installed when you run the [sample application that uses Go modules. If you installed the Go SDK for another application, create a Go module and fetch the Azure Data Explorer package (using go get), for example:

go mod init foo.com/bar
go get github.com/Azure/azure-kusto-go/kusto

The package dependency will be added to the go.mod file. Use it in your Go application.

Review the code

This Review the code section is optional. If you're interested to learn how the code works, you can review the following code snippets. Otherwise, you can skip ahead to Run the application.

Authenticate

The program needs to authenticate to Azure Data Explorer service before executing any operations.

auth := kusto.Authorization{Config: auth.NewClientCredentialsConfig(clientID, clientSecret, tenantID)}
client, err := kusto.New(kustoEndpoint, auth)

An instance of kusto.Authorization is created using the service principal credentials. It's then used to create a kusto.Client with the New function that also accepts the cluster endpoint.

Create table

The create table command is represented by a Kusto statement.The Mgmt function is used to execute management commands. It's used to execute the command to create a table.

func createTable(kc *kusto.Client, kustoDB string) {
  _, err := kc.Mgmt(context.Background(), kustoDB, kusto.NewStmt(createTableCommand))
  if err != nil {
    log.Fatal("failed to create table", err)
  }
  log.Printf("Table %s created in DB %s\n", kustoTable, kustoDB)
}

Tip

A Kusto statement is constant, by default, for better security. NewStmt accepts string constants. The UnsafeStmt API allows for use of non-constant statement segments, but isn't recommended.

The Kusto create table command is as follows:

.create table StormEvents (StartTime: datetime, EndTime: datetime, EpisodeId: int, EventId: int, State: string, EventType: string, InjuriesDirect: int, InjuriesIndirect: int, DeathsDirect: int, DeathsIndirect: int, DamageProperty: int, DamageCrops: int, Source: string, BeginLocation: string, EndLocation: string, BeginLat: real, BeginLon: real, EndLat: real, EndLon: real, EpisodeNarrative: string, EventNarrative: string, StormSummary: dynamic)

Create mapping

Data mappings are used during ingestion to map incoming data to columns inside Azure Data Explorer tables. For more information, see data mapping. Mapping is created, in the same way as a table, using the Mgmt function with the database name and the appropriate command. The complete command is available in the GitHub repo for the sample.

func createMapping(kc *kusto.Client, kustoDB string) {
  _, err := kc.Mgmt(context.Background(), kustoDB, kusto.NewStmt(createMappingCommand))
  if err != nil {
    log.Fatal("failed to create mapping - ", err)
  }
  log.Printf("Mapping %s created\n", kustoMappingRefName)
}

Ingest data

An ingestion is queued using a file from an existing Azure Blob Storage container.

func ingestFile(kc *kusto.Client, blobStoreAccountName, blobStoreContainer, blobStoreToken, blobStoreFileName, kustoMappingRefName, kustoDB, kustoTable string) {
  kIngest, err := ingest.New(kc, kustoDB, kustoTable)
  if err != nil {
    log.Fatal("failed to create ingestion client", err)
  }
  blobStorePath := fmt.Sprintf(blobStorePathFormat, blobStoreAccountName, blobStoreContainer, blobStoreFileName, blobStoreToken)
  err = kIngest.FromFile(context.Background(), blobStorePath, ingest.FileFormat(ingest.CSV), ingest.IngestionMappingRef(kustoMappingRefName, ingest.CSV))

  if err != nil {
    log.Fatal("failed to ingest file", err)
  }
  log.Println("Ingested file from -", blobStorePath)
}

The Ingestion client is created using ingest.New. The FromFile function is used to refer to the Azure Blob Storage URI. The mapping reference name and the data type are passed in the form of FileOption.

Run the application

  1. Clone the sample code from GitHub:

    git clone https://github.com/Azure-Samples/Azure-Data-Explorer-Go-SDK-example-to-ingest-data.git
    cd Azure-Data-Explorer-Go-SDK-example-to-ingest-data
    
  2. Run the sample code as seen in this snippet from main.go:

    func main {
        ...
        dropTable(kc, kustoDB)
        createTable(kc, kustoDB)
        createMapping(kc, kustoDB)
        ingestFile(kc, blobStoreAccountName, blobStoreContainer, blobStoreToken, blobStoreFileName, kustoMappingRefName, kustoDB, kustoTable)
        ...
    }
    

    Tip

    To try different combinations of operations, you can uncomment/comment the respective functions in main.go.

    When you run the sample code, the following actions are performed:

    1. Drop table: StormEvents table is dropped (if it exists).
    2. Table creation: StormEvents table is created.
    3. Mapping creation: StormEvents_CSV_Mapping mapping is created.
    4. File ingestion: A CSV file (in Azure Blob Storage) is queued for ingestion.
  3. To create a service principal for authentication, use Azure CLI with the az ad sp create-for-rbac command. Set the service principal information with the cluster endpoint and the database name in the form of environment variables that will be used by the program:

    export AZURE_SP_CLIENT_ID="<replace with appID>"
    export AZURE_SP_CLIENT_SECRET="<replace with password>"
    export AZURE_SP_TENANT_ID="<replace with tenant>"
    export KUSTO_ENDPOINT="https://<cluster name>.<azure region>.kusto.chinacloudapi.cn"
    export KUSTO_DB="name of the database"
    
  4. Run the program:

    go run main.go
    

    You'll get a similar output:

    Connected to Azure Data Explorer
    Using database - testkustodb
    Failed to drop StormEvents table. Maybe it does not exist?
    Table StormEvents created in DB testkustodb
    Mapping StormEvents_CSV_Mapping created
    Ingested file from - https://kustosamples.blob.core.chinacloudapi.cn/samplefiles/StormEvents.csv
    

Validate and troubleshoot

Wait for 5 to 10 minutes for the queued ingestion to schedule the ingestion process and load the data into Azure Data Explorer.

  1. Sign in to https://dataexplorer.azure.cn and connect to your cluster. Then run the following command to get the count of records in the StormEvents table.

    StormEvents | count
    
  2. Run the following command in your database to see if there were any ingestion failures in the last four hours. Replace the database name before running.

    .show ingestion failures
    | where FailedOn > ago(4h) and Database == "<DatabaseName>"
    
  3. Run the following command to view the status of all ingestion operations in the last four hours. Replace the database name before running.

    .show operations
    | where StartedOn > ago(4h) and Database == "<DatabaseName>" and Operation == "DataIngestPull"
    | summarize arg_max(LastUpdatedOn, *) by OperationId
    

Clean up resources

If you plan to follow our other articles, keep the resources you created. If not, run the following command in your database to drop the StormEvents table.

.drop table StormEvents

Next step