在不使用 Kusto.Ingest 库的情况下进行引入Ingestion without Kusto.Ingest Library

Kusto.Ingest 库是用于将数据引入到 Azure 数据资源管理器的首选方式。The Kusto.Ingest library is preferred for ingesting data to Azure Data Explorer. 不过,你还可以在不依赖于 Kusto.Ingest 包的情况下实现几乎相同的功能。However, you can still achieve almost the same functionality, without being dependent on the Kusto.Ingest package. 本文介绍如何使用目标为 Azure 数据资源管理器的针对生产级管道的 排队引入 来这样做。This article shows you how, by using Queued Ingestion to Azure Data Explorer for production-grade pipelines.

备注

下面的代码是用 C# 编写的,并使用 Azure 存储 SDK、ADAL 身份验证库和 NewtonSoft.JSON 包来简化示例代码。The code below is written in C#, and makes use of the Azure Storage SDK, the ADAL Authentication library, and the NewtonSoft.JSON package, to simplify the sample code. 如果需要,可以将相应的代码替换为适当的 Azure 存储 REST API 调用、非 .NET ADAL 包和任何可用的 JSON 处理包。If needed, the corresponding code can be replaced with appropriate Azure Storage REST API calls, non-.NET ADAL package, and any available JSON handling package.

本文讲述的是建议的引入模式。This article deals with the recommended mode of ingestion. 对于 Kusto.Ingest 库,其相应的实体是 IKustoQueuedIngestClient 接口。For the Kusto.Ingest library, its corresponding entity is the IKustoQueuedIngestClient interface. 这里,客户端代码通过将引入通知消息发布到 Azure 队列与 Azure 数据资源管理器服务交互。Here, the client code interacts with the Azure Data Explorer service by posting ingestion notification messages to an Azure queue. 对消息的引用是从 Kusto 数据管理(也称为引入)服务获取的。References to the messages are obtained from the Kusto Data Management (also known as the Ingestion) service. 与该服务的交互必须通过 Azure Active Directory (Azure AD) 进行身份验证。Interaction with the service must be authenticated with Azure Active Directory (Azure AD).

下面的代码展示了 Kusto 数据管理服务如何在不使用 Kusto.Ingest 库的情况下处理排队数据引入。The following code shows how the Kusto Data Management service handles queued data ingestion without using the Kusto.Ingest library. 如果完整的 .NET 由于环境或其他限制而无法访问或不可用,则此示例可能会很有用。This example may be useful if full .NET is inaccessible or unavailable because of the environment, or other restrictions.

此代码包括了创建 Azure 存储客户端并将数据上传到 blob 的步骤。The code includes the steps to create an Azure Storage client and upload the data to a blob. 在示例代码之后更详细地介绍了每个步骤。Each step is described in greater detail, after the sample code.

  1. 获取用于访问 Azure 数据资源管理器引入服务的身份验证令牌Obtain an authentication token for accessing the Azure Data Explorer ingestion service
  2. 查询 Azure 数据资源管理器引入服务以获取:Query the Azure Data Explorer ingestion service to obtain:
  3. 将数据上传到在第 (2) 步从 Kusto 获取的某个 blob 容器中的 blobUpload data to a blob on one of the blob containers obtained from Kusto in (2)
  4. 撰写一条引入消息,用于标识目标数据库和表并指向第 (3) 步中的 blobCompose an ingestion message that identifies the target database and table and that points to the blob from (3)
  5. 将在第 (4) 步撰写的引入消息发布到在第 (2) 步从 Azure 数据资源管理器获取的引入队列**Post the ingestion message we composed in (4) to an ingestion queue obtained from Azure Data Explorer in (2)**
  6. 检索服务在引入过程中发现的任何错误Retrieve any error found by the service during ingestion
// A container class for ingestion resources we are going to obtain from Azure Data Explorer
internal class IngestionResourcesSnapshot
{
    public IList<string> IngestionQueues { get; set; } = new List<string>();
    public IList<string> TempStorageContainers { get; set; } = new List<string>();

    public string FailureNotificationsQueue { get; set; } = string.Empty;
    public string SuccessNotificationsQueue { get; set; } = string.Empty;
}

