Azure Cosmos DB 更改源处理器

适用于: SQL API

更改源处理器是 Azure Cosmos DB SDK V3 的一部分。 它简化了读取更改源的过程,可有效地在多个使用者之间分布事件处理。

更改源处理器库的主要优点是其容错行为,可保证更改源中的所有事件交付“至少一次”。

更改源处理器的组件

实现更改源处理器需要四个主要组件:

  1. 监视的容器: 监视的容器是用于生成更改源的数据。 对受监视的容器的任何插入和更新都会反映在容器的更改源中。

  2. 租约容器: 租用容器充当状态存储并协调处理跨多个辅助角色的更改源。 租用容器可以与受监视的容器存储在同一帐户中,也可以存储在单独的帐户中。

  3. 计算实例:计算实例承载更改源处理器以便侦听更改。 根据平台不同,它可以由 VM、kubernetes Pod、Azure 应用服务实例、实际的物理计算机来表示。 在本文中,它具有被引用为实例名称的唯一标识符。

  4. 委托:委托是用于定义开发人员要对更改源处理器读取的每一批更改执行何种操作的代码。

若要进一步了解更改源处理器的四个元素是如何协同工作的,请看下图中的一个示例。 受监视的容器会存储文档,并将“City”用作分区键。 我们发现分区键值分布在包含项的范围内(每个范围表示一个物理分区)。 有两个计算实例,并且更改源处理器向每个实例分配不同的范围,以最大程度地提高计算分布率,每个实例都有独一无二且各不相同的名称。 每个范围都是并行读取的,其进程的维护通过租用文档独立于租用容器中的其他范围。 租用的组合表示更改源处理器的当前状态。

Change feed processor example

实现更改源处理器

入口点始终是被监视的容器,来自你调用 GetChangeFeedProcessorBuilderContainer 实例:

/// <summary>
/// Start the Change Feed Processor to listen for changes and process them with the HandleChangesAsync implementation.
/// </summary>
private static async Task<ChangeFeedProcessor> StartChangeFeedProcessorAsync(
    CosmosClient cosmosClient,
    IConfiguration configuration)
{
    string databaseName = configuration["SourceDatabaseName"];
    string sourceContainerName = configuration["SourceContainerName"];
    string leaseContainerName = configuration["LeasesContainerName"];

    Container leaseContainer = cosmosClient.GetContainer(databaseName, leaseContainerName);
    ChangeFeedProcessor changeFeedProcessor = cosmosClient.GetContainer(databaseName, sourceContainerName)
        .GetChangeFeedProcessorBuilder<ToDoItem>(processorName: "changeFeedSample", onChangesDelegate: HandleChangesAsync)
            .WithInstanceName("consoleHost")
            .WithLeaseContainer(leaseContainer)
            .Build();

    Console.WriteLine("Starting Change Feed Processor...");
    await changeFeedProcessor.StartAsync();
    Console.WriteLine("Change Feed Processor started.");
    return changeFeedProcessor;
}

其中,第一个参数是描述此处理器的目标的唯一名称,第二个参数是要处理更改的委托实现。

委托示例如下:

/// <summary>
/// The delegate receives batches of changes as they are generated in the change feed and can process them.
/// </summary>
static async Task HandleChangesAsync(
    ChangeFeedProcessorContext context,
    IReadOnlyCollection<ToDoItem> changes,
    CancellationToken cancellationToken)
{
    Console.WriteLine($"Started handling changes for lease {context.LeaseToken}...");
    Console.WriteLine($"Change Feed request consumed {context.Headers.RequestCharge} RU.");
    // SessionToken if needed to enforce Session consistency on another client instance
    Console.WriteLine($"SessionToken ${context.Headers.Session}");

    // We may want to track any operation's Diagnostics that took longer than some threshold
    if (context.Diagnostics.GetClientElapsedTime() > TimeSpan.FromSeconds(1))
    {
        Console.WriteLine($"Change Feed request took longer than expected. Diagnostics:" + context.Diagnostics.ToString());
    }

    foreach (ToDoItem item in changes)
    {
        Console.WriteLine($"Detected operation for item with id {item.id}, created at {item.creationTime}.");
        // Simulate some asynchronous operation
        await Task.Delay(10);
    }

    Console.WriteLine("Finished handling changes.");
}

