Kusto.Ingest 引入状态报告Kusto.Ingest ingestion status reporting

本文介绍了如何使用 IKustoQueuedIngestClient 功能来跟踪引入请求的状态。This article explains how to use IKustoQueuedIngestClient features to track the status of an ingestion request.

说明类Description classes

这些说明类包含有关要引入的源数据的重要详细信息,应在引入操作中使用。These description classes contain important details about the source data to be ingested, and should be used in the ingestion operation.

  • SourceDescriptionSourceDescription
  • DataReaderDescriptionDataReaderDescription
  • StreamDescriptionStreamDescription
  • FileDescriptionFileDescription
  • BlobDescriptionBlobDescription

这些类全部派生自抽象类 SourceDescription,用于实例化每个数据源的唯一标识符。The classes are all derived from the abstract class SourceDescription, and they're used to instantiate a unique identifier for each data source. 然后,系统会使用每个标识符进行状态跟踪并将其显示在与相关操作相关的所有报告、跟踪和异常中。Each identifier will then be used for status tracking and will show up in all reports, traces, and exceptions related to the relevant operation.

Class SourceDescriptionClass SourceDescription

大型数据集将拆分为 1GB 的区块,每个部分将单独引入。Large datasets will be split into 1GB chunks, and each part will be ingested separately. 然后,系统会将同一 SourceId 应用于源自同一数据集的所有引入操作。The same SourceId will then apply to all ingest operations originated from the same dataset.

public abstract class SourceDescription
    public Guid? SourceId { get; set; }

Class DataReaderDescriptionClass DataReaderDescription

public class DataReaderDescription : SourceDescription
    public  IDataReader DataReader { get; set; }

Class StreamDescriptionClass StreamDescription

public class StreamDescription : SourceDescription
    public Stream Stream { get; set; }

Class FileDescriptionClass FileDescription

public class FileDescription : SourceDescription
    public string FilePath { get; set; }

Class BlobDescriptionClass BlobDescription

public class BlobDescription : SourceDescription
    public string BlobUri { get; set; }
    // Setting the Blob size here is important, as this saves the client the trouble of probing the blob for size
    public long? BlobSize { get; set; }

引入结果表示形式Ingestion result representation

Interface IKustoIngestionResultInterface IKustoIngestionResult

此接口捕获单个排队引入操作的结果,可通过 SourceId 来检索。This interface captures the result of a single queued ingestion operation and can be retrieved by SourceId.

public interface IKustoIngestionResult
    // Retrieves the detailed ingestion status of the ingestion source with the given sourceId.
    IngestionStatus GetIngestionStatusBySourceId(Guid sourceId);

    // Retrieves the detailed ingestion status of all data ingestion operations into Kusto associated with this IKustoIngestionResult instance.
    IEnumerable<IngestionStatus> GetIngestionStatusCollection();

Class IngestionStatusClass IngestionStatus

IngestionStatus 包含单个引入操作的完整状态。IngestionStatus contains a complete status of a single ingestion operation.

public class IngestionStatus
    // The ingestion status returns from the service. Status remains 'Pending' during the ingestion process and
    // is updated by the service once the ingestion completes. When <see cref="IngestionReportMethod"/> is set to 'Queue' the ingestion status
    // will always be 'Queued' and the caller needs to query the report queues for ingestion status, as configured. To query statuses that were
    // reported to queue, see: <see href="https://docs.azure.cn/kusto/api/netfx/kusto-ingest-client-status#ingestion-status-in-azure-queue"/>.
    // When <see cref="IngestionReportMethod"/> is set to 'Table', call <see cref="IKustoIngestionResult.GetIngestionStatusBySourceId"/> or
    // <see cref="IKustoIngestionResult.GetIngestionStatusCollection"/> to retrieve the most recent ingestion status.
    public Status Status { get; set; }
    // A unique identifier representing the ingested source. Can be supplied during the ingestion execution.
    public Guid IngestionSourceId { get; set; }
    // The URI of the blob, potentially including the secret needed to access the blob.
    // This can be a filesystem URI (on-premises deployments only), or an Azure Blob Storage URI (including a SAS key or a semicolon followed by the account key)
    public string IngestionSourcePath { get; set; }
    // The name of the database holding the target table.
    public string Database { get; set; }
    // The name of the target table into which the data will be ingested.
    public string Table { get; set; }
    // The last updated time of the ingestion status.
    public DateTime UpdatedOn { get; set; }
    // The ingestion's operation Id.
    public Guid OperationId { get; set; }
    // The ingestion operation activity Id.
    public Guid ActivityId { get; set; }
    // In case of a failure - indicates the failure's error code.
    public IngestionErrorCode ErrorCode { get; set; }
    // In case of a failure - indicates the failure's status.
    public FailureStatus FailureStatus { get; set; }
    // In case of a failure - indicates the failure's details.
    public string Details { get; set; }
    // In case of a failure - indicates whether or not the failures originate from an Update Policy.
    public bool OriginatesFromUpdatePolicy { get; set; }

