使用 Azure 数据资源管理器 .NET Standard SDK(预览版)引入数据Ingest data using the Azure Data Explorer .NET Standard SDK (Preview)

Azure 数据资源管理器 (ADX) 是一项快速且高度可缩放的数据探索服务,适用于日志和遥测数据。Azure Data Explorer (ADX) is a fast and highly scalable data exploration service for log and telemetry data. ADX 为 .NET Standard 提供了两个客户端库:引入库数据库ADX provides two client libraries for .NET Standard: an ingest library and a data library. 可以使用这些库在群集中引入(加载)数据并从代码中查询数据。These libraries enable you to ingest (load) data into a cluster and query data from your code. 本文首先在测试群集中创建一个表和数据映射。In this article, you first create a table and data mapping in a test cluster. 然后将引入排列到群集并验证结果。You then queue an ingestion to the cluster and validate the results.

必备条件Prerequisites

安装引入库Install the ingest library

Install-Package Microsoft.Azure.Kusto.Ingest.NETStandard

AuthenticationAuthentication

Azure 数据资源管理器使用 AAD 租户 ID,以对应用程序进行身份验证。To authenticate an application, Azure Data Explorer uses your AAD tenant ID. 要查找租户 ID,请使用以下 URL,并将域替换为 YourDomain 。To find your tenant ID, use the following URL, substituting your domain for YourDomain.

https://login.chinacloudapi.cn/<YourDomain>/.well-known/openid-configuration/

例如,如果域名为 contoso.com,则该 URL 将是:https://login.chinacloudapi.cn/contoso.com/.well-known/openid-configuration/For example, if your domain is contoso.com, the URL is: https://login.chinacloudapi.cn/contoso.com/.well-known/openid-configuration/. 单击此 URL 以查看结果;第一行如下所示。Click this URL to see the results; the first line is as follows.

"authorization_endpoint":"https://login.chinacloudapi.cn/6babcaad-604b-40ac-a9d7-9fd97c0b779f/oauth2/authorize"

在这种情况下,租户 ID 为 6babcaad-604b-40ac-a9d7-9fd97c0b779fThe tenant ID in this case is 6babcaad-604b-40ac-a9d7-9fd97c0b779f.

此示例使用 AAD 用户和密码进行访问群集所需的身份验证。This example uses an AAD user and password for authentication to access the cluster. 也可使用 AAD 应用程序证书和 AAD 应用程序密钥。You can also use AAD application certificate and AAD application key. tenantIduserpassword 设置值,然后运行以下代码。Set the your values for tenantId, user, and password before running this code.

var tenantId = "<TenantId>";
var user = "<User>";
var password = "<Password>";

构造连接字符串Construct the connection string

现在构造连接字符串。Now construct the connection string. 在后续步骤中创建目标表和映射。You create the destination table and mapping in a later step.

var kustoUri = "https://<ClusterName>.<Region>.kusto.chinacloudapi.cn:443/";
var database = "<DatabaseName>";

var kustoConnectionStringBuilder =
    new KustoConnectionStringBuilder(kustoUri)
    {
        FederatedSecurity = true,
        InitialCatalog = database,
        UserID = user,
        Password = password,
        Authority = tenantId
    };

设置源文件信息Set source file information

设置源文件的路径。Set the path for the source file. 此示例使用 Azure Blob 存储上托管的示例文件。This example uses a sample file hosted on Azure Blob Storage. StormEvents 示例数据集包含美国国家环境信息中心中与天气相关的数据。The StormEvents sample data set contains weather-related data from the National Centers for Environmental Information.

var blobPath = "https://kustosamplefiles.blob.core.chinacloudapi.cn/samplefiles/StormEvents.csv?st=2018-08-31T22%3A02%3A25Z&se=2020-09-01T22%3A02%3A00Z&sp=r&sv=2018-03-28&sr=b&sig=LQIbomcKI8Ooz425hWtjeq6d61uEaq21UVX7YrM61N4%3D";

在测试群集上创建表Create a table on your test cluster

创建与 StormEvents.csv 文件中的数据架构匹配的名为 StormEvents 的表。Create a table named StormEvents that matches the schema of the data in the StormEvents.csv file.

var table = "StormEvents";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
    var command =
        CslCommandGenerator.GenerateTableCreateCommand(
            table,
            new[]
            {
                Tuple.Create("StartTime", "System.DateTime"),
                Tuple.Create("EndTime", "System.DateTime"),
                Tuple.Create("EpisodeId", "System.Int32"),
                Tuple.Create("EventId", "System.Int32"),
                Tuple.Create("State", "System.String"),
                Tuple.Create("EventType", "System.String"),
                Tuple.Create("InjuriesDirect", "System.Int32"),
                Tuple.Create("InjuriesIndirect", "System.Int32"),
                Tuple.Create("DeathsDirect", "System.Int32"),
                Tuple.Create("DeathsIndirect", "System.Int32"),
                Tuple.Create("DamageProperty", "System.Int32"),
                Tuple.Create("DamageCrops", "System.Int32"),
                Tuple.Create("Source", "System.String"),
                Tuple.Create("BeginLocation", "System.String"),
                Tuple.Create("EndLocation", "System.String"),
                Tuple.Create("BeginLat", "System.Double"),
                Tuple.Create("BeginLon", "System.Double"),
                Tuple.Create("EndLat", "System.Double"),
                Tuple.Create("EndLon", "System.Double"),
                Tuple.Create("EpisodeNarrative", "System.String"),
                Tuple.Create("EventNarrative", "System.String"),
                Tuple.Create("StormSummary", "System.Object"),
            });

    kustoClient.ExecuteControlCommand(command);
}

