创建可使用排队引入功能获取数据的应用

适用于:✅Azure 数据资源管理器

Kusto 能够通过批处理管理器优化和批处理引入的数据,从而处理大量数据引入。 批处理管理器会在引入的数据到达其目标表之前聚合数据,从而实现更高效的处理和改进的性能。 批处理通常以 1 GB 的原始数据、1000 个单独文件或默认 5 分钟的处理时间为单位完成。 可以在数据库和表级别更新批处理策略,这通常可缩短批处理时间并减少延迟。 有关引入批处理的详细信息,请参阅 IngestionBatching 策略以编程方式更改表级别引入批处理策略

注意

批处理还会考虑各种因素,例如目标数据库和表、运行引入的用户以及与引入关联的各种属性(例如特殊标记)。

在本文中,学习如何:

重要

引入 API 现在有两个版本:V1 和 V2。 V1 API 是原始 API,而 V2 API 是一个重新映像的版本,可在提供更多自定义的同时简化引入 API。

引入版本 2 为 预览 版,提供以下语言: C#

先决条件

开始之前

  • 使用以下方法之一创建 MyStormEvents 表,并且由于仅引入少量数据,因此将其引入批处理策略超时设置为 10 秒:

    1. 通过在管理命令中运行第一个应用,在数据库中创建名为 MyStormEvents 的目标表。
    2. 通过在管理命令中运行第二个应用,将引入批处理策略超时设置为 10 秒。 在运行应用之前,请将超时值更改为 00:00:10
  • 下载 stormevent.csv 示例数据文件。 该文件包含 1000 条风暴事件记录。

    注意

    以下示例假定引入数据的列与目标表的架构之间存在简单匹配。

    如果引入的数据与表架构不匹配,则必须使用引入映射使数据的列与表架构保持一致。

将文件排队以供引入和查询结果

在首选 IDE 或文本编辑器中,使用适合首选语言的约定创建名为“基本引入”的项目或文件。 将 stormevent.csv 文件放置在与应用相同的位置。

注意

在以下示例中,会使用两个客户端,一个用于查询群集,另一个用于将数据引入群集。 对于客户端库支持的语言,由于两个客户端共享同一个用户提示验证器,因此会生成单个用户提示,而不是每个客户端一个用户提示。

