如何使用 REST API 引入数据

Kusto.Ingest 库是用于将数据引入到群集的首选方式。 不过,你还可以在不依赖于 Kusto.Ingest 包的情况下实现几乎相同的功能。 本文介绍如何使用生产级管道的“排队引入”将数据引入到群集。

注意

下面的代码是用 C# 编写的,并使用 Azure 存储 SDK、Microsoft 身份验证库 (MSAL) 和 NewtonSoft.JSON 包来简化示例代码。 如果需要,可以将相应的代码替换为适当的 Azure 存储 REST API 调用、非 .NET MSAL 包和任何可用的 JSON 处理包。

本文讲述的是建议的引入模式。 对于 Kusto.Ingest 库,其相应的实体是 IKustoQueuedIngestClient 接口。 在此处,客户端代码通过将引入通知消息发布到 Azure 队列来与群集交互。 对消息的引用是从 Kusto 数据管理(也称为引入)服务获取的。 与该服务的交互必须使用 Microsoft Entra ID 进行身份验证。

下面的代码展示了 Kusto 数据管理服务如何在不使用 Kusto.Ingest 库的情况下处理排队数据引入。 如果完整的 .NET 由于环境或其他限制而无法访问或不可用,则此示例可能会很有用。

此代码包括了创建 Azure 存储客户端并将数据上传到 blob 的步骤。 在示例代码之后更详细地介绍了每个步骤。

  1. 获取用于访问引入服务的身份验证令牌
  2. 查询引入服务以获取:
  3. 将数据上传到在第 (2) 步从 Kusto 获取的某个 blob 容器中的 blob
  4. 撰写一条引入消息,用于标识目标数据库和表并指向第 (3) 步中的 blob
  5. 将我们在 (4) 中撰写的引入消息发布到在 (2) 中获取的引入队列
  6. 检索服务在引入过程中发现的任何错误
// A container class for ingestion resources we are going to obtain
internal class IngestionResourcesSnapshot
{
    public IList<string> IngestionQueues { get; set; } = new List<string>();
    public IList<string> TempStorageContainers { get; set; } = new List<string>();

    public string FailureNotificationsQueue { get; set; } = string.Empty;
    public string SuccessNotificationsQueue { get; set; } = string.Empty;
}

public static void IngestSingleFile(string file, string db, string table, string ingestionMappingRef)
{
    // Your Azure Data Explorer ingestion service URI, typically ingest-<your cluster name>.kusto.chinacloudapi.cn
    var dmServiceBaseUri = @"https://ingest-{serviceNameAndRegion}.kusto.chinacloudapi.cn";
    // 1. Authenticate the interactive user (or application) to access Kusto ingestion service
    var bearerToken = AuthenticateInteractiveUser(dmServiceBaseUri);
    // 2a. Retrieve ingestion resources
    var ingestionResources = RetrieveIngestionResources(dmServiceBaseUri, bearerToken);
    // 2b. Retrieve Kusto identity token
    var identityToken = RetrieveKustoIdentityToken(dmServiceBaseUri, bearerToken);
    // 3. Upload file to one of the blob containers we got from Azure Data Explorer.
    // This example uses the first one, but when working with multiple blobs,
    // one should round-robin the containers in order to prevent throttling
    var blobName = $"TestData{DateTime.UtcNow:yyyy-MM-dd_HH-mm-ss.FFF}";
    var blobUriWithSas = UploadFileToBlobContainer(
        file, ingestionResources.TempStorageContainers.First(), blobName,
        out var blobSizeBytes
    );
    // 4. Compose ingestion command
    var ingestionMessage = PrepareIngestionMessage(db, table, blobUriWithSas, blobSizeBytes, ingestionMappingRef, identityToken);
    // 5. Post ingestion command to one of the previously obtained ingestion queues.
    // This example uses the first one, but when working with multiple blobs,
    // one should round-robin the queues in order to prevent throttling
    PostMessageToQueue(ingestionResources.IngestionQueues.First(), ingestionMessage);

    Thread.Sleep(20000);

    // 6a. Read success notifications
    var successes = PopTopMessagesFromQueue(ingestionResources.SuccessNotificationsQueue, 32);
    foreach (var sm in successes)
    {
        Console.WriteLine($"Ingestion completed: {sm}");
    }

    // 6b. Read failure notifications
    var errors = PopTopMessagesFromQueue(ingestionResources.FailureNotificationsQueue, 32);
    foreach (var em in errors)
    {
        Console.WriteLine($"Ingestion error: {em}");
    }
}

使用生产级管道的排队引入

从 Microsoft Entra ID 获取身份验证证据

在这里,我们使用 Microsoft 身份验证库 (MSAL) 来获取 Microsoft Entra 令牌以访问 Kusto 数据管理服务并请求其输入队列。 MSAL 在多个平台上可用。

