Azure Cosmos DB 更改源处理器Change feed processor in Azure Cosmos DB

更改源处理器是 Azure Cosmos DB SDK V3 的一部分。The change feed processor is part of the Azure Cosmos DB SDK V3. 它简化了读取更改源的过程,可有效地在多个使用者之间分布事件处理。It simplifies the process of reading the change feed and distribute the event processing across multiple consumers effectively.

更改源处理器库的主要优点是其容错行为,可保证更改源中的所有事件交付“至少一次”。The main benefit of change feed processor library is its fault-tolerant behavior that assures an "at-least-once" delivery of all the events in the change feed.

更改源处理器的组件Components of the change feed processor

实现更改源处理器需要四个主要组件:There are four main components of implementing the change feed processor:

  1. 监视的容器: 监视的容器是用于生成更改源的数据。The monitored container: The monitored container has the data from which the change feed is generated. 对受监视的容器的任何插入和更新都会反映在容器的更改源中。Any inserts and updates to the monitored container are reflected in the change feed of the container.

  2. 租约容器: 租用容器充当状态存储并协调处理跨多个辅助角色的更改源。The lease container: The lease container acts as a state storage and coordinates processing the change feed across multiple workers. 租用容器可以与受监视的容器存储在同一帐户中,也可以存储在单独的帐户中。The lease container can be stored in the same account as the monitored container or in a separate account.

  3. 主机:主机是使用更改源处理器侦听更改的应用程序实例。The host: A host is an application instance that uses the change feed processor to listen for changes. 具有相同租用配置的多个实例可以并行运行,但是每个实例应具有不同的实例名称。Multiple instances with the same lease configuration can run in parallel, but each instance should have a different instance name.

  4. 委托:委托是用于定义开发人员要对更改源处理器读取的每一批更改执行何种操作的代码。The delegate: The delegate is the code that defines what you, the developer, want to do with each batch of changes that the change feed processor reads.

若要进一步了解更改源处理器的四个元素是如何协同工作的,请看下图中的一个示例。To further understand how these four elements of change feed processor work together, let's look at an example in the following diagram. 受监视的容器会存储文档,并将“City”用作分区键。The monitored container stores documents and uses 'City' as the partition key. 我们发现分区键值分布在包含项的范围内。We see that the partition key values are distributed in ranges that contain items. 有两个主机实例,更改源处理器向每个实例分配不同范围的分区键值,以最大程度地提高计算分布率。There are two host instances and the change feed processor is assigning different ranges of partition key values to each instance to maximize compute distribution. 每个范围都是并行读取的,其进程的维护独立于租用容器中的其他范围。Each range is being read in parallel and its progress is maintained separately from other ranges in the lease container.

更改源处理器示例

实现更改源处理器Implementing the change feed processor

入口点始终是被监视的容器,来自你调用 GetChangeFeedProcessorBuilderContainer 实例:The point of entry is always the monitored container, from a Container instance you call GetChangeFeedProcessorBuilder:

/// <summary>
/// Start the Change Feed Processor to listen for changes and process them with the HandleChangesAsync implementation.
/// </summary>
private static async Task<ChangeFeedProcessor> StartChangeFeedProcessorAsync(
    CosmosClient cosmosClient,
    IConfiguration configuration)
{
    string databaseName = configuration["SourceDatabaseName"];
    string sourceContainerName = configuration["SourceContainerName"];
    string leaseContainerName = configuration["LeasesContainerName"];

    Container leaseContainer = cosmosClient.GetContainer(databaseName, leaseContainerName);
    ChangeFeedProcessor changeFeedProcessor = cosmosClient.GetContainer(databaseName, sourceContainerName)
        .GetChangeFeedProcessorBuilder<ToDoItem>(processorName: "changeFeedSample", HandleChangesAsync)
            .WithInstanceName("consoleHost")
            .WithLeaseContainer(leaseContainer)
            .Build();

    Console.WriteLine("Starting Change Feed Processor...");
    await changeFeedProcessor.StartAsync();
    Console.WriteLine("Change Feed Processor started.");
    return changeFeedProcessor;
}

其中,第一个参数是描述此处理器的目标的唯一名称,第二个参数是要处理更改的委托实现。Where the first parameter is a distinct name that describes the goal of this processor and the second name is the delegate implementation that will handle changes.

委托示例如下:An example of a delegate would be:

/// <summary>
/// The delegate receives batches of changes as they are generated in the change feed and can process them.
/// </summary>
static async Task HandleChangesAsync(IReadOnlyCollection<ToDoItem> changes, CancellationToken cancellationToken)
{
    Console.WriteLine("Started handling changes...");
    foreach (ToDoItem item in changes)
    {
        Console.WriteLine($"Detected operation for item with id {item.id}, created at {item.creationTime}.");
        // Simulate some asynchronous operation
        await Task.Delay(10);
    }

    Console.WriteLine("Finished handling changes.");
}

最后,使用 WithInstanceName 定义此处理器实例的名称,它是用于维护具有 WithLeaseContainer 的租用状态的容器。Finally you define a name for this processor instance with WithInstanceName and which is the container to maintain the lease state with WithLeaseContainer.

调用 Build 可让你获得可通过调用 StartAsync 启动的处理器实例。Calling Build will give you the processor instance that you can start by calling StartAsync.

处理生命周期Processing life cycle