添加以下代码:

  1. 创建连接到群集并打印 MyStormEvents 表中的行数的客户端应用。 你将使用此计数作为基线,以便与每个引入方法后的行数进行比较。 将 <your_cluster_uri><your_database> 占位符分别替换为群集 URI 和数据库名称。

    using System.Data;
    
    using Kusto.Data;
    using Kusto.Data.Common;
    using Kusto.Data.Net.Client;
    
    using Azure.Identity;
    
    namespace BatchIngest;
    
    class BatchIngest
    {
      static async Task Main()
      {
        var tokenCredential = new InteractiveBrowserCredential();
        var clusterUri = "<your_cluster_uri>"; // e.g., "https://<your_cluster_name>.<region>.kusto.chinacloudapi.cn"
        var clusterKcsb = new KustoConnectionStringBuilder(clusterUri).WithAadAzureTokenCredentialsAuthentication(tokenCredential);
    
        using var kustoClient = KustoClientFactory.CreateCslQueryProvider(clusterKcsb);
    
        var database = "<your_database>";
        var table = "MyStormEvents";
    
        var query = table + " | count";
    
        using (var response = await kustoClient.ExecuteQueryAsync(database, query, null))
        {
            Console.WriteLine("\nNumber of rows in " + table + " BEFORE ingestion:");
            PrintResultsAsValueList(response);
        }
      }
    
    static void PrintResultsAsValueList(IDataReader response)
    {
      while (response.Read())
      {
          for (var i = 0; i < response.FieldCount; i++)
          {
              object val = response.GetValue(i);
              string value = val.ToString() ?? "None";
              Console.WriteLine("\t{0} - {1}", response.GetName(i), value);
          }
      }
      }
    }
    
  2. 在可能的情况下,使用与群集 URI 共享相同的身份验证凭据创建定义数据引入 URI 的连接字符串生成器对象。 将 <your_ingestion_uri> 占位符替换为数据引入 URI。

    using Kusto.Ingest; // Add this import
    
    // No need to use a different connection string builder - the ingestion client can auto-correct to the ingestion URI
    
    using Kusto.Ingest.V2; // Add this import
    
    // No need to use a different connection string builder - the ingestion client can auto-correct to the ingestion URI
    
  3. 通过将 stormevent.csv 文件添加到批处理队列来引入该文件。

    你将使用以下对象和属性:

    • QueuedIngestClient 创建引入客户端。

    • IngestionProperties 设置引入属性。

    • DataFormat 将文件格式指定为 CSV。

    • ignore_first_record 如果指定 CSV 中的第一行和类似的文件类型是否被忽略,请使用以下逻辑:

      • True:忽略第一行。 使用此选项可从表格文本数据中删除标题行。
      • False:第一行作为常规行引入。

      注意

      引入支持的最大文件大小为 6 GB。 建议引入 100 MB 到 1 GB 的文件。

    using var ingestClient = KustoIngestFactory.CreateQueuedIngestClient(clusterKcsb);
    
    string filePath = Path.Combine(Directory.GetCurrentDirectory(), "stormevents.csv");
    
    Console.WriteLine("\nIngesting data from file: \n\t " + filePath);
    var ingestProps = new KustoIngestionProperties(database, table) {
      Format = DataSourceFormat.csv,
      AdditionalProperties = new Dictionary<string, string>() {{ "ignoreFirstRecord", "True" }}
    };
    await ingestClient.IngestFromStorageAsync(filePath, ingestProps);
    

    你将使用以下对象和属性:

    • QueuedIngestClientBuilder 创建引入客户端。
    • IngestProperties 在大多数情况下是可选的,但此处用于设置 IgnoreFirstRecord
    • DataFormat 将文件格式指定为 DataSourceFormat.csv.
    • IgnoreFirstRecord 如果指定 CSV 中的第一行和类似的文件类型是否被忽略,请使用以下逻辑:
      • True:忽略第一行。 使用此选项可从表格文本数据中删除标题行。
      • False:第一行作为常规行引入。

    注意

    引入支持的最大文件大小为 6 GB。 建议引入 100 MB 到 1 GB 的文件。

    using var ingestClient = QueuedIngestClientBuilder.Create(new Uri(clusterUri)).WithAuthentication(tokenCredential).Build();
    
    string filePath = Path.Combine(Directory.GetCurrentDirectory(), "stormevents.csv");
    
    var fileSource = new FileSource(filePath, DataSourceFormat.csv);
    var props = new IngestProperties() { IgnoreFirstRecord = true };
    
    Console.WriteLine("\nIngesting data from file: \n\t " + filePath);
    
    await ingestClient.IngestAsync(fileSource, database, table, props);
    
  4. 引入文件后查询表中的行数,并显示引入的最后一行。

    注意

    为了让引入完成,请等待 30 秒,再查询表。 对于 C#,请等待 60 秒,以便有时间以异步方式将文件添加到引入队列。

    Console.WriteLine("\nWaiting 60 seconds for ingestion to complete ...");
    await Task.Delay(TimeSpan.FromSeconds(60));
    
    using (var response = await kustoClient.ExecuteQueryAsync(database, query, null)) {
      Console.WriteLine("\nNumber of rows in " + table + " AFTER ingesting the file:");
      PrintResultsAsValueList(response);
    }
    
    query = table + " | top 1 by ingestion_time()";
    using (var response = await kustoClient.ExecuteQueryAsync(database, query, null)) {
      Console.WriteLine("\nLast ingested row:");
      PrintResultsAsValueList(response);
    }
    

完整代码应如下所示:

using System.Data;

using Kusto.Data;
using Kusto.Data.Common;
using Kusto.Data.Net.Client;
using Kusto.Ingest;

namespace BatchIngest;

