更改流是从数据库流到应用程序的数据库更改实时流。 借助此功能,可以通过订阅数据库更改来生成反应式应用程序,无需持续进行轮询来检测更改。
此示例代码会在 exampleCollection
集合上启动更改流,以持续监视任何更改。 检测到更改后,它会检索更改事件,并采用 JSON 格式打印它。
// Open a change stream
const changeStream = db.exampleCollection.watch();
// Listen for changes
while (changeStream.hasNext())
{
const change = changeStream.next();
printjson(change);
}
package com.example;
import com.mongodb.MongoClientSettings;
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.model.changestream.ChangeStreamDocument;
import org.bson.BsonDocument;
import org.bson.Document;
import org.bson.json.JsonWriterSettings;
import org.json.JSONObject;
public class ChangeStreamExample {
public static void main(String[] args) {
MongoClient mongoClient = MongoClients.create("mongodb://<username>:<pwd>@<clustername>.global.mongocluster.cosmos.azure.com/?tls=true");
MongoDatabase database = mongoClient.getDatabase("test");
MongoCollection<Document> collection = database.getCollection("test");
// Open a change stream
MongoCursor<ChangeStreamDocument<Document>> cursor = collection.watch().iterator();
// Listen for changes
try {
while (cursor.hasNext()) {
ChangeStreamDocument<Document> change = cursor.next();
// Create a JSONObject from the fields of the ChangeStreamDocument
JSONObject json = new JSONObject();
// Add fields to the JSON object
json.put("operationType", change.getOperationType().getValue());
json.put("namespace", change.getNamespace().getFullName());
if (change.getDocumentKey() != null) {
json.put("documentKey", change.getDocumentKey().toJson().toString());
}
if (change.getResumeToken() != null) {
json.put("resumeToken", change.getResumeToken().toJson());
}
if (change.getFullDocument() != null) {
json.put("fullDocument", change.getFullDocument().toJson());
}
if (change.getWallTime() != null) {
json.put("wallTime", change.getWallTime().toString());
}
// Pretty-print the JSON object with 4-space indent
System.out.println(json.toString(4));
}
}
catch (Exception e) {
System.err.println("Error processing change stream: " + e);
}
finally {
cursor.close();
mongoClient.close();
}
}
}
重要
通过在打开游标时为 resumeAfter
指定恢复标记,可恢复更改流。 但预计有足够的历史记录来定位与该标记关联的操作。 在 _id
字段中的更改流中观察到的文档表示可恢复标记。
cursor = db.exampleCollection.watch(resume_after=resume_token)
让我们通过示例来了解更改流输出。
在此更改流事件中,我们看到一条新记录 inserted
到 cs
数据库中的 exampleCollection
集合中,事件详细信息包括新添加的文档的完整内容。
{
"_id": { "_data": "AeARBpQ/AAAA" }, // "resume_token"
"operationType": "insert",
"fullDocument": {
"_id": { "$oid": "66e6f63e6f49ecaabf794958" },
"employee_id": "17986",
"name": "John",
"position": "Software Engineer",
"department": "IT",
"rating": 4
},
"ns": { "db": "cs", "coll": "exampleCollection" },
"documentKey": { "_id": { "$oid": "66e6f63e6f49ecaabf794958" } }
}
在此更新事件中,John 的 position
和 rating
被修改。 更改流反映 exampleCollection
集合中的 update
以及文档的更新后状态。
{
"_id": { "_data": "AWACAKM/AAAA" },
"operationType": "update",
"fullDocument": {
"_id": { "$oid": "66e6f63e6f49ecaabf794958" },
"employee_id": "17986",
"name": "John",
"position": "SSE",
"department": "IT",
"rating": 5
},
"ns": {
"db": "cs", "coll": "exampleCollection"
},
"documentKey": {
"_id": { "$oid": "66e6f63e6f49ecaabf794958" }
}
}
更改流事件指示文档是来自 cs
数据库中的 exampleCollection
集合的 deleted
。 该事件会捕获已删除文档的唯一标识符。
{
"_id": { "_data": "ASgBAJs/AAAA" },
"operationType": "delete",
"ns": { "db": "cs", "coll": "exampleCollection" },
"documentKey": { "_id": { "$oid": "66e6f63e6f49ecaabf794958" } }
}
通过在配置期间指定一个或多个管道阶段的数组来自定义更改流输出。 支持的运算符包括以下内容。
$addFields
$match
$project
$set
$unset
Replace
事件尚不受支持。
pre-image
仍是一个不受支持的选项。
- 在当前状态下发生故障转移事件后,需要重新初始化更改流游标。
- 尚不支持过去时间线的历史更改流事件。
Update
事件尚不支持更新说明。
- 多分片群集上的更改流事件尚不受支持。
- 分片集合上的更改流尚不受支持。
- 尚不支持
showexpandedevents
。 它包括 createIndex
、dropIndex
、createCollection
、rename
等。
$changestream
作为另一个阶段的嵌套管道尚不受支持。