使用 Azure 数据资源管理器 Node 库引入数据Ingest data using the Azure Data Explorer Node library

Azure 数据资源管理器是一项快速且高度可缩放的数据探索服务,适用于日志和遥测数据。Azure Data Explorer is a fast and highly scalable data exploration service for log and telemetry data. Azure 数据资源管理器为 Node 提供了两个客户端库:引入库数据库Azure Data Explorer provides two client libraries for Node: an ingest library and a data library. 可以使用这些库在群集中引入(加载)数据并从代码中查询数据。These libraries enable you to ingest (load) data into a cluster and query data from your code. 本文首先在测试群集中创建一个表和数据映射。In this article, you first create a table and data mapping in a test cluster. 然后将引入排列到群集并验证结果。You then queue ingestion to the cluster and validate the results.

如果没有 Azure 订阅,可在开始前创建一个试用帐户If you don't have an Azure subscription, create a trial account before you begin.

先决条件Prerequisites

除 Azure 订阅外,还需以下条件才能完成本文:In addition to an Azure subscription, you need the following to complete this article:

安装数据和引入库Install the data and ingest libraries

安装 azure-kusto-ingestazure-kusto-dataInstall azure-kusto-ingest and azure-kusto-data

npm i azure-kusto-ingest azure-kusto-data

添加导入语句和常量Add import statements and constants

从库中导入类Import classes from the libraries


const KustoConnectionStringBuilder = require("azure-kusto-data").KustoConnectionStringBuilder;
const KustoClient = require("azure-kusto-data").Client;
const KustoIngestClient = require("azure-kusto-ingest").IngestClient;
const IngestionProperties = require("azure-kusto-ingest").IngestionProperties;
const { DataFormat } = require("azure-kusto-ingest").IngestionPropertiesEnums;
const { BlobDescriptor } = require("azure-kusto-ingest").IngestionDescriptors;

Azure 数据资源管理器使用 Azure Active Directory 租户 ID,以对应用程序进行身份验证。To authenticate an application, Azure Data Explorer uses your Azure Active Directory tenant ID. 若要查找租户 ID,请按查找 Microsoft 365 租户 ID 中的说明操作。To find your tenant ID, follow Find your Microsoft 365 tenant ID.

authorityIdkustoUrikustoIngestUrikustoDatabase 设置值,然后运行以下代码。Set the values for authorityId, kustoUri, kustoIngestUri and kustoDatabase before running this code.

const cluster = "MyCluster";
const region = "chinaeast2";
const authorityId = "microsoft.com";
const kustoUri = `https://${cluster}.${region}.kusto.chinacloudapi.cn:443`;
const kustoIngestUri = `https://ingest-${cluster}.${region}.kusto.chinacloudapi.cn:443`;
const kustoDatabase  = "Weather";

现在构造连接字符串。Now construct the connection string. 此示例使用设备身份验证来访问群集。This example uses device authentication to access the cluster. 还可以使用 Azure Active Directory 应用程序证书、应用程序密钥以及用户和密码。You can also use Azure Active Directory application certificate, application key, and user and password.

在后续步骤中创建目标表和映射。You create the destination table and mapping in a later step.

const kcsbIngest = KustoConnectionStringBuilder.withAadDeviceAuthentication(kustoIngestUri, authorityId);
const kcsbData = KustoConnectionStringBuilder.withAadDeviceAuthentication(kustoUri, authorityId);
const destTable = "StormEvents";
const destTableMapping = "StormEvents_CSV_Mapping";

设置源文件信息Set source file information

导入其他类并设置数据源文件的常数。Import additional classes and set constants for the data source file. 此示例使用 Azure Blob 存储上托管的示例文件。This example uses a sample file hosted on Azure Blob Storage. StormEvents 示例数据集包含美国国家环境信息中心中与天气相关的数据。The StormEvents sample data set contains weather-related data from the National Centers for Environmental Information.

const container = "samplefiles";
const account = "kustosamplefiles";
const sas = "?st=2018-08-31T22%3A02%3A25Z&se=2020-09-01T22%3A02%3A00Z&sp=r&sv=2018-03-28&sr=b&sig=LQIbomcKI8Ooz425hWtjeq6d61uEaq21UVX7YrM61N4%3D";
const filePath = "StormEvents.csv";
const blobPath = `https://${account}.blob.core.chinacloudapi.cn/${container}/${filePath}${sas}`;

在测试群集上创建表Create a table on your test cluster

创建与 StormEvents.csv 文件中的数据架构匹配的表。Create a table that matches the schema of the data in the StormEvents.csv file. 运行此代码时,它会返回如下消息:若要登录,请使用 Web 浏览器打开页 https://microsoft.com/devicelogin ,然后输入代码 XXXXXXXXX 进行身份验证 。When this code runs, it returns a message like the following: To sign in, use a web browser to open the page https://microsoft.com/devicelogin and enter the code XXXXXXXXX to authenticate. 按照步骤登录,然后返回运行下一个代码块。Follow the steps to sign in, then return to run the next code block. 建立连接的后续代码块会要求你再次登录。Subsequent code blocks that make a connection will require you to sign in again.

