Create an Event Grid data connection for Azure Data Explorer with SDKs

In this article, you learn how to ingest blobs from your storage account into Azure Data Explorer using an Event Grid data connection. You'll create an Event Grid data connection that sets an Azure Event Grid subscription. The Event Grid subscription routes events from your storage account to Azure Data Explorer via an Azure Event Hubs.

To learn how to create the connection in the Azure portal or with an ARM template, see Create an Event Grid data connection.

For general information about ingesting into Azure Data Explorer from Event Grid, see Connect to Event Grid.

Note

To achieve the best performance with the Event Grid connection, set the rawSizeBytes ingestion property via the blob metadata. For more information, see ingestion properties.

For code samples based on previous SDK versions, see the archived article.

Prerequisites

Create an Event Grid data connection

In this section, you'll establish a connection between Event Grid and your Azure Data Explorer table.

  1. Install the required libraries.

    pip install azure-common
    pip install azure-mgmt-kusto
    
  2. Create a Microsoft Entra application principal to use for authentication. You'll need the directory (tenant) ID, application ID, and client secret.

  3. Run the following code.

    from azure.mgmt.kusto import KustoManagementClient
    from azure.mgmt.kusto.models import EventGridDataConnection
    from azure.common.credentials import ServicePrincipalCredentials
    
    #Directory (tenant) ID
    tenant_id = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx"
    #Application ID
    client_id = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx"
    #Client Secret
    client_secret = "xxxxxxxxxxxxxx"
    subscription_id = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx"
    credentials = ServicePrincipalCredentials(
            client_id=client_id,
            secret=client_secret,
            tenant=tenant_id
        )
    kusto_management_client = KustoManagementClient(credentials, subscription_id)
    
    resource_group_name = "testrg"
    #The cluster and database that are created as part of the Prerequisites
    cluster_name = "mykustocluster"
    database_name = "mykustodatabase"
    data_connection_name = "myeventhubconnect"
    #The event hub and storage account that are created as part of the Prerequisites
    event_hub_resource_id = "/subscriptions/xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx/resourceGroups/xxxxxx/providers/Microsoft.EventHub/namespaces/xxxxxx/eventhubs/xxxxxx"
    storage_account_resource_id = "/subscriptions/xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx/resourceGroups/xxxxxx/providers/Microsoft.Storage/storageAccounts/xxxxxx"
    consumer_group = "$Default"
    location = "China East 2"
    #The table and column mapping that are created as part of the Prerequisites
    table_name = "StormEvents"
    mapping_rule_name = "StormEvents_CSV_Mapping"
    data_format = "csv"
    database_routing = "Multi"
    blob_storage_event_type = "Microsoft.Storage.BlobCreated"
    
    #Returns an instance of LROPoller, check https://learn.microsoft.com/python/api/msrest/msrest.polling.lropoller?view=azure-python
    poller = kusto_management_client.data_connections.begin_create_or_update(resource_group_name=resource_group_name, cluster_name=cluster_name, database_name=database_name, data_connection_name=data_connection_name,
                                                parameters=EventGridDataConnection(storage_account_resource_id=storage_account_resource_id, event_hub_resource_id=event_hub_resource_id, 
                                                                                    consumer_group=consumer_group, table_name=table_name, location=location, mapping_rule_name=mapping_rule_name, data_format=data_format, database_routing=database_routing,
                                                                                    blob_storage_event_type=blob_storage_event_type))
    # The creation of the connection is async. Validation errors are only visible if you wait for the results.
    poller.wait()
    print(poller.result())
    
    Setting Suggested value Field description
    tenant_id xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx Your tenant ID. Also known as directory ID.
    subscription_id xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx The subscription ID that you use for resource creation.
    client_id xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx The client ID of the application that can access resources in your tenant.
    client_secret xxxxxxxxxxxxxx The client secret of the application that can access resources in your tenant.
    resource_group_name testrg The name of the resource group containing your cluster.
    cluster_name mykustocluster The name of your cluster.
    database_name mykustodatabase The name of the target database in your cluster.
    data_connection_name myeventhubconnect The desired name of your data connection.
    table_name StormEvents The name of the target table in the target database.
    mapping_rule_name StormEvents_CSV_Mapping The name of your column mapping related to the target table.
    database_routing Multi or Single The database routing for the connection. If you set the value to Single, the data connection will be routed to a single database in the cluster as specified in the databaseName setting. If you set the value to Multi, you can override the default target database using the Database ingestion property. For more information, see Events routing.
    data_format csv The data format of the message.
    event_hub_resource_id Resource ID The resource ID of your event hub where the Event Grid is configured to send events.
    storage_account_resource_id Resource ID The resource ID of your storage account that holds the data for ingestion.
    consumer_group $Default The consumer group of your event hub.
    location China East 2 The location of the data connection resource.
    blob_storage_event_type Microsoft.Storage.BlobCreated The type of event that triggers ingestion. Supported events are: Microsoft.Storage.BlobCreated or Microsoft.Storage.BlobRenamed. Blob renaming is supported only for ADLSv2 storage.

Use the Event Grid data connection

This section shows how to trigger ingestion from Azure Blob Storage or Azure Data Lake Gen 2 to your cluster following blob creation or blob renaming.

Select the relevant tab based on the type of storage SDK used to upload blobs.

The following code sample uses the Azure Blob Storage SDK to upload a file to Azure Blob Storage. The upload triggers the Event Grid data connection, which ingests the data into Azure Data Explorer.

var azureStorageAccountConnectionString=<storage_account_connection_string>;
var containerName = <container_name>;
var blobName = <blob_name>;
var localFileName = <file_to_upload>;
var uncompressedSizeInBytes = <uncompressed_size_in_bytes>;
var mapping = <mappingReference>;
// Create a new container in your storage account.
var azureStorageAccount = CloudStorageAccount.Parse(azureStorageAccountConnectionString);
var blobClient = azureStorageAccount.CreateCloudBlobClient();
var container = blobClient.GetContainerReference(containerName);
container.CreateIfNotExists();
// Set metadata and upload a file to the blob.
var blob = container.GetBlockBlobReference(blobName);
blob.Metadata.Add("rawSizeBytes", uncompressedSizeInBytes);
blob.Metadata.Add("kustoIngestionMappingReference", mapping);
blob.UploadFromFile(localFileName);
// Confirm success of the upload by listing the blobs in your container.
var blobs = container.ListBlobs();

Note

Azure Data Explorer won't delete the blobs post ingestion. Retain the blobs for three to five days by using Azure Blob storage lifecycle to manage blob deletion.

Note

Triggering ingestion following a CopyBlob operation is not supported for storage accounts that have the hierarchical namespace feature enabled on them.

Important

We highly discourage generating Storage Events from custom code and sending them to Event Hubs. If you choose to do so, make sure that the events produced strictly adhere to the appropriate Storage Events schema and JSON format specifications.

Remove an Event Grid data connection

To remove the Event Grid connection, run the following command:

kustoManagementClient.DataConnections.Delete(resourceGroupName, clusterName, databaseName, dataConnectionName);