class BatchIngest
{
    static async Task Main()
    {
        var clusterUri = "<your cluster>"; // e.g., "https://<your_cluster_name>.<region>.kusto.chinacloudapi.cn"
        var clusterKcsb = new KustoConnectionStringBuilder(clusterUri).WithAadUserPromptAuthentication();

        using var kustoClient = KustoClientFactory.CreateCslQueryProvider(clusterKcsb);

        var database = "<your database>";
        var table = "MyStormEvents";

        var query = table + " | count";

        using (var response = await kustoClient.ExecuteQueryAsync(database, query, null))
        {
            Console.WriteLine("\nNumber of rows in " + table + " BEFORE ingestion:");
            PrintResultsAsValueList(response);
        }

        using var ingestClient = KustoIngestFactory.CreateQueuedIngestClient(clusterKcsb);

        string filePath = Path.Combine(Directory.GetCurrentDirectory(), "stormevents.csv");

        Console.WriteLine("\nIngesting data from file: \n\t " + filePath);
        var ingestProps = new KustoIngestionProperties(database, table) {
            Format = DataSourceFormat.csv,
            AdditionalProperties = new Dictionary<string, string>() {{ "ignoreFirstRecord", "True" }}
        };
        await ingestClient.IngestFromStorageAsync(filePath, ingestProps);

        Console.WriteLine("\nWaiting 60 seconds for ingestion to complete ...");
        await Task.Delay(TimeSpan.FromSeconds(60));

        using (var response = await kustoClient.ExecuteQueryAsync(database, query, null)) {
            Console.WriteLine("\nNumber of rows in " + table + " AFTER ingesting the file:");
            PrintResultsAsValueList(response);
        }

        query = table + " | top 1 by ingestion_time()";
        using (var response = await kustoClient.ExecuteQueryAsync(database, query, null)) {
            Console.WriteLine("\nLast ingested row:");
            PrintResultsAsValueList(response);
        }
    }

    static void PrintResultsAsValueList(IDataReader response)
    {
        while (response.Read())
        {
            for (var i = 0; i < response.FieldCount; i++)
            {
                object val = response.GetValue(i);
                string value = val.ToString() ?? "None";
                Console.WriteLine("\t{0} - {1}", response.GetName(i), value);
            }
        }
    }
}
using System.Data;
using Azure.Identity;
using Kusto.Data;
using Kusto.Data.Common;
using Kusto.Data.Net.Client;
using Kusto.Ingest.V2;

namespace BatchIngest;

class BatchIngest
{
   static async Task Main()
   {
       var tokenCredential = new InteractiveBrowserCredential();
       var clusterUri = "<your_cluster_uri>"; // e.g., "https://<your_cluster_name>.<region>.kusto.chinacloudapi.cn"
       var clusterKcsb = new KustoConnectionStringBuilder(clusterUri).WithAadAzureTokenCredentialsAuthentication(tokenCredential);

       using var kustoClient = KustoClientFactory.CreateCslQueryProvider(clusterKcsb);

       var database = "<your_database>";
       var table = "MyStormEvents";

       var query = table + " | count";

       using (var response = await kustoClient.ExecuteQueryAsync(database, query, null))
       {
           Console.WriteLine("\nNumber of rows in " + table + " BEFORE ingestion:");
           PrintResultsAsValueList(response);
       }

       using var ingestClient = QueuedIngestClientBuilder.Create(new Uri(clusterUri)).WithAuthentication(tokenCredential).Build();

       string filePath = Path.Combine(Directory.GetCurrentDirectory(), "stormevents.csv");

       var fileSource = new FileSource(filePath, DataSourceFormat.csv);
       var props = new IngestProperties() { IgnoreFirstRecord = true };

       Console.WriteLine("\nIngesting data from file: \n\t " + filePath);

       await ingestClient.IngestAsync(fileSource, database, table, props);

       Console.WriteLine("\nWaiting 60 seconds for ingestion to complete ...");
       await Task.Delay(TimeSpan.FromSeconds(60));

       using (var response = await kustoClient.ExecuteQueryAsync(database, query, null)) {
           Console.WriteLine("\nNumber of rows in " + table + " AFTER ingesting the file:");
           PrintResultsAsValueList(response);
       }

       query = table + " | top 1 by ingestion_time()";
       using (var response = await kustoClient.ExecuteQueryAsync(database, query, null)) {
           Console.WriteLine("\nLast ingested row:");
           PrintResultsAsValueList(response);
       }
   }

