Azure Cosmos DB 中的更改源拉取模型Change feed pull model in Azure Cosmos DB

适用于: SQL API

使用更改源拉取模型,你可以按自己的节奏使用 Azure Cosmos DB 更改源。With the change feed pull model, you can consume the Azure Cosmos DB change feed at your own pace. 正如你使用更改源处理器所做的那样,你可以使用更改源拉取模型来并行处理多个更改源使用者之间的更改。As you can already do with the change feed processor, you can use the change feed pull model to parallelize the processing of changes across multiple change feed consumers.

备注

更改源拉取模型当前仅在 Azure Cosmos DB .NET SDK 中提供了预览版The change feed pull model is currently in preview in the Azure Cosmos DB .NET SDK only. 该预览版尚不可用于其他 SDK 版本。The preview is not yet available for other SDK versions.

与更改源处理器进行比较Comparing with change feed processor

许多情况下,既可以使用更改源处理器又可以使用拉取模型来处理更改源。Many scenarios can process the change feed using either the change feed processor or the pull model. 拉取模型的继续标记和更改源处理器的租约容器都是更改源中最后处理的项(或一批项)的“书签”。The pull model's continuation tokens and the change feed processor's lease container are both "bookmarks" for the last processed item (or batch of items) in the change feed.

但是,不能将继续标记转换为租赁容器(反之亦然)。However, you can't convert continuation tokens to a lease container (or vice versa).

备注

在大多数情况下,如果需要从更改源中读取数据,最简单的方法是使用更改源处理器In most cases when you need to read from the change feed, the simplest option is to use the change feed processor.

以下情况应考虑使用拉取模型:You should consider using the pull model in these scenarios:

  • 从特定的分区键读取更改Read changes from a particular partition key
  • 控制客户端接收要处理的更改的速度Control the pace at which your client receives changes for processing
  • 对更改源中的现有数据执行一次性读取,以便完成特定目标(例如,进行数据迁移)Perform a one-time read of the existing data in the change feed (for example, to do a data migration)

下面是更改源处理器与拉取模型之间的一些主要差异:Here's some key differences between the change feed processor and pull model:

功能Feature 更改源处理器Change feed processor 拉取模型Pull model
在处理更改源时跟踪当前位置Keeping track of current point in processing change feed 租赁(存储在 Azure Cosmos DB 容器中)Lease (stored in an Azure Cosmos DB container) 继续标记(存储在内存中或手动进行保存)Continuation token (stored in memory or manually persisted)
能够重播过去的更改Ability to replay past changes 是(在使用推送模型的情况下)Yes, with push model 是(在使用拉取模型的情况下)Yes, with pull model
轮询将来的更改Polling for future changes 基于用户指定的 WithPollInterval 自动检查更改Automatically checks for changes based on user-specified WithPollInterval 手动Manual
未出现新变化的行为Behavior where there are no new changes 自动等待 WithPollInterval 并重新检查Automatically wait WithPollInterval and recheck 必须捕获异常并手动重新检查Must catch exception and manually recheck
处理整个容器中的更改Process changes from entire container 是的,自动并行处理从同一容器使用更改的多个线程/机器Yes, and automatically parallelized across multiple threads/machine consuming from the same container 是,使用 FeedToken 手动并行化Yes, and manually parallelized using FeedTokens
仅处理单个分区键的更改Process changes from just a single partition key 不支持Not supported Yes
支持级别Support level 正式发布Generally available 预览Preview

备注

与使用更改源处理器进行读取不同,如果未出现新变化,需要显式处理。Unlike when reading using the change feed processor, you must explicitly handle cases where there no are no new changes.

使用整个容器的更改Consuming an entire container's changes

你可以创建一个 FeedIterator 来使用拉取模型处理更改源。You can create a FeedIterator to process the change feed using the pull model. 最初创建 FeedIterator 时,必须指定所需的 ChangeFeedStartFrom 值,该值由读取更改的起始位置和所需的 FeedRange 组成。When you initially create a FeedIterator, you must specify a required ChangeFeedStartFrom value which consists of both the starting position for reading changes as well as the desired FeedRange. FeedRange 是一系列分区键值,它指定将使用该特定 FeedIterator 从更改源中读取的项。The FeedRange is a range of partition key values and specifies the items that will be read from the change feed using that specific FeedIterator.

