使用 “版本 ”下拉列表切换服务。 了解有关导航的详细信息。
适用于:✅ Azure Data Explorer
流式引入使你能够以近乎实时的延迟将数据写入 Kusto。 在需要将少量数据写入大量表格、导致批处理效率低下的情况下,这也很有用。
本文介绍如何使用托管流式引入客户端将数据引入到 Kusto。 以文件流或内存流的形式导入数据流。
注释
流式引入是一种高速引入协议。 使用 托管流式引入 或 流 式引入客户端引入与使用 流源引入的方式不同。
客户端类型是指数据摄取的方式。 使用 托管流式引入 或 流式引入 客户端时,可以使用流式引入协议将数据发送到 Kusto。 它使用 流式处理服务 来允许低延迟引入。
从流源进行数据引入指的是数据如何存储。 例如,在 C# 中,可以从对象创建流源MemoryStream。 这种方法不同于从磁盘文件创建的文件源。
引入方法取决于使用的客户端:对于排队引入,该过程首先将数据从源上传到 Blob 存储,然后将数据排入队列进行引入。 通过使用流式引入,进程直接将数据在流式 HTTP 请求的正文中发送到 Kusto。
重要
引入 API 现在有两个版本:V1 和 V2。 V1 API 是原始 API。 V2 API 是重新设计的版本,它简化了数据引入API,同时提供了更多的自定义功能。
摄取版本 2 目前为 预览 版,支持以下语言:C#
此外,查询 V2 API 与引入 V2 API 无关。
流媒体和托管流媒体
Kusto SDK 提供两种流式引入客户端: 流式引入客户端 和 托管流引入客户端。 托管流引入客户端包括内置的重试和故障转移逻辑。
注释
本文介绍如何使用 托管流式引入。 若要使用纯 流式引入 而不是 托管流式处理,请将实例化客户端类型更改为 流式引入客户端。
使用 托管流式引入 API 引入数据时,它会自动处理失败和重试,如下所示:
- 它将由于服务器端大小限制而失败的流媒体请求移动到队列导入。
- 它会将估计超过流式处理限制的数据自动发送到排队引入。
- 流式处理限制的大小取决于数据的格式和压缩。
- 可以通过在托管流引入策略中设置大小因子(在初始化中传递)来更改限制。
- 它会对暂时性的故障进行重试,例如限流,总共重试三次,然后将请求移动到排队引入。
- 它不会重试永久性故障。
注释
如果流式引入失败,并且系统将数据移到排队引入,则引入数据所需的时间更长,因为进程会批处理数据,并将数据排队以供引入。 可以使用 批处理策略来控制此延迟。
局限性
与用于引入的队列数据相比,数据流式处理存在一些限制。
- 无法对数据设置标记。
- 只能通过
ingestionMappingReference提供映射。 不支持内联映射。 - 请求中发送的有效负载不能超过 10 MB,而不考虑格式或压缩。
- 流式引入不支持此属性
ignoreFirstRecord,因此引入的数据不得包含标题行。
有关详细信息,请参阅 流式处理限制。
先决条件
具有数据库用户或更高权限的Fabric或Azure Data Explorer集群。 在 https://dataexplorer.azure.cn/freecluster 预配免费群集。
设置开发环境以使用 Kusto 客户端库。
在您开始之前
在创建应用之前,请完成以下步骤。 以下各节详细介绍了每个步骤。
- 在Azure Data Explorer群集上配置流式引入。
- 创建一个 Kusto 表以引入数据。
- 在表上启用流式引入策略。
- 下载包含 1,000 条 storm 事件记录的 stormevent.csv 示例数据文件。
配置流式数据接收
若要配置流式引入,请参阅配置Azure Data Explorer群集上的流式引入。 配置可能需要几分钟才能生效。 如果使用 Fabric 或免费群集,则会自动启用流式引入。
创建 Kusto 表
通过 Kusto Explorer(桌面)或 Kusto Web Explorer 在数据库上运行以下命令。
- 创建名为 Storm Events 的表
.create table MyStormEvents (StartTime:datetime, EndTime:datetime, State:string, DamageProperty:int, DamageCrops:int, Source:string, StormSummary:dynamic)
启用数据流引入策略
使用以下命令之一对表或整个数据库启用流式引入:
表级别:
.alter table <your table name> policy streamingingestion enable
数据库级别:
.alter database <databaseName> policy streamingingestion enable
策略生效可能需要长达两分钟的时间。
有关流式处理策略的详细信息,请参阅 流式引入策略。
创建基本客户端应用程序
创建连接到 Kusto 帮助群集的基本客户端应用程序。 在相关变量中输入群集查询并引入 URI 和数据库名称。 应用使用两个客户端:一个用于查询,一个用于引入。 每个客户端都会打开浏览器窗口以对用户进行身份验证。
该代码示例包含用于打印查询结果的服务函数 PrintResultAsValueList() 。
使用以下命令添加 Kusto 库:
dotnet add package Microsoft.Azure.Kusto.Data
dotnet add package Microsoft.Azure.Kusto.ingest
using System.Data;
using Kusto.Data;
using Kusto.Data.Common;
using Kusto.Data.Net.Client;
using Kusto.Ingest;
using Kusto.Ingest.Common;
using Azure.Identity;
namespace BatchIngest;
class BatchIngest
{
static async Task Main()
{
var tokenCredential = new InteractiveBrowserCredential();
var clusterUri = "<your cluster>"; // e.g., "https://<your_cluster_name>.<region>.kusto.chinacloudapi.cn"
var clusterKcsb = new KustoConnectionStringBuilder(clusterUri).WithAadAzureTokenCredentialsAuthentication(tokenCredential);
using var kustoClient = KustoClientFactory.CreateCslQueryProvider(clusterKcsb);
var database = "<your database>";
var table = "MyStormEvents";
var query = table + " | count";
using (var response = await kustoClient.ExecuteQueryAsync(database, query, null))
{
Console.WriteLine("\nNumber of rows in " + table + " BEFORE ingestion:");
PrintResultsAsValueList(response);
}
}
static void PrintResultsAsValueList(IDataReader response)
{
while (response.Read())
{
for (var i = 0; i < response.FieldCount; i++)
{
object val = response.GetValue(i);
string value = val.ToString() ?? "None";
Console.WriteLine("\t{0} - {1}", response.GetName(i), value);
}
}
}
}
流式传输用于导入的文件
使用该方法 IngestFromStorageAsync 引入 stormevents.csv 文件。
将stormevents.csv 文件复制到脚本所在的同一位置。 由于我们的输入是 CSV 文件,因此 Format = DataSourceFormat.csv 在引入属性中使用。
在Main()的末尾使用以下行添加添加和数据引入部分。
using var ingestClient = KustoIngestFactory.CreateManagedStreamingIngestClient(clusterKcsb);
var ingestProperties = new KustoIngestionProperties(databaseName, tableName)
{
Format = DataSourceFormat.csv
};
//Ingestion section
Console.WriteLine("Ingesting data from a file");
await ingestClient.IngestFromStorageAsync(".\\stormevents.csv", ingestProperties);
此外,让我们查询导入后的新行数和最新的一行。 在引入命令后面添加以下行:
Console.WriteLine("Number of rows in " + tableName);
result = await kustoClient.ExecuteQueryAsync(databaseName, tableName + " | count", new ClientRequestProperties());
PrintResultAsValueList(result);
Console.WriteLine("Example line from " + tableName);
result = await kustoClient.ExecuteQueryAsync(databaseName, tableName + " | top 1 by EndTime", new ClientRequestProperties());
PrintResultAsValueList(result);
该代码示例包含用于打印查询结果的服务函数 PrintResultAsValueList() 。
使用以下命令添加 Kusto 库:
dotnet add package Microsoft.Azure.Kusto.Data
dotnet add package Microsoft.Azure.Kusto.ingest.V2
using System.Data;
using Kusto.Data;
using Kusto.Data.Common;
using Kusto.Data.Net.Client;
using Kusto.Ingest.V2;
using Kusto.Ingest.Common;
using Azure.Identity;
namespace BatchIngest;
class BatchIngest
{
static async Task Main()
{
var tokenCredential = new InteractiveBrowserCredential();
var clusterUri = "<your cluster>"; // e.g., "https://<your_cluster_name>.<region>.kusto.chinacloudapi.cn"
var clusterKcsb = new KustoConnectionStringBuilder(clusterUri).WithAadAzureTokenCredentialsAuthentication(tokenCredential);
var database = "<your database>";
var table = "MyStormEvents";
var query = table + " | count";
using var kustoClient = KustoClientFactory.CreateCslQueryProvider(clusterKcsb);
using (var response = await kustoClient.ExecuteQueryAsync(database, query, null))
{
Console.WriteLine("\nNumber of rows in " + table + " BEFORE ingestion:");
PrintResultsAsValueList(response);
}
}
static void PrintResultsAsValueList(IDataReader response)
{
while (response.Read())
{
for (var i = 0; i < response.FieldCount; i++)
{
object val = response.GetValue(i);
string value = val.ToString() ?? "None";
Console.WriteLine("\t{0} - {1}", response.GetName(i), value);
}
}
}
}
流式传输用于导入的文件
使用该方法 IngestAsync 引入 stormevents.csv 文件。
将stormevents.csv 文件复制到脚本所在的同一位置。 由于我们的输入是 CSV 文件,因此在 DataSourceFormat.csv 中使用 FileSource 作为格式。
在Main()的末尾使用以下行添加添加和数据引入部分。
using var ingestClient = ManagedStreamingIngestClientBuilder.Create(new Uri(clusterUri)).WithAuthentication(tokenCredential).Build();
var fileSource = new FileSource(.\\stormevents.csv, DataSourceFormat.csv);
await ingestClient.IngestAsync(fileSource, database, table);
此外,让我们查询导入后的新行数和最新的一行。 在引入命令后面添加以下行:
Console.WriteLine("Number of rows in " + tableName);
result = await kustoClient.ExecuteQueryAsync(databaseName, tableName + " | count", new ClientRequestProperties());
PrintResultAsValueList(result);
Console.WriteLine("Example line from " + tableName);
result = await kustoClient.ExecuteQueryAsync(databaseName, tableName + " | top 1 by EndTime", new ClientRequestProperties());
PrintResultAsValueList(result);
首次运行应用程序时,会看到以下结果:
Number of rows in MyStormEvents
row 1 :
Count - 0
Ingesting data from a file
New number of rows in MyStormEvents
row 1 :
Count - 1000
Example line from MyStormEvents
row 1 :
StartTime - 2007-12-31 11:15:00+00:00
EndTime - 2007-12-31 13:21:00+00:00
State - HAWAII
DamageProperty - 0
DamageCrops - 0
Source - COOP Observer
StormSummary - {'TotalDamages': 0, 'StartTime': '2007-12-31T11:15:00.0000000Z', 'EndTime': '2007-12-31T13:21:00.0000000Z', 'Details': {'Description': 'Heavy showers caused flash flooding in the eastern part of Molokai. Water was running over the bridge at Halawa Valley.', 'Location': 'HAWAII'}}
流式传输用于引入的内存中数据
若要从内存中引入数据,请创建包含要引入的数据的流。
若要从内存中引入流,请调用该方法 IngestFromStreamAsync() 。
将引入部分替换为以下代码:
// Ingestion section
Console.WriteLine("Ingesting data from memory");
var singleLine = "2018-01-26 00:00:00.0000000,2018-01-27 14:00:00.0000000,MEXICO,0,0,Unknown,'{}'";
byte[] byteArray = Encoding.UTF8.GetBytes(singleLine);
using var ingestClient = KustoIngestFactory.CreateManagedStreamingIngestClient(clusterKcsb);
using var stream = new MemoryStream(byteArray);
var streamSourceOptions = new StreamSourceOptions
{
LeaveOpen = false
};
await ingestClient.IngestFromStreamAsync(stream, ingestProperties, streamSourceOptions);
// Ingestion section
Console.WriteLine("Ingesting data from memory");
var singleLine = "2018-01-26 00:00:00.0000000,2018-01-27 14:00:00.0000000,MEXICO,0,0,Unknown,'{}'";
byte[] byteArray = Encoding.UTF8.GetBytes(singleLine);
using var ingestClient = QueuedIngestClientBuilder.Create(new Uri(clusterUri)).WithAuthentication(tokenCredential).Build();
using var ingestClient = KustoIngestFactory.CreateManagedStreamingIngestClient(clusterKcsb);
using var stream = new MemoryStream(byteArray);
var streamSource = new StreamSource(.\\stormevents.csv, DataSourceCompressionType.None, DataSourceFormat.csv);
await ingestClient.IngestAsync(streamSource, database, table);
结果如下所示:
Number of rows in MyStormEvents
row 1 :
Count - 1000
Ingesting data from memory
New number of rows in MyStormEvents
row 1 :
Count - 1001
Example line from MyStormEvents
row 1 :
StartTime - 2018-01-26 00:00:00+00:00
EndTime - 2018-01-27 14:00:00+00:00
State - MEXICO
DamageProperty - 0
DamageCrops - 0
Source - Unknown
StormSummary - {}