public static void IngestSingleFile(string file, string db, string table, string ingestionMappingRef)
{
    // Your Azure Data Explorer ingestion service URI, typically ingest-<your cluster name>.kusto.chinacloudapi.cn
    string DmServiceBaseUri = @"https://ingest-{serviceNameAndRegion}.kusto.chinacloudapi.cn";

    // 1. Authenticate the interactive user (or application) to access Kusto ingestion service
    string bearerToken = AuthenticateInteractiveUser(DmServiceBaseUri);

    // 2a. Retrieve ingestion resources
    IngestionResourcesSnapshot ingestionResources = RetrieveIngestionResources(DmServiceBaseUri, bearerToken);

    // 2b. Retrieve Kusto identity token
    string identityToken = RetrieveKustoIdentityToken(DmServiceBaseUri, bearerToken);

    // 3. Upload file to one of the blob containers we got from Azure Data Explorer.
    // This example uses the first one, but when working with multiple blobs,
    // one should round-robin the containers in order to prevent throttling
    long blobSizeBytes = 0;
    string blobName = $"TestData{DateTime.UtcNow.ToString("yyyy-MM-dd_HH-mm-ss.FFF")}";
    string blobUriWithSas = UploadFileToBlobContainer(file, ingestionResources.TempStorageContainers.First(),
                                                            "temp001", blobName, out blobSizeBytes);

    // 4. Compose ingestion command
    string ingestionMessage = PrepareIngestionMessage(db, table, blobUriWithSas, blobSizeBytes, ingestionMappingRef, identityToken);

    // 5. Post ingestion command to one of the ingestion queues we got from Azure Data Explorer.
    // This example uses the first one, but when working with multiple blobs,
    // one should round-robin the queues in order to prevent throttling
    PostMessageToQueue(ingestionResources.IngestionQueues.First(), ingestionMessage);

    Thread.Sleep(20000);

    // 6a. Read success notifications
    var successes = PopTopMessagesFromQueue(ingestionResources.SuccessNotificationsQueue, 32);
    foreach (var sm in successes)
    {
        Console.WriteLine($"Ingestion completed: {sm}");
    }

    // 6b. Read failure notifications
    var errors = PopTopMessagesFromQueue(ingestionResources.FailureNotificationsQueue, 32);
    foreach (var em in errors)
    {
        Console.WriteLine($"Ingestion error: {em}");
    }
}

使用目标为 Azure 数据资源管理器的针对生产级管道的排队引入Using Queued Ingestion to Azure Data Explorer for production-grade pipelines

从 Azure AD 获取身份验证证据Obtain authentication evidence from Azure AD

在这里,我们使用 ADAL 来获取 Azure AD 令牌以访问 Kusto 数据管理服务并请求其输入队列。Here we use ADAL to obtain an Azure AD token to access the Kusto Data Management service and ask for its input queues. 如果需要,可以在非 Windows 平台上使用 ADAL。ADAL is available on non-Windows platforms if needed.

// Authenticates the interactive user and retrieves Azure AD Access token for specified resource
internal static string AuthenticateInteractiveUser(string resource)
{
    // Create Auth Context for MSFT Azure AD:
    AuthenticationContext authContext = new AuthenticationContext("https://login.partner.microsoftonline.cn/{Azure AD Tenant ID or name}");

    // Acquire user token for the interactive user for Azure Data Explorer:
    AuthenticationResult result =
        authContext.AcquireTokenAsync(resource, "<your client app ID>", new Uri(@"<your client app URI>"),
                                        new PlatformParameters(PromptBehavior.Auto), UserIdentifier.AnyUser, "prompt=select_account").Result;
    return result.AccessToken;
}

检索 Azure 数据资源管理器引入资源Retrieve Azure Data Explorer ingestion resources

手动构造针对数据管理服务的 HTTP POST 请求,用以请求返回引入资源。Manually construct an HTTP POST request to the Data Management service, requesting the return of the ingestion resources. 这些资源包括 DM 服务正在侦听的队列,以及用于数据上传的 blob 容器。These resources include queues that the DM service is listening on, and blob containers for data uploading. 数据管理服务将处理其包含的引入请求已到达这些队列之一的任何消息。The Data Management service will process any messages containing ingestion requests that arrive on one of those queues.

