使用 Azure 数据资源管理器 Python 库引入数据Ingest data using the Azure Data Explorer Python library

本文将使用 Azure 数据资源管理器 Python 库引入数据。In this article, you ingest data using the Azure Data Explorer Python library. Azure 数据资源管理器是一项快速且高度可缩放的数据探索服务,适用于日志和遥测数据。Azure Data Explorer is a fast and highly scalable data exploration service for log and telemetry data. Azure 数据资源管理器为 Python 提供了两个客户端库:引入库数据库Azure Data Explorer provides two client libraries for Python: an ingest library and a data library. 使用这些库,可以从代码将数据引入或加载到群集中并查询数据。These libraries enable you to ingest, or load, data into a cluster and query data from your code.

首先,在群集中创建一个表和数据映射。First, create a table and data mapping in a cluster. 然后将引入排列到群集并验证结果。You then queue ingestion to the cluster and validate the results.

本文同时也以 Azure Notebook 的形式提供。This article is also available as an Azure Notebook.

先决条件Prerequisites

安装数据和引入库Install the data and ingest libraries

安装 azure-kusto-data 和 azure-kusto-ingest 。Install azure-kusto-data and azure-kusto-ingest.

pip install azure-kusto-data
pip install azure-kusto-ingest

添加导入语句和常量Add import statements and constants

从 azure-kusto-data 导入类。Import classes from azure-kusto-data.

from azure.kusto.data import KustoClient, KustoConnectionStringBuilder
from azure.kusto.data.exceptions import KustoServiceError
from azure.kusto.data.helpers import dataframe_from_result_table

Azure 数据资源管理器使用 Azure Active Directory 租户 ID,以对应用程序进行身份验证。To authenticate an application, Azure Data Explorer uses your Azure Active Directory tenant ID. 若要查找租户 ID,请使用以下 URL,将 YourDomain 替换为你的域。To find your tenant ID, use the following URL, replacing your domain for YourDomain.

https://login.chinacloudapi.cn/<YourDomain>/.well-known/openid-configuration/

例如,如果域名为 contoso.com,则该 URL 将是:https://login.chinacloudapi.cn/contoso.com/.well-known/openid-configuration/For example, if your domain is contoso.com , the URL is: https://login.chinacloudapi.cn/contoso.com/.well-known/openid-configuration/. 单击此 URL 以查看结果;第一行如下所示。Click this URL to see the results; the first line is as follows.

"authorization_endpoint":"https://login.chinacloudapi.cn/6babcaad-604b-40ac-a9d7-9fd97c0b779f/oauth2/authorize"

在这种情况下,租户 ID 为 6babcaad-604b-40ac-a9d7-9fd97c0b779fThe tenant ID in this case is 6babcaad-604b-40ac-a9d7-9fd97c0b779f. 在运行此代码之前,请为 AAD_TENANT_ID、KUSTO_URI、KUSTO_INGEST_URI 和 KUSTO_DATABASE 设置值。Set the values for AAD_TENANT_ID, KUSTO_URI, KUSTO_INGEST_URI, and KUSTO_DATABASE before running this code.

AAD_TENANT_ID = "<TenantId>"
KUSTO_URI = "https://<ClusterName>.<Region>.kusto.chinacloudapi.cn:443/"
KUSTO_INGEST_URI = "https://ingest-<ClusterName>.<Region>.kusto.chinacloudapi.cn:443/"
KUSTO_DATABASE = "<DatabaseName>"

现在构造连接字符串。Now construct the connection string. 此示例使用设备身份验证来访问群集。This example uses device authentication to access the cluster. 还可以使用 Azure Active Directory 应用程序证书Azure Active Directory 应用程序密钥以及 Azure Active Directory 用户和密码You can also use Azure Active Directory application certificate, Azure Active Directory application key, and Azure Active Directory user and password.

在后续步骤中创建目标表和映射。You create the destination table and mapping in a later step.

KCSB_INGEST = KustoConnectionStringBuilder.with_aad_device_authentication(
    KUSTO_INGEST_URI, AAD_TENANT_ID)

