本文内容
适用于:✅Azure 数据资源管理器
Kusto.Ingest 库是将数据引入数据库的首选库。 不过,你还可以在不依赖于 Kusto.Ingest 包的情况下实现几乎相同的功能。 本文介绍如何将 Queued Ingestion 用于生产级管道的数据库。
备注
下面的代码是用 C# 编写的,并使用 Azure 存储 SDK、Microsoft 身份验证库 (MSAL) 和 NewtonSoft.JSON 包来简化示例代码。 如果需要,可以将相应的代码替换为适当的 Azure 存储 REST API 调用、非 .NET MSAL 包和任何可用的 JSON 处理包。
本文讲述的是建议的引入模式。 对于 Kusto.Ingest 库,其相应的实体是 IKustoQueuedIngestClient 接口。 在此处,客户端代码通过将引入通知消息发布到 Azure 队列来与数据库交互。 对消息的引用是从 Kusto 数据管理(也称为引入)服务获取的。 与该服务的交互必须使用 Microsoft Entra ID 进行身份验证。
下面的代码展示了 Kusto 数据管理服务如何在不使用 Kusto.Ingest 库的情况下处理排队数据引入。 如果完整的 .NET 由于环境或其他限制而无法访问或不可用,则此示例可能会很有用。
此代码包括了创建 Azure 存储客户端并将数据上传到 blob 的步骤。 在示例代码之后更详细地介绍了每个步骤。
- 获取用于访问引入服务的身份验证令牌
- 查询引入服务以获取:
- 将数据上传到在第 (2) 步从 Kusto 获取的某个 blob 容器中的 blob
- 撰写一条引入消息,用于标识目标数据库和表并指向第 (3) 步中的 blob
- 将我们在 (4) 中撰写的引入消息发布到在 (2) 中获取的引入队列
- 检索服务在引入过程中发现的任何错误
// A container class for ingestion resources we are going to obtain
internal class IngestionResourcesSnapshot
{
public IList<string> IngestionQueues { get; set; } = new List<string>();
public IList<string> TempStorageContainers { get; set; } = new List<string>();
public string FailureNotificationsQueue { get; set; } = string.Empty;
public string SuccessNotificationsQueue { get; set; } = string.Empty;
}
public static void IngestSingleFile(string file, string db, string table, string ingestionMappingRef)
{
// Your ingestion service URI
var dmServiceBaseUri = @"{serviceURI}";
// 1. Authenticate the interactive user (or application) to access Kusto ingestion service
var bearerToken = AuthenticateInteractiveUser(dmServiceBaseUri);
// 2a. Retrieve ingestion resources
var ingestionResources = RetrieveIngestionResources(dmServiceBaseUri, bearerToken);
// 2b. Retrieve Kusto identity token
var identityToken = RetrieveKustoIdentityToken(dmServiceBaseUri, bearerToken);
// 3. Upload file to one of the blob containers.
// This example uses the first one, but when working with multiple blobs,
// one should round-robin the containers in order to prevent throttling
var blobName = $"TestData{DateTime.UtcNow:yyyy-MM-dd_HH-mm-ss.FFF}";
var blobUriWithSas = UploadFileToBlobContainer(
file, ingestionResources.TempStorageContainers.First(), blobName,
out var blobSizeBytes
);
// 4. Compose ingestion command
var ingestionMessage = PrepareIngestionMessage(db, table, blobUriWithSas, blobSizeBytes, ingestionMappingRef, identityToken);
// 5. Post ingestion command to one of the previously obtained ingestion queues.
// This example uses the first one, but when working with multiple blobs,
// one should round-robin the queues in order to prevent throttling
PostMessageToQueue(ingestionResources.IngestionQueues.First(), ingestionMessage);
Thread.Sleep(20000);
// 6a. Read success notifications
var successes = PopTopMessagesFromQueue(ingestionResources.SuccessNotificationsQueue, 32);
foreach (var sm in successes)
{
Console.WriteLine($"Ingestion completed: {sm}");
}
// 6b. Read failure notifications
var errors = PopTopMessagesFromQueue(ingestionResources.FailureNotificationsQueue, 32);
foreach (var em in errors)
{
Console.WriteLine($"Ingestion error: {em}");
}
}
在这里,我们使用 Microsoft 身份验证库 (MSAL) 来获取 Microsoft Entra 令牌以访问 Kusto 数据管理服务并请求其输入队列。 MSAL 在多个平台上可用。
// Authenticates the interactive user and retrieves Microsoft Entra Access token for specified resource
internal static string AuthenticateInteractiveUser(string resource)
{
// Create an authentication client for Microsoft Entra ID:
var authClient = PublicClientApplicationBuilder.Create("<appId>")
.WithAuthority("https://login.partner.microsoftonline.cn/<appTenant>")
.WithRedirectUri("<appRedirectUri>")
.Build();
// Acquire user token for the interactive user:
var result = authClient.AcquireTokenInteractive(
new[] { $"{resource}/.default" } // Define scopes
).ExecuteAsync().Result;
return result.AccessToken;
}
手动构造针对数据管理服务的 HTTP POST 请求,用以请求返回引入资源。 这些资源包括 DM 服务正在侦听的队列,以及用于数据上传的 blob 容器。 数据管理服务将处理其包含的引入请求已到达这些队列之一的任何消息。
// Retrieve ingestion resources (queues and blob containers) with SAS from specified ingestion service using supplied access token
internal static IngestionResourcesSnapshot RetrieveIngestionResources(string ingestClusterBaseUri, string accessToken)
{
var ingestClusterUri = $"{ingestClusterBaseUri}/v1/rest/mgmt";
var requestBody = "{ \"csl\": \".get ingestion resources\" }";
var ingestionResources = new IngestionResourcesSnapshot();
using var response = SendPostRequest(ingestClusterUri, accessToken, requestBody);
using var sr = new StreamReader(response.GetResponseStream());
using var jtr = new JsonTextReader(sr);
var responseJson = JObject.Load(jtr);
// Input queues
var tokens = responseJson.SelectTokens("Tables[0].Rows[?(@.[0] == 'SecuredReadyForAggregationQueue')]");
foreach (var token in tokens)
{
ingestionResources.IngestionQueues.Add((string)token[1]);
}
// Temp storage containers
tokens = responseJson.SelectTokens("Tables[0].Rows[?(@.[0] == 'TempStorage')]");
foreach (var token in tokens)
{
ingestionResources.TempStorageContainers.Add((string)token[1]);
}
// Failure notifications queue
var singleToken =
responseJson.SelectTokens("Tables[0].Rows[?(@.[0] == 'FailedIngestionsQueue')].[1]").FirstOrDefault();
ingestionResources.FailureNotificationsQueue = (string)singleToken;
// Success notifications queue
singleToken =
responseJson.SelectTokens("Tables[0].Rows[?(@.[0] == 'SuccessfulIngestionsQueue')].[1]").FirstOrDefault();
ingestionResources.SuccessNotificationsQueue = (string)singleToken;
return ingestionResources;
}
// Executes a POST request on provided URI using supplied Access token and request body
internal static WebResponse SendPostRequest(string uriString, string authToken, string body)
{
var request = WebRequest.Create(uriString);
request.Method = "POST";
request.ContentType = "application/json";
request.ContentLength = body.Length;
request.Headers.Set(HttpRequestHeader.Authorization, $"Bearer {authToken}");
using var bodyStream = request.GetRequestStream();
using (var sw = new StreamWriter(bodyStream))
{
sw.Write(body);
sw.Flush();
}
bodyStream.Close();
return request.GetResponse();
}
引入消息通过非直接通道(Azure 队列)转发到群集,因此无法进行带内授权验证以访问引入服务。 解决方案是向每个引入消息附加一个标识令牌。 此令牌将启用带内授权验证。 然后,引入服务就可以在收到引入消息时验证这个已签名的令牌。
// Retrieves a Kusto identity token that will be added to every ingest message
internal static string RetrieveKustoIdentityToken(string ingestClusterBaseUri, string accessToken)
{
var ingestClusterUri = $"{ingestClusterBaseUri}/v1/rest/mgmt";
var requestBody = "{ \"csl\": \".get kusto identity token\" }";
var jsonPath = "Tables[0].Rows[*].[0]";
using var response = SendPostRequest(ingestClusterUri, accessToken, requestBody);
using var sr = new StreamReader(response.GetResponseStream());
using var jtr = new JsonTextReader(sr);
var responseJson = JObject.Load(jtr);
var identityToken = responseJson.SelectTokens(jsonPath).FirstOrDefault();
return (string)identityToken;
}
此步骤介绍了如何将本地文件上传到 Azure Blob,以便交付该文件进行引入。 此代码使用了 Azure 存储 SDK。 如果无法使用依赖项,则可以通过 Azure Blob 服务 REST API 实现此目的。
// Uploads a single local file to an Azure Blob container, returns blob URI and original data size
internal static string UploadFileToBlobContainer(string filePath, string blobContainerUri, string blobName, out long blobSize)
{
var blobUri = new Uri(blobContainerUri);
var blobContainer = new BlobContainerClient(blobUri);
var blob = blobContainer.GetBlobClient(blobName);
using (var stream = File.OpenRead(filePath))
{
blob.UploadAsync(BinaryData.FromStream(stream));
blobSize = blob.GetProperties().Value.ContentLength;
}
return $"{blob.Uri.AbsoluteUri}{blobUri.Query}";
}
NewtonSoft.JSON 包将再次撰写有效的引入请求,该请求用于标识目标数据库和表并指向 blob。 该消息将发布到相关 Kusto 数据管理服务正在侦听的 Azure 队列。
下面是需要注意的一些要点。
- 此请求是引入消息的最小量。
备注
标识令牌是必需的,并且必须是 AdditionalProperties JSON 对象的一部分。
- 必要时,还必须提供 CsvMapping 或 JsonMapping 属性
- 有关详细信息,请参阅有关预先创建引入映射的文章。
- 引入消息内部结构部分解释了引入消息结构
internal static string PrepareIngestionMessage(string db, string table, string dataUri, long blobSizeBytes, string mappingRef, string identityToken)
{
var message = new JObject
{
{ "Id", Guid.NewGuid().ToString() },
{ "BlobPath", dataUri },
{ "RawDataSize", blobSizeBytes },
{ "DatabaseName", db },
{ "TableName", table },
{ "RetainBlobOnSuccess", true }, // Do not delete the blob on success
{ "FlushImmediately", true }, // Do not aggregate
{ "ReportLevel", 2 }, // Report failures and successes (might incur perf overhead)
{ "ReportMethod", 0 }, // Failures are reported to an Azure Queue
{
"AdditionalProperties", new JObject(
new JProperty("authorizationContext", identityToken),
new JProperty("mappingReference", mappingRef),
// Data is in JSON format
new JProperty("format", "multijson")
)
}
};
return message.ToString();
}
最后,将构造的消息发布到先前获取的选定引入队列。
备注
默认情况下,低于 v12 的 .Net 存储客户端版本将消息编码为 base64。有关详细信息,请参阅存储文档。如果使用的是 v12 以上的 .Net 存储客户端版本,则必须正确编码消息内容。
internal static void PostMessageToQueue(string queueUriWithSas, string message)
{
var queue = new QueueClient(new Uri(queueUriWithSas));
queue.SendMessage(message);
}
在引入后,我们将检查数据管理向其中写入数据的相关队列是否返回了失败消息。 有关失败消息结构的详细信息,请参阅引入失败消息结构。
internal static IEnumerable<string> PopTopMessagesFromQueue(string queueUriWithSas, int count)
{
var queue = new QueueClient(new Uri(queueUriWithSas));
var messagesFromQueue = queue.ReceiveMessages(maxMessages: count).Value;
var messages = messagesFromQueue.Select(m => m.MessageText);
return messages;
}
Kusto 数据管理服务预期从输入 Azure 队列中读取的消息是采用以下格式的 JSON 文档。
{
"Id" : "<ID>",
"BlobPath" : "https://<AccountName>.blob.core.chinacloudapi.cn/<ContainerName>/<PathToBlob>?<SasToken>",
"RawDataSize" : "<RawDataSizeInBytes>",
"DatabaseName": "<DatabaseName>",
"TableName" : "<TableName>",
"RetainBlobOnSuccess" : "<RetainBlobOnSuccess>",
"FlushImmediately": "<true|false>",
"ReportLevel" : <0-Failures, 1-None, 2-All>,
"ReportMethod" : <0-Queue, 1-Table>,
"AdditionalProperties" : { "<PropertyName>" : "<PropertyValue>" }
}
属性 | 说明 |
---|---|
ID | 消息标识符 (GUID) |
BlobPath | Blob 的路径 (URI),包括授予读取/写入/删除该 Blob 的权限的 SAS 密钥。 引入服务必须拥有相关权限才能在完成数据引入操作后删除 Blob。 |
RawDataSize | 解压缩的数据的大小(字节)。 提供此值后,引入服务就有可能聚合多个 Blob,从而优化引入。 此属性是可选的,但如果未提供,则服务将访问 Blob 来检索数据大小。 |
DatabaseName | 目标数据库名称 |
TableName | 目标表名称 |
RetainBlobOnSuccess | 如果设置为 true ,则成功完成引入后,不会删除 blob。 默认为 false |
FlushImmediately | 如果设置为 true ,则会跳过任何聚合。 默认为 false |
ReportLevel | 成功/错误报告级别:0 - 失败、1 - 无、2 - 全部 |
ReportMethod | 报告机制:0 - 队列、1 - 表 |
AdditionalProperties | 其他属性,例如 format 、tags 和 creationTime 。 有关详细信息,请参阅数据引入属性。 |
数据管理预期从输入 Azure 队列中读取的消息需要是采用以下格式的 JSON 文档。
属性 | 说明 |
---|---|
OperationId | 可用于在服务端跟踪操作的操作标识符 (GUID) |
数据库 | 目标数据库名称 |
表 | 目标表名称 |
FailedOn | 失败时间戳 |
IngestionSourceId | 用于标识无法引入的数据区块的 GUID |
IngestionSourcePath | 无法引入的数据区块的路径 (URI) |
详细信息 | 失败消息 |
ErrorCode | 错误代码。 有关所有错误代码,请参阅引入错误代码。 |
FailureStatus | 指示失败是永久性的还是暂时性的 |
RootActivityId | 可用于在服务端跟踪操作的关联标识符 (GUID) |
OriginatesFromUpdatePolicy | 指示失败是否由错误的事务性更新策略导致 |
ShouldRetry | 指示如果按原样重试引入是否可以成功 |