const kustoClient = new KustoClient(kcsbData);
const createTableCommand = `.create table ${destTable} (StartTime: datetime, EndTime: datetime, EpisodeId: int, EventId: int, State: string, EventType: string, InjuriesDirect: int, InjuriesIndirect: int, DeathsDirect: int, DeathsIndirect: int, DamageProperty: int, DamageCrops: int, Source: string, BeginLocation: string, EndLocation: string, BeginLat: real, BeginLon: real, EndLat: real, EndLon: real, EpisodeNarrative: string, EventNarrative: string, StormSummary: dynamic)`;

kustoClient.executeMgmt(kustoDatabase, createTableCommand, (err, results) => {
    console.log(results.primaryResults[0][0].toString());
});

定义引入映射Define ingestion mapping

将传入的 CSV 数据映射到创建表时使用的列名称和数据类型。Map incoming CSV data to the column names and data types used when creating the table.

const createMappingCommand = `.create table ${destTable} ingestion csv mapping '${destTableMapping}' '[{"Name":"StartTime","datatype":"datetime","Ordinal":0}, {"Name":"EndTime","datatype":"datetime","Ordinal":1},{"Name":"EpisodeId","datatype":"int","Ordinal":2},{"Name":"EventId","datatype":"int","Ordinal":3},{"Name":"State","datatype":"string","Ordinal":4},{"Name":"EventType","datatype":"string","Ordinal":5},{"Name":"InjuriesDirect","datatype":"int","Ordinal":6},{"Name":"InjuriesIndirect","datatype":"int","Ordinal":7},{"Name":"DeathsDirect","datatype":"int","Ordinal":8},{"Name":"DeathsIndirect","datatype":"int","Ordinal":9},{"Name":"DamageProperty","datatype":"int","Ordinal":10},{"Name":"DamageCrops","datatype":"int","Ordinal":11},{"Name":"Source","datatype":"string","Ordinal":12},{"Name":"BeginLocation","datatype":"string","Ordinal":13},{"Name":"EndLocation","datatype":"string","Ordinal":14},{"Name":"BeginLat","datatype":"real","Ordinal":16},{"Name":"BeginLon","datatype":"real","Ordinal":17},{"Name":"EndLat","datatype":"real","Ordinal":18},{"Name":"EndLon","datatype":"real","Ordinal":19},{"Name":"EpisodeNarrative","datatype":"string","Ordinal":20},{"Name":"EventNarrative","datatype":"string","Ordinal":21},{"Name":"StormSummary","datatype":"dynamic","Ordinal":22}]'`;

kustoClient.executeMgmt(kustoDatabase, createMappingCommand, (err, results) => {
    console.log(results.primaryResults[0][0].toString());
});

列入一条引入消息Queue a message for ingestion

将一条消息排入队列,以便从 blob 存储中提取数据并将该数据引入到 Azure 数据资源管理器。Queue a message to pull data from blob storage and ingest that data into Azure Data Explorer.

const defaultProps  = new IngestionProperties(kustoDatabase, destTable, DataFormat.csv, null,destTableMapping, {'ignoreFirstRecord': 'true'});
const ingestClient = new KustoIngestClient(kcsbIngest, defaultProps);
// All ingestion properties are documented here: https://docs.microsoft.com/azure/kusto/management/data-ingest#ingestion-properties

const blobDesc = new BlobDescriptor(blobPath, 10);
ingestClient.ingestFromBlob(blobDesc,null, (err) => {
    if (err) throw new Error(err);
});

验证表是否包含数据Validate that table contains data

验证数据已引入表中。Validate that the data was ingested into the table. 等待五到十分钟,直到排入队列的引入已计划在 Azure 数据资源管理器中引入和加载数据。Wait for five to ten minutes for the queued ingestion to schedule the ingest and load the data into Azure Data Explorer. 然后运行以下代码,以获取 StormEvents 表中记录的计数。Then run the following code to get the count of records in the StormEvents table.

const query = `${destTable} | count`;

kustoClient.execute(kustoDatabase, query, (err, results) => {
    if (err) throw new Error(err);  
    console.log(results.primaryResults[0][0].toString());
});

运行故障排除查询Run troubleshooting queries

登录到 https://dataexplorer.azure.cn 并连接到群集。Sign in to https://dataexplorer.azure.cn and connect to your cluster. 在数据库中运行以下命令以查看过去四个小时内是否存在任何失败引入。Run the following command in your database to see if there were any ingestion failures in the last four hours. 在运行之前替换数据库名称。Replace the database name before running.

.show ingestion failures
| where FailedOn > ago(4h) and Database == "<DatabaseName>"

运行以下命令以查看过去四个小时内所有引入操作的状态。Run the following command to view the status of all ingestion operations in the last four hours. 在运行之前替换数据库名称。Replace the database name before running.

.show operations
| where StartedOn > ago(4h) and Database == "<DatabaseName>" and Operation == "DataIngestPull"
| summarize arg_max(LastUpdatedOn, *) by OperationId

清理资源Clean up resources

如果计划学习我们的其他文章,请保留已创建的资源。If you plan to follow our other articles, keep the resources you created. 否则,在数据库中运行以下命令以清除 StormEvents 表。If not, run the following command in your database to clean up the StormEvents table.

.drop table StormEvents

后续步骤Next steps