// Retrieve ingestion resources (queues and blob containers) with SAS from specified Azure Data Explorer Ingestion service using supplied Access token
internal static IngestionResourcesSnapshot RetrieveIngestionResources(string ingestClusterBaseUri, string accessToken)
{
    string ingestClusterUri = $"{ingestClusterBaseUri}/v1/rest/mgmt";
    string requestBody = $"{{ \"csl\": \".get ingestion resources\" }}";

    IngestionResourcesSnapshot ingestionResources = new IngestionResourcesSnapshot();

    using (WebResponse response = SendPostRequest(ingestClusterUri, accessToken, requestBody))
    using (StreamReader sr = new StreamReader(response.GetResponseStream()))
    using (JsonTextReader jtr = new JsonTextReader(sr))
    {
        JObject responseJson = JObject.Load(jtr);
        IEnumerable<JToken> tokens;

        // Input queues
        tokens = responseJson.SelectTokens("Tables[0].Rows[?(@.[0] == 'SecuredReadyForAggregationQueue')]");
        foreach (var token in tokens)
        {
            ingestionResources.IngestionQueues.Add((string) token[1]);
        }

        // Temp storage containers
        tokens = responseJson.SelectTokens("Tables[0].Rows[?(@.[0] == 'TempStorage')]");
        foreach (var token in tokens)
        {
            ingestionResources.TempStorageContainers.Add((string)token[1]);
        }

        // Failure notifications queue
        var singleToken =
            responseJson.SelectTokens("Tables[0].Rows[?(@.[0] == 'FailedIngestionsQueue')].[1]").FirstOrDefault();
        ingestionResources.FailureNotificationsQueue = (string)singleToken;

        // Success notifications queue
        singleToken =
            responseJson.SelectTokens("Tables[0].Rows[?(@.[0] == 'SuccessfulIngestionsQueue')].[1]").FirstOrDefault();
        ingestionResources.SuccessNotificationsQueue = (string)singleToken;
    }

    return ingestionResources;
}

// Executes a POST request on provided URI using supplied Access token and request body
internal static WebResponse SendPostRequest(string uriString, string authToken, string body)
{
    WebRequest request = WebRequest.Create(uriString);

    request.Method = "POST";
    request.ContentType = "application/json";
    request.ContentLength = body.Length;
    request.Headers.Set(HttpRequestHeader.Authorization, $"Bearer {authToken}");

    Stream bodyStream = request.GetRequestStream();
    using (StreamWriter sw = new StreamWriter(bodyStream))
    {
        sw.Write(body);
        sw.Flush();
    }

    bodyStream.Close();
    return request.GetResponse();
}

获取 Kusto 标识令牌Obtain a Kusto identity token

引入消息通过非直接通道(Azure 队列)交付到 Azure 数据资源管理器,因此无法进行带内授权验证以访问 Azure 数据资源管理器引入服务。Ingest messages are handed off to Azure Data Explorer via a non-direct channel (Azure queue), making it impossible to do in-band authorization validation for accessing the Azure Data Explorer ingestion service. 解决方案是向每个引入消息附加一个标识令牌。The solution is to attach an identity token to every ingest message. 此令牌将启用带内授权验证。The token enables in-band authorization validation. 然后,Azure 数据资源管理器服务就可以在收到引入消息时验证这个已签名的令牌。This signed token can then be validated by the Azure Data Explorer service when it receives the ingestion message.

// Retrieves a Kusto identity token that will be added to every ingest message
internal static string RetrieveKustoIdentityToken(string ingestClusterBaseUri, string accessToken)
{
    string ingestClusterUri = $"{ingestClusterBaseUri}/v1/rest/mgmt";
    string requestBody = $"{{ \"csl\": \".get kusto identity token\" }}";
    string jsonPath = "Tables[0].Rows[*].[0]";

    using (WebResponse response = SendPostRequest(ingestClusterUri, accessToken, requestBody))
    using (StreamReader sr = new StreamReader(response.GetResponseStream()))
    using (JsonTextReader jtr = new JsonTextReader(sr))
    {
        JObject responseJson = JObject.Load(jtr);
        JToken identityToken = responseJson.SelectTokens(jsonPath).FirstOrDefault();

        return ((string)identityToken);
    }
}

将数据上传到 Azure Blob 容器Upload data to the Azure Blob container

此步骤介绍了如何将本地文件上传到 Azure Blob,以便交付该文件进行引入。This step is about uploading a local file to an Azure Blob that will be handed off for ingestion. 此代码使用了 Azure 存储 SDK。This code uses the Azure Storage SDK. 如果无法使用依赖项,则可以通过 Azure Blob 服务 REST API 实现此目的。If dependency isn't possible, it can be achieved with Azure Blob Service REST API.

