使用 Azure 数据资源管理器 .NET SDK 引入数据Ingest data using the Azure Data Explorer .NET SDK

Azure 数据资源管理器是一项快速且高度可缩放的数据探索服务,适用于日志和遥测数据。Azure Data Explorer is a fast and highly scalable data exploration service for log and telemetry data. 它为 .NET 提供了两个客户端库:引入库数据库It provides two client libraries for .NET: an ingest library and a data library. 有关 .NET SDK 的详细信息,请参阅关于 .NET SDKFor more information on .NET SDK, see about .NET SDK. 可以使用这些库在群集中引入(加载)数据并从代码中查询数据。These libraries enable you to ingest (load) data into a cluster and query data from your code. 本文首先在测试群集中创建一个表和数据映射。In this article, you first create a table and data mapping in a test cluster. 然后将引入排列到群集并验证结果。You then queue an ingestion to the cluster and validate the results.

先决条件Prerequisites

安装引入库Install the ingest library

Install-Package Microsoft.Azure.Kusto.Ingest

添加身份验证和构造连接字符串Add authentication and construct connection string

身份验证Authentication

Azure 数据资源管理器 SDK 使用 AAD 租户 ID,以对应用程序进行身份验证。To authenticate an application, Azure Data Explorer SDK uses your AAD tenant ID. 要查找租户 ID,请使用以下 URL,并将域替换为 YourDomain 。To find your tenant ID, use the following URL, substituting your domain for YourDomain .

https://login.chinacloudapi.cn/<YourDomain>/.well-known/openid-configuration/

例如,如果域名为 contoso.com,则该 URL 将是:https://login.chinacloudapi.cn/contoso.com/.well-known/openid-configuration/For example, if your domain is contoso.com , the URL is: https://login.chinacloudapi.cn/contoso.com/.well-known/openid-configuration/. 单击此 URL 以查看结果;第一行如下所示。Click this URL to see the results; the first line is as follows.

"authorization_endpoint":"https://login.chinacloudapi.cn/6babcaad-604b-40ac-a9d7-9fd97c0b779f/oauth2/authorize"

在这种情况下,租户 ID 为 6babcaad-604b-40ac-a9d7-9fd97c0b779fThe tenant ID in this case is 6babcaad-604b-40ac-a9d7-9fd97c0b779f.

此示例使用交互式 AAD 用户身份验证来访问群集。This example uses an interactive AAD user authentication to access the cluster. 还可以使用通过证书或应用程序机密完成的 AAD 应用程序身份验证。You can also use AAD application authentication with certificate or application secret. 在运行此代码之前,请确保为 tenantIdclusterUri 设置正确的值。Make sure to set the correct values for tenantId and clusterUri before running this code.

Azure 数据资源管理器 SDK 提供了一种简便方法,可将身份验证方法设置为连接字符串的一部分。Azure Data Explorer SDK provides a convenient way to set up the authentication method as part of the connection string. 有关 Azure 数据资源管理器连接字符串的完整文档,请参阅连接字符串For complete documentation on Azure Data Explorer connection strings, see connection strings.

备注

当前版本的 SDK 不支持 .NET Core 上的交互式用户身份验证。The current version of the SDK doesn't support interactive uer authentication on .NET Core. 如果需要,请改用 AAD 用户名/密码或应用程序身份验证。If required, use AAD username/password or application authentication instead.

构造连接字符串Construct the connection string

现在可以构造 Azure 数据资源管理器连接字符串。Now you can construct the Azure Data Explorer connection string. 你将在后续步骤中创建目标表和映射。You will create the destination table and mapping in a later step.

var tenantId = "<TenantId>";
var kustoUri = "https://<ClusterName>.<Region>.kusto.chinacloudapi.cn/";

var kustoConnectionStringBuilder = new KustoConnectionStringBuilder(kustoUri).WithAadUserPromptAuthentication(tenantId);

设置源文件信息Set source file information

设置源文件的路径。Set the path for the source file. 此示例使用 Azure Blob 存储上托管的示例文件。This example uses a sample file hosted on Azure Blob Storage. StormEvents 示例数据集包含美国国家环境信息中心中与天气相关的数据。The StormEvents sample data set contains weather-related data from the National Centers for Environmental Information.

var blobPath = "https://kustosamplefiles.blob.core.chinacloudapi.cn/samplefiles/StormEvents.csv?st=2018-08-31T22%3A02%3A25Z&se=2020-09-01T22%3A02%3A00Z&sp=r&sv=2018-03-28&sr=b&sig=LQIbomcKI8Ooz425hWtjeq6d61uEaq21UVX7YrM61N4%3D";