   static void PrintResultsAsValueList(IDataReader response)
   {
       while (response.Read())
       {
           for (var i = 0; i < response.FieldCount; i++)
           {
               object val = response.GetValue(i);
               string value = val.ToString() ?? "None";
               Console.WriteLine("\t{0} - {1}", response.GetName(i), value);
           }
       }
   }
}

运行应用

在命令行界面中,使用以下命令运行应用:

# Change directory to the folder that contains the management commands project
dotnet run .

你应会看到如下所示的结果:

Number of rows in MyStormEvents BEFORE ingestion:
         Count - 0

Ingesting data from file: 
        C:\MyApp\stormevents.csv

Waiting 30 seconds for ingestion to complete

Number of rows in MyStormEvents AFTER ingesting the file:
         Count - 1000

Last ingested row:
         StartTime - 2018-01-26 00:00:00+00:00
         EndTime - 2018-01-27 14:00:00+00:00
         State - MEXICO
         DamageProperty - 0
         DamageCrops - 0
         Source - Unknown
         StormSummary - {}

将内存中数据排队以用于引入和查询结果

可以通过创建包含数据的流,然后排队进行引入,以从内存中引入数据。

例如,可以修改应用以替换 从文件引入 代码,如下所示:

  1. 将流描述符包添加到文件顶部的导入。

    不需要任何其他包。

  2. 添加包含要引入的数据的内存中字符串。

    string singleLine = "2018-01-26 00:00:00.0000000,2018-01-27 14:00:00.0000000,MEXICO,0,0,Unknown,\"{}\"";
    var stringStream = new MemoryStream(System.Text.Encoding.UTF8.GetBytes(singleLine));
    
  3. 将引入属性设置为不忽略第一条记录,因为内存中字符串没有标题行。

    ingestProps.AdditionalProperties = new Dictionary<string, string>() {{ "ignoreFirstRecord", "False" }};
    
    // Remove the IngestionProperties object `props`
    
  4. 通过将内存中数据添加到批处理队列来引入数据。 如果可能,请提供原始数据的大小。

  5. _= await ingestClient.IngestFromStreamAsync(stringStream, ingestProps, new StreamSourceOptions {Size = stringStream.Length});
    
        var streamSource = new StreamSource(stringStream, DataSourceCompressionType.None, DataSourceFormat.csv);
    
        await ingestClient.IngestAsync(streamSource, database, table);
    

更新代码的概述应如下所示:

using System.Data;
using Azure.Identity;
using Kusto.Data;
using Kusto.Data.Common;
using Kusto.Data.Net.Client;
using Kusto.Ingest;

namespace BatchIngest;

class BatchIngest
{
    static async Task Main()
    {
        string singleLine = "2018-01-26 00:00:00.0000000,2018-01-27 14:00:00.0000000,MEXICO,0,0,Unknown,\"{}\"";
        var stringStream = new MemoryStream(System.Text.Encoding.UTF8.GetBytes(singleLine));
        ...

        _= await ingestClient.IngestFromStreamAsync(stringStream, ingestProps, new StreamSourceOptions {Size = stringStream.Length});
        ...
    }

    static void PrintResultsAsValueList(IDataReader response)
    {
        ...
    }
}
using System.Data;
using Azure.Identity;
using Kusto.Data;
using Kusto.Data.Common;
using Kusto.Data.Net.Client;
using Kusto.Ingest.V2;

namespace BatchIngest;

class BatchIngest
{
    static async Task Main()
    {
        string singleLine = "2018-01-26 00:00:00.0000000,2018-01-27 14:00:00.0000000,MEXICO,0,0,Unknown,\"{}\"";
        var stringStream = new MemoryStream(System.Text.Encoding.UTF8.GetBytes(singleLine));
        ...

        var streamSource = new StreamSource(stringStream, DataSourceCompressionType.None, DataSourceFormat.csv);

        await ingestClient.IngestAsync(streamSource, database, table);

        ...
    }