状态枚举Status enumeration

ValueValue 含义Meaning 临时/永久Temporary/Permanent
挂起Pending 此值在引入过程中可能会发生变化,具体取决于引入操作的结果The value may change during the course of ingestion, based on the outcome of the ingestion operation 临时Temporary
已成功Succeeded 数据已成功引入The data has been successfully ingested 永久性Permanent
已失败Failed 引入失败Ingestion failed 永久性Permanent
已排队Queued 数据已排队等待引入The data has been queued for ingestion 永久性Permanent
已跳过Skipped 未提供任何数据,已跳过引入操作No data was supplied and the ingest operation was skipped 永久性Permanent
PartiallySucceededPartiallySucceeded 部分数据引入成功,部分数据引入失败Part of the data was successfully ingested, while some failed 永久性Permanent

跟踪引入状态 (KustoQueuedIngestClient)Tracking Ingestion Status (KustoQueuedIngestClient)

IKustoQueuedIngestClient 是一种“即发即弃”客户端。IKustoQueuedIngestClient is a 'fire-and-forget' client. 客户端上的引入操作将消息发布到 Azure 队列便结束了。The ingestion operation on the client side ends by posting a message to an Azure queue. 在发布后,客户端作业完成。After the posting, the client job is done. 为了方便客户端用户,KustoQueuedIngestClient 提供了一种用于跟踪各个引入状态的机制。For the client user's convenience, KustoQueuedIngestClient provides a mechanism for tracking the individual ingestion status. 此机制不适合在高吞吐量引入管道上大量使用。This mechanism isn't intended for mass usage on high-throughput ingestion pipelines. 此机制适用于在速率相对较低且跟踪要求严格的情况下进行精确引入。This mechanism is for precision ingestion when the rate is relatively low and the tracking requirements are strict.


应避免为大容量数据流的每个引入请求启用主动通知,因为这会给基础 xStore 资源带来极大的负载,因此可能会导致引入延迟增加,甚至可能会导致群集根本不响应。Turning on positive notifications for every ingestion request for large volume data streams should be avoided, since this places an extreme load on the underlying xStore resources, which might lead to increased ingestion latency and even complete cluster non-responsiveness.

以下属性(在 KustoQueuedIngestionProperties 上设置的)控制引入成功通知或失败通知的级别和传输。The following properties (set on KustoQueuedIngestionProperties) control the level and transport for ingestion success or failure notifications.

IngestionReportLevel 枚举IngestionReportLevel enumeration

public enum IngestionReportLevel
    FailuresOnly = 0,

IngestionReportMethod 枚举IngestionReportMethod enumeration

public enum IngestionReportMethod
    Queue = 0,

若要跟踪引入状态,请为执行引入操作时使用的 IKustoQueuedIngestClient 提供以下内容:To track the status of your ingestion, provide the following to the IKustoQueuedIngestClient that you do the ingest operation with:

  1. IngestionReportLevel 属性设置为所需的报告级别。Set IngestionReportLevelproperty to the required level of report. FailuresOnly(默认值)或 FailuresAndSuccessesEither FailuresOnly (which is the default value) or FailuresAndSuccesses. 如果设置为 None,则在引入结束时不会报告任何内容。When set to None, nothing will be reported at the end of the ingestion.
  2. 指定 IngestionReportMethod - QueueTablebothSpecify the IngestionReportMethod - Queue, Table, or both.

可在 Kusto.Ingest 示例页上找到用法示例。A usage example can be found on the Kusto.Ingest Examples page.

Azure 表中的引入状态Ingestion status in the Azure table

