在 Azure 数据资源管理器中,可以从事件中心(一个大数据流式处理平台和事件引入服务)引入数据。 事件中心每秒可以准实时处理数百万个事件。
在本文中,你将连接到事件中心并将数据引入 Azure 数据资源管理器。 有关从事件中心引入数据的概述,请参阅 Azure 事件中心数据连接。
若要了解如何在 Azure 数据资源管理器 Web UI、Azure 门户中或使用 ARM 模板创建连接,请参阅创建事件中心数据连接。
有关基于以前的 SDK 版本的代码示例,请参阅存档的文章。
在本部分,你将在事件中心与 Azure 数据资源管理器表之间建立连接。 只要建立了此连接,数据就会从事件中心传输到目标表。 如果将事件中心移动到了其他资源或订阅,则需要更新或重新创建连接。
安装所需的库。
pip install azure-common pip install azure-mgmt-kusto
创建用于身份验证的 Microsoft Entra 应用程序主体。 需要目录(租户)ID、应用程序 ID 和客户端密码。
运行以下代码。
from azure.mgmt.kusto import KustoManagementClient from azure.mgmt.kusto.models import EventHubDataConnection from azure.identity import ClientSecretCredential #Directory (tenant) ID tenant_id = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx" #Application ID client_id = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx" #Client Secret client_secret = "xxxxxxxxxxxxxx" subscription_id = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx" credentials = ServicePrincipalCredentials( client_id=client_id, secret=client_secret, tenant=tenant_id ) kusto_management_client = KustoManagementClient(credentials, subscription_id) resource_group_name = "myresourcegroup" #The cluster and database that are created as part of the Prerequisites cluster_name = "mycluster" database_name = "mydatabase" data_connection_name = "myeventhubconnect" #The event hub that is created as part of the Prerequisites event_hub_resource_id = "/subscriptions/xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx/resourceGroups/myresourcegroup/providers/Microsoft.EventHub/namespaces/myeventhubnamespace/eventhubs/myeventhub""; consumer_group = "$Default" location = "China East 2" #The table and column mapping that are created as part of the Prerequisites table_name = "mytable" mapping_rule_name = "mytablemappingrule" data_format = "csv" database_routing = "Multi" #Returns an instance of LROPoller, check https://learn.microsoft.com/python/api/msrest/msrest.polling.lropoller?view=azure-python poller = kusto_management_client.data_connections.create_or_update( resource_group_name=resource_group_name, cluster_name=cluster_name, database_name=database_name, data_connection_name=data_connection_name, parameters=EventHubDataConnection( event_hub_resource_id=event_hub_resource_id, consumer_group=consumer_group, location=location, table_name=table_name, mapping_rule_name=mapping_rule_name, data_format=data_format, database_routing=database_routing ) ) poller.wait() print(poller.result())
设置 建议的值 字段说明 tenant_id xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx 租户 ID。 也称为目录 ID。 subscriptionId xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx 用于创建资源的订阅 ID。 client_id xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx 可以访问租户中资源的应用程序的客户端 ID。 client_secret xxxxxxxxxxxxxx 可以访问租户中资源的应用程序的客户端密码。 resource_group_name myresourcegroup 包含群集的资源组的名称。 cluster_name mycluster 群集的名称。 database_name mydatabase 群集中目标数据库的名称。 data_connection_name myeventhubconnect 所需的数据连接名称。 table_name mytable 目标数据库中目标表的名称。 mapping_rule_name mytablemappingrule 与目标表相关的列映射的名称。 data_format csv 消息的数据格式。 event_hub_resource_id 资源 ID 包含要引入的数据的事件中心的资源 ID。 consumer_group $Default 事件中心的使用者组。 location 中国东部 2 数据连接资源的位置。 databaseRouting 多或单 连接的数据库路由。 如果将该值设置为“单个”,则数据连接会按 databaseName 设置中指定的内容被路由到群集中的单个数据库。 如果将此值设置为“多”,可使用数据库引入属性重写默认目标数据库。 有关详细信息,请参阅事件路由。
若要删除事件中心连接,请运行以下命令:
kustoManagementClient.DataConnections.Delete(resourceGroupName, clusterName, databaseName, dataConnectionName);
- 检查与事件中心示例消息应用的连接
- 在 Web UI 中查询数据