使用 SDK 为 Azure 数据资源管理器创建事件网格数据连接

本文介绍如何使用事件网格数据连接将 blob 从存储帐户引入到 Azure 数据资源管理器。 你将创建一个用于设置 Azure 事件网格订阅的事件网格数据连接。 事件网格订阅通过 Azure 事件中心将事件从存储帐户路由到 Azure 数据资源管理器。

若要了解如何在 Azure 门户中或使用 ARM 模板创建连接,请参阅创建事件网格数据连接

有关如何从事件网格引入 Azure 数据资源管理器的一般信息,请参阅连接到事件网格

备注

若要使用事件网格连接实现最佳性能,请通过 Blob 元数据设置 rawSizeBytes 引入属性。 有关详细信息,请参阅引入属性

有关基于以前的 SDK 版本的代码示例,请参阅存档的文章

先决条件

创建事件网格数据连接

在本部分,我们将在事件网格与 Azure 数据资源管理器表之间建立连接。

  1. 安装所需的库。

    pip install azure-common
    pip install azure-mgmt-kusto
    
  2. 创建用于身份验证的 Microsoft Entra 应用程序主体。 需要目录(租户)ID、应用程序 ID 和客户端机密。

  3. 运行以下代码。

    from azure.mgmt.kusto import KustoManagementClient
    from azure.mgmt.kusto.models import EventGridDataConnection
    from azure.common.credentials import ServicePrincipalCredentials
    
    #Directory (tenant) ID
    tenant_id = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx"
    #Application ID
    client_id = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx"
    #Client Secret
    client_secret = "xxxxxxxxxxxxxx"
    subscription_id = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx"
    credentials = ServicePrincipalCredentials(
            client_id=client_id,
            secret=client_secret,
            tenant=tenant_id
        )
    kusto_management_client = KustoManagementClient(credentials, subscription_id)
    
    resource_group_name = "testrg"
    #The cluster and database that are created as part of the Prerequisites
    cluster_name = "mykustocluster"
    database_name = "mykustodatabase"
    data_connection_name = "myeventhubconnect"
    #The event hub and storage account that are created as part of the Prerequisites
    event_hub_resource_id = "/subscriptions/xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx/resourceGroups/xxxxxx/providers/Microsoft.EventHub/namespaces/xxxxxx/eventhubs/xxxxxx"
    storage_account_resource_id = "/subscriptions/xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx/resourceGroups/xxxxxx/providers/Microsoft.Storage/storageAccounts/xxxxxx"
    consumer_group = "$Default"
    location = "China East 2"
    #The table and column mapping that are created as part of the Prerequisites
    table_name = "StormEvents"
    mapping_rule_name = "StormEvents_CSV_Mapping"
    data_format = "csv"
    database_routing = "Multi"
    blob_storage_event_type = "Microsoft.Storage.BlobCreated"
    
    #Returns an instance of LROPoller, check https://learn.microsoft.com/python/api/msrest/msrest.polling.lropoller?view=azure-python
    poller = kusto_management_client.data_connections.begin_create_or_update(resource_group_name=resource_group_name, cluster_name=cluster_name, database_name=database_name, data_connection_name=data_connection_name,
                                                parameters=EventGridDataConnection(storage_account_resource_id=storage_account_resource_id, event_hub_resource_id=event_hub_resource_id, 
                                                                                    consumer_group=consumer_group, table_name=table_name, location=location, mapping_rule_name=mapping_rule_name, data_format=data_format, database_routing=database_routing,
                                                                                    blob_storage_event_type=blob_storage_event_type))
    # The creation of the connection is async. Validation errors are only visible if you wait for the results.
    poller.wait()
    print(poller.result())
    
    设置 建议的值 字段说明
    tenant_id xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx 租户 ID。 也称为目录 ID。
    subscription_id xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx 用于创建资源的订阅 ID。
    client_id xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx 可以访问租户中资源的应用程序的客户端 ID。
    client_secret xxxxxxxxxxxxxx 可以访问租户中资源的应用程序的客户端密码。
    resource_group_name testrg 包含群集的资源组的名称。
    cluster_name mykustocluster 群集的名称。
    database_name mykustodatabase 群集中目标数据库的名称。
    data_connection_name myeventhubconnect 所需的数据连接名称。
    table_name StormEvents 目标数据库中目标表的名称。
    mapping_rule_name StormEvents_CSV_Mapping 与目标表相关的列映射的名称。
    database_routing 多或单 连接的数据库路由。 如果将此值设置为“单”,数据连接将按“databaseName”设置中指定的那样路由到群集中的单个数据库。 如果将此值设置为“多”,可使用数据库引入属性重写默认目标数据库。 有关详细信息,请参阅事件路由
    data_format csv 消息的数据格式。
    event_hub_resource_id 资源 ID 将事件网格配置为发送事件的事件中心的资源 ID。
    storage_account_resource_id 资源 ID 包含要引入的数据的存储帐户的资源 ID。
    consumer_group $Default 事件中心的使用者组。
    location 中国东部 2 数据连接资源的位置。
    blob_storage_event_type Microsoft.Storage.BlobCreated 触发引入的事件类型。 支持的事件为:Microsoft.Storage.BlobCreated 或 Microsoft.Storage.BlobRenamed。 仅 ADLSv2 存储支持 Blob 重命名。

使用事件网格数据连接

本部分介绍如何在创建 Blob 或重命名 Blob 后,触发从 Azure Blob 存储或 Azure Data Lake Gen 2 到群集的引入。

根据用于上传 Blob 的存储 SDK 类型选择相关的选项卡。

以下代码示例使用 Azure Blob 存储 SDK 将文件上传到 Azure Blob 存储。 上传会触发事件网格数据连接,该连接将数据引入到 Azure 数据资源管理器中。

var azureStorageAccountConnectionString=<storage_account_connection_string>;
var containerName = <container_name>;
var blobName = <blob_name>;
var localFileName = <file_to_upload>;
var uncompressedSizeInBytes = <uncompressed_size_in_bytes>;
var mapping = <mappingReference>;
// Create a new container in your storage account.
var azureStorageAccount = CloudStorageAccount.Parse(azureStorageAccountConnectionString);
var blobClient = azureStorageAccount.CreateCloudBlobClient();
var container = blobClient.GetContainerReference(containerName);
container.CreateIfNotExists();
// Set metadata and upload a file to the blob.
var blob = container.GetBlockBlobReference(blobName);
blob.Metadata.Add("rawSizeBytes", uncompressedSizeInBytes);
blob.Metadata.Add("kustoIngestionMappingReference", mapping);
blob.UploadFromFile(localFileName);
// Confirm success of the upload by listing the blobs in your container.
var blobs = container.ListBlobs();

备注

Azure 数据资源管理器在引入后不会删除 blob。 使用 Azure Blob 存储生命周期管理 blob 删除,将 blob 保留三到五天。

备注

启用了分层命名空间功能的存储帐户不支持在 CopyBlob 操作后触发引入。

重要

我们强烈建议不要从自定义代码生成存储事件并将其发送到事件中心。 如果选择这样做,请确保生成的事件严格遵循相应的存储事件架构和 JSON 格式规范。

删除事件网格数据连接

若要删除事件网格连接,请运行以下命令:

kustoManagementClient.DataConnections.Delete(resourceGroupName, clusterName, databaseName, dataConnectionName);