KCSB_DATA = KustoConnectionStringBuilder.with_aad_device_authentication(
    KUSTO_URI, AAD_TENANT_ID)

DESTINATION_TABLE = "StormEvents"
DESTINATION_TABLE_COLUMN_MAPPING = "StormEvents_CSV_Mapping"

设置源文件信息Set source file information

导入其他类并设置数据源文件的常数。Import additional classes and set constants for the data source file. 此示例使用 Azure Blob 存储上托管的示例文件。This example uses a sample file hosted on Azure Blob Storage. StormEvents 示例数据集包含美国国家环境信息中心中与天气相关的数据。The StormEvents sample data set contains weather-related data from the National Centers for Environmental Information.

from azure.kusto.ingest import KustoIngestClient, IngestionProperties, FileDescriptor, BlobDescriptor, DataFormat, ReportLevel, ReportMethod

CONTAINER = "samplefiles"
ACCOUNT_NAME = "kustosamplefiles"
SAS_TOKEN = "?st=2018-08-31T22%3A02%3A25Z&se=2020-09-01T22%3A02%3A00Z&sp=r&sv=2018-03-28&sr=b&sig=LQIbomcKI8Ooz425hWtjeq6d61uEaq21UVX7YrM61N4%3D"
FILE_PATH = "StormEvents.csv"
FILE_SIZE = 64158321    # in bytes

BLOB_PATH = "https://" + ACCOUNT_NAME + ".blob.core.chinacloudapi.cn/" + \
    CONTAINER + "/" + FILE_PATH + SAS_TOKEN

在群集上创建表Create a table on your cluster

创建与 StormEvents.csv 文件中的数据架构匹配的表。Create a table that matches the schema of the data in the StormEvents.csv file. 运行此代码时,它会返回如下消息:若要登录,请使用 Web 浏览器打开页面 https://microsoft.com/devicelogin ,然后输入代码 F3W4VWZDM 进行身份验证。When this code runs, it returns a message like the following message: To sign in, use a web browser to open the page https://microsoft.com/devicelogin and enter the code F3W4VWZDM to authenticate. 按照步骤登录,然后返回运行下一个代码块。Follow the steps to sign in, then return to run the next code block. 建立连接的后续代码块将要求你再次登录。Subsequent code blocks that make a connection require you to sign in again.

KUSTO_CLIENT = KustoClient(KCSB_DATA)
CREATE_TABLE_COMMAND = ".create table StormEvents (StartTime: datetime, EndTime: datetime, EpisodeId: int, EventId: int, State: string, EventType: string, InjuriesDirect: int, InjuriesIndirect: int, DeathsDirect: int, DeathsIndirect: int, DamageProperty: int, DamageCrops: int, Source: string, BeginLocation: string, EndLocation: string, BeginLat: real, BeginLon: real, EndLat: real, EndLon: real, EpisodeNarrative: string, EventNarrative: string, StormSummary: dynamic)"

RESPONSE = KUSTO_CLIENT.execute_mgmt(KUSTO_DATABASE, CREATE_TABLE_COMMAND)

dataframe_from_result_table(RESPONSE.primary_results[0])

定义引入映射Define ingestion mapping

将传入的 CSV 数据映射到创建表时使用的列名称和数据类型。Map incoming CSV data to the column names and data types used when creating the table. 这会将源数据字段映射到目标表列This maps source data fields to destination table columns