定义引入映射Define ingestion mapping

将传入的 CSV 数据映射到创建表时使用的列名称。Map the incoming CSV data to the column names used when creating the table. 在该表上预配 CSV 列映射对象Provision a CSV column mapping object on that table

var tableMapping = "StormEvents_CSV_Mapping";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
    var command =
        CslCommandGenerator.GenerateTableMappingCreateCommand(
            Data.Ingestion.IngestionMappingKind.Csv,
            table,
            tableMapping,
            new[] {
                new ColumnMapping() { ColumnName = "StartTime", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "0" } } },
                new ColumnMapping() { ColumnName = "EndTime", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "1" } } },
                new ColumnMapping() { ColumnName = "EpisodeId", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "2" } } },
                new ColumnMapping() { ColumnName = "EventId", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "3" } } },
                new ColumnMapping() { ColumnName = "State", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "4" } } },
                new ColumnMapping() { ColumnName = "EventType", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "5" } } },
                new ColumnMapping() { ColumnName = "InjuriesDirect", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "6" } } },
                new ColumnMapping() { ColumnName = "InjuriesIndirect", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "7" } } },
                new ColumnMapping() { ColumnName = "DeathsDirect", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "8" } } },
                new ColumnMapping() { ColumnName = "DeathsIndirect", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "9" } } },
                new ColumnMapping() { ColumnName = "DamageProperty", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "10" } } },
                new ColumnMapping() { ColumnName = "DamageCrops", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "11" } } },
                new ColumnMapping() { ColumnName = "Source", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "12" } } },
                new ColumnMapping() { ColumnName = "BeginLocation", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "13" } } },
                new ColumnMapping() { ColumnName = "EndLocation", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "14" } } },
                new ColumnMapping() { ColumnName = "BeginLat", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "15" } } },
                new ColumnMapping() { ColumnName = "BeginLon", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "16" } } },
                new ColumnMapping() { ColumnName = "EndLat", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "17" } } },
                new ColumnMapping() { ColumnName = "EndLon", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "18" } } },
                new ColumnMapping() { ColumnName = "EpisodeNarrative", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "19" } } },
                new ColumnMapping() { ColumnName = "EventNarrative", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "20" } } },
                new ColumnMapping() { ColumnName = "StormSummary", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "21" } } }
        });

    kustoClient.ExecuteControlCommand(command);
}

列入一条引入消息Queue a message for ingestion

将一条消息排入队列,以便从 Blob 存储中提取数据并将该数据引入到 ADX。Queue a message to pull data from blob storage and ingest that data into ADX.

var ingestUri = "https://ingest-<ClusterName>.<Region>.kusto.chinacloudapi.cn:443/";
var ingestConnectionStringBuilder =
    new KustoConnectionStringBuilder(ingestUri)
    {
        FederatedSecurity = true,
        InitialCatalog = database,
        UserID = user,
        Password = password,
        Authority = tenantId
    };

using (var ingestClient = KustoIngestFactory.CreateQueuedIngestClient(ingestConnectionStringBuilder))
{
    var properties =
        new KustoQueuedIngestionProperties(database, table)
        {
            Format = DataSourceFormat.csv,
            IngestionMapping = new IngestionMapping()
            { 
                IngestionMappingReference = tableMapping
            },
            IgnoreFirstRecord = true
        };

    ingestClient.IngestFromStorageAsync(blobPath ingestionProperties: properties);
}

验证数据已引入表中Validate data was ingested into the table

等待五到十分钟,直到排入队列的引入已计划在 ADX 中引入和加载数据。Wait for five to ten minutes for the queued ingestion to schedule the ingest and load the data into ADX. 然后运行以下代码,以获取 StormEvents 表中记录的计数。Then run the following code to get the count of records in the StormEvents table.

using (var cslQueryProvider = KustoClientFactory.CreateCslQueryProvider(kustoConnectionStringBuilder))
{
    var query = $"{table} | count";

    var results = cslQueryProvider.ExecuteQuery<long>(query);
    Console.WriteLine(results.Single());
}

运行故障排除查询Run troubleshooting queries

登录到 https://dataexplorer.azure.cn 并连接到群集。Sign in to https://dataexplorer.azure.cn and connect to your cluster. 在数据库中运行以下命令以查看过去四个小时内是否存在任何失败引入。Run the following command in your database to see if there were any ingestion failures in the last four hours. 在运行之前替换数据库名称。Replace the database name before running.

.show ingestion failures
| where FailedOn > ago(4h) and Database == "<DatabaseName>"

运行以下命令以查看过去四个小时内所有引入操作的状态。Run the following command to view the status of all ingestion operations in the last four hours. 在运行之前替换数据库名称。Replace the database name before running.

.show operations
| where StartedOn > ago(4h) and Database == "<DatabaseName>" and Operation == "DataIngestPull"
| summarize arg_max(LastUpdatedOn, *) by OperationId

清理资源Clean up resources

如果计划学习我们的其他文章,请保留已创建的资源。If you plan to follow our other articles, keep the resources you created. 否则,在数据库中运行以下命令以清除 StormEvents 表。If not, run the following command in your database to clean up the StormEvents table.

.drop table StormEvents

后续步骤Next steps