你还可以选择指定 ChangeFeedRequestOptions 以设置 PageSizeHintYou can optionally specify ChangeFeedRequestOptions to set a PageSizeHint. PageSizeHint 是将在单个页面中返回的最大项数。The PageSizeHint is the maximum number of items that will be returned in a single page.

FeedIterator 有两种形式。The FeedIterator comes in two flavors. 除了下述可返回实体对象的示例之外,还可以获取提供 Stream 支持的响应。In addition to the examples below that return entity objects, you can also obtain the response with Stream support. 利用流,你可以在不先将数据反序列化的情况下读取数据,从而节省客户端资源。Streams allow you to read data without having it first deserialized, saving on client resources.

下面的示例用于获取一个可返回实体对象(在本例中为 User 对象)的 FeedIteratorHere's an example for obtaining a FeedIterator that returns entity objects, in this case a User object:

FeedIterator<User> InteratorWithPOCOS = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning());

下面的示例用于获取一个可返回 StreamFeedIteratorHere's an example for obtaining a FeedIterator that returns a Stream:

FeedIterator iteratorWithStreams = container.GetChangeFeedStreamIterator<User>(ChangeFeedStartFrom.Beginning());

如果没有向 FeedIterator 提供 FeedRange,则可以按你自己的节奏处理整个容器的更改源。If you don't supply a FeedRange to a FeedIterator, you can process an entire container's change feed at your own pace. 下面的示例将从当前时间开始读取所有更改:Here's an example which starts reading all changes starting at the current time:

FeedIterator iteratorForTheEntireContainer = container.GetChangeFeedStreamIterator<User>(ChangeFeedStartFrom.Now());

while (iteratorForTheEntireContainer.HasMoreResults)
{
    try {
        FeedResponse<User> users = await iteratorForTheEntireContainer.ReadNextAsync();

        foreach (User user in users)
            {
                Console.WriteLine($"Detected change for user with id {user.id}");
            }
    }
    catch {
        Console.WriteLine($"No new changes");
        Thread.Sleep(5000);
    }
}

由于更改源实际上是包含所有后续写入和更新的项的无穷列表,因此 HasMoreResults 的值始终为 true。Because the change feed is effectively an infinite list of items encompassing all future writes and updates, the value of HasMoreResults is always true. 尝试读取更改源并且未出现新变化时,你将收到一个异常。When you try to read the change feed and there are no new changes available, you'll receive an exception. 在上述示例中,通过先等待 5 秒再重新检查更改来处理异常。In the above example, the exception is handled by waiting 5 seconds before rechecking for changes.

使用分区键的更改Consuming a partition key's changes

在某些情况下,你可能只需要处理特定分区键的更改。In some cases, you may only want to process a specific partition key's changes. 可以获取特定分区键的 FeedIterator,并采用处理整个容器的方式来处理更改。You can obtain a FeedIterator for a specific partition key and process the changes the same way that you can for an entire container.

FeedIterator<User> iteratorForPartitionKey = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(FeedRange.FromPartitionKey(new PartitionKey("PartitionKeyValue"))));