在测试群集上创建表Create a table on your test cluster

创建与 StormEvents.csv 文件中的数据架构匹配的名为 StormEvents 的表。Create a table named StormEvents that matches the schema of the data in the StormEvents.csv file.

提示

下面的代码片段为几乎每个调用创建一个客户端实例。The following code snippets create an instance of a client for almost every call. 这样做是为了使每个片段可单独运行。This is done to make each snippet individually runnable. 在生产环境中,客户端实例是可重入的,应根据需要保留。In production, the client instances are reentrant, and should be kept as long as needed. 即使使用多个数据库(可以在命令级别指定数据库),每个 URI 一个客户端实例也已足够。A single client instance per URI is sufficient, even when working with multiple databases (database can be specified on a command level).

var databaseName = "<DatabaseName>";
var table = "StormEvents";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
    var command =
        CslCommandGenerator.GenerateTableCreateCommand(
            table,
            new[]
            {
                Tuple.Create("StartTime", "System.DateTime"),
                Tuple.Create("EndTime", "System.DateTime"),
                Tuple.Create("EpisodeId", "System.Int32"),
                Tuple.Create("EventId", "System.Int32"),
                Tuple.Create("State", "System.String"),
                Tuple.Create("EventType", "System.String"),
                Tuple.Create("InjuriesDirect", "System.Int32"),
                Tuple.Create("InjuriesIndirect", "System.Int32"),
                Tuple.Create("DeathsDirect", "System.Int32"),
                Tuple.Create("DeathsIndirect", "System.Int32"),
                Tuple.Create("DamageProperty", "System.Int32"),
                Tuple.Create("DamageCrops", "System.Int32"),
                Tuple.Create("Source", "System.String"),
                Tuple.Create("BeginLocation", "System.String"),
                Tuple.Create("EndLocation", "System.String"),
                Tuple.Create("BeginLat", "System.Double"),
                Tuple.Create("BeginLon", "System.Double"),
                Tuple.Create("EndLat", "System.Double"),
                Tuple.Create("EndLon", "System.Double"),
                Tuple.Create("EpisodeNarrative", "System.String"),
                Tuple.Create("EventNarrative", "System.String"),
                Tuple.Create("StormSummary", "System.Object"),
            });

    kustoClient.ExecuteControlCommand(databaseName, command);
}

定义引入映射Define ingestion mapping

将传入的 CSV 数据映射到创建表时使用的列名称。Map the incoming CSV data to the column names used when creating the table. 在该表上预配 CSV 列映射对象Provision a CSV column mapping object on that table.

var tableMapping = "StormEvents_CSV_Mapping";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
    var command =
        CslCommandGenerator.GenerateTableMappingCreateCommand(
            Data.Ingestion.IngestionMappingKind.Csv,
            table,
            tableMapping,
            new[] {
                new ColumnMapping() { ColumnName = "StartTime", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "0" } } },
                new ColumnMapping() { ColumnName = "EndTime", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "1" } } },
                new ColumnMapping() { ColumnName = "EpisodeId", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "2" } } },
                new ColumnMapping() { ColumnName = "EventId", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "3" } } },
                new ColumnMapping() { ColumnName = "State", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "4" } } },
                new ColumnMapping() { ColumnName = "EventType", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "5" } } },
                new ColumnMapping() { ColumnName = "InjuriesDirect", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "6" } } },
                new ColumnMapping() { ColumnName = "InjuriesIndirect", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "7" } } },
                new ColumnMapping() { ColumnName = "DeathsDirect", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "8" } } },
                new ColumnMapping() { ColumnName = "DeathsIndirect", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "9" } } },
                new ColumnMapping() { ColumnName = "DamageProperty", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "10" } } },
                new ColumnMapping() { ColumnName = "DamageCrops", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "11" } } },
                new ColumnMapping() { ColumnName = "Source", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "12" } } },
                new ColumnMapping() { ColumnName = "BeginLocation", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "13" } } },
                new ColumnMapping() { ColumnName = "EndLocation", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "14" } } },
                new ColumnMapping() { ColumnName = "BeginLat", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "15" } } },
                new ColumnMapping() { ColumnName = "BeginLon", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "16" } } },
                new ColumnMapping() { ColumnName = "EndLat", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "17" } } },
                new ColumnMapping() { ColumnName = "EndLon", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "18" } } },
                new ColumnMapping() { ColumnName = "EpisodeNarrative", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "19" } } },
                new ColumnMapping() { ColumnName = "EventNarrative", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "20" } } },
                new ColumnMapping() { ColumnName = "StormSummary", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "21" } } }
        });

    kustoClient.ExecuteControlCommand(databaseName, command);
}