// Authenticates the interactive user and retrieves Azure AD Access token for specified resource
internal static string AuthenticateInteractiveUser(string resource)
{
    // Create an authentication client for Azure AD:
    var authClient = PublicClientApplicationBuilder.Create("<appId>")
        .WithAuthority("https://login.partner.microsoftonline.cn/<appTenant>")
        .WithRedirectUri("<appRedirectUri>")
        .Build();
    // Acquire user token for the interactive user for Azure Data Explorer:
    var result = authClient.AcquireTokenInteractive(
        new[] { $"{resource}/.default" } // Define scopes
    ).ExecuteAsync().Result;
    return result.AccessToken;
}

检索引入资源

手动构造针对数据管理服务的 HTTP POST 请求,用以请求返回引入资源。 这些资源包括 DM 服务正在侦听的队列,以及用于数据上传的 blob 容器。 数据管理服务将处理其包含的引入请求已到达这些队列之一的任何消息。

// Retrieve ingestion resources (queues and blob containers) with SAS from specified ingestion service using supplied access token
internal static IngestionResourcesSnapshot RetrieveIngestionResources(string ingestClusterBaseUri, string accessToken)
{
    var ingestClusterUri = $"{ingestClusterBaseUri}/v1/rest/mgmt";
    var requestBody = "{ \"csl\": \".get ingestion resources\" }";
    var ingestionResources = new IngestionResourcesSnapshot();
    using var response = SendPostRequest(ingestClusterUri, accessToken, requestBody);
    using var sr = new StreamReader(response.GetResponseStream());
    using var jtr = new JsonTextReader(sr);
    var responseJson = JObject.Load(jtr);
    // Input queues
    var tokens = responseJson.SelectTokens("Tables[0].Rows[?(@.[0] == 'SecuredReadyForAggregationQueue')]");
    foreach (var token in tokens)
    {
        ingestionResources.IngestionQueues.Add((string)token[1]);
    }
    // Temp storage containers
    tokens = responseJson.SelectTokens("Tables[0].Rows[?(@.[0] == 'TempStorage')]");
    foreach (var token in tokens)
    {
        ingestionResources.TempStorageContainers.Add((string)token[1]);
    }
    // Failure notifications queue
    var singleToken =
        responseJson.SelectTokens("Tables[0].Rows[?(@.[0] == 'FailedIngestionsQueue')].[1]").FirstOrDefault();
    ingestionResources.FailureNotificationsQueue = (string)singleToken;
    // Success notifications queue
    singleToken =
        responseJson.SelectTokens("Tables[0].Rows[?(@.[0] == 'SuccessfulIngestionsQueue')].[1]").FirstOrDefault();
    ingestionResources.SuccessNotificationsQueue = (string)singleToken;
    return ingestionResources;
}

// Executes a POST request on provided URI using supplied Access token and request body
internal static WebResponse SendPostRequest(string uriString, string authToken, string body)
{
    var request = WebRequest.Create(uriString);
    request.Method = "POST";
    request.ContentType = "application/json";
    request.ContentLength = body.Length;
    request.Headers.Set(HttpRequestHeader.Authorization, $"Bearer {authToken}");
    using var bodyStream = request.GetRequestStream();
    using (var sw = new StreamWriter(bodyStream))
    {
        sw.Write(body);
        sw.Flush();
    }
    bodyStream.Close();
    return request.GetResponse();
}

获取 Kusto 标识令牌

引入消息通过非直接通道(Azure 队列)转发到群集,因此无法进行带内授权验证以访问引入服务。 解决方案是向每个引入消息附加一个标识令牌。 此令牌将启用带内授权验证。 然后,引入服务就可以在收到引入消息时验证这个已签名的令牌。

// Retrieves a Kusto identity token that will be added to every ingest message
internal static string RetrieveKustoIdentityToken(string ingestClusterBaseUri, string accessToken)
{
    var ingestClusterUri = $"{ingestClusterBaseUri}/v1/rest/mgmt";
    var requestBody = "{ \"csl\": \".get kusto identity token\" }";
    var jsonPath = "Tables[0].Rows[*].[0]";
    using var response = SendPostRequest(ingestClusterUri, accessToken, requestBody);
    using var sr = new StreamReader(response.GetResponseStream());
    using var jtr = new JsonTextReader(sr);
    var responseJson = JObject.Load(jtr);
    var identityToken = responseJson.SelectTokens(jsonPath).FirstOrDefault();
    return (string)identityToken;
}

将数据上传到 Azure Blob 容器

此步骤介绍了如何将本地文件上传到 Azure Blob,以便交付该文件进行引入。 此代码使用了 Azure 存储 SDK。 如果无法使用依赖项,则可以通过 Azure Blob 服务 REST API 实现此目的。

// Uploads a single local file to an Azure Blob container, returns blob URI and original data size
internal static string UploadFileToBlobContainer(string filePath, string blobContainerUri, string blobName, out long blobSize)
{
    var blobUri = new Uri(blobContainerUri);
    var blobContainer = new BlobContainerClient(blobUri);
    var blob = blobContainer.GetBlobClient(blobName);
    using (var stream = File.OpenRead(filePath))
    {
        blob.UploadAsync(BinaryData.FromStream(stream));
        blobSize = blob.GetProperties().Value.ContentLength;
    }
    return $"{blob.Uri.AbsoluteUri}{blobUri.Query}";
}

撰写引入消息

NewtonSoft.JSON 包将再次撰写有效的引入请求,该请求用于标识目标数据库和表并指向 blob。 该消息将发布到相关 Kusto 数据管理服务正在侦听的 Azure 队列。

下面是需要注意的一些要点。

  • 此请求是引入消息的最小量。

注意

标识令牌是必需的,并且必须是 AdditionalProperties JSON 对象的一部分。

internal static string PrepareIngestionMessage(string db, string table, string dataUri, long blobSizeBytes, string mappingRef, string identityToken)
{
    var message = new JObject
    {
        { "Id", Guid.NewGuid().ToString() },
        { "BlobPath", dataUri },
        { "RawDataSize", blobSizeBytes },
        { "DatabaseName", db },
        { "TableName", table },
        { "RetainBlobOnSuccess", true }, // Do not delete the blob on success
        { "FlushImmediately", true }, // Do not aggregate
        { "ReportLevel", 2 }, // Report failures and successes (might incur perf overhead)
        { "ReportMethod", 0 }, // Failures are reported to an Azure Queue
        {
            "AdditionalProperties", new JObject(
                new JProperty("authorizationContext", identityToken),
                new JProperty("mappingReference", mappingRef),
                // Data is in JSON format
                new JProperty("format", "multijson")
            )
        }
    };
    return message.ToString();
}

将引入消息发布到引入队列

最后,将构造的消息发布到先前获取的选定引入队列。

注意

默认情况下,低于 v12 的 .Net 存储客户端版本将消息编码为 base64。有关详细信息,请参阅存储文档。如果使用的是 v12 以上的 .Net 存储客户端版本,则必须正确编码消息内容。

internal static void PostMessageToQueue(string queueUriWithSas, string message)
{
    var queue = new QueueClient(new Uri(queueUriWithSas));
    queue.SendMessage(message);
}

检查是否有来自 Azure 队列的错误消息

在引入后,我们将检查数据管理向其中写入数据的相关队列是否返回了失败消息。 有关失败消息结构的详细信息,请参阅引入失败消息结构

internal static IEnumerable<string> PopTopMessagesFromQueue(string queueUriWithSas, int count)
{
    var queue = new QueueClient(new Uri(queueUriWithSas));
    var messagesFromQueue = queue.ReceiveMessages(maxMessages: count).Value;
    var messages = messagesFromQueue.Select(m => m.MessageText);
    return messages;
}

引入消息 - JSON 文档格式

引入消息内部结构

Kusto 数据管理服务预期从输入 Azure 队列中读取的消息是采用以下格式的 JSON 文档。

{
    "Id" : "<ID>",
    "BlobPath" : "https://<AccountName>.blob.core.chinacloudapi.cn/<ContainerName>/<PathToBlob>?<SasToken>",
    "RawDataSize" : "<RawDataSizeInBytes>",
    "DatabaseName": "<DatabaseName>",
    "TableName" : "<TableName>",
    "RetainBlobOnSuccess" : "<RetainBlobOnSuccess>",
    "FlushImmediately": "<true|false>",
    "ReportLevel" : <0-Failures, 1-None, 2-All>,
    "ReportMethod" : <0-Queue, 1-Table>,
    "AdditionalProperties" : { "<PropertyName>" : "<PropertyValue>" }
}
属性 说明
ID 消息标识符 (GUID)
BlobPath Blob 的路径 (URI),包括授予读取/写入/删除该 Blob 的权限的 SAS 密钥。 引入服务必须拥有相关权限才能在完成数据引入操作后删除 Blob。
RawDataSize 解压缩的数据的大小(字节)。 提供此值后,引入服务就有可能聚合多个 Blob,从而优化引入。 此属性是可选的,但如果未提供,则服务将访问 Blob 来检索数据大小。
DatabaseName 目标数据库名称
TableName 目标表名称
RetainBlobOnSuccess 如果设置为 true,则成功完成引入后,不会删除 blob。 默认为 false
FlushImmediately 如果设置为 true,则会跳过任何聚合。 默认为 false
ReportLevel 成功/错误报告级别:0 - 失败、1 - 无、2 - 全部
ReportMethod 报告机制:0 - 队列、1 - 表
AdditionalProperties 其他属性,例如 formattagscreationTime。 有关详细信息,请参阅数据引入属性

引入失败消息结构

数据管理预期从输入 Azure 队列中读取的消息需要是采用以下格式的 JSON 文档。

属性 说明
OperationId 可用于在服务端跟踪操作的操作标识符 (GUID)
数据库 目标数据库名称
目标表名称
FailedOn 失败时间戳
IngestionSourceId 用于标识无法引入的数据区块的 GUID
IngestionSourcePath 无法引入的数据区块的路径 (URI)
详细信息 失败消息
ErrorCode 错误代码。 有关所有错误代码,请参阅引入错误代码
FailureStatus 指示失败是永久性的还是暂时性的
RootActivityId 可用于在服务端跟踪操作的关联标识符 (GUID)
OriginatesFromUpdatePolicy 指示失败是否由错误的事务性更新策略导致
ShouldRetry 指示如果按原样重试引入是否可以成功