    static void PrintResultsAsValueList(IDataReader response)
    {
        ...
    }
}

运行应用时,应看到类似于以下所示的结果。 请注意,引入后,表中的行数增加了 1。

Number of rows in MyStormEvents BEFORE ingestion:
         Count - 1000

Ingesting data from memory:

Waiting 30 seconds for ingestion to complete ...

Number of rows in MyStormEvents AFTER ingesting from memory:
         Count - 1001

Last ingested row:
         StartTime - 2018-01-26 00:00:00+00:00
         EndTime - 2018-01-27 14:00:00+00:00
         State - MEXICO
         DamageProperty - 0
         DamageCrops - 0
         Source - Unknown
         StormSummary - {}

将 blob 排队以用于引入和查询结果

可以从 Azure 存储 Blob、Azure Data Lake 文件和 Amazon S3 文件引入数据。

例如,可以通过将 从内存引入 代码替换为以下内容,修改应用:

  1. 首先将 stormevent.csv 文件上传到存储帐户,并生成具有读取权限的 URI,例如,使用 Azure Blob 的 SAS 令牌

  2. 将 Blob 描述符包添加到文件顶部的导入。

    不需要任何其他包。

  3. 使用 Blob URI 创建 Blob 描述符,设置引入属性,然后从 Blob 引入数据。 将 <your_blob_uri> 占位符替换为 blob URI。

    string blobUri = "<your_blob_uri>";
    
    ingestProps.AdditionalProperties = new Dictionary<string, string>() { { "ignoreFirstRecord", "True" } };
    _= ingestClient.IngestFromStorageAsync(blobUri, ingestProps).Result;
    
    var blobSource = new BlobSource("<your_blob_uri", DataSourceFormat.csv);
    
    await ingestClient.IngestAsync(blobSource, database, table);
    

更新代码的概述应如下所示:

using Kusto.Data;
using Kusto.Data.Net.Client;
using Kusto.Data.Common;
using Kusto.Ingest;
using System.Data;

namespace BatchIngest;

class BatchIngest
{
    static async Task Main()
    {
        string blobUri = "<your_blob_uri>";
        ...

        Console.WriteLine("\nIngesting data from memory:");
        ingestProps.AdditionalProperties = new Dictionary<string, string>() { { "ignoreFirstRecord", "True" } };
        await ingestClient.IngestFromStorageAsync(blobUri, ingestProps);

        ...
    }

    static void PrintResultsAsValueList(IDataReader response)
    {
        ...
    }
}
using Kusto.Data;
using Kusto.Data.Net.Client;
using Kusto.Data.Common;
using Kusto.Ingest;
using System.Data;

namespace BatchIngest;

class BatchIngest
{
    static async Task Main()
    {
        string blobUri = "<your_blob_uri>";
        ...

        Console.WriteLine("\nIngesting data from memory:");
        var blobSource = new BlobSource("<your_blob_uri", DataSourceFormat.csv);
        var props = new IngestProperties() { IgnoreFirstRecord = true };

        await ingestClient.IngestAsync(blobSource, database, table, props);

        ...
    }

    static void PrintResultsAsValueList(IDataReader response)
    {
        ...
    }
}

运行应用时,应看到类似于以下所示的结果。 请注意,引入后,表中的行数增加了 1000。

Number of rows in MyStormEvents BEFORE ingestion:
         Count - 1001

Ingesting data from a blob:

Waiting 30 seconds for ingestion to complete ...

Number of rows in MyStormEvents AFTER ingesting from a blob:
         Count - 2001

Last ingested row:
         StartTime - 2018-01-26 00:00:00+00:00
         EndTime - 2018-01-27 14:00:00+00:00
         State - MEXICO
         DamageProperty - 0
         DamageCrops - 0
         Source - Unknown
         StormSummary - {}

下一步