在本文中,将使用 Azure 数据资源管理器 Python 库引入数据。 Azure 数据资源管理器是用于日志和遥测数据的快速、可缩放的数据浏览服务。 Azure 数据资源管理器为 Python 提供两个客户端库: 引入库 和 数据库。 通过这些库,可以将数据引入或加载到群集中,并从代码查询数据。
首先,在群集中创建表和数据映射。 然后,将队列导入到集群并验证结果。
- Microsoft 帐户或 Microsoft Entra 用户标识。 不需要 Azure 订阅。
- Azure 数据分析服务群集和数据库。 了解如何 创建群集和数据库。
- 安装 Python 3.4+。
安装 azure-kusto-data 和 azure-kusto-ingest 库。
pip install azure-kusto-data
pip install azure-kusto-ingest
从 azure-kusto-data 中导入类。
from azure.kusto.data import KustoClient, KustoConnectionStringBuilder
from azure.kusto.data.exceptions import KustoServiceError
from azure.kusto.data.helpers import dataframe_from_result_table
若要对应用程序进行身份验证,Azure 数据资源管理器使用 Microsoft Entra 租户 ID。 若要查找租户 ID,请使用以下 URL,将域替换为 YourDomain。
https://login.partner.microsoftonline.cn/<YourDomain>/.well-known/openid-configuration/
例如,如果域 contoso.com,则 URL 为: https://login.partner.microsoftonline.cn/contoso.com/.well-known/openid-configuration/。 单击此 URL 可查看结果;第一行如下所示。
"authorization_endpoint":"https://login.partner.microsoftonline.cn/6babcaad-604b-40ac-a9d7-9fd97c0b779f/oauth2/authorize"
在本例中,租户 ID 为 aaaabbbb-0000-cccc-1111-dddd2222eeee. 在运行此代码之前,请设置AAD_TENANT_ID、KUSTO_URI、KUSTO_INGEST_URI和KUSTO_DATABASE的值。
AAD_TENANT_ID = "<TenantId>"
KUSTO_URI = "https://<ClusterName>.<Region>.kusto.chinacloudapi.cn/"
KUSTO_INGEST_URI = "https://ingest-<ClusterName>.<Region>.kusto.chinacloudapi.cn/"
KUSTO_DATABASE = "<DatabaseName>"
构造连接字符串。 以下示例使用设备身份验证访问群集。 还可以使用 托管标识 身份验证、 Microsoft Entra 应用程序证书、 Microsoft Entra 应用程序密钥,或 Microsoft Entra 用户和密码。
在后面的步骤中创建目标表和映射。
KCSB_INGEST = KustoConnectionStringBuilder.with_interactive_login(
KUSTO_INGEST_URI)
KCSB_DATA = KustoConnectionStringBuilder.with_interactive_login(
KUSTO_URI)
DESTINATION_TABLE = "StormEvents"
DESTINATION_TABLE_COLUMN_MAPPING = "StormEvents_CSV_Mapping"
导入其他类并为数据源文件设置常量。 此示例使用 Azure Blob 存储上托管的示例文件。 StormEvents 示例数据集具有来自国家环境信息中心与天气相关的数据。
from azure.kusto.data import DataFormat
from azure.kusto.ingest import QueuedIngestClient, IngestionProperties, FileDescriptor, BlobDescriptor, DataFormat, ReportLevel, ReportMethod
CONTAINER = "samplefiles"
ACCOUNT_NAME = "kustosamples"
SAS_TOKEN = "" # If relevant, add SAS token
FILE_PATH = "StormEvents.csv"
FILE_SIZE = 64158321 # In bytes
BLOB_PATH = "https://" + ACCOUNT_NAME + ".blob.core.chinacloudapi.cn/" + \
CONTAINER + "/" + FILE_PATH + SAS_TOKEN # Construct the full blob path
创建与 StormEvents.csv 文件中数据的架构匹配的表。 运行此代码时,它将返回如下所示的消息: 若要登录,请使用 Web 浏览器打开页面 https://microsoft.com/devicelogin 并输入代码F3W4VWZDM进行身份验证。 按照步骤登录,然后返回运行下一个代码块。 建立连接的后续代码块要求再次登录。
KUSTO_CLIENT = KustoClient(KCSB_DATA)
CREATE_TABLE_COMMAND = ".create table StormEvents (StartTime: datetime, EndTime: datetime, EpisodeId: int, EventId: int, State: string, EventType: string, InjuriesDirect: int, InjuriesIndirect: int, DeathsDirect: int, DeathsIndirect: int, DamageProperty: int, DamageCrops: int, Source: string, BeginLocation: string, EndLocation: string, BeginLat: real, BeginLon: real, EndLat: real, EndLon: real, EpisodeNarrative: string, EventNarrative: string, StormSummary: dynamic)"
RESPONSE = KUSTO_CLIENT.execute_mgmt(KUSTO_DATABASE, CREATE_TABLE_COMMAND)
dataframe_from_result_table(RESPONSE.primary_results[0])
将传入 CSV 数据映射到创建表时使用的列名称和数据类型。 此过程将源数据字段映射到目标表列。
CREATE_MAPPING_COMMAND = """.create table StormEvents ingestion csv mapping 'StormEvents_CSV_Mapping' '[{"Name":"StartTime","datatype":"datetime","Ordinal":0}, {"Name":"EndTime","datatype":"datetime","Ordinal":1}, {"Name":"EpisodeId","datatype":"int","Ordinal":2}, {"Name":"EventId","datatype":"int","Ordinal":3}, {"Name":"State","datatype":"string","Ordinal":4}, {"Name":"EventType","datatype":"string","Ordinal":5}, {"Name":"InjuriesDirect","datatype":"int","Ordinal":6}, {"Name":"InjuriesIndirect","datatype":"int","Ordinal":7}, {"Name":"DeathsDirect","datatype":"int","Ordinal":8}, {"Name":"DeathsIndirect","datatype":"int","Ordinal":9}, {"Name":"DamageProperty","datatype":"int","Ordinal":10}, {"Name":"DamageCrops","datatype":"int","Ordinal":11}, {"Name":"Source","datatype":"string","Ordinal":12}, {"Name":"BeginLocation","datatype":"string","Ordinal":13}, {"Name":"EndLocation","datatype":"string","Ordinal":14}, {"Name":"BeginLat","datatype":"real","Ordinal":16}, {"Name":"BeginLon","datatype":"real","Ordinal":17}, {"Name":"EndLat","datatype":"real","Ordinal":18}, {"Name":"EndLon","datatype":"real","Ordinal":19}, {"Name":"EpisodeNarrative","datatype":"string","Ordinal":20}, {"Name":"EventNarrative","datatype":"string","Ordinal":21}, {"Name":"StormSummary","datatype":"dynamic","Ordinal":22}]'"""
RESPONSE = KUSTO_CLIENT.execute_mgmt(KUSTO_DATABASE, CREATE_MAPPING_COMMAND)
dataframe_from_result_table(RESPONSE.primary_results[0])
将消息排入队列,以便从 Blob 存储拉取数据,并将该数据引入 Azure 数据资源管理器。
INGESTION_CLIENT = QueuedIngestClient(KCSB_INGEST)
INGESTION_PROPERTIES = IngestionProperties(database=KUSTO_DATABASE, table=DESTINATION_TABLE, data_format=DataFormat.CSV,
ingestion_mapping_reference=DESTINATION_TABLE_COLUMN_MAPPING, additional_properties={'ignoreFirstRecord': 'true'})
# FILE_SIZE is the raw size of the data in bytes
BLOB_DESCRIPTOR = BlobDescriptor(BLOB_PATH, FILE_SIZE)
INGESTION_CLIENT.ingest_from_blob(
BLOB_DESCRIPTOR, ingestion_properties=INGESTION_PROPERTIES)
print('Ingestion queued successfully with Azure Data Explorer')
等待 5 到 10 分钟,等待排队引入计划并将数据加载到 Azure 数据资源管理器中。 运行以下代码来计算 StormEvents 表中的记录。
QUERY = "StormEvents | count"
RESPONSE = KUSTO_CLIENT.execute_query(KUSTO_DATABASE, QUERY)
dataframe_from_result_table(RESPONSE.primary_results[0])
登录到 https://dataexplorer.azure.cn 并连接到您的群集。 在数据库中运行以下命令,检查过去四小时内的引入失败。 在运行之前替换数据库名称。
.show ingestion failures
| where FailedOn > ago(4h) and Database == "<DatabaseName>"
运行此命令以查看过去四小时内所有数据导入操作的状态。 在运行之前替换数据库名称。
.show operations
| where StartedOn > ago(4h) and Database == "<DatabaseName>" and Table == "StormEvents" and Operation == "DataIngestPull"
| summarize arg_max(LastUpdatedOn, *) by OperationId
如果打算关注其他文章,请保留所创建的资源。 否则,请在数据库中运行以下命令以清理 StormEvents 表。
.drop table StormEvents