while (iteratorForThePartitionKey.HasMoreResults)
{
    try {
        FeedResponse<User> users = await iteratorForThePartitionKey.ReadNextAsync();

        foreach (User user in users)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
    catch {
        Console.WriteLine($"No new changes");
        Thread.Sleep(5000);
    }
}

使用 FeedRange 实现并行化Using FeedRange for parallelization

更改源处理器中,工作自动分布到多个使用者。In the change feed processor, work is automatically spread across multiple consumers. 在更改源拉取模型中,可以使用 FeedRange 来并行处理更改源。In the change feed pull model, you can use the FeedRange to parallelize the processing of the change feed. FeedRange 表示分区键值的一个范围。A FeedRange represents a range of partition key values.

下面的示例展示了如何获取容器的范围列表:Here's an example showing how to obtain a list of ranges for your container:

IReadOnlyList<FeedRange> ranges = await container.GetFeedRangesAsync();

获取容器的 FeedRange 列表时,每个物理分区你都会获得一个 FeedRangeWhen you obtain of list of FeedRanges for your container, you'll get one FeedRange per physical partition.

然后可以使用 FeedRange 创建一个 FeedIterator,以便跨多个计算机或线程并行处理更改源。Using a FeedRange, you can then create a FeedIterator to parallelize the processing of the change feed across multiple machines or threads. 与上面展示了如何获取整个容器或单个分区键的 FeedIterator 的示例不同,你可以使用 FeedRanges 获取多个 FeedIterator,以便并行处理更改源。Unlike the previous example that showed how to obtain a FeedIterator for the entire container or a single partition key, you can use FeedRanges to obtain multiple FeedIterators which can process the change feed in parallel.

若要使用 FeedRange,需要通过一个业务流程协调程序进程来获取 FeedRange 并将其分发到那些计算机。In the case where you want to use FeedRanges, you need to have an orchestrator process that obtains FeedRanges and distributes them to those machines. 该分发可能存在以下情况:This distribution could be:

  • 使用 FeedRange.ToJsonString 并分发此字符串值。Using FeedRange.ToJsonString and distributing this string value. 使用者可以将此值用于 FeedRange.FromJsonStringThe consumers can use this value with FeedRange.FromJsonString
  • 如果分发正在进行,则传递 FeedRange 对象引用。If the distribution is in-process, passing the FeedRange object reference.

下面的示例展示了如何使用两个并行读取的单独的虚构计算机从容器的更改源开头进行读取:Here's a sample that shows how to read from the beginning of the container's change feed using two hypothetical separate machines that are reading in parallel:

计算机 1:Machine 1:

FeedIterator<User> iteratorA = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(ranges[0]));
while (iteratorA.HasMoreResults)
{
    try {
        FeedResponse<User> users = await iteratorA.ReadNextAsync();

        foreach (User user in users)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
    catch {
        Console.WriteLine($"No new changes");
        Thread.Sleep(5000);
    }
}

计算机 2:Machine 2:

FeedIterator<User> iteratorB = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(ranges[1]));
while (iteratorB.HasMoreResults)
{
    try {
        FeedResponse<User> users = await iteratorA.ReadNextAsync();

        foreach (User user in users)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
    catch {
        Console.WriteLine($"No new changes");
        Thread.Sleep(5000);
    }
}

保存继续标记Saving continuation tokens

可以通过创建一个继续标记来保存 FeedIterator 的位置。You can save the position of your FeedIterator by creating a continuation token. 继续标记是一个字符串值,用于跟踪 FeedIterator 上次处理的更改。A continuation token is a string value that keeps of track of your FeedIterator's last processed changes. 这允许 FeedIterator 稍后在此位置继续。This allows the FeedIterator to resume at this point later. 以下代码将读取更改源中自容器创建以来的内容。The following code will read through the change feed since container creation. 当没有更多更改可用时,它会保留一个继续标记,以便以后可以继续使用更改源。After no more changes are available, it will persist a continuation token so that change feed consumption can be later resumed.

FeedIterator<User> iterator = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning());

string continuation = null;

while (iterator.HasMoreResults)
{
   try { 
        FeedResponse<User> users = await iterator.ReadNextAsync();
        continuation = users.ContinuationToken;

        foreach (User user in users)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
   }
    catch {
        Console.WriteLine($"No new changes");
        Thread.Sleep(5000);
    }   
}

// Some time later
FeedIterator<User> iteratorThatResumesFromLastPoint = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.ContinuationToken(continuation));

只要 Cosmos 容器仍然存在,FeedIterator 的继续标记将永不过期。As long as the Cosmos container still exists, a FeedIterator's continuation token never expires.

后续步骤Next steps