Azure Cosmos DB 中的更改源拉取模型
- Artículo
适用范围: NoSQL
使用更改源拉取模型可以按自己的节奏使用 Azure Cosmos DB 更改源。 与更改源处理器类似,你可以使用更改源拉取模型来并行处理多个更改源使用者之间的更改。
对比更改源处理器
许多情况下,既可以使用更改源处理器又可以使用更改源拉取模型来处理更改源。 拉取模型的延续令牌和更改源处理器的租约容器都可作为更改源中最后处理项(或一批项)的书签。
但是,延续令牌无法转换为租约(反之亦然)。
注意
在大多数情况下,如果需要从更改源中读取数据,最简单的方法是使用更改源处理器。
以下情况应考虑使用拉取模型:
- 读取特定分区键的更改。
- 控制客户端接收要处理的更改的速度。
- 一次性读取更改源中现有的数据(例如,用于数据迁移)。
下文阐述了更改源拉取模型与更改源处理器之间的几点关键差异:
功能 | 更改源处理器 | 更改源请求模型 |
---|---|---|
持续追踪更改源处理的当前点 | 租赁(存储在 Azure Cosmos DB 容器中) | 继续标记(存储在内存中或手动进行保存) |
能够重播过去的更改 | 是(在使用推送模型的情况下) | 是(在使用拉取模型的情况下) |
轮询将来的更改 | 基于用户指定的 WithPollInterval 值自动检查更改 |
手动 |
未出现新变化的行为 | 自动等待 WithPollInterval 值,然后重新检查 |
必须检查状态并手动重新检查 |
处理整个容器的更改 | 是的,自动并行处理从同一容器使用更改的多个线程和机器 | 支持,请使用 FeedRange 来手动并行处理 |
仅处理单个分区键的更改 | 不支持 | 是 |
注意
与使用更改源处理器进行读取不同,当使用拉取模型时,如果未出现新变化,则需要显式处理。
使用拉取模型
若要使用拉取模型处理更改源,请创建一个 FeedIterator
实例。 在最初创建 FeedIterator
时,必须指定所需的 ChangeFeedStartFrom
值,该值由读取更改的起始位置和所需的 FeedRange
值组成。 FeedRange
是分区键值范围,指定根据特定 FeedIterator
可从更改源中读取的项。 此外,必须为所需的更改处理模式指定必需的 ChangeFeedMode
值:最新版本或所有版本和删除模式。 使用 ChangeFeedMode.LatestVersion
或 ChangeFeedMode.AllVersionsAndDeletes
指示读取更改源的模式。 使用所有版本和删除模式时,必须选择从 Now()
值或从特定延续令牌开始的更改源。
你还可以选择指定 ChangeFeedRequestOptions
以设置 PageSizeHint
。 设置后,此属性会对每页收到的项目的最大数目进行设置。 如果受监视集合中的操作通过存储过程执行,则在从更改源读取项时,会保留事务范围。 因此,收到的项数可能高于指定的值,通过同一事务更改的项会作为某一原子批的一部分返回。
以下示例以最新版本模式获取一个返回实体对象(在本例中为 User
对象)的 FeedIterator
:
FeedIterator<User> InteratorWithPOCOS = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);
提示
在低于 3.34.0
的版本中,可以通过设置 ChangeFeedMode.Incremental
来使用最新版本模式。 Incremental
和 LatestVersion
是指更改源的最新版本模式以及使用任一模式都会看到相同行为的应用程序。
所有版本和删除模式目前为预览版,可以在预览版 .NET SDK 3.32.0-preview
或更高版本中使用。 以下示例显示从所有版本获取和返回 User
对象的删除模式中获取 FeedIterator
:
FeedIterator<ChangeFeedItem<User>> InteratorWithPOCOS = container.GetChangeFeedIterator<ChangeFeedItem<User>>(ChangeFeedStartFrom.Now(), ChangeFeedMode.AllVersionsAndDeletes);
通过流使用更改源
两种更改源模式的 FeedIterator
都有两种选项。 除了返回实体对象的示例之外,还可以获取提供 Stream
支持的响应。 利用流,你可以在不先将数据反序列化的情况下读取数据,从而节省客户端资源。
以下示例展示如何在最新版本模式下获取返回 Stream
的 FeedIterator
:
FeedIterator iteratorWithStreams = container.GetChangeFeedStreamIterator(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);
使用整个容器的更改
如果没有向 FeedIterator
提供 FeedRange
,则可以按自己的节奏处理整个容器的更改源。 以下示例使用最新版本模式读取从当前时间开始的所有更改:
FeedIterator<User> iteratorForTheEntireContainer = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Now(), ChangeFeedMode.LatestVersion);
while (iteratorForTheEntireContainer.HasMoreResults)
{
FeedResponse<User> response = await iteratorForTheEntireContainer.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
await Task.Delay(TimeSpan.FromSeconds(5));
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
由于更改源实际上是包含所有后续写入和更新项的无穷列表,因此 HasMoreResults
的值始终为 true
。 在尝试读取更改源时,如果未发生新的更改,你会收到 NotModified
状态的响应。 在上述示例中,处理方式是先等待 5 秒钟,然后重新检查更改。
使用分区键的更改
在某些情况下,你可能希望仅处理特定分区键的更改。 可以获取特定分区键的 FeedIterator
,并采用处理整个容器的方式来处理更改。
FeedIterator<User> iteratorForPartitionKey = container.GetChangeFeedIterator<User>(
ChangeFeedStartFrom.Beginning(FeedRange.FromPartitionKey(new PartitionKey("PartitionKeyValue")), ChangeFeedMode.LatestVersion));
while (iteratorForThePartitionKey.HasMoreResults)
{
FeedResponse<User> response = await iteratorForThePartitionKey.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
await Task.Delay(TimeSpan.FromSeconds(5));
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
使用 FeedRange 实现并行化
在更改源处理器中,工作自动分布到多个使用者。 在更改源拉取模型中,可以使用 FeedRange
来并行处理更改源。 FeedRange
表示分区键值的一个范围。
下面的示例展示了如何获取容器的范围列表:
IReadOnlyList<FeedRange> ranges = await container.GetFeedRangesAsync();
获取容器的 FeedRange
值列表时,每个物理分区都会获得一个 FeedRange
。
使用 FeedRange
可以创建一个 FeedIterator
,以便跨多个计算机或线程并行处理更改源。 上面的示例展示了如何获取整个容器或某一个分区键的 FeedIterator
,与之不同的是,你可以使用 FeedRanges 来获取多个 FeedIterator,这样就可以并行处理更改源。
若要使用 FeedRange,需要通过一个业务流程协调程序进程来获取 FeedRange 并将其分发到那些计算机。 此分发可能是:
- 使用
FeedRange.ToJsonString
并分发此字符串值。 使用者可以将此值用于FeedRange.FromJsonString
。 - 如果分发正在进行,则传递
FeedRange
对象引用。
下面的示例展示了如何使用两个并行读取的独立虚构计算机从容器的更改源开头进行读取:
计算机 1:
FeedIterator<User> iteratorA = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(ranges[0]), ChangeFeedMode.LatestVersion);
while (iteratorA.HasMoreResults)
{
FeedResponse<User> response = await iteratorA.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
await Task.Delay(TimeSpan.FromSeconds(5));
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
计算机 2:
FeedIterator<User> iteratorB = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(ranges[1]), ChangeFeedMode.LatestVersion);
while (iteratorB.HasMoreResults)
{
FeedResponse<User> response = await iteratorA.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
await Task.Delay(TimeSpan.FromSeconds(5));
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
保存延续令牌
可以通过获取延续令牌来保存 FeedIterator
的位置。 延续令牌是字符串值,它会跟踪 FeedIterator 的上次处理的更改,并且允许 FeedIterator
稍后在此点进行恢复。 延续令牌(如果已指定)优先于开始时间和“从头开始”值。 以下代码读取自容器创建以来生成的更改源。 当没有更多更改可用时,它会保留一个继续标记,以便以后可以继续使用更改源。
FeedIterator<User> iterator = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);
string continuation = null;
while (iterator.HasMoreResults)
{
FeedResponse<User> response = await iterator.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
continuation = response.ContinuationToken;
// Stop the consumption since there are no new changes
break;
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
// Some time later when I want to check changes again
FeedIterator<User> iteratorThatResumesFromLastPoint = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.ContinuationToken(continuation), ChangeFeedMode.LatestVersion);
使用最新版本模式时, 只要 Azure Cosmos DB 容器存在,FeedIterator
延续令牌就不会过期。 使用所有版本和删除模式时, 只要更改发生在连续备份的保留窗口内,FeedIterator
延续令牌就有效。
若要使用拉取模型处理更改源,请创建一个 Iterator<FeedResponse<JsonNode>> responseIterator
实例。 创建 CosmosChangeFeedRequestOptions
时,必须指定从何处开始读取更改源,并传递要使用的 FeedRange
参数。 FeedRange
是分区键值范围,指定可从更改源中读取的项。
如果想要以所有版本和删除模式读取更改源,则还必须在创建 CosmosChangeFeedRequestOptions
时指定 allVersionsAndDeletes()
。 所有版本和删除模式不支持从头开始或者从某个时间点处理更改源。 必须从现在或者从延续令牌开始处理更改。 所有版本和删除模式目前为预览版,可以在 Java SDK 4.42.0
或更高版本中使用。
使用整个容器的更改
指定 FeedRange.forFullRange()
后就可以按自己的节奏处理整个容器的更改源。 还可选择在 byPage()
中指定一个值。 设置后,此属性会对每页收到的项目的最大数目进行设置。
注意
以下所有代码片段均摘自 GitHub 的示例。 可以使用最新版本模式示例和所有版本和删除模式示例。
以下示例展示如何在最新版本模式下获取 responseIterator
值:
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.cosmos.examples.changefeedpull;
import com.azure.cosmos.ConsistencyLevel;
import com.azure.cosmos.CosmosAsyncClient;
import com.azure.cosmos.CosmosAsyncContainer;
import com.azure.cosmos.CosmosAsyncDatabase;
import com.azure.cosmos.CosmosClientBuilder;
import com.azure.cosmos.examples.common.AccountSettings;
import com.azure.cosmos.models.CosmosChangeFeedRequestOptions;
import com.azure.cosmos.models.FeedRange;
import com.azure.cosmos.models.FeedResponse;
import com.azure.cosmos.models.PartitionKey;
import com.fasterxml.jackson.databind.JsonNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
public class SampleChangeFeedPullModel {
public static CosmosAsyncClient clientAsync;
private CosmosAsyncContainer container;
private CosmosAsyncDatabase database;
public static final String DATABASE_NAME = "db";
public static final String COLLECTION_NAME = "ChangeFeedPull";
public static final String PARTITION_KEY_FIELD_NAME = "pk";
protected static Logger logger = LoggerFactory.getLogger(SampleChangeFeedPullModel.class);
public static void main(String[] args) {
SampleChangeFeedPullModel p = new SampleChangeFeedPullModel();
try {
logger.info("Starting ASYNC main");
p.ChangeFeedPullDemo();
logger.info("Demo complete, please hold while resources are released");
} catch (Exception e) {
e.printStackTrace();
logger.error(String.format("Cosmos getStarted failed with %s", e));
} finally {
logger.info("Closing the client");
p.shutdown(p.container, p.database);
}
}
public void ChangeFeedPullDemo() {
clientAsync = this.getCosmosAsyncClient();
Resources resources = new Resources(PARTITION_KEY_FIELD_NAME, clientAsync, DATABASE_NAME, COLLECTION_NAME);
this.container = resources.container;
this.database = resources.database;
resources.insertDocuments(10, 20);
logger.info("*************************************************************");
logger.info("*************************************************************");
logger.info("Reading change feed with all feed ranges on this machine...");
logger.info("*************************************************************");
logger.info("*************************************************************");
// <FeedResponseIterator>
CosmosChangeFeedRequestOptions options = CosmosChangeFeedRequestOptions
.createForProcessingFromBeginning(FeedRange.forFullRange());
Iterator<FeedResponse<JsonNode>> responseIterator = container
.queryChangeFeed(options, JsonNode.class)
.byPage()
.toIterable()
.iterator();
// </FeedResponseIterator>
// <AllFeedRanges>
int i = 0;
List<JsonNode> results;
while (responseIterator.hasNext()) {
FeedResponse<JsonNode> response = responseIterator.next();
results = response.getResults();
logger.info("Got " + results.size() + " items(s)");
// applying the continuation token
// only after processing all events
options = CosmosChangeFeedRequestOptions
.createForProcessingFromContinuation(response.getContinuationToken());
i++;
if (i >= 5) {
// artificially breaking out of loop - not required in a real app
System.out.println("breaking....");
break;
}
}
// </AllFeedRanges>
logger.info("*************************************************************");
logger.info("*************************************************************");
logger.info("Finished Reading change feed using all feed ranges!");
logger.info("*************************************************************");
logger.info("*************************************************************");
// <GetFeedRanges>
Mono<List<FeedRange>> feedranges = resources.container.getFeedRanges();
List<FeedRange> feedRangeList = feedranges.block();
// </GetFeedRanges>
logger.info("*************************************************************");
logger.info("*************************************************************");
logger.info("Simulate processing change feed on two separate machines");
logger.info("*************************************************************");
logger.info("*************************************************************");
logger.info("*************************************************************");
logger.info("*************************************************************");
logger.info("Start reading from machine 1....");
logger.info("*************************************************************");
logger.info("*************************************************************");
// <Machine1>
FeedRange range1 = feedRangeList.get(0);
options = CosmosChangeFeedRequestOptions
.createForProcessingFromBeginning(range1);
int machine1index = 0;
responseIterator = container
.queryChangeFeed(options, JsonNode.class)
.byPage()
.toIterable()
.iterator();
while (responseIterator.hasNext()) {
FeedResponse<JsonNode> response = responseIterator.next();
results = response.getResults();
logger.info("Got " + results.size() + " items(s) retrieved");
// applying the continuation token
// only after processing all events
options = CosmosChangeFeedRequestOptions
.createForProcessingFromContinuation(response.getContinuationToken());
machine1index++;
if (machine1index >= 5) {
// artificially breaking out of loop - not required in a real app
System.out.println("breaking....");
break;
}
}
// </Machine1>
logger.info("*************************************************************");
logger.info("*************************************************************");
logger.info("Finished reading feed ranges on machine 1!");
logger.info("*************************************************************");
logger.info("*************************************************************");
logger.info("*************************************************************");
logger.info("*************************************************************");
logger.info("*************************************************************");
logger.info("*************************************************************");
logger.info("Start reading from machine 2....");
logger.info("*************************************************************");
logger.info("*************************************************************");
// <Machine2>
FeedRange range2 = feedRangeList.get(1);
options = CosmosChangeFeedRequestOptions
.createForProcessingFromBeginning(range2);
responseIterator = container
.queryChangeFeed(options, JsonNode.class)
.byPage()
.toIterable()
.iterator();
int machine2index = 0;
while (responseIterator.hasNext()) {
FeedResponse<JsonNode> response = responseIterator.next();
results = response.getResults();
logger.info("Got " + results.size() + " items(s) retrieved");
// applying the continuation token
// only after processing all events
options = CosmosChangeFeedRequestOptions
.createForProcessingFromContinuation(response.getContinuationToken());
machine2index++;
if (machine2index >= 5) {
// artificially breaking out of loop - not required in a real app
System.out.println("breaking....");
break;
}
}
// </Machine2>
logger.info("*************************************************************");
logger.info("*************************************************************");
logger.info("Finished reading feed ranges on machine 2!");
logger.info("*************************************************************");
logger.info("*************************************************************");
//grab first pk in keySet()
Set<String> keySet = resources.partitionKeyToDocuments.keySet();
String partitionKey="";
for (String string : keySet) {
partitionKey = string;
break;
}
logger.info("*************************************************************");
logger.info("*************************************************************");
logger.info("Reading change feed from logical partition key!");
logger.info("*************************************************************");
logger.info("*************************************************************");
// <PartitionKeyProcessing>
options = CosmosChangeFeedRequestOptions
.createForProcessingFromBeginning(FeedRange.forLogicalPartition(new PartitionKey(partitionKey)));
responseIterator = container
.queryChangeFeed(options, JsonNode.class)
.byPage()
.toIterable()
.iterator();
int pkIndex = 0;
while (responseIterator.hasNext()) {
FeedResponse<JsonNode> response = responseIterator.next();
results = response.getResults();
logger.info("Got " + results.size() + " items(s) retrieved");
// applying the continuation token
// only after processing all events
options = CosmosChangeFeedRequestOptions
.createForProcessingFromContinuation(response.getContinuationToken());
pkIndex++;
if (pkIndex >= 5) {
// artificially breaking out of loop
System.out.println("breaking....");
break;
}
}
// </PartitionKeyProcessing>
logger.info("*************************************************************");
logger.info("*************************************************************");
logger.info("Finished reading change feed from logical partition key!");
logger.info("*************************************************************");
logger.info("*************************************************************");
}
public CosmosAsyncClient getCosmosAsyncClient() {
return new CosmosClientBuilder()
.endpoint(AccountSettings.HOST)
.key(AccountSettings.MASTER_KEY)
.contentResponseOnWriteEnabled(true)
.consistencyLevel(ConsistencyLevel.SESSION)
.buildAsyncClient();
}
public void close() {
clientAsync.close();
}
private void shutdown(CosmosAsyncContainer container, CosmosAsyncDatabase database) {
try {
// To allow for the sequence to complete after subscribe() calls
Thread.sleep(5000);
// Clean shutdown
logger.info("Deleting Cosmos DB resources");
logger.info("-Deleting container...");
if (container != null)
container.delete().subscribe();
logger.info("-Deleting database...");
if (database != null)
database.delete().subscribe();
logger.info("-Closing the client...");
} catch (InterruptedException err) {
err.printStackTrace();
} catch (Exception err) {
logger.error("Deleting Cosmos DB resources failed, will still attempt to close the client. See stack "
+ "trace below.");
err.printStackTrace();
}
clientAsync.close();
logger.info("Done.");
}
}
以下示例展示如何在所有版本和删除模式下获取 responseIterator
:
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.cosmos.examples.changefeedpull;
import com.azure.cosmos.ConsistencyLevel;
import com.azure.cosmos.CosmosAsyncClient;
import com.azure.cosmos.CosmosAsyncContainer;
import com.azure.cosmos.CosmosAsyncDatabase;
import com.azure.cosmos.CosmosClientBuilder;
import com.azure.cosmos.examples.common.AccountSettings;
import com.azure.cosmos.models.CosmosChangeFeedRequestOptions;
import com.azure.cosmos.models.FeedRange;
import com.azure.cosmos.models.FeedResponse;
import com.azure.cosmos.models.PartitionKey;
import com.fasterxml.jackson.databind.JsonNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
public class SampleChangeFeedPullModelForAllVersionsAndDeletesMode {
public static CosmosAsyncClient clientAsync;
private CosmosAsyncContainer container;
private CosmosAsyncDatabase database;
public static final String DATABASE_NAME = "db";
public static final String COLLECTION_NAME = "ChangeFeedPullForAllVersionsAndDeletes";
public static final String PARTITION_KEY_FIELD_NAME = "pk";
protected static Logger logger = LoggerFactory.getLogger(SampleChangeFeedPullModelForAllVersionsAndDeletesMode.class);
public static void main(String[] args) {
SampleChangeFeedPullModelForAllVersionsAndDeletesMode p = new SampleChangeFeedPullModelForAllVersionsAndDeletesMode();
try {
logger.info("Starting ASYNC main");
p.changeFeedAllVersionsAndDeletesPullDemo();
logger.info("Demo complete, please hold while resources are released");
} catch (Exception e) {
e.printStackTrace();
logger.error(String.format("Cosmos getStarted failed with %s", e));
} finally {
logger.info("Closing the client");
p.shutdown(p.container, p.database);
}
}
public void changeFeedAllVersionsAndDeletesPullDemo() {
clientAsync = this.getCosmosAsyncClient();
Resources resources = new Resources(PARTITION_KEY_FIELD_NAME, clientAsync, DATABASE_NAME, COLLECTION_NAME);
this.container = resources.container;
this.database = resources.database;
resources.insertDocuments(5, 10);
logger.info("*************************************************************");
logger.info("*************************************************************");
logger.info("Reading change feed with all feed ranges on this machine...");
logger.info("*************************************************************");
logger.info("*************************************************************");
// <FeedResponseIterator>
CosmosChangeFeedRequestOptions options = CosmosChangeFeedRequestOptions
.createForProcessingFromNow(FeedRange.forFullRange())
.allVersionsAndDeletes();
Iterator<FeedResponse<JsonNode>> responseIterator = container
.queryChangeFeed(options, JsonNode.class)
.byPage()
.toIterable()
.iterator();
// </FeedResponseIterator>
// <AllFeedRanges>
int i = 0;
List<JsonNode> results;
while (responseIterator.hasNext()) {
FeedResponse<JsonNode> response = responseIterator.next();
results = response.getResults();
logger.info("Got " + results.size() + " items(s)");
// applying the continuation token
// only after processing all events
options = CosmosChangeFeedRequestOptions
.createForProcessingFromContinuation(response.getContinuationToken())
.allVersionsAndDeletes();
// Insert, update and delete documents to get them in AllVersionsAndDeletes Change feed
resources.insertDocuments(5, 10);
resources.updateDocuments(5, 10);
resources.deleteDocuments(5, 10);
responseIterator = container
.queryChangeFeed(options, JsonNode.class)
.byPage()
.toIterable()
.iterator();
i++;
if (i > 2) {
// artificially breaking out of loop - not required in a real app
System.out.println("breaking....");
break;
}
}
// </AllFeedRanges>
logger.info("*************************************************************");
logger.info("*************************************************************");
logger.info("Finished Reading change feed using all feed ranges!");
logger.info("*************************************************************");
logger.info("*************************************************************");
// <GetFeedRanges>
Mono<List<FeedRange>> feedranges = resources.container.getFeedRanges();
List<FeedRange> feedRangeList = feedranges.block();
// </GetFeedRanges>
logger.info("*************************************************************");
logger.info("*************************************************************");
logger.info("Simulate processing change feed on two separate machines");
logger.info("*************************************************************");
logger.info("*************************************************************");
logger.info("*************************************************************");
logger.info("*************************************************************");
logger.info("Start reading from machine 1....");
logger.info("*************************************************************");
logger.info("*************************************************************");
// <Machine1>
FeedRange range1 = feedRangeList.get(0);
options = CosmosChangeFeedRequestOptions
.createForProcessingFromNow(range1)
.allVersionsAndDeletes();
int machine1index = 0;
responseIterator = container
.queryChangeFeed(options, JsonNode.class)
.byPage()
.toIterable()
.iterator();
while (responseIterator.hasNext()) {
FeedResponse<JsonNode> response = responseIterator.next();
results = response.getResults();
logger.info("Got " + results.size() + " items(s) retrieved");
// applying the continuation token
// only after processing all events
options = CosmosChangeFeedRequestOptions
.createForProcessingFromContinuation(response.getContinuationToken())
.allVersionsAndDeletes();
// Insert, update and delete documents to get them in AllVersionsAndDeletes Change feed
resources.insertDocuments(5, 10);
resources.updateDocuments(5, 10);
resources.deleteDocuments(5, 10);
responseIterator = container
.queryChangeFeed(options, JsonNode.class)
.byPage()
.toIterable()
.iterator();
machine1index++;
if (machine1index > 2) {
// artificially breaking out of loop - not required in a real app
System.out.println("breaking....");
break;
}
}
// </Machine1>
logger.info("*************************************************************");
logger.info("*************************************************************");
logger.info("Finished reading feed ranges on machine 1!");
logger.info("*************************************************************");
logger.info("*************************************************************");
logger.info("*************************************************************");
logger.info("*************************************************************");
logger.info("*************************************************************");
logger.info("*************************************************************");
logger.info("Start reading from machine 2....");
logger.info("*************************************************************");
logger.info("*************************************************************");
// <Machine2>
FeedRange range2 = feedRangeList.get(1);
options = CosmosChangeFeedRequestOptions
.createForProcessingFromNow(range2)
.allVersionsAndDeletes();
responseIterator = container
.queryChangeFeed(options, JsonNode.class)
.byPage()
.toIterable()
.iterator();
int machine2index = 0;
while (responseIterator.hasNext()) {
FeedResponse<JsonNode> response = responseIterator.next();
results = response.getResults();
logger.info("Got " + results.size() + " items(s) retrieved");
// applying the continuation token
// only after processing all events
options = CosmosChangeFeedRequestOptions
.createForProcessingFromContinuation(response.getContinuationToken())
.allVersionsAndDeletes();
// Insert, update and delete documents to get them in AllVersionsAndDeletes Change feed
resources.insertDocuments(5, 10);
resources.updateDocuments(5, 10);
resources.deleteDocuments(5, 10);
responseIterator = container
.queryChangeFeed(options, JsonNode.class)
.byPage()
.toIterable()
.iterator();
machine2index++;
if (machine2index > 2) {
// artificially breaking out of loop - not required in a real app
System.out.println("breaking....");
break;
}
}
// </Machine2>
logger.info("*************************************************************");
logger.info("*************************************************************");
logger.info("Finished reading feed ranges on machine 2!");
logger.info("*************************************************************");
logger.info("*************************************************************");
//grab first pk in keySet()
Set<String> keySet = resources.partitionKeyToDocuments.keySet();
String partitionKey="";
for (String string : keySet) {
partitionKey = string;
break;
}
logger.info("*************************************************************");
logger.info("*************************************************************");
logger.info("Reading change feed from logical partition key!");
logger.info("*************************************************************");
logger.info("*************************************************************");
// <PartitionKeyProcessing>
options = CosmosChangeFeedRequestOptions
.createForProcessingFromNow(FeedRange.forLogicalPartition(new PartitionKey(partitionKey)))
.allVersionsAndDeletes();
responseIterator = container
.queryChangeFeed(options, JsonNode.class)
.byPage()
.toIterable()
.iterator();
int pkIndex = 0;
while (responseIterator.hasNext()) {
FeedResponse<JsonNode> response = responseIterator.next();
results = response.getResults();
logger.info("Got " + results.size() + " items(s) retrieved");
// applying the continuation token
// only after processing all events
options = CosmosChangeFeedRequestOptions
.createForProcessingFromContinuation(response.getContinuationToken())
.allVersionsAndDeletes();
// Insert, update and delete documents to get them in AllVersionsAndDeletes Change feed
resources.insertDocuments(5, 10);
resources.updateDocuments(5, 10);
resources.deleteDocuments(5, 10);
responseIterator = container
.queryChangeFeed(options, JsonNode.class)
.byPage()
.toIterable()
.iterator();
pkIndex++;
if (pkIndex > 5) {
// artificially breaking out of loop
System.out.println("breaking....");
break;
}
}
// </PartitionKeyProcessing>
logger.info("*************************************************************");
logger.info("*************************************************************");
logger.info("Finished reading change feed from logical partition key!");
logger.info("*************************************************************");
logger.info("*************************************************************");
}
public CosmosAsyncClient getCosmosAsyncClient() {
return new CosmosClientBuilder()
.endpoint(AccountSettings.HOST)
.key(AccountSettings.MASTER_KEY)
.contentResponseOnWriteEnabled(true)
.consistencyLevel(ConsistencyLevel.SESSION)
.buildAsyncClient();
}
public void close() {
clientAsync.close();
}
private void shutdown(CosmosAsyncContainer container, CosmosAsyncDatabase database) {
try {
// To allow for the sequence to complete after subscribe() calls
Thread.sleep(5000);
// Clean shutdown
logger.info("Deleting Cosmos DB resources");
logger.info("-Deleting container...");
if (container != null)
container.delete().subscribe();
logger.info("-Deleting database...");
if (database != null)
database.delete().subscribe();
logger.info("-Closing the client...");
} catch (InterruptedException err) {
err.printStackTrace();
} catch (Exception err) {
logger.error("Deleting Cosmos DB resources failed, will still attempt to close the client. See stack "
+ "trace below.");
err.printStackTrace();
}
clientAsync.close();
logger.info("Done.");
}
}
然后,我们可以循环访问结果。 由于更改源实际上是包含所有后续写入和更新项的无穷列表,因此 responseIterator.hasNext()
的值始终为 true
。 以下示例使用最新版本模式从头开始读取所有更改。 每次迭代在处理所有事件后都会保留一个延续令牌。 它从更改源中的最后一个处理点进行选取,并使用 createForProcessingFromContinuation
进行处理:
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.cosmos.examples.changefeedpull;
import com.azure.cosmos.ConsistencyLevel;
import com.azure.cosmos.CosmosAsyncClient;
import com.azure.cosmos.CosmosAsyncContainer;
import com.azure.cosmos.CosmosAsyncDatabase;
import com.azure.cosmos.CosmosClientBuilder;
import com.azure.cosmos.examples.common.AccountSettings;
import com.azure.cosmos.models.CosmosChangeFeedRequestOptions;
import com.azure.cosmos.models.FeedRange;
import com.azure.cosmos.models.FeedResponse;
import com.azure.cosmos.models.PartitionKey;
import com.fasterxml.jackson.databind.JsonNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
public class SampleChangeFeedPullModel {
public static CosmosAsyncClient clientAsync;
private CosmosAsyncContainer container;
private CosmosAsyncDatabase database;
public static final String DATABASE_NAME = "db";
public static final String COLLECTION_NAME = "ChangeFeedPull";
public static final String PARTITION_KEY_FIELD_NAME = "pk";
protected static Logger logger = LoggerFactory.getLogger(SampleChangeFeedPullModel.class);
public static void main(String[] args) {
SampleChangeFeedPullModel p = new SampleChangeFeedPullModel();
try {
logger.info("Starting ASYNC main");
p.ChangeFeedPullDemo();
logger.info("Demo complete, please hold while resources are released");
} catch (Exception e) {
e.printStackTrace();
logger.error(String.format("Cosmos getStarted failed with %s", e));
} finally {
logger.info("Closing the client");
p.shutdown(p.container, p.database);
}
}
public void ChangeFeedPullDemo() {
clientAsync = this.getCosmosAsyncClient();
Resources resources = new Resources(PARTITION_KEY_FIELD_NAME, clientAsync, DATABASE_NAME, COLLECTION_NAME);
this.container = resources.container;
this.database = resources.database;
resources.insertDocuments(10, 20);
logger.info("*************************************************************");
logger.info("*************************************************************");
logger.info("Reading change feed with all feed ranges on this machine...");
logger.info("*************************************************************");
logger.info("*************************************************************");
// <FeedResponseIterator>
CosmosChangeFeedRequestOptions options = CosmosChangeFeedRequestOptions
.createForProcessingFromBeginning(FeedRange.forFullRange());
Iterator<FeedResponse<JsonNode>> responseIterator = container
.queryChangeFeed(options, JsonNode.class)
.byPage()
.toIterable()
.iterator();
// </FeedResponseIterator>
// <AllFeedRanges>
int i = 0;
List<JsonNode> results;
while (responseIterator.hasNext()) {
FeedResponse<JsonNode> response = responseIterator.next();
results = response.getResults();
logger.info("Got " + results.size() + " items(s)");
// applying the continuation token
// only after processing all events
options = CosmosChangeFeedRequestOptions
.createForProcessingFromContinuation(response.getContinuationToken());
i++;
if (i >= 5) {
// artificially breaking out of loop - not required in a real app
System.out.println("breaking....");
break;
}
}
// </AllFeedRanges>
logger.info("*************************************************************");
logger.info("*************************************************************");
logger.info("Finished Reading change feed using all feed ranges!");
logger.info("*************************************************************");
logger.info("*************************************************************");
// <GetFeedRanges>
Mono<List<FeedRange>> feedranges = resources.container.getFeedRanges();
List<FeedRange> feedRangeList = feedranges.block();
// </GetFeedRanges>
logger.info("*************************************************************");
logger.info("*************************************************************");
logger.info("Simulate processing change feed on two separate machines");
logger.info("*************************************************************");
logger.info("*************************************************************");
logger.info("*************************************************************");
logger.info("*************************************************************");
logger.info("Start reading from machine 1....");
logger.info("*************************************************************");
logger.info("*************************************************************");
// <Machine1>
FeedRange range1 = feedRangeList.get(0);
options = CosmosChangeFeedRequestOptions
.createForProcessingFromBeginning(range1);
int machine1index = 0;
responseIterator = container
.queryChangeFeed(options, JsonNode.class)
.byPage()
.toIterable()
.iterator();
while (responseIterator.hasNext()) {
FeedResponse<JsonNode> response = responseIterator.next();
results = response.getResults();
logger.info("Got " + results.size() + " items(s) retrieved");
// applying the continuation token
// only after processing all events
options = CosmosChangeFeedRequestOptions
.createForProcessingFromContinuation(response.getContinuationToken());
machine1index++;
if (machine1index >= 5) {
// artificially breaking out of loop - not required in a real app
System.out.println("breaking....");
break;
}
}
// </Machine1>
logger.info("*************************************************************");
logger.info("*************************************************************");
logger.info("Finished reading feed ranges on machine 1!");
logger.info("*************************************************************");
logger.info("*************************************************************");
logger.info("*************************************************************");
logger.info("*************************************************************");
logger.info("*************************************************************");
logger.info("*************************************************************");
logger.info("Start reading from machine 2....");
logger.info("*************************************************************");
logger.info("*************************************************************");
// <Machine2>
FeedRange range2 = feedRangeList.get(1);
options = CosmosChangeFeedRequestOptions
.createForProcessingFromBeginning(range2);
responseIterator = container
.queryChangeFeed(options, JsonNode.class)
.byPage()
.toIterable()
.iterator();
int machine2index = 0;
while (responseIterator.hasNext()) {
FeedResponse<JsonNode> response = responseIterator.next();
results = response.getResults();
logger.info("Got " + results.size() + " items(s) retrieved");
// applying the continuation token
// only after processing all events
options = CosmosChangeFeedRequestOptions
.createForProcessingFromContinuation(response.getContinuationToken());
machine2index++;
if (machine2index >= 5) {
// artificially breaking out of loop - not required in a real app
System.out.println("breaking....");
break;
}
}
// </Machine2>
logger.info("*************************************************************");
logger.info("*************************************************************");
logger.info("Finished reading feed ranges on machine 2!");
logger.info("*************************************************************");
logger.info("*************************************************************");
//grab first pk in keySet()
Set<String> keySet = resources.partitionKeyToDocuments.keySet();
String partitionKey="";
for (String string : keySet) {
partitionKey = string;
break;
}
logger.info("*************************************************************");
logger.info("*************************************************************");
logger.info("Reading change feed from logical partition key!");
logger.info("*************************************************************");
logger.info("*************************************************************");
// <PartitionKeyProcessing>
options = CosmosChangeFeedRequestOptions
.createForProcessingFromBeginning(FeedRange.forLogicalPartition(new PartitionKey(partitionKey)));
responseIterator = container
.queryChangeFeed(options, JsonNode.class)
.byPage()
.toIterable()
.iterator();
int pkIndex = 0;
while (responseIterator.hasNext()) {
FeedResponse<JsonNode> response = responseIterator.next();
results = response.getResults();
logger.info("Got " + results.size() + " items(s) retrieved");
// applying the continuation token
// only after processing all events
options = CosmosChangeFeedRequestOptions
.createForProcessingFromContinuation(response.getContinuationToken());
pkIndex++;
if (pkIndex >= 5) {
// artificially breaking out of loop
System.out.println("breaking....");
break;
}
}
// </PartitionKeyProcessing>
logger.info("*************************************************************");
logger.info("*************************************************************");
logger.info("Finished reading change feed from logical partition key!");
logger.info("*************************************************************");
logger.info("*************************************************************");
}
public CosmosAsyncClient getCosmosAsyncClient() {
return new CosmosClientBuilder()
.endpoint(AccountSettings.HOST)
.key(AccountSettings.MASTER_KEY)
.contentResponseOnWriteEnabled(true)
.consistencyLevel(ConsistencyLevel.SESSION)
.buildAsyncClient();
}
public void close() {
clientAsync.close();
}
private void shutdown(CosmosAsyncContainer container, CosmosAsyncDatabase database) {
try {
// To allow for the sequence to complete after subscribe() calls
Thread.sleep(5000);
// Clean shutdown
logger.info("Deleting Cosmos DB resources");
logger.info("-Deleting container...");
if (container != null)
container.delete().subscribe();
logger.info("-Deleting database...");
if (database != null)
database.delete().subscribe();
logger.info("-Closing the client...");
} catch (InterruptedException err) {
err.printStackTrace();
} catch (Exception err) {
logger.error("Deleting Cosmos DB resources failed, will still attempt to close the client. See stack "
+ "trace below.");
err.printStackTrace();
}
clientAsync.close();
logger.info("Done.");
}
}
使用分区键的更改
在某些情况下,你可能希望仅处理特定分区键的更改。 可以按照处理整个容器的相同方式处理特定分区键的更改。 以下是使用最新版本模式的示例:
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.cosmos.examples.changefeedpull;
import com.azure.cosmos.ConsistencyLevel;
import com.azure.cosmos.CosmosAsyncClient;
import com.azure.cosmos.CosmosAsyncContainer;
import com.azure.cosmos.CosmosAsyncDatabase;
import com.azure.cosmos.CosmosClientBuilder;
import com.azure.cosmos.examples.common.AccountSettings;
import com.azure.cosmos.models.CosmosChangeFeedRequestOptions;
import com.azure.cosmos.models.FeedRange;
import com.azure.cosmos.models.FeedResponse;
import com.azure.cosmos.models.PartitionKey;
import com.fasterxml.jackson.databind.JsonNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
public class SampleChangeFeedPullModel {
public static CosmosAsyncClient clientAsync;
private CosmosAsyncContainer container;
private CosmosAsyncDatabase database;
public static final String DATABASE_NAME = "db";
public static final String COLLECTION_NAME = "ChangeFeedPull";
public static final String PARTITION_KEY_FIELD_NAME = "pk";
protected static Logger logger = LoggerFactory.getLogger(SampleChangeFeedPullModel.class);
public static void main(String[] args) {
SampleChangeFeedPullModel p = new SampleChangeFeedPullModel();
try {
logger.info("Starting ASYNC main");
p.ChangeFeedPullDemo();
logger.info("Demo complete, please hold while resources are released");
} catch (Exception e) {
e.printStackTrace();
logger.error(String.format("Cosmos getStarted failed with %s", e));
} finally {
logger.info("Closing the client");
p.shutdown(p.container, p.database);
}
}
public void ChangeFeedPullDemo() {
clientAsync = this.getCosmosAsyncClient();
Resources resources = new Resources(PARTITION_KEY_FIELD_NAME, clientAsync, DATABASE_NAME, COLLECTION_NAME);
this.container = resources.container;
this.database = resources.database;
resources.insertDocuments(10, 20);
logger.info("*************************************************************");
logger.info("*************************************************************");
logger.info("Reading change feed with all feed ranges on this machine...");
logger.info("*************************************************************");
logger.info("*************************************************************");
// <FeedResponseIterator>
CosmosChangeFeedRequestOptions options = CosmosChangeFeedRequestOptions
.createForProcessingFromBeginning(FeedRange.forFullRange());
Iterator<FeedResponse<JsonNode>> responseIterator = container
.queryChangeFeed(options, JsonNode.class)
.byPage()
.toIterable()
.iterator();
// </FeedResponseIterator>
// <AllFeedRanges>
int i = 0;
List<JsonNode> results;
while (responseIterator.hasNext()) {
FeedResponse<JsonNode> response = responseIterator.next();
results = response.getResults();
logger.info("Got " + results.size() + " items(s)");
// applying the continuation token
// only after processing all events
options = CosmosChangeFeedRequestOptions
.createForProcessingFromContinuation(response.getContinuationToken());
i++;
if (i >= 5) {
// artificially breaking out of loop - not required in a real app
System.out.println("breaking....");
break;
}
}
// </AllFeedRanges>
logger.info("*************************************************************");
logger.info("*************************************************************");
logger.info("Finished Reading change feed using all feed ranges!");
logger.info("*************************************************************");
logger.info("*************************************************************");
// <GetFeedRanges>
Mono<List<FeedRange>> feedranges = resources.container.getFeedRanges();
List<FeedRange> feedRangeList = feedranges.block();
// </GetFeedRanges>
logger.info("*************************************************************");
logger.info("*************************************************************");
logger.info("Simulate processing change feed on two separate machines");
logger.info("*************************************************************");
logger.info("*************************************************************");
logger.info("*************************************************************");
logger.info("*************************************************************");
logger.info("Start reading from machine 1....");
logger.info("*************************************************************");
logger.info("*************************************************************");
// <Machine1>
FeedRange range1 = feedRangeList.get(0);
options = CosmosChangeFeedRequestOptions
.createForProcessingFromBeginning(range1);
int machine1index = 0;
responseIterator = container
.queryChangeFeed(options, JsonNode.class)
.byPage()
.toIterable()
.iterator();
while (responseIterator.hasNext()) {
FeedResponse<JsonNode> response = responseIterator.next();
results = response.getResults();
logger.info("Got " + results.size() + " items(s) retrieved");
// applying the continuation token
// only after processing all events
options = CosmosChangeFeedRequestOptions
.createForProcessingFromContinuation(response.getContinuationToken());
machine1index++;
if (machine1index >= 5) {
// artificially breaking out of loop - not required in a real app
System.out.println("breaking....");
break;
}
}
// </Machine1>
logger.info("*************************************************************");
logger.info("*************************************************************");
logger.info("Finished reading feed ranges on machine 1!");
logger.info("*************************************************************");
logger.info("*************************************************************");
logger.info("*************************************************************");
logger.info("*************************************************************");
logger.info("*************************************************************");
logger.info("*************************************************************");
logger.info("Start reading from machine 2....");
logger.info("*************************************************************");
logger.info("*************************************************************");
// <Machine2>
FeedRange range2 = feedRangeList.get(1);
options = CosmosChangeFeedRequestOptions
.createForProcessingFromBeginning(range2);
responseIterator = container
.queryChangeFeed(options, JsonNode.class)
.byPage()
.toIterable()
.iterator();
int machine2index = 0;
while (responseIterator.hasNext()) {
FeedResponse<JsonNode> response = responseIterator.next();
results = response.getResults();
logger.info("Got " + results.size() + " items(s) retrieved");
// applying the continuation token
// only after processing all events
options = CosmosChangeFeedRequestOptions
.createForProcessingFromContinuation(response.getContinuationToken());
machine2index++;
if (machine2index >= 5) {
// artificially breaking out of loop - not required in a real app
System.out.println("breaking....");
break;
}
}
// </Machine2>
logger.info("*************************************************************");
logger.info("*************************************************************");
logger.info("Finished reading feed ranges on machine 2!");
logger.info("*************************************************************");
logger.info("*************************************************************");
//grab first pk in keySet()
Set<String> keySet = resources.partitionKeyToDocuments.keySet();
String partitionKey="";
for (String string : keySet) {
partitionKey = string;
break;
}
logger.info("*************************************************************");
logger.info("*************************************************************");
logger.info("Reading change feed from logical partition key!");
logger.info("*************************************************************");
logger.info("*************************************************************");
// <PartitionKeyProcessing>
options = CosmosChangeFeedRequestOptions
.createForProcessingFromBeginning(FeedRange.forLogicalPartition(new PartitionKey(partitionKey)));
responseIterator = container
.queryChangeFeed(options, JsonNode.class)
.byPage()
.toIterable()
.iterator();
int pkIndex = 0;
while (responseIterator.hasNext()) {
FeedResponse<JsonNode> response = responseIterator.next();
results = response.getResults();
logger.info("Got " + results.size() + " items(s) retrieved");
// applying the continuation token
// only after processing all events
options = CosmosChangeFeedRequestOptions
.createForProcessingFromContinuation(response.getContinuationToken());
pkIndex++;
if (pkIndex >= 5) {
// artificially breaking out of loop
System.out.println("breaking....");
break;
}
}
// </PartitionKeyProcessing>
logger.info("*************************************************************");
logger.info("*************************************************************");
logger.info("Finished reading change feed from logical partition key!");
logger.info("*************************************************************");
logger.info("*************************************************************");
}
public CosmosAsyncClient getCosmosAsyncClient() {
return new CosmosClientBuilder()
.endpoint(AccountSettings.HOST)
.key(AccountSettings.MASTER_KEY)
.contentResponseOnWriteEnabled(true)
.consistencyLevel(ConsistencyLevel.SESSION)
.buildAsyncClient();
}
public void close() {
clientAsync.close();
}
private void shutdown(CosmosAsyncContainer container, CosmosAsyncDatabase database) {
try {
// To allow for the sequence to complete after subscribe() calls
Thread.sleep(5000);
// Clean shutdown
logger.info("Deleting Cosmos DB resources");
logger.info("-Deleting container...");
if (container != null)
container.delete().subscribe();
logger.info("-Deleting database...");
if (database != null)
database.delete().subscribe();
logger.info("-Closing the client...");
} catch (InterruptedException err) {
err.printStackTrace();
} catch (Exception err) {
logger.error("Deleting Cosmos DB resources failed, will still attempt to close the client. See stack "
+ "trace below.");
err.printStackTrace();
}
clientAsync.close();
logger.info("Done.");
}
}
获取容器的 FeedRanges 列表时,对于每个物理分区,你将获取一个 FeedRange
。
使用 FeedRange
可以跨多个计算机或线程并行处理更改源。 上面的示例展示了如何处理整个容器或某一个分区键的更改,与之不同的是,你可以使用 FeedRanges 来并行处理更改源。
若要使用 FeedRange,需要通过一个业务流程协调程序进程来获取 FeedRange 并将其分发到那些计算机。 此分发可能是:
- 使用
FeedRange.toString()
并分发此字符串值。 - 如果分发正在进行,则传递
FeedRange
对象引用。
以下示例使用最新版本模式。 该示例展示如何使用两个并行读取的独立虚构计算机从容器的更改源开头进行读取:
计算机 1:
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.cosmos.examples.changefeedpull;
import com.azure.cosmos.ConsistencyLevel;
import com.azure.cosmos.CosmosAsyncClient;
import com.azure.cosmos.CosmosAsyncContainer;
import com.azure.cosmos.CosmosAsyncDatabase;
import com.azure.cosmos.CosmosClientBuilder;
import com.azure.cosmos.examples.common.AccountSettings;
import com.azure.cosmos.models.CosmosChangeFeedRequestOptions;
import com.azure.cosmos.models.FeedRange;
import com.azure.cosmos.models.FeedResponse;
import com.azure.cosmos.models.PartitionKey;
import com.fasterxml.jackson.databind.JsonNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
public class SampleChangeFeedPullModel {
public static CosmosAsyncClient clientAsync;
private CosmosAsyncContainer container;
private CosmosAsyncDatabase database;
public static final String DATABASE_NAME = "db";
public static final String COLLECTION_NAME = "ChangeFeedPull";
public static final String PARTITION_KEY_FIELD_NAME = "pk";
protected static Logger logger = LoggerFactory.getLogger(SampleChangeFeedPullModel.class);
public static void main(String[] args) {
SampleChangeFeedPullModel p = new SampleChangeFeedPullModel();
try {
logger.info("Starting ASYNC main");
p.ChangeFeedPullDemo();
logger.info("Demo complete, please hold while resources are released");
} catch (Exception e) {
e.printStackTrace();
logger.error(String.format("Cosmos getStarted failed with %s", e));
} finally {
logger.info("Closing the client");
p.shutdown(p.container, p.database);
}
}
public void ChangeFeedPullDemo() {
clientAsync = this.getCosmosAsyncClient();
Resources resources = new Resources(PARTITION_KEY_FIELD_NAME, clientAsync, DATABASE_NAME, COLLECTION_NAME);
this.container = resources.container;
this.database = resources.database;
resources.insertDocuments(10, 20);
logger.info("*************************************************************");
logger.info("*************************************************************");
logger.info("Reading change feed with all feed ranges on this machine...");
logger.info("*************************************************************");
logger.info("*************************************************************");
// <FeedResponseIterator>
CosmosChangeFeedRequestOptions options = CosmosChangeFeedRequestOptions
.createForProcessingFromBeginning(FeedRange.forFullRange());
Iterator<FeedResponse<JsonNode>> responseIterator = container
.queryChangeFeed(options, JsonNode.class)
.byPage()
.toIterable()
.iterator();
// </FeedResponseIterator>
// <AllFeedRanges>
int i = 0;
List<JsonNode> results;
while (responseIterator.hasNext()) {
FeedResponse<JsonNode> response = responseIterator.next();
results = response.getResults();
logger.info("Got " + results.size() + " items(s)");
// applying the continuation token
// only after processing all events
options = CosmosChangeFeedRequestOptions
.createForProcessingFromContinuation(response.getContinuationToken());
i++;
if (i >= 5) {
// artificially breaking out of loop - not required in a real app
System.out.println("breaking....");
break;
}
}
// </AllFeedRanges>
logger.info("*************************************************************");
logger.info("*************************************************************");
logger.info("Finished Reading change feed using all feed ranges!");
logger.info("*************************************************************");
logger.info("*************************************************************");
// <GetFeedRanges>
Mono<List<FeedRange>> feedranges = resources.container.getFeedRanges();
List<FeedRange> feedRangeList = feedranges.block();
// </GetFeedRanges>
logger.info("*************************************************************");
logger.info("*************************************************************");
logger.info("Simulate processing change feed on two separate machines");
logger.info("*************************************************************");
logger.info("*************************************************************");
logger.info("*************************************************************");
logger.info("*************************************************************");
logger.info("Start reading from machine 1....");
logger.info("*************************************************************");
logger.info("*************************************************************");
// <Machine1>
FeedRange range1 = feedRangeList.get(0);
options = CosmosChangeFeedRequestOptions
.createForProcessingFromBeginning(range1);
int machine1index = 0;
responseIterator = container
.queryChangeFeed(options, JsonNode.class)
.byPage()
.toIterable()
.iterator();
while (responseIterator.hasNext()) {
FeedResponse<JsonNode> response = responseIterator.next();
results = response.getResults();
logger.info("Got " + results.size() + " items(s) retrieved");
// applying the continuation token
// only after processing all events
options = CosmosChangeFeedRequestOptions
.createForProcessingFromContinuation(response.getContinuationToken());
machine1index++;
if (machine1index >= 5) {
// artificially breaking out of loop - not required in a real app
System.out.println("breaking....");
break;
}
}
// </Machine1>
logger.info("*************************************************************");
logger.info("*************************************************************");
logger.info("Finished reading feed ranges on machine 1!");
logger.info("*************************************************************");
logger.info("*************************************************************");
logger.info("*************************************************************");
logger.info("*************************************************************");
logger.info("*************************************************************");
logger.info("*************************************************************");
logger.info("Start reading from machine 2....");
logger.info("*************************************************************");
logger.info("*************************************************************");
// <Machine2>
FeedRange range2 = feedRangeList.get(1);
options = CosmosChangeFeedRequestOptions
.createForProcessingFromBeginning(range2);
responseIterator = container
.queryChangeFeed(options, JsonNode.class)
.byPage()
.toIterable()
.iterator();
int machine2index = 0;
while (responseIterator.hasNext()) {
FeedResponse<JsonNode> response = responseIterator.next();
results = response.getResults();
logger.info("Got " + results.size() + " items(s) retrieved");
// applying the continuation token
// only after processing all events
options = CosmosChangeFeedRequestOptions
.createForProcessingFromContinuation(response.getContinuationToken());
machine2index++;
if (machine2index >= 5) {
// artificially breaking out of loop - not required in a real app
System.out.println("breaking....");
break;
}
}
// </Machine2>
logger.info("*************************************************************");
logger.info("*************************************************************");
logger.info("Finished reading feed ranges on machine 2!");
logger.info("*************************************************************");
logger.info("*************************************************************");
//grab first pk in keySet()
Set<String> keySet = resources.partitionKeyToDocuments.keySet();
String partitionKey="";
for (String string : keySet) {
partitionKey = string;
break;
}
logger.info("*************************************************************");
logger.info("*************************************************************");
logger.info("Reading change feed from logical partition key!");
logger.info("*************************************************************");
logger.info("*************************************************************");
// <PartitionKeyProcessing>
options = CosmosChangeFeedRequestOptions
.createForProcessingFromBeginning(FeedRange.forLogicalPartition(new PartitionKey(partitionKey)));
responseIterator = container
.queryChangeFeed(options, JsonNode.class)
.byPage()
.toIterable()
.iterator();
int pkIndex = 0;
while (responseIterator.hasNext()) {
FeedResponse<JsonNode> response = responseIterator.next();
results = response.getResults();
logger.info("Got " + results.size() + " items(s) retrieved");
// applying the continuation token
// only after processing all events
options = CosmosChangeFeedRequestOptions
.createForProcessingFromContinuation(response.getContinuationToken());
pkIndex++;
if (pkIndex >= 5) {
// artificially breaking out of loop
System.out.println("breaking....");
break;
}
}
// </PartitionKeyProcessing>
logger.info("*************************************************************");
logger.info("*************************************************************");
logger.info("Finished reading change feed from logical partition key!");
logger.info("*************************************************************");
logger.info("*************************************************************");
}
public CosmosAsyncClient getCosmosAsyncClient() {
return new CosmosClientBuilder()
.endpoint(AccountSettings.HOST)
.key(AccountSettings.MASTER_KEY)
.contentResponseOnWriteEnabled(true)
.consistencyLevel(ConsistencyLevel.SESSION)
.buildAsyncClient();
}
public void close() {
clientAsync.close();
}
private void shutdown(CosmosAsyncContainer container, CosmosAsyncDatabase database) {
try {
// To allow for the sequence to complete after subscribe() calls
Thread.sleep(5000);
// Clean shutdown
logger.info("Deleting Cosmos DB resources");
logger.info("-Deleting container...");
if (container != null)
container.delete().subscribe();
logger.info("-Deleting database...");
if (database != null)
database.delete().subscribe();
logger.info("-Closing the client...");
} catch (InterruptedException err) {
err.printStackTrace();
} catch (Exception err) {
logger.error("Deleting Cosmos DB resources failed, will still attempt to close the client. See stack "
+ "trace below.");
err.printStackTrace();
}
clientAsync.close();
logger.info("Done.");
}
}
计算机 2:
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.cosmos.examples.changefeedpull;
import com.azure.cosmos.ConsistencyLevel;
import com.azure.cosmos.CosmosAsyncClient;
import com.azure.cosmos.CosmosAsyncContainer;
import com.azure.cosmos.CosmosAsyncDatabase;
import com.azure.cosmos.CosmosClientBuilder;
import com.azure.cosmos.examples.common.AccountSettings;
import com.azure.cosmos.models.CosmosChangeFeedRequestOptions;
import com.azure.cosmos.models.FeedRange;
import com.azure.cosmos.models.FeedResponse;
import com.azure.cosmos.models.PartitionKey;
import com.fasterxml.jackson.databind.JsonNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
public class SampleChangeFeedPullModel {
public static CosmosAsyncClient clientAsync;
private CosmosAsyncContainer container;
private CosmosAsyncDatabase database;
public static final String DATABASE_NAME = "db";
public static final String COLLECTION_NAME = "ChangeFeedPull";
public static final String PARTITION_KEY_FIELD_NAME = "pk";
protected static Logger logger = LoggerFactory.getLogger(SampleChangeFeedPullModel.class);
public static void main(String[] args) {
SampleChangeFeedPullModel p = new SampleChangeFeedPullModel();
try {
logger.info("Starting ASYNC main");
p.ChangeFeedPullDemo();
logger.info("Demo complete, please hold while resources are released");
} catch (Exception e) {
e.printStackTrace();
logger.error(String.format("Cosmos getStarted failed with %s", e));
} finally {
logger.info("Closing the client");
p.shutdown(p.container, p.database);
}
}
public void ChangeFeedPullDemo() {
clientAsync = this.getCosmosAsyncClient();
Resources resources = new Resources(PARTITION_KEY_FIELD_NAME, clientAsync, DATABASE_NAME, COLLECTION_NAME);
this.container = resources.container;
this.database = resources.database;
resources.insertDocuments(10, 20);
logger.info("*************************************************************");
logger.info("*************************************************************");
logger.info("Reading change feed with all feed ranges on this machine...");
logger.info("*************************************************************");
logger.info("*************************************************************");
// <FeedResponseIterator>
CosmosChangeFeedRequestOptions options = CosmosChangeFeedRequestOptions
.createForProcessingFromBeginning(FeedRange.forFullRange());
Iterator<FeedResponse<JsonNode>> responseIterator = container
.queryChangeFeed(options, JsonNode.class)
.byPage()
.toIterable()
.iterator();
// </FeedResponseIterator>
// <AllFeedRanges>
int i = 0;
List<JsonNode> results;
while (responseIterator.hasNext()) {
FeedResponse<JsonNode> response = responseIterator.next();
results = response.getResults();
logger.info("Got " + results.size() + " items(s)");
// applying the continuation token
// only after processing all events
options = CosmosChangeFeedRequestOptions
.createForProcessingFromContinuation(response.getContinuationToken());
i++;
if (i >= 5) {
// artificially breaking out of loop - not required in a real app
System.out.println("breaking....");
break;
}
}
// </AllFeedRanges>
logger.info("*************************************************************");
logger.info("*************************************************************");
logger.info("Finished Reading change feed using all feed ranges!");
logger.info("*************************************************************");
logger.info("*************************************************************");
// <GetFeedRanges>
Mono<List<FeedRange>> feedranges = resources.container.getFeedRanges();
List<FeedRange> feedRangeList = feedranges.block();
// </GetFeedRanges>
logger.info("*************************************************************");
logger.info("*************************************************************");
logger.info("Simulate processing change feed on two separate machines");
logger.info("*************************************************************");
logger.info("*************************************************************");
logger.info("*************************************************************");
logger.info("*************************************************************");
logger.info("Start reading from machine 1....");
logger.info("*************************************************************");
logger.info("*************************************************************");
// <Machine1>
FeedRange range1 = feedRangeList.get(0);
options = CosmosChangeFeedRequestOptions
.createForProcessingFromBeginning(range1);
int machine1index = 0;
responseIterator = container
.queryChangeFeed(options, JsonNode.class)
.byPage()
.toIterable()
.iterator();
while (responseIterator.hasNext()) {
FeedResponse<JsonNode> response = responseIterator.next();
results = response.getResults();
logger.info("Got " + results.size() + " items(s) retrieved");
// applying the continuation token
// only after processing all events
options = CosmosChangeFeedRequestOptions
.createForProcessingFromContinuation(response.getContinuationToken());
machine1index++;
if (machine1index >= 5) {
// artificially breaking out of loop - not required in a real app
System.out.println("breaking....");
break;
}
}
// </Machine1>
logger.info("*************************************************************");
logger.info("*************************************************************");
logger.info("Finished reading feed ranges on machine 1!");
logger.info("*************************************************************");
logger.info("*************************************************************");
logger.info("*************************************************************");
logger.info("*************************************************************");
logger.info("*************************************************************");
logger.info("*************************************************************");
logger.info("Start reading from machine 2....");
logger.info("*************************************************************");
logger.info("*************************************************************");
// <Machine2>
FeedRange range2 = feedRangeList.get(1);
options = CosmosChangeFeedRequestOptions
.createForProcessingFromBeginning(range2);
responseIterator = container
.queryChangeFeed(options, JsonNode.class)
.byPage()
.toIterable()
.iterator();
int machine2index = 0;
while (responseIterator.hasNext()) {
FeedResponse<JsonNode> response = responseIterator.next();
results = response.getResults();
logger.info("Got " + results.size() + " items(s) retrieved");
// applying the continuation token
// only after processing all events
options = CosmosChangeFeedRequestOptions
.createForProcessingFromContinuation(response.getContinuationToken());
machine2index++;
if (machine2index >= 5) {
// artificially breaking out of loop - not required in a real app
System.out.println("breaking....");
break;
}
}
// </Machine2>
logger.info("*************************************************************");
logger.info("*************************************************************");
logger.info("Finished reading feed ranges on machine 2!");
logger.info("*************************************************************");
logger.info("*************************************************************");
//grab first pk in keySet()
Set<String> keySet = resources.partitionKeyToDocuments.keySet();
String partitionKey="";
for (String string : keySet) {
partitionKey = string;
break;
}
logger.info("*************************************************************");
logger.info("*************************************************************");
logger.info("Reading change feed from logical partition key!");
logger.info("*************************************************************");
logger.info("*************************************************************");
// <PartitionKeyProcessing>
options = CosmosChangeFeedRequestOptions
.createForProcessingFromBeginning(FeedRange.forLogicalPartition(new PartitionKey(partitionKey)));
responseIterator = container
.queryChangeFeed(options, JsonNode.class)
.byPage()
.toIterable()
.iterator();
int pkIndex = 0;
while (responseIterator.hasNext()) {
FeedResponse<JsonNode> response = responseIterator.next();
results = response.getResults();
logger.info("Got " + results.size() + " items(s) retrieved");
// applying the continuation token
// only after processing all events
options = CosmosChangeFeedRequestOptions
.createForProcessingFromContinuation(response.getContinuationToken());
pkIndex++;
if (pkIndex >= 5) {
// artificially breaking out of loop
System.out.println("breaking....");
break;
}
}
// </PartitionKeyProcessing>
logger.info("*************************************************************");
logger.info("*************************************************************");
logger.info("Finished reading change feed from logical partition key!");
logger.info("*************************************************************");
logger.info("*************************************************************");
}
public CosmosAsyncClient getCosmosAsyncClient() {
return new CosmosClientBuilder()
.endpoint(AccountSettings.HOST)
.key(AccountSettings.MASTER_KEY)
.contentResponseOnWriteEnabled(true)
.consistencyLevel(ConsistencyLevel.SESSION)
.buildAsyncClient();
}
public void close() {
clientAsync.close();
}
private void shutdown(CosmosAsyncContainer container, CosmosAsyncDatabase database) {
try {
// To allow for the sequence to complete after subscribe() calls
Thread.sleep(5000);
// Clean shutdown
logger.info("Deleting Cosmos DB resources");
logger.info("-Deleting container...");
if (container != null)
container.delete().subscribe();
logger.info("-Deleting database...");
if (database != null)
database.delete().subscribe();
logger.info("-Closing the client...");
} catch (InterruptedException err) {
err.printStackTrace();
} catch (Exception err) {
logger.error("Deleting Cosmos DB resources failed, will still attempt to close the client. See stack "
+ "trace below.");
err.printStackTrace();
}
clientAsync.close();
logger.info("Done.");
}
}
若要使用拉取模型处理更改源,请创建一个 ChangeFeedPullModelIterator
实例。 最初创建 ChangeFeedPullModelIterator
时,必须在 ChangeFeedIteratorOptions
内指定一个必需的 changeFeedStartFrom
值,该值包括读取更改的起始位置和需要提取更改的资源(分区键或 FeedRange)。
注意
如果未指定 changeFeedStartFrom
值,则将从 Now() 提取整个容器的更改源。
目前,JS SDK 仅支持 最新版本,并且默认选择最新版本。
可以选择在 ChangeFeedIteratorOptions
中使用 maxItemCount
来设置每页接收的最大项目数。
下面的示例介绍了如何在最新版本模式中获取返回实体对象的迭代器:
const options = {
changeFeedStartFrom: ChangeFeedStartFrom.Now()
};
const iterator = container.items.getChangeFeedIterator(options);
使用整个容器的更改
如果没有在 ChangeFeedStartFrom
中提供PartitionKey
或 FeedRange
参数,则可以按自己的节奏处理整个容器的更改源。 下面的示例将从当前时间开始读取所有更改:
async function waitFor(milliseconds: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, milliseconds));
}
const options = {
changeFeedStartFrom: ChangeFeedStartFrom.Beginning()
};
const iterator = container.items.getChangeFeedIterator(options);
let timeout = 0;
while(iterator.hasMoreResults) {
const response = await iterator.readNext();
if (response.statusCode === StatusCodes.NotModified) {
timeout = 5000;
}
else {
console.log("Result found", response.result);
timeout = 0;
}
await waitFor(timeout);
}
由于更改源实际上是包含所有后续写入和更新项的无穷列表,因此 hasMoreResults
的值始终为 true
。 在尝试读取更改源时,如果未发生新的更改,你会收到 NotModified
状态的响应。 在上述示例中,处理方式是先等待 5 秒钟,然后重新检查更改。
使用分区键的更改
在某些情况下,你可能希望仅处理特定分区键的更改。 可以获取特定分区键的迭代器,并以处理整个容器的相同方式处理更改。
async function waitFor(milliseconds: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, milliseconds));
}
const options = {
changeFeedStartFrom: ChangeFeedStartFrom.Beginning("partitionKeyValue")
};
const iterator = container.items.getChangeFeedIterator(options);
let timeout = 0;
while(iterator.hasMoreResults) {
const response = await iterator.readNext();
if (response.statusCode === StatusCodes.NotModified) {
timeout = 5000;
}
else {
console.log("Result found", response.result);
timeout = 0;
}
await waitFor(timeout);
}
使用 FeedRange 实现并行化
在更改源拉取模型中,可以使用 FeedRange
来并行处理更改源。 FeedRange
表示分区键值的一个范围。
下面的示例展示了如何获取容器的范围列表:
const ranges = await container.getFeedRanges();
获取容器的 FeedRange
值列表时,每个物理分区都会获得一个 FeedRange
。
可以使用 FeedRange
创建一个迭代器,以便跨多个计算机或线程并行处理更改源。 上面的示例展示了如何获取整个容器或某一个分区键的更改源迭代器,与之不同的是,你可以使用 FeedRanges 获取多个迭代器,这样就可以并行处理更改源。
下面的示例展示了如何使用两个并行读取的独立虚构计算机从容器的更改源开头进行读取:
计算机 1:
async function waitFor(milliseconds: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, milliseconds));
}
const options = {
changeFeedStartFrom: ChangeFeedStartFrom.Beginning(ranges[0])
};
const iterator = container.items.getChangeFeedIterator(options);
let timeout = 0;
while(iterator.hasMoreResults) {
const response = await iterator.readNext();
if (response.statusCode === StatusCodes.NotModified) {
timeout = 5000;
}
else {
console.log("Result found", response.result);
timeout = 0;
}
await waitFor(timeout);
}
计算机 2:
async function waitFor(milliseconds: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, milliseconds));
}
const options = {
changeFeedStartFrom: ChangeFeedStartFrom.Beginning(ranges[1])
};
const iterator = container.items.getChangeFeedIterator(options);
let timeout = 0;
while(iterator.hasMoreResults) {
const response = await iterator.readNext();
if (response.statusCode === StatusCodes.NotModified) {
timeout = 5000;
}
else {
console.log("Result found", response.result);
timeout = 0;
}
await waitFor(timeout);
}
保存延续令牌
可以通过获取延续令牌来保存迭代器的位置。 延续令牌是字符串值,它会跟踪更改源迭代器的上次处理的更改,并且允许迭代器稍后在此点进行恢复。 延续令牌(如果已指定)优先于开始时间和“从头开始”值。 以下代码读取自容器创建以来生成的更改源。 当没有更多更改可用时,它会保留一个继续标记,以便以后可以继续使用更改源。
const options = {
changeFeedStartFrom: ChangeFeedStartFrom.Beginning()
};
const iterator = container.items.getChangeFeedIterator(options);
let timeout = 0;
let continuation = "";
while(iterator.hasMoreResults) {
const response = await iterator.readNext();
if (response.statusCode === StatusCodes.NotModified) {
continuation = response.continuationToken;
break;
}
else {
console.log("Result found", response.result);
}
}
// For checking any new changes using the continuation token
const continuationOptions = {
changeFeedStartFrom: ChangeFeedStartFrom(continuation)
}
const newIterator = container.items.getChangeFeedIterator(continuationOptions);
只要 Azure Cosmos DB 容器仍然存在,延续令牌将永不过期。
使用异步迭代器
可以使用 JavaScript 异步迭代器提取更改源。 下面是使用异步迭代器的示例。
async function waitFor(milliseconds: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, milliseconds));
}
const options = {
changeFeedStartFrom: ChangeFeedStartFrom.Beginning()
};
let timeout = 0;
for await(const result of container.items.getChangeFeedIterator(options).getAsyncIterator()) {
if (result.statusCode === StatusCodes.NotModified) {
timeout = 5000;
}
else {
console.log("Result found", result.result);
timeout = 0;
}
await waitFor(timeout);
}