在不使用 Kusto.Ingest 库的情况下进行引入

Kusto.Ingest 库是用于将数据引入到 Azure 数据资源管理器的首选方式。 不过,你还可以在不依赖于 Kusto.Ingest 包的情况下实现几乎相同的功能。 本文介绍如何使用目标为 Azure 数据资源管理器的针对生产级管道的排队引入来这样做。

注意

下面的代码是用 C# 编写的,并使用 Azure 存储 SDK、Microsoft 身份验证库 (MSAL) 和 NewtonSoft.JSON 包来简化示例代码。 如果需要,可以将相应的代码替换为适当的 Azure 存储 REST API 调用、非 .NET MSAL 包和任何可用的 JSON 处理包。

本文讲述的是建议的引入模式。 对于 Kusto.Ingest 库,其相应的实体是 IKustoQueuedIngestClient 接口。 这里,客户端代码通过将引入通知消息发布到 Azure 队列与 Azure 数据资源管理器服务交互。 对消息的引用是从 Kusto 数据管理(也称为引入)服务获取的。 与该服务的交互必须通过 Azure Active Directory (Azure AD) 进行身份验证。

下面的代码展示了 Kusto 数据管理服务如何在不使用 Kusto.Ingest 库的情况下处理排队数据引入。 如果完整的 .NET 由于环境或其他限制而无法访问或不可用,则此示例可能会很有用。

此代码包括了创建 Azure 存储客户端并将数据上传到 blob 的步骤。 在示例代码之后更详细地介绍了每个步骤。

  1. 获取用于访问 Azure 数据资源管理器引入服务的身份验证令牌
  2. 查询 Azure 数据资源管理器引入服务以获取:
  3. 将数据上传到在第 (2) 步从 Kusto 获取的某个 blob 容器中的 blob
  4. 撰写一条引入消息,用于标识目标数据库和表并指向第 (3) 步中的 blob
  5. 将在第 (4) 步撰写的引入消息发布到在第 (2) 步从 Azure 数据资源管理器获取的引入队列**
  6. 检索服务在引入过程中发现的任何错误
// A container class for ingestion resources we are going to obtain from Azure Data Explorer
internal class IngestionResourcesSnapshot
{
    public IList<string> IngestionQueues { get; set; } = new List<string>();
    public IList<string> TempStorageContainers { get; set; } = new List<string>();

    public string FailureNotificationsQueue { get; set; } = string.Empty;
    public string SuccessNotificationsQueue { get; set; } = string.Empty;
}

public static void IngestSingleFile(string file, string db, string table, string ingestionMappingRef)
{
    // Your Azure Data Explorer ingestion service URI, typically ingest-<your cluster name>.kusto.chinacloudapi.cn
    string DmServiceBaseUri = @"https://ingest-{serviceNameAndRegion}.kusto.chinacloudapi.cn";

    // 1. Authenticate the interactive user (or application) to access Kusto ingestion service
    string bearerToken = AuthenticateInteractiveUser(DmServiceBaseUri);

    // 2a. Retrieve ingestion resources
    IngestionResourcesSnapshot ingestionResources = RetrieveIngestionResources(DmServiceBaseUri, bearerToken);

    // 2b. Retrieve Kusto identity token
    string identityToken = RetrieveKustoIdentityToken(DmServiceBaseUri, bearerToken);

    // 3. Upload file to one of the blob containers we got from Azure Data Explorer.
    // This example uses the first one, but when working with multiple blobs,
    // one should round-robin the containers in order to prevent throttling
    long blobSizeBytes = 0;
    string blobName = $"TestData{DateTime.UtcNow.ToString("yyyy-MM-dd_HH-mm-ss.FFF")}";
    string blobUriWithSas = UploadFileToBlobContainer(file, ingestionResources.TempStorageContainers.First(),
                                                            "temp001", blobName, out blobSizeBytes);

    // 4. Compose ingestion command
    string ingestionMessage = PrepareIngestionMessage(db, table, blobUriWithSas, blobSizeBytes, ingestionMappingRef, identityToken);

    // 5. Post ingestion command to one of the ingestion queues we got from Azure Data Explorer.
    // This example uses the first one, but when working with multiple blobs,
    // one should round-robin the queues in order to prevent throttling
    PostMessageToQueue(ingestionResources.IngestionQueues.First(), ingestionMessage);

    Thread.Sleep(20000);

    // 6a. Read success notifications
    var successes = PopTopMessagesFromQueue(ingestionResources.SuccessNotificationsQueue, 32);
    foreach (var sm in successes)
    {
        Console.WriteLine($"Ingestion completed: {sm}");
    }

    // 6b. Read failure notifications
    var errors = PopTopMessagesFromQueue(ingestionResources.FailureNotificationsQueue, 32);
    foreach (var em in errors)
    {
        Console.WriteLine($"Ingestion error: {em}");
    }
}

使用目标为 Azure 数据资源管理器的针对生产级管道的排队引入

从 Azure AD 获取身份验证证据

在这里,我们使用 Microsoft 身份验证库 (MSAL) 来获取 Azure AD 令牌以访问 Kusto 数据管理服务并请求其输入队列。 MSAL 在多个平台上可用。

// Authenticates the interactive user and retrieves Azure AD Access token for specified resource
internal static string AuthenticateInteractiveUser(string resource)
{
    // Create an authentication client for Azure AD:
    var authClient = PublicClientApplicationBuilder.Create("<your client app ID>")
                .WithAuthority("https://login.partner.microsoftonline.cn/{Azure AD Tenant ID or name}")
                .WithRedirectUri(@"<your client app redirect URI>")
                .Build();

    // Define scopes:
    string[] scopes = new string[] { $"{resource}/.default" };

    // Acquire user token for the interactive user for Azure Data Explorer:
    AuthenticationResult result = authClient.AcquireTokenInteractive(scopes).ExecuteAsync().Result;
    return result.AccessToken;
}

检索 Azure 数据资源管理器引入资源

手动构造针对数据管理服务的 HTTP POST 请求,用以请求返回引入资源。 这些资源包括 DM 服务正在侦听的队列,以及用于数据上传的 blob 容器。 数据管理服务将处理其包含的引入请求已到达这些队列之一的任何消息。

// Retrieve ingestion resources (queues and blob containers) with SAS from specified Azure Data Explorer Ingestion service using supplied Access token
internal static IngestionResourcesSnapshot RetrieveIngestionResources(string ingestClusterBaseUri, string accessToken)
{
    string ingestClusterUri = $"{ingestClusterBaseUri}/v1/rest/mgmt";
    string requestBody = $"{{ \"csl\": \".get ingestion resources\" }}";

    IngestionResourcesSnapshot ingestionResources = new IngestionResourcesSnapshot();

    using (WebResponse response = SendPostRequest(ingestClusterUri, accessToken, requestBody))
    using (StreamReader sr = new StreamReader(response.GetResponseStream()))
    using (JsonTextReader jtr = new JsonTextReader(sr))
    {
        JObject responseJson = JObject.Load(jtr);
        IEnumerable<JToken> tokens;

        // Input queues
        tokens = responseJson.SelectTokens("Tables[0].Rows[?(@.[0] == 'SecuredReadyForAggregationQueue')]");
        foreach (var token in tokens)
        {
            ingestionResources.IngestionQueues.Add((string) token[1]);
        }

        // Temp storage containers
        tokens = responseJson.SelectTokens("Tables[0].Rows[?(@.[0] == 'TempStorage')]");
        foreach (var token in tokens)
        {
            ingestionResources.TempStorageContainers.Add((string)token[1]);
        }

        // Failure notifications queue
        var singleToken =
            responseJson.SelectTokens("Tables[0].Rows[?(@.[0] == 'FailedIngestionsQueue')].[1]").FirstOrDefault();
        ingestionResources.FailureNotificationsQueue = (string)singleToken;

        // Success notifications queue
        singleToken =
            responseJson.SelectTokens("Tables[0].Rows[?(@.[0] == 'SuccessfulIngestionsQueue')].[1]").FirstOrDefault();
        ingestionResources.SuccessNotificationsQueue = (string)singleToken;
    }

    return ingestionResources;
}

// Executes a POST request on provided URI using supplied Access token and request body
internal static WebResponse SendPostRequest(string uriString, string authToken, string body)
{
    WebRequest request = WebRequest.Create(uriString);

    request.Method = "POST";
    request.ContentType = "application/json";
    request.ContentLength = body.Length;
    request.Headers.Set(HttpRequestHeader.Authorization, $"Bearer {authToken}");

    Stream bodyStream = request.GetRequestStream();
    using (StreamWriter sw = new StreamWriter(bodyStream))
    {
        sw.Write(body);
        sw.Flush();
    }

    bodyStream.Close();
    return request.GetResponse();
}

获取 Kusto 标识令牌

引入消息通过非直接通道(Azure 队列)交付到 Azure 数据资源管理器,因此无法进行带内授权验证以访问 Azure 数据资源管理器引入服务。 解决方案是向每个引入消息附加一个标识令牌。 此令牌将启用带内授权验证。 然后,Azure 数据资源管理器服务就可以在收到引入消息时验证这个已签名的令牌。

// Retrieves a Kusto identity token that will be added to every ingest message
internal static string RetrieveKustoIdentityToken(string ingestClusterBaseUri, string accessToken)
{
    string ingestClusterUri = $"{ingestClusterBaseUri}/v1/rest/mgmt";
    string requestBody = $"{{ \"csl\": \".get kusto identity token\" }}";
    string jsonPath = "Tables[0].Rows[*].[0]";

    using (WebResponse response = SendPostRequest(ingestClusterUri, accessToken, requestBody))
    using (StreamReader sr = new StreamReader(response.GetResponseStream()))
    using (JsonTextReader jtr = new JsonTextReader(sr))
    {
        JObject responseJson = JObject.Load(jtr);
        JToken identityToken = responseJson.SelectTokens(jsonPath).FirstOrDefault();

        return ((string)identityToken);
    }
}

将数据上传到 Azure Blob 容器

此步骤介绍了如何将本地文件上传到 Azure Blob,以便交付该文件进行引入。 此代码使用了 Azure 存储 SDK。 如果无法使用依赖项,则可以通过 Azure Blob 服务 REST API 实现此目的。

// Uploads a single local file to an Azure Blob container, returns blob URI and original data size
internal static string UploadFileToBlobContainer(string filePath, string blobContainerUri, string containerName, string blobName, out long blobSize)
{
    var blobUri = new Uri(blobContainerUri);
    CloudBlobContainer blobContainer = new CloudBlobContainer(blobUri);
    CloudBlockBlob blockBlob = blobContainer.GetBlockBlobReference(blobName);

    using (Stream stream = File.OpenRead(filePath))
    {
        blockBlob.UploadFromStream(stream);
        blobSize = blockBlob.Properties.Length;
    }

    return string.Format("{0}{1}", blockBlob.Uri.AbsoluteUri, blobUri.Query);
}

撰写 Azure 数据资源管理器引入消息

NewtonSoft.JSON 包将再次撰写有效的引入请求,该请求用于标识目标数据库和表并指向 blob。 该消息将发布到相关 Kusto 数据管理服务正在侦听的 Azure 队列。

下面是需要注意的一些要点。

  • 此请求是引入消息的最小量。

注意

标识令牌是必需的,并且必须是 AdditionalProperties JSON 对象的一部分。

internal static string PrepareIngestionMessage(string db, string table, string dataUri, long blobSizeBytes, string mappingRef, string identityToken)
{
    var message = new JObject();

    message.Add("Id", Guid.NewGuid().ToString());
    message.Add("BlobPath", dataUri);
    message.Add("RawDataSize", blobSizeBytes);
    message.Add("DatabaseName", db);
    message.Add("TableName", table);
    message.Add("RetainBlobOnSuccess", true);   // Do not delete the blob on success
    message.Add("FlushImmediately", true);      // Do not aggregate
    message.Add("ReportLevel", 2);              // Report failures and successes (might incur perf overhead)
    message.Add("ReportMethod", 0);             // Failures are reported to an Azure Queue

    message.Add("AdditionalProperties", new JObject(
                                            new JProperty("authorizationContext", identityToken),
                                            new JProperty("jsonMappingReference", mappingRef),
                                            // Data is in JSON format
                                            new JProperty("format", "json")));
    return message.ToString();
}

将 Azure 数据资源管理器引入消息发布到 Azure 数据资源管理器引入队列

最后,将你构造的消息发布到你从 Azure 数据资源管理器获取的选定引入队列。

注意

默认情况下,低于 v12 的 .Net 存储客户端版本将消息编码为 base64。有关详细信息,请参阅存储文档。如果使用的是 v12 以上的 .Net 存储客户端版本,则必须正确编码消息内容。

internal static void PostMessageToQueue(string queueUriWithSas, string message)
{
    CloudQueue queue = new CloudQueue(new Uri(queueUriWithSas));
    CloudQueueMessage queueMessage = new CloudQueueMessage(message);

    queue.AddMessage(queueMessage, null, null, null, null);
}

检查是否有来自 Azure 队列的错误消息

在引入后,我们将检查数据管理向其中写入数据的相关队列是否返回了失败消息。 有关失败消息结构的详细信息,请参阅引入失败消息结构

internal static IEnumerable<string> PopTopMessagesFromQueue(string queueUriWithSas, int count)
{
    List<string> messages = Enumerable.Empty<string>().ToList();
    CloudQueue queue = new CloudQueue(new Uri(queueUriWithSas));
    var messagesFromQueue = queue.GetMessages(count);
    foreach (var m in messagesFromQueue)
    {
        messages.Add(m.AsString);
        queue.DeleteMessage(m);
    }

    return messages;
}

引入消息 - JSON 文档格式

引入消息内部结构

Kusto 数据管理服务预期从输入 Azure 队列中读取的消息是采用以下格式的 JSON 文档。

{
    "Id" : "<ID>",
    "BlobPath" : "https://<AccountName>.blob.core.chinacloudapi.cn/<ContainerName>/<PathToBlob>?<SasToken>",
    "RawDataSize" : "<RawDataSizeInBytes>",
    "DatabaseName": "<DatabaseName>",
    "TableName" : "<TableName>",
    "RetainBlobOnSuccess" : "<RetainBlobOnSuccess>",
    "FlushImmediately": "<true|false>",
    "ReportLevel" : <0-Failures, 1-None, 2-All>,
    "ReportMethod" : <0-Queue, 1-Table>,
    "AdditionalProperties" : { "<PropertyName>" : "<PropertyValue>" }
}
属性 说明
ID 消息标识符 (GUID)
BlobPath Blob 的路径 (URI),包括向 Azure 数据资源管理器授予读取/写入/删除它的权限的 SAS 密钥。 Azure 数据资源管理器必须具有相关权限才能在完成数据引入后删除 blob
RawDataSize 解压缩的数据的大小(字节)。 提供此值将使得 Azure 数据资源管理器能够通过聚合多个 blob(在可能情况下)来优化引入。 此属性是可选的,但如果未提供,则 Azure 数据资源管理器将访问 blob 来检索该大小
DatabaseName 目标数据库名称
TableName 目标表名称
RetainBlobOnSuccess 如果设置为 true,则成功完成引入后,不会删除 blob。 默认为 false
FlushImmediately 如果设置为 true,则会跳过任何聚合。 默认为 false
ReportLevel 成功/错误报告级别:0 - 失败、1 - 无、2 - 全部
ReportMethod 报告机制:0 - 队列、1 - 表
AdditionalProperties 其他属性,例如 formattagscreationTime。 有关详细信息,请参阅数据引入属性

引入失败消息结构

数据管理预期从输入 Azure 队列中读取的消息需要是采用以下格式的 JSON 文档。

属性 说明
OperationId 可用于在服务端跟踪操作的操作标识符 (GUID)
数据库 目标数据库名称
目标表名称
FailedOn 失败时间戳
IngestionSourceId GUID,用于标识 Azure 数据资源管理器未能引入的数据块
IngestionSourcePath Azure 数据资源管理器未能引入的数据块的路径 (URI)
详细信息 失败消息
ErrorCode Azure 数据资源管理器错误代码(请在此处查看所有错误代码)
FailureStatus 指示失败是永久性的还是暂时性的
RootActivityId 可用于在服务端跟踪操作的 Azure 数据资源管理器相关标识符 (GUID)
OriginatesFromUpdatePolicy 指示失败是否由错误的事务性更新策略导致
ShouldRetry 指示如果按原样重试引入是否可以成功