更改流是从数据库流到应用程序的数据库更改的实时流。 借助此功能,你可以通过订阅数据库更改来生成反应式应用程序,而无需持续轮询来检测更改。
注释
Azure DocumentDB 目前支持单分片群集中的更改流。 我们正在努力在不久的将来为多分片群集提供变更流支持。
Azure DocumentDB 对更改流施加 400 MB 的限制。 请提交支持请求以增加此限制。
启用更改流
可以使用 Azure CLI 或 ARM 模板启用或禁用此功能。 即将添加门户支持。
通过 CLI 在群集上启用更改流的步骤
- 登录到 Azure CLI
az login
- 检索群集上功能标志的当前设置。 这可确保在添加新功能时保留任何现有标志。
az resource show --ids "/subscriptions/<sub id>/resourceGroups/<resource group name>/providers/Microsoft.DocumentDB/mongoClusters/<resource name of your Azure DocumentDB cluster>" --api-version 2024-10-01-preview
- 发送 PATCH 请求以启用该功能。
az resource patch --ids "/subscriptions/<subscription_id>/resourceGroups/<resource_group_name>/providers/Microsoft.DocumentDB/mongoClusters/<cluster-name>" --api-version 2024-10-01-preview --properties "{\"previewFeatures\": [ \"ChangeStreams\"]}"
- 验证结果:
- 确保响应有效负载包括
"previewFeatures": ["ChangeStreams"]。
- 如果遇到错误“此群集不支持更改流”,请创建支持请求。
配置变更流
此示例代码在集合上 exampleCollection 启动更改流,持续监视任何更改。 检测到更改后,它会检索更改事件,并采用 JSON 格式打印该更改事件。
// Open a change stream
const changeStream = db.exampleCollection.watch();
// Listen for changes
while (changeStream.hasNext())
{
const change = changeStream.next();
printjson(change);
}
package com.example;
import com.mongodb.MongoClientSettings;
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.model.changestream.ChangeStreamDocument;
import org.bson.BsonDocument;
import org.bson.Document;
import org.bson.json.JsonWriterSettings;
import org.json.JSONObject;
public class ChangeStreamExample {
public static void main(String[] args) {
MongoClient mongoClient = MongoClients.create("mongodb://<username>:<pwd>@<clustername>.global.mongocluster.cosmos.azure.cn/?tls=true");
MongoDatabase database = mongoClient.getDatabase("test");
MongoCollection<Document> collection = database.getCollection("test");
// Open a change stream
MongoCursor<ChangeStreamDocument<Document>> cursor = collection.watch().iterator();
// Listen for changes
try {
while (cursor.hasNext()) {
ChangeStreamDocument<Document> change = cursor.next();
// Create a JSONObject from the fields of the ChangeStreamDocument
JSONObject json = new JSONObject();
// Add fields to the JSON object
json.put("operationType", change.getOperationType().getValue());
json.put("namespace", change.getNamespace().getFullName());
if (change.getDocumentKey() != null) {
json.put("documentKey", change.getDocumentKey().toJson().toString());
}
if (change.getResumeToken() != null) {
json.put("resumeToken", change.getResumeToken().toJson());
}
if (change.getFullDocument() != null) {
json.put("fullDocument", change.getFullDocument().toJson());
}
if (change.getWallTime() != null) {
json.put("wallTime", change.getWallTime().toString());
}
// Pretty-print the JSON object with 4-space indent
System.out.println(json.toString(4));
}
}
catch (Exception e) {
System.err.println("Error processing change stream: " + e);
}
finally {
cursor.close();
mongoClient.close();
}
}
}
重要
通过在打开游标时指定恢复令牌到resumeAfter,可以恢复变更流。 不过,预计有足够的历史记录可以查找与令牌关联的操作。 在 _id 字段中观察到的更改流文档表示可恢复令牌。
cursor = db.exampleCollection.watch(resume_after=resume_token)
使用更改流监视数据库更改
让我们通过示例了解更改流输出。
在此更改流事件中,我们看到新记录已inserted进入数据库中的exampleCollectioncs集合中,事件详细信息包括新添加的文档的完整内容。
{
"_id": { "_data": "AeARBpQ/AAAA" }, // "resume_token"
"operationType": "insert",
"fullDocument": {
"_id": { "$oid": "66e6f63e6f49ecaabf794958" },
"employee_id": "17986",
"name": "John",
"position": "Software Engineer",
"department": "IT",
"rating": 4
},
"ns": { "db": "cs", "coll": "exampleCollection" },
"documentKey": { "_id": { "$oid": "66e6f63e6f49ecaabf794958" } }
}
在此次更新事件中,John 的 position 和 rating 已被修改。 更改流反映 update 集合中 exampleCollection 文档的更新后状态。
{
"_id": { "_data": "AWACAKM/AAAA" },
"operationType": "update",
"fullDocument": {
"_id": { "$oid": "66e6f63e6f49ecaabf794958" },
"employee_id": "17986",
"name": "John",
"position": "SSE",
"department": "IT",
"rating": 5
},
"ns": {
"db": "cs", "coll": "exampleCollection"
},
"documentKey": {
"_id": { "$oid": "66e6f63e6f49ecaabf794958" }
}
}
更改流事件指示文档来自deleted数据库中的exampleCollectioncs集合。 此事件捕捉已删除文档的唯一标识符。
{
"_id": { "_data": "ASgBAJs/AAAA" },
"operationType": "delete",
"ns": { "db": "cs", "coll": "exampleCollection" },
"documentKey": { "_id": { "$oid": "66e6f63e6f49ecaabf794958" } }
}
在 Change Stream 中个性化数据
通过在设置时指定一个或多个管道阶段的数组来定制变更流输出。 支持的运算符包括以下内容。
$addFields
$match
$project
$set
$unset
局限性
- 尚不支持在多分片群集上处理更改流事件。
- 更改流游标需要在当前状态的故障转移事件后重新初始化。
-
Update 事件尚不支持更新描述。
-
pre-image 是不支持的选项。
-
$changestream 作为其他阶段的一部分的嵌套管道尚不受支持。
相关内容