Azure Cosmos DB 中的更改源拉取模型Change feed pull model in Azure Cosmos DB

使用更改源拉取模型,你可以按自己的节奏使用 Azure Cosmos DB 更改源。With the change feed pull model, you can consume the Azure Cosmos DB change feed at your own pace. 正如你使用更改源处理器所做的那样,你可以使用更改源拉取模型来并行处理多个更改源使用者之间的更改。As you can already do with the change feed processor, you can use the change feed pull model to parallelize the processing of changes across multiple change feed consumers.

Note

更改源拉取模型当前仅在 Azure Cosmos DB .NET SDK 中提供了预览版The change feed pull model is currently in preview in the Azure Cosmos DB .NET SDK only. 该预览版尚不可用于其他 SDK 版本。The preview is not yet available for other SDK versions.

使用整个容器的更改Consuming an entire container's changes

你可以创建一个 FeedIterator 来使用拉取模型处理更改源。You can create a FeedIterator to process the change feed using the pull model. 最初创建 FeedIterator 时,可以在 ChangeFeedRequestOptions 中指定一个可选 StartTimeWhen you initially create a FeedIterator, you can specify an optional StartTime within the ChangeFeedRequestOptions. 如果未指定,则 StartTime 将是当前时间。When left unspecified, the StartTime will be the current time.

FeedIterator 有两种形式。The FeedIterator comes in two flavors. 除了下述可返回实体对象的示例之外,还可以获取提供 Stream 支持的响应。In addition to the examples below that return entity objects, you can also obtain the response with Stream support. 利用流,你可以在不先将数据反序列化的情况下读取数据,从而节省客户端资源。Streams allow you to read data without having it first deserialized, saving on client resources.

下面的示例用于获取一个可返回实体对象(在本例中为 User 对象)的 FeedIteratorHere's an example for obtaining a FeedIterator that returns entity objects, in this case a User object:

FeedIterator<User> iteratorWithPOCOS = container.GetChangeFeedIterator<User>();

下面的示例用于获取一个可返回 StreamFeedIteratorHere's an example for obtaining a FeedIterator that returns a Stream:

FeedIterator iteratorWithStreams = container.GetChangeFeedStreamIterator();

使用 FeedIterator,可以轻松地按自己的节奏处理整个容器的更改源。Using a FeedIterator, you can easily process an entire container's change feed at your own pace. 下面是一个示例:Here's an example:

FeedIterator<User> iteratorForTheEntireContainer= container.GetChangeFeedIterator<User>();

while (iteratorForTheEntireContainer.HasMoreResults)
{
   FeedResponse<User> users = await iteratorForTheEntireContainer.ReadNextAsync();

   foreach (User user in users)
    {
        Console.WriteLine($"Detected change for user with id {user.id}");
    }
}

使用分区键的更改Consuming a partition key's changes

在某些情况下,你可能只需要处理特定分区键的更改。In some cases, you may only want to process a specific partition key's changes. 可以获取特定分区键的 FeedIterator,并采用处理整个容器的方式来处理更改。You can obtain a FeedIterator for a specific partition key and process the changes the same way that you can for an entire container.

FeedIterator<User> iteratorForThePartitionKey = container.GetChangeFeedIterator<User>(new PartitionKey("myPartitionKeyValueToRead"));

while (iteratorForThePartitionKey.HasMoreResults)
{
   FeedResponse<User> users = await iteratorForThePartitionKey.ReadNextAsync();

   foreach (User user in users)
    {
        Console.WriteLine($"Detected change for user with id {user.id}");
    }
}

使用 FeedRange 实现并行化Using FeedRange for parallelization

更改源处理器中,工作自动分布到多个使用者。In the change feed processor, work is automatically spread across multiple consumers. 在更改源拉取模型中,可以使用 FeedRange 来并行处理更改源。In the change feed pull model, you can use the FeedRange to parallelize the processing of the change feed. FeedRange 表示分区键值的一个范围。A FeedRange represents a range of partition key values.

下面的示例展示了如何获取容器的范围列表:Here's an example showing how to obtain a list of ranges for your container:

IReadOnlyList<FeedRange> ranges = await container.GetFeedRangesAsync();

获取容器的 FeedRange 列表时,每个物理分区你都会获得一个 FeedRangeWhen you obtain of list of FeedRanges for your container, you'll get one FeedRange per physical partition.

然后可以使用 FeedRange 创建一个 FeedIterator,以便跨多个计算机或线程并行处理更改源。Using a FeedRange, you can then create a FeedIterator to parallelize the processing of the change feed across multiple machines or threads. 与上面展示了如何获取整个容器的单个 FeedIterator 的示例不同,你可以使用 FeedRange 获取多个 FeedIterator,以便并行处理更改源。Unlike the previous example that showed how to obtain a single FeedIterator for the entire container, you can use the FeedRange to obtain multiple FeedIterators which can process the change feed in parallel.

若要使用 FeedRange,需要通过一个业务流程协调程序进程来获取 FeedRange 并将其分发到那些计算机。In the case where you want to use FeedRanges, you need to have an orchestrator process that obtains FeedRanges and distributes them to those machines. 该分发可能存在以下情况:This distribution could be:

  • 使用 FeedRange.ToJsonString 并分发此字符串值。Using FeedRange.ToJsonString and distributing this string value. 使用者可以将此值用于 FeedRange.FromJsonStringThe consumers can use this value with FeedRange.FromJsonString
  • 如果分发正在进行,则传递 FeedRange 对象引用。If the distribution is in-process, passing the FeedRange object reference.

下面的示例展示了如何使用两个并行读取的单独的虚构计算机从容器的更改源开头进行读取:Here's a sample that shows how to read from the beginning of the container's change feed using two hypothetical separate machines that are reading in parallel:

计算机 1:Machine 1:

FeedIterator<User> iteratorA = container.GetChangeFeedIterator<User>(ranges[0], new ChangeFeedRequestOptions{StartTime = DateTime.MinValue});
while (iteratorA.HasMoreResults)
{
   FeedResponse<User> users = await iteratorA.ReadNextAsync();

   foreach (User user in users)
    {
        Console.WriteLine($"Detected change for user with id {user.id}");
    }
}

计算机 2:Machine 2:

FeedIterator<User> iteratorB = container.GetChangeFeedIterator<User>(ranges[1], new ChangeFeedRequestOptions{StartTime = DateTime.MinValue});
while (iteratorB.HasMoreResults)
{
   FeedResponse<User> users = await iteratorB.ReadNextAsync();

   foreach (User user in users)
    {
        Console.WriteLine($"Detected change for user with id {user.id}");
    }
}

保存继续标记Saving continuation tokens

可以通过创建一个继续标记来保存 FeedIterator 的位置。You can save the position of your FeedIterator by creating a continuation token. 继续标记是一个字符串值,用于跟踪 FeedIterator 上次处理的更改。A continuation token is a string value that keeps of track of your FeedIterator's last processed changes. 这允许 FeedIterator 稍后在此位置继续。This allows the FeedIterator to resume at this point later. 以下代码将读取更改源中自容器创建以来的内容。The following code will read through the change feed since container creation. 当没有更多更改可用时,它会保留一个继续标记,以便以后可以继续使用更改源。After no more changes are available, it will persist a continuation token so that change feed consumption can be later resumed.

FeedIterator<User> iterator = container.GetChangeFeedIterator<User>(ranges[0], new ChangeFeedRequestOptions{StartTime = DateTime.MinValue});

string continuation = null;

while (iterator.HasMoreResults)
{
   FeedResponse<User> users = await iterator.ReadNextAsync();
   continuation = users.ContinuationToken;

   foreach (User user in users)
    {
        Console.WriteLine($"Detected change for user with id {user.id}");
    }
}

// Some time later
FeedIterator<User> iteratorThatResumesFromLastPoint = container.GetChangeFeedIterator<User>(continuation);

只要 Cosmos 容器仍然存在,FeedIterator 的继续标记将永不过期。As long as the Cosmos container still exists, a FeedIterator's continuation token never expires.

与更改源处理器进行比较Comparing with change feed processor

许多情况下,既可以使用更改源处理器又可以使用拉取模型来处理更改源。Many scenarios can process the change feed using either the change feed processor or the pull model. 拉取模型的继续标记和更改源处理器的租约容器都是更改源中最后处理的项(或一批项)的“书签”。The pull model's continuation tokens and the change feed processor's lease container are both "bookmarks" for the last processed item (or batch of items) in the change feed. 但是,不能将继续标记转换为租赁容器(反之亦然)。However, you can't convert continuation tokens to a lease container (or vice versa).

以下情况应考虑使用拉取模型:You should consider using the pull model in these scenarios:

  • 从特定的分区键读取更改Reading changes from a particular partition key
  • 控制客户端接收要处理的更改的速度Controlling the pace at which your client receives changes for processing
  • 对更改源中的现有数据执行一次性读取(例如,执行数据迁移)Doing a one-time read of the existing data in the change feed (for example, to do a data migration)

下面是更改源处理器与拉取模型之间的一些主要差异:Here's some key differences between the change feed processor and pull model:

更改源处理器Change feed processor 拉取模型Pull model
在处理更改源时跟踪当前位置Keeping track of current point in processing change feed 租赁(存储在 Azure Cosmos DB 容器中)Lease (stored in an Azure Cosmos DB container) 继续标记(存储在内存中或手动进行保存)Continuation token (stored in memory or manually persisted)
能够重播过去的更改Ability to replay past changes 是(在使用推送模型的情况下)Yes, with push model 是(在使用拉取模型的情况下)Yes, with pull model
轮询将来的更改Polling for future changes 基于用户指定的 WithPollInterval 自动检查更改Automatically checks for changes based on user-specified WithPollInterval 手动Manual
处理整个容器中的更改Process changes from entire container 是的,自动并行处理从同一容器使用更改的多个线程/机器Yes, and automatically parallelized across multiple threads/machine consuming from the same container 是,使用 FeedToken 手动并行化Yes, and manually parallelized using FeedTokens
仅处理单个分区键的更改Process changes from just a single partition key 不支持Not supported Yes
支持级别Support level 正式发布Generally available 预览Preview

后续步骤Next steps