// Uploads a single local file to an Azure Blob container, returns blob URI and original data size
internal static string UploadFileToBlobContainer(string filePath, string blobContainerUri, string containerName, string blobName, out long blobSize)
{
    var blobUri = new Uri(blobContainerUri);
    CloudBlobContainer blobContainer = new CloudBlobContainer(blobUri);
    CloudBlockBlob blockBlob = blobContainer.GetBlockBlobReference(blobName);

    using (Stream stream = File.OpenRead(filePath))
    {
        blockBlob.UploadFromStream(stream);
        blobSize = blockBlob.Properties.Length;
    }

    return string.Format("{0}{1}", blockBlob.Uri.AbsoluteUri, blobUri.Query);
}

撰写 Azure 数据资源管理器引入消息Compose the Azure Data Explorer ingestion message

NewtonSoft.JSON 包将再次撰写有效的引入请求,该请求用于标识目标数据库和表并指向 blob。The NewtonSoft.JSON package will again compose a valid ingestion request to identify the target database and table, and that points to the blob. 该消息将发布到相关 Kusto 数据管理服务正在侦听的 Azure 队列。The message will be posted to the Azure Queue that the relevant Kusto Data Management service is listening on.

下面是需要注意的一些要点。Here are some points to consider.

  • 此请求是引入消息的最小量。This request is the bare minimum for the ingestion message.

备注

标识令牌是必需的,并且必须是 AdditionalProperties JSON 对象的一部分。The identity token is mandatory and must be part of the AdditionalProperties JSON object.

internal static string PrepareIngestionMessage(string db, string table, string dataUri, long blobSizeBytes, string mappingRef, string identityToken)
{
    var message = new JObject();

    message.Add("Id", Guid.NewGuid().ToString());
    message.Add("BlobPath", dataUri);
    message.Add("RawDataSize", blobSizeBytes);
    message.Add("DatabaseName", db);
    message.Add("TableName", table);
    message.Add("RetainBlobOnSuccess", true);   // Do not delete the blob on success
    message.Add("FlushImmediately", true);      // Do not aggregate
    message.Add("ReportLevel", 2);              // Report failures and successes (might incur perf overhead)
    message.Add("ReportMethod", 0);             // Failures are reported to an Azure Queue

    message.Add("AdditionalProperties", new JObject(
                                            new JProperty("authorizationContext", identityToken),
                                            new JProperty("jsonMappingReference", mappingRef),
                                            // Data is in JSON format
                                            new JProperty("format", "json")));
    return message.ToString();
}

将 Azure 数据资源管理器引入消息发布到 Azure 数据资源管理器引入队列Post the Azure Data Explorer ingestion message to the Azure Data Explorer ingestion queue

最后,将你构造的消息发布到你从 Azure 数据资源管理器获取的选定引入队列。Finally, post the message that you constructed, to the selected ingestion queue that you obtained from Azure Data Explorer.

备注

默认情况下,低于 v12 的 .Net 存储客户端版本将消息编码为 base64。有关详细信息,请参阅存储文档。如果使用的是 v12 以上的 .Net 存储客户端版本,则必须正确编码消息内容。.Net storage client versions below v12, by default, encode the message to base64 For more information, see storage docs. If you are using .Net storage client versions above v12, you must properly encode the message content.

internal static void PostMessageToQueue(string queueUriWithSas, string message)
{
    CloudQueue queue = new CloudQueue(new Uri(queueUriWithSas));
    CloudQueueMessage queueMessage = new CloudQueueMessage(message);

    queue.AddMessage(queueMessage, null, null, null, null);
}

检查是否有来自 Azure 队列的错误消息Check for error messages from the Azure queue

在引入后,我们将检查数据管理向其中写入数据的相关队列是否返回了失败消息。After ingestion, we check for failure messages from the relevant queue that the Data Management writes to. 有关失败消息结构的详细信息,请参阅引入失败消息结构For more information on the failure message structure, see Ingestion failure message structure.

internal static IEnumerable<string> PopTopMessagesFromQueue(string queueUriWithSas, int count)
{
    List<string> messages = Enumerable.Empty<string>().ToList();
    CloudQueue queue = new CloudQueue(new Uri(queueUriWithSas));
    var messagesFromQueue = queue.GetMessages(count);
    foreach (var m in messagesFromQueue)
    {
        messages.Add(m.AsString);
        queue.DeleteMessage(m);
    }

    return messages;
}

引入消息 - JSON 文档格式Ingestion messages - JSON document formats

引入消息内部结构Ingestion message internal structure

Kusto 数据管理服务预期从输入 Azure 队列中读取的消息是采用以下格式的 JSON 文档。The message that the Kusto Data Management service expects to read from the input Azure Queue is a JSON document in the following format.