此后,你使用 WithInstanceName 来定义计算实例名称或唯一标识符,这在各个你要部署的计算实例中应该是独一无二并且各不相同的,最后它通过 WithLeaseContainer 成为用于维护租用状态的容器。

调用 Build 可让你获得可通过调用 StartAsync 启动的处理器实例。

处理生命周期

主机实例的正常生命周期为:

  1. 读取更改源。
  2. 如果没有发生更改,请在一段预定义的时间内保持睡眠状态(可在 Builder 中使用 WithPollInterval 进行自定义),然后转到 #1。
  3. 如果发生了更改,请将其发送给委托。
  4. 委托成功地处理更改后,以最新的处理时间点更新租用存储,然后转到 #1。

错误处理。

更改源处理器可在发生用户代码错误后复原。 这意味着,如果委托实现具有未经处理的异常(步骤 #4),则将停止处理特定更改批次的线程,并将创建一个新线程。 新线程将检查租赁存储在该分区键值范围内的最新时间点,并从该时间点重启,从而有效地向委托发送同一批更改。 此行为将一直持续到委托正确处理完更改为止,这也是更改源处理器能够提供“至少一次”保证的原因。

注意

只有一种情况不会重试一批更改。 如果第一次执行委托时发生故障,则租用存储没有以前保存的状态可用于重试。 在这种情况下,重试将使用初始启动配置,该配置可能包含也可能不包含最后一批次。

若要防止更改源处理器不断地重试同一批更改,应在委托代码中添加逻辑,以便在出现异常时将文档写入死信队列。 此设计可确保你可以跟踪未处理的更改,同时仍然能够继续处理将来的更改。 死信队列可能是另一个 Cosmos 容器。 确切的数据存储并不重要,只是未处理的更改会被保留。

此外,还可使用更改源估算器在更改源处理器实例读取更改源时监视其进度,或使用生命周期通知来检测潜在故障。

生命周期通知

利用更改源处理器,可以在其生命周期内连接相关事件,你可选择接受一个事件或所有事件的通知。 建议至少注册错误通知:

  • WithLeaseAcquireNotification 注册处理程序,以便在当前主机获得租约开始处理它时收到通知。
  • WithLeaseReleaseNotification 注册处理程序,以便在当前主机释放租约并停止处理它时收到通知。
  • WithErrorNotification 注册处理程序,以便当前主机在处理过程中遇到异常时收到通知,从而能够区分异常是源于用户委托(未经处理的异常)还是处理器在试图访问被监视的容器时遇到的错误(例如,网络问题)。
Container.ChangeFeedMonitorLeaseAcquireDelegate onLeaseAcquiredAsync = (string leaseToken) =>
{
    Console.WriteLine($"Lease {leaseToken} is acquired and will start processing");
    return Task.CompletedTask;
};

Container.ChangeFeedMonitorLeaseReleaseDelegate onLeaseReleaseAsync = (string leaseToken) =>
{
    Console.WriteLine($"Lease {leaseToken} is released and processing is stopped");
    return Task.CompletedTask;
};

Container.ChangeFeedMonitorErrorDelegate onErrorAsync = (string LeaseToken, Exception exception) =>
{
    if (exception is ChangeFeedProcessorUserException userException)
    {
        Console.WriteLine($"Lease {LeaseToken} processing failed with unhandled exception from user delegate {userException.InnerException}");
    }
    else
    {
        Console.WriteLine($"Lease {LeaseToken} failed with {exception}");
    }

    return Task.CompletedTask;
};

ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedNotifications", handleChanges)
        .WithLeaseAcquireNotification(onLeaseAcquiredAsync)
        .WithLeaseReleaseNotification(onLeaseReleaseAsync)
        .WithErrorNotification(onErrorAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .Build();

部署单元

单个更改源处理器部署单元包含一个或多个具有相同 processorName 和租用容器配置但实例名称各不相同的计算实例。 可以有多个部署单元,其中每个单元可以具有不同的更改业务流,且每个部署单元可由一个或多个实例组成。

例如,你可能有一个部署单元,只要容器发生更改,该部署单元就会触发外部 API。 另一个部署单元可能会在每次发生更改时实时移动数据。 当被监视的容器中发生更改时,所有部署单元都会收到通知。

动态缩放

如前所述,在某个部署单元中,可以有一个或多个计算实例。 若要充分利用部署单元内的计算分布,只需满足以下关键要求:

  1. 所有实例应具有相同的租用容器配置。
  2. 所有实例都应具有相同的 processorName
  3. 每个实例都需要具有不同的实例名称 (WithInstanceName)。

如果符合这三个条件,更改源处理器将使用均等分布算法,将租用容器中的所有租约分布到所有正在运行的实例,并将计算并行化。 在给定的时间,一个租约只能归一个实例所有,因此,最大实例数等于租约数。

实例数可以增加和减少,更改源处理器会通过相应地重新分配来动态调整负载。

而且,由于吞吐量或存储增加,更改源处理器还可以动态调整到容器规模。 当容器增多时,更改源处理器会以透明方式应对这种情况,方法是动态增加租约并将新的租约分布到现有实例。

更改源和预配吞吐量

针对受监视容器的更改源读取操作将消耗请求单位。 请确保受监视的容器未遇到限制,否则会在处理器上接收更改源事件时遇到延迟。

租用容器上的操作(更新和维护状态)将使用请求单位。 使用相同租用容器的实例数越多,潜在的请求单位消耗量就越高。 请确保租用容器未遇到限制,否则在接收处理器上接收更改源事件时会遇到延迟。在某些情况下,如果限制较高,处理器可能会完全停止处理。

开始时间

默认情况下,首次启动更改源处理器时,它将初始化租用容器,并开始其处理生命周期。 不会检测到首次初始化更改源处理器之前在受监视容器中发生的任何更改。

从以前的某个日期和时间读取

DateTime 的实例传递给 WithStartTime 生成器扩展,可将更改源处理器初始化为从特定的日期和时间开始读取更改:

Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedTime", Program.HandleChangesAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .WithStartTime(particularPointInTime)
        .Build();

更改源处理器将根据该特定日期和时间初始化,并开始读取此日期和时间之后发生的更改。

注意

多区域写入帐户不支持在特定日期和时间启动更改源处理器。

从头开始读取

在其他方案(例如数据迁移,或分析容器的整个历史记录)中,需要从该容器的生存期开始时间读取更改源。 为此,可以在生成器扩展中使用 WithStartTime,但需要传递 DateTime.MinValue.ToUniversalTime(),以便生成最小 DateTime 值的 UTC 表示形式,如下所示:

Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedBeginning", Program.HandleChangesAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .WithStartTime(DateTime.MinValue.ToUniversalTime())
        .Build();

更改源处理器将会初始化,并从容器生存期的开始时间读取更改。

注意

这些自定义选项仅用于设置更改源处理器的起始时间点。 首次初始化租约容器后,更改这些选项不起作用。

共享租用容器

可在多个部署单元之间共享租用容器,每个部署单元将侦听不同的受监视容器或具有不同的 processorName。 使用此配置,每个部署单元都将在租用容器上保持独立状态。 查看租用容器上的请求单位消耗,确保预配的吞吐量足以满足所有部署单元的需求。

托管更改源处理器的位置

更改源处理器可以托管在任何支持长时间运行的进程或任务的平台中:

  • 连续运行的 Azure WebJob。

  • Azure 虚拟机中的进程。

  • Azure Kubernetes 服务中的后台作业。

  • ASP.NET 托管服务

虽然更改源处理器可以在生存期较短的环境中运行,但由于租用容器会对状态进行维护,这些环境的启动周期会导致接收通知的延迟增加(因为每次启动环境时存在启动处理器的开销)。

其他资源

后续步骤

现在,可以通过以下文章继续详细了解更改源处理器: