使用 Kusto .NET SDK 引入数据

.NET 有两个客户端库: 引入库数据库。 若要了解有关 .NET SDK 的详细信息,请参阅 有关 .NET SDK 的信息。 通过这些库,可以将数据引入群集并查询代码中的数据。 在本文中,您将在测试群集内创建一个表和数据映射,将数据摄取操作排队到群集,并验证数据摄取结果。

先决条件

  • Microsoft 帐户或 Microsoft Entra 用户标识。 无需 Azure 订阅。
  • 群集和数据库。 了解如何 创建群集和数据库

安装引入库

Install-Package Microsoft.Azure.Kusto.Ingest

添加身份验证和构造连接字符串

身份验证

SDK 使用 Microsoft Entra 租户 ID,以对应用程序进行身份验证。 使用以下 URL 查找租户 ID,将域替换为 YourDomain

https://login.partner.microsoftonline.cn/<YourDomain>/.well-known/openid-configuration/

例如,如果域名为 contoso.com,则该 URL 将是:https://login.partner.microsoftonline.cn/contoso.com/.well-known/openid-configuration/。 选择此 URL 以查看结果;第一行如下所示。

"authorization_endpoint":"https://login.partner.microsoftonline.cn/6babcaad-604b-40ac-a9d7-9fd97c0b779f/oauth2/authorize"

在这种情况下,租户 ID 为 aaaabbbb-0000-cccc-1111-dddd2222eeee

此示例使用交互式Microsoft Entra 用户身份验证来访问群集。 还可以使用通过证书或应用程序机密完成的 Microsoft Entra 应用程序身份验证。 在运行此代码之前,设置tenantIdclusterUri的正确值。

SDK 提供了一种简便方法,可将身份验证方法设置为连接字符串的一部分。 有关连接字符串的完整文档,请参阅连接字符串

注意

当前版本的 SDK 不支持 .NET Core 上的交互式用户身份验证。 如果需要,请改用 Microsoft Entra 用户名/密码或应用程序身份验证。

构造连接字符串

构造连接字符串。 在后面的步骤中创建目标表和映射。

var kustoUri = "https://<ClusterName>.<Region>.kusto.chinacloudapi.cn/";
var tenantId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";
var kustoConnectionStringBuilder = new KustoConnectionStringBuilder(kustoUri).WithAadUserPromptAuthentication(tenantId);

设置源文件信息

设置源文件的路径。 此示例使用 Azure Blob 存储上托管的示例文件。 StormEvents 示例数据集包括来自国家环境信息中心的天气相关数据。

var blobPath = "https://kustosamples.blob.core.chinacloudapi.cn/samplefiles/StormEvents.csv";

在测试群集上创建表

创建与 StormEvents 文件中的数据架构匹配的名为 StormEvents.csv 的表。

提示

下面的代码段为几乎每个调用创建一个客户端实例。 此方法使每个代码片段单独可运行。 在生产环境中,客户端实例是可重入的,应该根据需要保留。 每个 URI 的单个客户端实例就足够了,即使使用多个数据库(可以在命令级别指定数据库)。

var databaseName = "<DatabaseName>";
var tableName = "StormEvents";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
    var command = CslCommandGenerator.GenerateTableCreateCommand(
        tableName,
        new[]
        {
            Tuple.Create("StartTime", "System.DateTime"),
            Tuple.Create("EndTime", "System.DateTime"),
            Tuple.Create("EpisodeId", "System.Int32"),
            Tuple.Create("EventId", "System.Int32"),
            Tuple.Create("State", "System.String"),
            Tuple.Create("EventType", "System.String"),
            Tuple.Create("InjuriesDirect", "System.Int32"),
            Tuple.Create("InjuriesIndirect", "System.Int32"),
            Tuple.Create("DeathsDirect", "System.Int32"),
            Tuple.Create("DeathsIndirect", "System.Int32"),
            Tuple.Create("DamageProperty", "System.Int32"),
            Tuple.Create("DamageCrops", "System.Int32"),
            Tuple.Create("Source", "System.String"),
            Tuple.Create("BeginLocation", "System.String"),
            Tuple.Create("EndLocation", "System.String"),
            Tuple.Create("BeginLat", "System.Double"),
            Tuple.Create("BeginLon", "System.Double"),
            Tuple.Create("EndLat", "System.Double"),
            Tuple.Create("EndLon", "System.Double"),
            Tuple.Create("EpisodeNarrative", "System.String"),
            Tuple.Create("EventNarrative", "System.String"),
            Tuple.Create("StormSummary", "System.Object"),
        }
    );
    await kustoClient.ExecuteControlCommandAsync(databaseName, command);
}

定义引入映射

将传入的 CSV 数据映射到用于创建表的列名上。 在该表上设置 CSV 列映射对象

