Note
Access to this page requires authorization. You can try signing in or changing directories.
Access to this page requires authorization. You can try changing directories.
In this article, you learn how to ingest blobs from your storage account into Azure Data Explorer using an Event Grid data connection. You'll create an Event Grid data connection that sets an Azure Event Grid subscription. The Event Grid subscription routes events from your storage account to Azure Data Explorer via an Azure Event Hubs.
To learn how to create the connection in the Azure portal or with an ARM template, see Create an Event Grid data connection.
For general information about ingesting into Azure Data Explorer from Event Grid, see Connect to Event Grid.
Note
To achieve the best performance with the Event Grid connection, set the rawSizeBytes
ingestion property via the blob metadata. For more information, see ingestion properties.
For code samples based on previous SDK versions, see the archived article.
- An Azure subscription. Create a Azure account.
- An Azure Data Explorer cluster and database. Create a cluster and database.
- A destination table. Create a table or use an existing table.
- An ingestion mapping for the table.
- A storage account. An Event Grid notification subscription can be set on Azure Storage accounts for
BlobStorage
,StorageV2
, or Data Lake Storage Gen2. - Have the Event Grid resource provider registered.
In this section, you'll establish a connection between Event Grid and your Azure Data Explorer table.
Install the required libraries.
pip install azure-common pip install azure-mgmt-kusto
Create a Microsoft Entra application principal to use for authentication. You'll need the directory (tenant) ID, application ID, and client secret.
Run the following code.
from azure.mgmt.kusto import KustoManagementClient from azure.mgmt.kusto.models import EventGridDataConnection from azure.common.credentials import ServicePrincipalCredentials #Directory (tenant) ID tenant_id = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx" #Application ID client_id = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx" #Client Secret client_secret = "xxxxxxxxxxxxxx" subscription_id = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx" credentials = ServicePrincipalCredentials( client_id=client_id, secret=client_secret, tenant=tenant_id ) kusto_management_client = KustoManagementClient(credentials, subscription_id) resource_group_name = "testrg" #The cluster and database that are created as part of the Prerequisites cluster_name = "mykustocluster" database_name = "mykustodatabase" data_connection_name = "myeventhubconnect" #The event hub and storage account that are created as part of the Prerequisites event_hub_resource_id = "/subscriptions/xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx/resourceGroups/xxxxxx/providers/Microsoft.EventHub/namespaces/xxxxxx/eventhubs/xxxxxx" storage_account_resource_id = "/subscriptions/xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx/resourceGroups/xxxxxx/providers/Microsoft.Storage/storageAccounts/xxxxxx" consumer_group = "$Default" location = "China East 2" #The table and column mapping that are created as part of the Prerequisites table_name = "StormEvents" mapping_rule_name = "StormEvents_CSV_Mapping" data_format = "csv" database_routing = "Multi" blob_storage_event_type = "Microsoft.Storage.BlobCreated" #Returns an instance of LROPoller, check https://learn.microsoft.com/python/api/msrest/msrest.polling.lropoller?view=azure-python poller = kusto_management_client.data_connections.begin_create_or_update(resource_group_name=resource_group_name, cluster_name=cluster_name, database_name=database_name, data_connection_name=data_connection_name, parameters=EventGridDataConnection(storage_account_resource_id=storage_account_resource_id, event_hub_resource_id=event_hub_resource_id, consumer_group=consumer_group, table_name=table_name, location=location, mapping_rule_name=mapping_rule_name, data_format=data_format, database_routing=database_routing, blob_storage_event_type=blob_storage_event_type)) # The creation of the connection is async. Validation errors are only visible if you wait for the results. poller.wait() print(poller.result())
Setting Suggested value Field description tenant_id xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx Your tenant ID. Also known as directory ID. subscription_id xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx The subscription ID that you use for resource creation. client_id xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx The client ID of the application that can access resources in your tenant. client_secret xxxxxxxxxxxxxx The client secret of the application that can access resources in your tenant. resource_group_name testrg The name of the resource group containing your cluster. cluster_name mykustocluster The name of your cluster. database_name mykustodatabase The name of the target database in your cluster. data_connection_name myeventhubconnect The desired name of your data connection. table_name StormEvents The name of the target table in the target database. mapping_rule_name StormEvents_CSV_Mapping The name of your column mapping related to the target table. database_routing Multi or Single The database routing for the connection. If you set the value to Single, the data connection will be routed to a single database in the cluster as specified in the databaseName setting. If you set the value to Multi, you can override the default target database using the Database ingestion property. For more information, see Events routing. data_format csv The data format of the message. event_hub_resource_id Resource ID The resource ID of your event hub where the Event Grid is configured to send events. storage_account_resource_id Resource ID The resource ID of your storage account that holds the data for ingestion. consumer_group $Default The consumer group of your event hub. location China East 2 The location of the data connection resource. blob_storage_event_type Microsoft.Storage.BlobCreated The type of event that triggers ingestion. Supported events are: Microsoft.Storage.BlobCreated or Microsoft.Storage.BlobRenamed. Blob renaming is supported only for ADLSv2 storage.
This section shows how to trigger ingestion from Azure Blob Storage or Azure Data Lake Gen 2 to your cluster following blob creation or blob renaming.
Select the relevant tab based on the type of storage SDK used to upload blobs.
The following code sample uses the Azure Blob Storage SDK to upload a file to Azure Blob Storage. The upload triggers the Event Grid data connection, which ingests the data into Azure Data Explorer.
var azureStorageAccountConnectionString=<storage_account_connection_string>;
var containerName = <container_name>;
var blobName = <blob_name>;
var localFileName = <file_to_upload>;
var uncompressedSizeInBytes = <uncompressed_size_in_bytes>;
var mapping = <mappingReference>;
// Create a new container in your storage account.
var azureStorageAccount = CloudStorageAccount.Parse(azureStorageAccountConnectionString);
var blobClient = azureStorageAccount.CreateCloudBlobClient();
var container = blobClient.GetContainerReference(containerName);
container.CreateIfNotExists();
// Set metadata and upload a file to the blob.
var blob = container.GetBlockBlobReference(blobName);
blob.Metadata.Add("rawSizeBytes", uncompressedSizeInBytes);
blob.Metadata.Add("kustoIngestionMappingReference", mapping);
blob.UploadFromFile(localFileName);
// Confirm success of the upload by listing the blobs in your container.
var blobs = container.ListBlobs();
Note
Azure Data Explorer won't delete the blobs post ingestion. Retain the blobs for three to five days by using Azure Blob storage lifecycle to manage blob deletion.
Note
Triggering ingestion following a CopyBlob
operation is not supported for storage accounts that have the hierarchical namespace feature enabled on them.
Important
We highly discourage generating Storage Events from custom code and sending them to Event Hubs. If you choose to do so, make sure that the events produced strictly adhere to the appropriate Storage Events schema and JSON format specifications.
To remove the Event Grid connection, run the following command:
kustoManagementClient.DataConnections.Delete(resourceGroupName, clusterName, databaseName, dataConnectionName);