CREATE_MAPPING_COMMAND = """.create table StormEvents ingestion csv mapping 'StormEvents_CSV_Mapping' '[{"Name":"StartTime","datatype":"datetime","Ordinal":0}, {"Name":"EndTime","datatype":"datetime","Ordinal":1},{"Name":"EpisodeId","datatype":"int","Ordinal":2},{"Name":"EventId","datatype":"int","Ordinal":3},{"Name":"State","datatype":"string","Ordinal":4},{"Name":"EventType","datatype":"string","Ordinal":5},{"Name":"InjuriesDirect","datatype":"int","Ordinal":6},{"Name":"InjuriesIndirect","datatype":"int","Ordinal":7},{"Name":"DeathsDirect","datatype":"int","Ordinal":8},{"Name":"DeathsIndirect","datatype":"int","Ordinal":9},{"Name":"DamageProperty","datatype":"int","Ordinal":10},{"Name":"DamageCrops","datatype":"int","Ordinal":11},{"Name":"Source","datatype":"string","Ordinal":12},{"Name":"BeginLocation","datatype":"string","Ordinal":13},{"Name":"EndLocation","datatype":"string","Ordinal":14},{"Name":"BeginLat","datatype":"real","Ordinal":16},{"Name":"BeginLon","datatype":"real","Ordinal":17},{"Name":"EndLat","datatype":"real","Ordinal":18},{"Name":"EndLon","datatype":"real","Ordinal":19},{"Name":"EpisodeNarrative","datatype":"string","Ordinal":20},{"Name":"EventNarrative","datatype":"string","Ordinal":21},{"Name":"StormSummary","datatype":"dynamic","Ordinal":22}]'"""

RESPONSE = KUSTO_CLIENT.execute_mgmt(KUSTO_DATABASE, CREATE_MAPPING_COMMAND)

dataframe_from_result_table(RESPONSE.primary_results[0])

列入一条引入消息Queue a message for ingestion

将一条消息排入队列,以便从 blob 存储中提取数据并将该数据引入到 Azure 数据资源管理器。Queue a message to pull data from blob storage and ingest that data into Azure Data Explorer.

INGESTION_CLIENT = KustoIngestClient(KCSB_INGEST)

# All ingestion properties are documented here: https://docs.microsoft.com/azure/kusto/management/data-ingest#ingestion-properties
INGESTION_PROPERTIES = IngestionProperties(database=KUSTO_DATABASE, table=DESTINATION_TABLE, dataFormat=DataFormat.CSV,
                                           mappingReference=DESTINATION_TABLE_COLUMN_MAPPING, additionalProperties={'ignoreFirstRecord': 'true'})
# FILE_SIZE is the raw size of the data in bytes
BLOB_DESCRIPTOR = BlobDescriptor(BLOB_PATH, FILE_SIZE)
INGESTION_CLIENT.ingest_from_blob(
    BLOB_DESCRIPTOR, ingestion_properties=INGESTION_PROPERTIES)

print('Done queuing up ingestion with Azure Data Explorer')

查询已引入表中的数据Query data that was ingested into the table

等待五到十分钟,直到排入队列的引入已计划在 Azure 数据资源管理器中引入和加载数据。Wait for five to 10 minutes for the queued ingestion to schedule the ingest and load the data into Azure Data Explorer. 然后运行以下代码,以获取 StormEvents 表中记录的计数。Then run the following code to get the count of records in the StormEvents table.

QUERY = "StormEvents | count"

RESPONSE = KUSTO_CLIENT.execute_query(KUSTO_DATABASE, QUERY)

dataframe_from_result_table(RESPONSE.primary_results[0])

运行故障排除查询Run troubleshooting queries

登录到 https://dataexplorer.azure.cn 并连接到群集。Sign in to https://dataexplorer.azure.cn and connect to your cluster. 在数据库中运行以下命令以查看过去四个小时内是否存在任何失败引入。Run the following command in your database to see if there were any ingestion failures in the last four hours. 在运行之前替换数据库名称。Replace the database name before running.

.show ingestion failures
| where FailedOn > ago(4h) and Database == "<DatabaseName>"

运行以下命令以查看过去四个小时内所有引入操作的状态。Run the following command to view the status of all ingestion operations in the last four hours. 在运行之前替换数据库名称。Replace the database name before running.

.show operations
| where StartedOn > ago(4h) and Database == "<DatabaseName>" and Table == "StormEvents" and Operation == "DataIngestPull"
| summarize arg_max(LastUpdatedOn, *) by OperationId

清理资源Clean up resources

如果计划学习我们的其他文章,请保留已创建的资源。If you plan to follow our other articles, keep the resources you created. 否则,在数据库中运行以下命令以清除 StormEvents 表。If not, run the following command in your database to clean up the StormEvents table.

.drop table StormEvents

后续步骤Next steps