var tableMappingName = "StormEvents_CSV_Mapping";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
    var command = CslCommandGenerator.GenerateTableMappingCreateCommand(
        IngestionMappingKind.Csv,
        tableName,
        tableMappingName,
        new ColumnMapping[]
        {
            new() { ColumnName = "StartTime", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "0" } } },
            new() { ColumnName = "EndTime", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "1" } } },
            new() { ColumnName = "EpisodeId", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "2" } } },
            new() { ColumnName = "EventId", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "3" } } },
            new() { ColumnName = "State", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "4" } } },
            new() { ColumnName = "EventType", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "5" } } },
            new() { ColumnName = "InjuriesDirect", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "6" } } },
            new() { ColumnName = "InjuriesIndirect", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "7" } } },
            new() { ColumnName = "DeathsDirect", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "8" } } },
            new() { ColumnName = "DeathsIndirect", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "9" } } },
            new() { ColumnName = "DamageProperty", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "10" } } },
            new() { ColumnName = "DamageCrops", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "11" } } },
            new() { ColumnName = "Source", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "12" } } },
            new() { ColumnName = "BeginLocation", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "13" } } },
            new() { ColumnName = "EndLocation", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "14" } } },
            new() { ColumnName = "BeginLat", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "15" } } },
            new() { ColumnName = "BeginLon", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "16" } } },
            new() { ColumnName = "EndLat", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "17" } } },
            new() { ColumnName = "EndLon", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "18" } } },
            new() { ColumnName = "EpisodeNarrative", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "19" } } },
            new() { ColumnName = "EventNarrative", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "20" } } },
            new() { ColumnName = "StormSummary", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "21" } } }
        }
    );
    
    await kustoClient.ExecuteControlCommandAsync(databaseName, command);
}

为表定义批处理策略

批处理传入数据可优化数据分片大小。 引入批处理策略控制此批处理。 使用 引入批处理策略管理命令修改策略。 此策略可降低缓慢到达数据的延迟。

using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
    var command = CslCommandGenerator.GenerateTableAlterIngestionBatchingPolicyCommand(
        databaseName,
        tableName,
        new IngestionBatchingPolicy(
            maximumBatchingTimeSpan: TimeSpan.FromSeconds(10),
            maximumNumberOfItems: 100,
            maximumRawDataSizeMB: 1024
        )
    );
    kustoClient.ExecuteControlCommand(command);
}

Raw Data Size定义引入数据的值,并逐渐将大小减小到 250 MB,以检查性能是否有所改善。

使用 Flush Immediately 属性跳过批处理。 但是,不建议进行大规模引入,因为它可能会导致性能不佳。

列入一条引入消息

将消息排入队列,从 Blob 存储中拉取数据并进行处理。 建立与引入群集的连接,并创建另一个客户端来处理该终结点。

提示

下面的代码段为几乎每个调用创建一个客户端实例。 这样做是为了使每个片段可单独运行。 在生产环境中,客户端实例是可重入的,应根据需要保留。 即使使用多个数据库(可以在命令级别指定数据库),每个 URI 一个客户端实例也已足够。

var ingestUri = "https://ingest-<clusterName>.<region>.kusto.chinacloudapi.cn"; // Replace <clusterName> and <region> with your values
var ingestConnectionStringBuilder = new KustoConnectionStringBuilder(ingestUri).WithAadUserPromptAuthentication(tenantId);
using var ingestClient = KustoIngestFactory.CreateQueuedIngestClient(ingestConnectionStringBuilder);
var properties = new KustoQueuedIngestionProperties(databaseName, tableName)
{
    Format = DataSourceFormat.csv,
    IngestionMapping = new IngestionMapping
    {
        IngestionMappingReference = tableMappingName,
        IngestionMappingKind = IngestionMappingKind.Csv
    },
    IgnoreFirstRecord = true
};
await ingestClient.IngestFromStorageAsync(blobPath, properties); // Ensure blobPath points to the correct storage location

验证表中的数据导入

请等 5 到 10 分钟,让排队的引入过程完成调度并将数据加载到群集中。 运行以下代码以获取 StormEvents 表中的记录数。

using var cslQueryProvider = KustoClientFactory.CreateCslQueryProvider(kustoConnectionStringBuilder);
var query = $"{tableName} | count";
var results = cslQueryProvider.ExecuteQuery<long>(databaseName, query);
Console.WriteLine(results.Single());

运行故障排除查询

登录到 https://dataexplorer.azure.cn 并连接到群集。 在数据库中运行以下命令,检查过去四小时内的引入失败。 在运行之前替换数据库名称。

.show ingestion failures
| where FailedOn > ago(4h) and Database == "<DatabaseName>"

运行以下命令,检查过去四小时内所有引入操作的状态。 在运行之前替换数据库名称。

.show operations
| where StartedOn > ago(4h) and Database == "<DatabaseName>" and Operation == "DataIngestPull"
| summarize arg_max(LastUpdatedOn, *) by OperationId

清理资源

如果打算关注其他文章,请保留所创建的资源。 否则,在数据库中运行以下命令以清除 StormEvents 表。

.drop table StormEvents