{
    "Id" : "<Id>",
    "BlobPath" : "https://<AccountName>.blob.core.chinacloudapi.cn/<ContainerName>/<PathToBlob>?<SasToken>",
    "RawDataSize" : "<RawDataSizeInBytes>",
    "DatabaseName": "<DatabaseName>",
    "TableName" : "<TableName>",
    "RetainBlobOnSuccess" : "<RetainBlobOnSuccess>",
    "FlushImmediately": "<true|false>",
    "ReportLevel" : <0-Failures, 1-None, 2-All>,
    "ReportMethod" : <0-Queue, 1-Table>,
    "AdditionalProperties" : { "<PropertyName>" : "<PropertyValue>" }
}
属性Property 说明Description
IDId 消息标识符 (GUID)Message identifier (GUID)
BlobPathBlobPath Blob 的路径 (URI),包括向 Azure 数据资源管理器授予读取/写入/删除它的权限的 SAS 密钥。Path (URI) to the blob, including the SAS key granting Azure Data Explorer permissions to read/write/delete it. Azure 数据资源管理器必须具有相关权限才能在完成数据引入后删除 blobPermissions are required so that Azure Data Explorer can delete the blob once it has completed ingesting the data
RawDataSizeRawDataSize 解压缩的数据的大小(字节)。Size of the uncompressed data in bytes. 提供此值将使得 Azure 数据资源管理器能够通过聚合多个 blob(在可能情况下)来优化引入。Providing this value enables Azure Data Explorer to optimize ingestion by potentially aggregating multiple blobs. 此属性是可选的,但如果未提供,则 Azure 数据资源管理器将访问 blob 来检索该大小This property is optional, but if not given, Azure Data Explorer will access the blob just to retrieve the size
DatabaseNameDatabaseName 目标数据库名称Target database name
TableNameTableName 目标表名称Target table name
RetainBlobOnSuccessRetainBlobOnSuccess 如果设置为 true,则成功完成引入后,不会删除 blob。If set to true, the blob won't be deleted once ingestion is successfully completed. 默认为 falseDefault is false
FlushImmediatelyFlushImmediately 如果设置为 true,则会跳过任何聚合。If set to true, any aggregation will be skipped. 默认为 falseDefault is false
ReportLevelReportLevel 成功/错误报告级别:0 - 失败、1 - 无、2 - 全部Success/Error reporting level: 0-Failures, 1-None, 2-All
ReportMethodReportMethod 报告机制:0 - 队列、1 - 表Reporting mechanism: 0-Queue, 1-Table
AdditionalPropertiesAdditionalProperties 其他属性,例如 formattagscreationTimeAdditional properties such as format, tags, and creationTime. 有关详细信息,请参阅数据引入属性For more information, see data ingestion properties.

引入失败消息结构Ingestion failure message structure

数据管理预期从输入 Azure 队列中读取的消息需要是采用以下格式的 JSON 文档。The message that the Data Management expects to read from the input Azure Queue is a JSON document in the following format.

属性Property 说明Description
OperationIdOperationId 可用于在服务端跟踪操作的操作标识符 (GUID)Operation identifier (GUID) that can be used to track the operation on the service side
数据库Database 目标数据库名称Target database name
Table 目标表名称Target table name
FailedOnFailedOn 失败时间戳Failure timestamp
IngestionSourceIdIngestionSourceId GUID,用于标识 Azure 数据资源管理器未能引入的数据块GUID identifying the data chunk that Azure Data Explorer failed to ingest
IngestionSourcePathIngestionSourcePath Azure 数据资源管理器未能引入的数据块的路径 (URI)Path (URI) to the data chunk that Azure Data Explorer failed to ingest
详细信息Details 失败消息Failure message
ErrorCodeErrorCode Azure 数据资源管理器错误代码(请在此处查看所有错误代码)Azure Data Explorer error code (see all the error codes here)
FailureStatusFailureStatus 指示失败是永久性的还是暂时性的Indicates whether the failure is permanent or transient
RootActivityIdRootActivityId 可用于在服务端跟踪操作的 Azure 数据资源管理器相关标识符 (GUID)Azure Data Explorer correlation identifier (GUID) that can be used to track the operation on the service side
OriginatesFromUpdatePolicyOriginatesFromUpdatePolicy 指示失败是否由错误的事务性更新策略导致Indicates whether the failure was caused by an erroneous transactional update policy
ShouldRetryShouldRetry 指示如果按原样重试引入是否可以成功Indicates whether the ingestion could succeed if retried as is