主机实例的正常生命周期为:The normal life cycle of a host instance is:

  1. 读取更改源。Read the change feed.
  2. 如果没有发生更改,请在一段预定义的时间内保持睡眠状态(可在 Builder 中使用 WithPollInterval 进行自定义),然后转到 #1。If there are no changes, sleep for a predefined amount of time (customizable with WithPollInterval in the Builder) and go to #1.
  3. 如果发生了更改,请将其发送给委托。If there are changes, send them to the delegate.
  4. 委托成功地处理更改后,以最新的处理时间点更新租用存储,然后转到 #1。When the delegate finishes processing the changes successfully, update the lease store with the latest processed point in time and go to #1.

错误处理。Error handling

更改源处理器可在发生用户代码错误后复原。The change feed processor is resilient to user code errors. 这意味着,如果委托实现具有未经处理的异常(步骤 #4),则将停止处理特定更改批次的线程,并将创建一个新线程。That means that if your delegate implementation has an unhandled exception (step #4), the thread processing that particular batch of changes will be stopped, and a new thread will be created. 新线程将检查租赁存储在该分区键值范围内的最新时间点,并从该时间点重启,从而有效地向委托发送同一批更改。The new thread will check which was the latest point in time the lease store has for that range of partition key values, and restart from there, effectively sending the same batch of changes to the delegate. 此行为一直持续到委托能正确处理更改为止,这也是更改源处理器能够提供“至少一次”保证的原因,因为如果委托代码引发异常,它将重试该批次。This behavior will continue until your delegate processes the changes correctly and it's the reason the change feed processor has an "at least once" guarantee, because if the delegate code throws an exception, it will retry that batch.

若要防止更改源处理器不断地重试同一批更改,应在委托代码中添加逻辑,以便在出现异常时将文档写入死信队列。To prevent your change feed processor from getting "stuck" continuously retrying the same batch of changes, you should add logic in your delegate code to write documents, upon exception, to a dead-letter queue. 此设计可确保你可以跟踪未处理的更改,同时仍然能够继续处理将来的更改。This design ensures that you can keep track of unprocessed changes while still being able to continue to process future changes. 死信队列可能只是另一个 Cosmos 容器。The dead-letter queue might simply be another Cosmos container. 确切的数据存储并不重要,只是未处理的更改会被保留。The exact data store does not matter, simply that the unprocessed changes are persisted.

此外,还可以使用更改源估算器在更改源处理器实例读取更改源时监视其进度。In addition, you can use the change feed estimator to monitor the progress of your change feed processor instances as they read the change feed. 除了监视更改源处理器是否“卡在”持续重试同一批更改外,还可了解更改源处理器是否因可用资源(如 CPU、内存和网络带宽)而滞后。In addition to monitoring if the change feed processor gets "stuck" continuously retrying the same batch of changes, you can also understand if your change feed processor is lagging behind due to available resources like CPU, memory, and network bandwidth.

部署单元Deployment unit

单个更改源处理器部署单元由一个或多个具有相同 processorName 和租用容器配置的实例组成。A single change feed processor deployment unit consists of one or more instances with the same processorName and lease container configuration. 可以有多个部署单元,其中每个单元可以具有不同的更改业务流,且每个部署单元可由一个或多个实例组成。You can have many deployment units where each one has a different business flow for the changes and each deployment unit consisting of one or more instances.

例如,你可能有一个部署单元,只要容器发生更改,该部署单元就会触发外部 API。For example, you might have one deployment unit that triggers an external API anytime there is a change in your container. 另一个部署单元可能会在每次发生更改时实时移动数据。Another deployment unit might move data, in real-time, each time there is a change. 当被监视的容器中发生更改时,所有部署单元都会收到通知。When a change happens in your monitored container, all your deployment units will get notified.

动态缩放Dynamic scaling

如前所述,部署单元中可以有一个或多个实例。As mentioned before, within a deployment unit you can have one or more instances. 若要充分利用部署单元内的计算分布,只需满足以下关键要求:To take advantage of the compute distribution within the deployment unit, the only key requirements are:

  1. 所有实例应具有相同的租用容器配置。All instances should have the same lease container configuration.
  2. 所有实例都应具有相同的 processorNameAll instances should have the same processorName.
  3. 每个实例都需要具有不同的实例名称 (WithInstanceName)。Each instance needs to have a different instance name (WithInstanceName).

如果符合这三个条件,更改源处理器将使用均等分布算法,将租用容器中的所有租约分布到所有正在运行的实例,并将计算并行化。If these three conditions apply, then the change feed processor will, using an equal distribution algorithm, distribute all the leases in the lease container across all running instances of that deployment unit and parallelize compute. 在给定的时间,一个租约只能归一个实例所有,因此,最大实例数等于租约数。One lease can only be owned by one instance at a given time, so the maximum number of instances equals to the number of leases.

实例数可以增加和减少,更改源处理器会通过相应地重新分配来动态调整负载。The number of instances can grow and shrink, and the change feed processor will dynamically adjust the load by redistributing accordingly.

而且,由于吞吐量或存储增加,更改源处理器还可以动态调整到容器规模。Moreover, the change feed processor can dynamically adjust to containers scale due to throughput or storage increases. 当容器增多时,更改源处理器会以透明方式应对这种情况,方法是动态增加租约并将新的租约分布到现有实例。When your container grows, the change feed processor transparently handles these scenarios by dynamically increasing the leases and distributing the new leases among existing instances.

更改源和预配吞吐量Change feed and provisioned throughput

消耗的 RU 会产生费用,将数据移入和移出 Cosmos 容器始终会消耗 RU。You are charged for RUs consumed, since data movement in and out of Cosmos containers always consumes RUs. 租约容器消耗的 RU 也会产生费用。You are charged for RUs consumed by the lease container.

其他资源Additional resources

后续步骤Next steps

现在,可以通过以下文章继续详细了解更改源处理器:You can now proceed to learn more about change feed processor in the following articles: