可通过更改流 API 在 Azure Cosmos DB 的 API for MongoDB 中获取更改源支持。 通过使用更改流 API,你的应用程序可以获取对集合或对单个分片中的项的更改。 以后,可以根据结果采取进一步的措施。 对集合中的项所做的更改将按照其修改时间的顺序进行捕获,并按分片键保证排序顺序。
注意
若要使用更改流,请使用 3.6 或更高版本的服务器创建 Azure Cosmos DB 的用于 MongoDB 的 API 帐户。 如果对较早版本运行更改流示例,可能会看到“无法识别的管道阶段名称: $changeStream”错误。
Bson match = Aggregates.match(Filters.in("operationType", asList("update", "replace", "insert")));
// Pick the field you are most interested in
Bson project = Aggregates.project(fields(include("_id", "ns", "documentKey", "fullDocument")));
// This variable is for second example
BsonDocument resumeToken = null;
// Now time to build the pipeline
List<Bson> pipeline = Arrays.asList(match, project);
//#1 Simple example to seek changes
// Create cursor with update_lookup
MongoChangeStreamCursor<ChangeStreamDocument<org.bson.Document>> cursor = collection.watch(pipeline)
.fullDocument(FullDocument.UPDATE_LOOKUP).cursor();
Document document = new Document("name", "doc-in-step-1-" + Math.random());
collection.insertOne(document);
while (cursor.hasNext()) {
// There you go, we got the change document.
ChangeStreamDocument<Document> csDoc = cursor.next();
// Let is pick the token which will help us resuming
// You can save this token in any persistent storage and retrieve it later
resumeToken = csDoc.getResumeToken();
//Printing the token
System.out.println(resumeToken);
//Printing the document.
System.out.println(csDoc.getFullDocument());
//This break is intentional but in real project feel free to remove it.
break;
}
cursor.close();