为表定义批处理策略Define batching policy for your table

Azure 数据资源管理器引入对传入数据执行批处理,以优化数据分片大小。Azure Data Explorer ingestion performs batching of the incoming data to optimize for data shard size. 此过程由引入批处理策略控制,并且可通过引入批处理策略控制命令进行修改。This process is controlled by the ingestion batching policy and can be modified by the ingestion batching policy control command. 使用此策略可以减少缓慢到达的数据的延迟。Use this policy to reduce latency of slowly arriving data.

using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
    var command =
        CslCommandGenerator.GenerateTableAlterIngestionBatchingPolicyCommand(
        databaseName,
        table,
        new IngestionBatchingPolicy(maximumBatchingTimeSpan: TimeSpan.FromSeconds(10.0), maximumNumberOfItems: 100, maximumRawDataSizeMB: 1024));

    kustoClient.ExecuteControlCommand(command);
}

列入一条引入消息Queue a message for ingestion

将一条消息排入队列,以便从 blob 存储中提取数据并将该数据引入到 Azure 数据资源管理器。Queue a message to pull data from blob storage and ingest that data into Azure Data Explorer. 会建立与 Azure 数据资源管理器群集的数据引入终结点的连接,并创建另一个客户端以使用该终结点。A connection is established to the data ingestion endpoint of the Azure Data Explorer cluster, and another client is created to work with that endpoint.

提示

下面的代码片段为几乎每个调用创建一个客户端实例。The following code snippets create an instance of a client for almost every call. 这样做是为了使每个片段可单独运行。This is done to make each snippet individually runnable. 在生产环境中,客户端实例是可重入的,应根据需要保留。In production, the client instances are reentrant, and should be kept as long as needed. 即使使用多个数据库(可以在命令级别指定数据库),每个 URI 一个客户端实例也已足够。A single client instance per URI is sufficient, even when working with multiple databases (database can be specified on a command level).

var ingestUri = "https://ingest-<ClusterName>.<Region>.kusto.chinacloudapi.cn";
var ingestConnectionStringBuilder = new KustoConnectionStringBuilder(ingestUri).WithAadUserPromptAuthentication(tenantId);

using (var ingestClient = KustoIngestFactory.CreateQueuedIngestClient(ingestConnectionStringBuilder))
{
    var properties =
        new KustoQueuedIngestionProperties(database, table)
        {
            Format = DataSourceFormat.csv,
            IngestionMapping = new IngestionMapping()
            { 
                IngestionMappingReference = tableMapping,
                IngestionMappingKind = IngestionMappingKind.Csv
            },
            IgnoreFirstRecord = true
        };

    ingestClient.IngestFromStorageAsync(blobPath, ingestionProperties: properties).GetAwaiter().GetResult();
}

验证数据已引入表中Validate data was ingested into the table

等待五到十分钟,直到排入队列的引入已计划在 Azure 数据资源管理器中引入和加载数据。Wait five to ten minutes for the queued ingestion to schedule the ingestion and load the data into Azure Data Explorer. 然后运行以下代码,以获取 StormEvents 表中记录的计数。Then run the following code to get the count of records in the StormEvents table.

using (var cslQueryProvider = KustoClientFactory.CreateCslQueryProvider(kustoConnectionStringBuilder))
{
    var query = $"{table} | count";

    var results = cslQueryProvider.ExecuteQuery<long>(databaseName, query);
    Console.WriteLine(results.Single());
}

运行故障排除查询Run troubleshooting queries

登录到 https://dataexplorer.azure.cn 并连接到群集。Sign in to https://dataexplorer.azure.cn and connect to your cluster. 在数据库中运行以下命令以查看过去四个小时内是否存在任何失败引入。Run the following command in your database to see if there were any ingestion failures in the last four hours. 在运行之前替换数据库名称。Replace the database name before running.

.show ingestion failures
| where FailedOn > ago(4h) and Database == "<DatabaseName>"

运行以下命令以查看过去四个小时内所有引入操作的状态。Run the following command to view the status of all ingestion operations in the last four hours. 在运行之前替换数据库名称。Replace the database name before running.

.show operations
| where StartedOn > ago(4h) and Database == "<DatabaseName>" and Operation == "DataIngestPull"
| summarize arg_max(LastUpdatedOn, *) by OperationId

清理资源Clean up resources

如果计划学习我们的其他文章,请保留已创建的资源。If you plan to follow our other articles, keep the resources you created. 否则,在数据库中运行以下命令以清除 StormEvents 表。If not, run the following command in your database to clean up the StormEvents table.

.drop table StormEvents

后续步骤Next steps