从每个引入操作返回的 IKustoIngestionResult 接口包含可用于查询引入状态的函数。The IKustoIngestionResult interface that is returned from each ingest operation contains functions that can be used to query the status of the ingestion. 请特别注意返回的 IngestionStatus 对象的 Status 属性:Pay special attention to the Status property of the returned IngestionStatus objects:

  • Pending 表示源已排队等待引入,但尚未更新。Pending indicates that the source has been queued for ingestion and is yet to be updated. 请再次使用该函数查询源的状态Use the function again to query the status of the source
  • Succeeded 表示已成功引入源Succeeded indicates that the source has been ingested successfully
  • Failed 表示未能引入源Failed indicates that the source failed to be ingested


如果得到 Queued 状态,则表示 IngestionReportMethod 保留了其默认值“Queue”。Getting a Queued status indicates that the IngestionReportMethod was left at its default value of 'Queue'. 这是永久性状态,重新调用 GetIngestionStatusBySourceIdGetIngestionStatusCollection 函数将始终产生同一状态:“Queued”。This is a permanent status and re-invoking the GetIngestionStatusBySourceId or GetIngestionStatusCollection functions, will always result in the same 'Queued' status. 若要检查 Azure 表中某个引入的状态,请在引入之前验证 KustoQueuedIngestionPropertiesIngestionReportMethod 属性是否已设置为 TableTo check the status of an ingestion in an Azure table, prior to ingesting, verify that the IngestionReportMethod property of the KustoQueuedIngestionProperties is set to Table. 如果还想将引入状态报告给队列,请将状态设置为 QueueAndTableIf you also want the ingestion status to be reported to a queue, set the status to QueueAndTable.

Azure 队列中的引入状态Ingestion status in Azure queue

IKustoIngestionResult 方法仅适用于检查 Azure 表中的状态。The IKustoIngestionResult methods are only relevant for checking a status in an Azure table. 若要查询报告给 Azure 队列的状态,请使用下面的 IKustoQueuedIngestClient 方法。To query statuses that were reported to an Azure queue, use the following IKustoQueuedIngestClient methods.

方法Method 目的Purpose
PeekTopIngestionFailuresPeekTopIngestionFailures 一种异步方法,该方法返回的信息涉及由于对所请求的消息存在限制而尚未丢弃的最早的引入失败Async method that returns information about the earliest ingestion failures that haven't already been discarded because of the limit for requested messages
GetAndDiscardTopIngestionFailuresGetAndDiscardTopIngestionFailures 一种异步方法,该方法返回并丢弃由于对所请求的消息存在限制而尚未丢弃的最早的引入失败Async method that returns and discards the earliest ingestion failures that haven't already been discarded because of the limit for requested messages
GetAndDiscardTopIngestionSuccessesGetAndDiscardTopIngestionSuccesses 一种异步方法,该方法返回并丢弃由于对所请求的消息存在限制而尚未丢弃的最早的引入成功。Async method that returns and discards the earliest ingestion successes that haven't already been discarded because of the limit for requested messages. 只有当 IngestionReportLevel 设置为 FailuresAndSuccesses 时,此方法才适用。This method is only relevant if the IngestionReportLevel is set to FailuresAndSuccesses

从 Azure 队列检索的引入失败Ingestion failures retrieved from the Azure queue

引入失败由 IngestionFailure 对象表示,该对象包含有关失败的有用信息。The ingestion failures are represented by the IngestionFailure object that contains useful information about the failure.

属性Property 含义Meaning
数据库和表Database & Table 所需的数据库名称和表名称The intended database and table names
IngestionSourcePathIngestionSourcePath 引入的 blob 的路径。The path of the ingested blob. 如果文件已引入,此项会包含原始文件名。Will contain the original file name if file is ingested. 如果 DataReader 已引入,则此项将是随机的Will be random if DataReader is ingested
FailureStatusFailureStatus Permanent(将不执行重试)、Transient(将执行重试)或 Exhausted(多次重试也都失败)Permanent (no retry will be executed), Transient (retry will be executed), or Exhausted (several retries also failed)
OperationId 和 RootActivityIdOperationId & RootActivityId 引入的操作 ID 和 RootActivity ID(用于进一步故障排除)Operation ID and RootActivity ID of the ingestion (useful for further troubleshooting)
FailedOnFailedOn 失败时间 (UTC)。UTC time of the failure. 将晚于调用引入方法的时间,因为在运行引入操作之前会聚合数据Will be greater than the time when the ingestion method was called, since the data is aggregated before running the ingestion
详细信息Details 有关失败的其他详细信息(如果有)Other details concerning the failure (if any exist)
ErrorCodeErrorCode IngestionErrorCode 枚举,表示引入错误代码(如果失败)IngestionErrorCode enumeration, represents the ingestion error code, if there was a failure