更改流是从数据库流到应用程序的数据库更改的实时流。 借助此功能,你可以通过订阅数据库更改来生成反应式应用程序,而无需持续轮询来检测更改。
注释
对多分片集群的变更流支持目前处于预览阶段。 若要在群集上启用此功能, 请创建支持请求。
开始
示例代码在集合上 exampleCollection 启动更改流,持续监视任何更改。 检测到更改后,它会检索更改事件,并采用 JSON 格式打印该更改事件。
from pymongo.errors import OperationFailure, PyMongoError
from mongo_utils import get_collection, load_mongo_config, load_resume_token, ping_server, save_resume_token
def main() -> None:
client = None
try:
_, _, source_collection_name, destination_collection_name = load_mongo_config()
client, database, collection = get_collection()
destination_collection = None
if destination_collection_name:
destination_collection = database[destination_collection_name]
print(
f"Mirroring change events from '{source_collection_name}' to '{destination_collection_name}'."
)
else:
print(
f"Watching source collection '{source_collection_name}' with publish-only mode (no destination collection configured)."
)
ping_server(client)
# Try to resume from last known token
resume_token = load_resume_token()
if resume_token:
print(f"Resuming from last token: {resume_token.get('_data', '')[:20]}...")
stream = collection.watch(resume_after=resume_token)
else:
print("Starting fresh change stream (no prior token found).")
stream = collection.watch()
print("Watching for changes...")
with stream:
for change in stream:
print(f"Change: {change['operationType']} on {change['ns']['coll']}")
print(f" Full doc: {change.get('fullDocument', {})}")
if destination_collection is not None:
mirrored_event = {
"source": change.get("ns", {}),
"operationType": change.get("operationType"),
"documentKey": change.get("documentKey", {}),
"fullDocument": change.get("fullDocument", {}),
"resumeToken": change.get("_id", {}),
}
destination_collection.insert_one(mirrored_event)
# Save token after each successful change
save_resume_token(change["_id"])
print()
except OperationFailure as error:
print("The change stream could not be opened on this collection.")
print(error)
print(
"Check whether your Azure database deployment supports change streams for the configured aggregation properties."
)
except PyMongoError as error:
print("Azure DocumentDB connection or driver error while watching for changes.")
print(error)
except KeyboardInterrupt:
print("\nConsumer stopped by user.")
finally:
if client is not None:
client.close()
if __name__ == "__main__":
main()
package com.avijit.changestreams;
import com.mongodb.MongoException;
import com.mongodb.client.ChangeStreamIterable;
import com.mongodb.client.MongoChangeStreamCursor;
import com.mongodb.client.model.changestream.ChangeStreamDocument;
import org.bson.BsonDocument;
import org.bson.Document;
/**
* Consumer: Listens for change-stream events and persists the resume token.
*/
public final class DataConsumerApp {
private DataConsumerApp() {
}
public static void main(String[] args) {
MongoUtils.MongoContext context = null;
try {
context = MongoUtils.getContext();
MongoUtils.pingServer(context.client());
BsonDocument resumeToken = MongoUtils.loadResumeToken();
ChangeStreamIterable<Document> watch = context.collection().watch();
if (resumeToken != null) {
System.out.printf("Resuming from last token: %s...%n", abbreviate(resumeToken.toJson()));
watch = watch.resumeAfter(resumeToken);
} else {
System.out.println("Starting fresh change stream (no prior token found).");
}
System.out.println("Watching for changes...");
try (MongoChangeStreamCursor<ChangeStreamDocument<Document>> cursor = watch.cursor()) {
while (cursor.hasNext()) {
ChangeStreamDocument<Document> change = cursor.next();
Document fullDocument = change.getFullDocument();
System.out.printf("Change: %s on %s%n", change.getOperationType().getValue(), change.getNamespace().getCollectionName());
System.out.printf(" Full doc: %s%n%n", fullDocument == null ? "{}" : fullDocument.toJson());
MongoUtils.saveResumeToken(change.getResumeToken());
}
}
} catch (MongoException | IllegalStateException error) {
System.out.println("Azure DocumentDB connection or driver error while watching for changes.");
System.out.println(error.getMessage());
} finally {
if (context != null) {
context.client().close();
}
}
}
private static String abbreviate(String tokenJson) {
int maxLength = 24;
if (tokenJson.length() <= maxLength) {
return tokenJson;
}
return tokenJson.substring(0, maxLength);
}
}
// Open a change stream
const changeStream = db.exampleCollection.watch();
// Listen for changes
while (changeStream.hasNext())
{
const change = changeStream.next();
printjson(change);
}
using MongoDB.Bson;
using MongoDB.Driver;
namespace ChangeStreamsDriver;
public static class DataConsumerApp
{
public static async Task RunAsync(CancellationToken cancellationToken = default)
{
var context = MongoUtils.GetContext();
await MongoUtils.PingServerAsync(context.Client, cancellationToken);
var resumeToken = await MongoUtils.LoadResumeTokenAsync();
var options = new ChangeStreamOptions
{
FullDocument = ChangeStreamFullDocumentOption.UpdateLookup,
MaxAwaitTime = TimeSpan.FromSeconds(1)
};
if (resumeToken is not null)
{
options.ResumeAfter = resumeToken;
Console.WriteLine($"Resuming from last token: {Abbreviate(resumeToken.ToJson())}...");
}
else
{
Console.WriteLine("Starting fresh change stream (no prior token found).");
}
Console.WriteLine("Watching for changes...");
using var cursor = await context.Collection.WatchAsync(options, cancellationToken);
while (!cancellationToken.IsCancellationRequested && await cursor.MoveNextAsync(cancellationToken))
{
foreach (var change in cursor.Current)
{
var operationType = change.OperationType;
var collectionName = change.CollectionNamespace?.CollectionName ?? context.Collection.CollectionNamespace.CollectionName;
var fullDoc = change.FullDocument?.ToJson() ?? "{}";
Console.WriteLine($"Change: {operationType.ToString().ToLowerInvariant()} on {collectionName}");
Console.WriteLine($" Full doc: {fullDoc}\n");
await MongoUtils.SaveResumeTokenAsync(change.ResumeToken);
}
}
}
private static string Abbreviate(string tokenJson)
{
const int maxLength = 24;
return tokenJson.Length <= maxLength ? tokenJson : tokenJson[..maxLength];
}
}
require_relative 'mongo_utils'
require 'json'
# MongoDB Data Consumer - Watches for change stream events
class DataConsumerApp
def self.run
config = MongoUtils.load_mongo_config
client = MongoUtils.create_client(config[:uri])
begin
collection = MongoUtils.get_collection(client, config[:db], config[:collection])
# Load resume token if available
resume_token = MongoUtils.load_resume_token
if resume_token
puts "Resuming from last token: #{resume_token['_data'][0..20]}..."
stream = collection.watch([], resume_after: resume_token)
else
puts "Starting fresh change stream (no prior token found)."
stream = collection.watch([])
end
puts "Watching for changes..."
# Watch for change events
stream.each do |change|
operation_type = change['operationType']
ns = change['ns']
collection_name = ns['coll'] if ns
full_document = change['fullDocument']
puts "Event: #{operation_type} on #{collection_name}"
puts " Document: #{full_document.inspect}"
# Save the resume token after processing each event
if change['_id']
MongoUtils.save_resume_token(change['_id'])
end
end
stream.close
rescue Interrupt
puts "\nConsumer stopped by user."
ensure
client.close
end
end
end
DataConsumerApp.run if __FILE__ == $0
import {
getContext,
pingServer,
saveResumeToken,
loadResumeToken,
closeConnection,
} from '../utils/MongoUtils.js';
/**
* Consumer: Listens for change-stream events and persists the resume token
*/
async function main() {
try {
const { model } = await getContext();
await pingServer();
const resumeToken = await loadResumeToken();
const watchOptions = resumeToken ? { resumeAfter: resumeToken } : {};
if (resumeToken) {
console.log(`Resuming from last token: ${abbreviate(JSON.stringify(resumeToken))}...`);
} else {
console.log('Starting fresh change stream (no prior token found).');
}
console.log('Watching for changes...');
const changeStream = model.watch([], watchOptions);
while (true) {
const change = await changeStream.next();
const operationType = change.operationType;
if ('ns' in change && change.ns) {
const ns = change.ns as { db: string; coll?: string };
const collectionName = ns.coll ?? 'unknown';
const fullDocument = 'fullDocument' in change ? change.fullDocument ?? {} : {};
console.log(`Change: ${operationType} on ${collectionName}`);
console.log(` Full doc: ${JSON.stringify(fullDocument)}\n`);
}
if (change._id) {
await saveResumeToken(change._id as Record<string, unknown>);
}
}
} catch (error) {
console.log('Azure DocumentDB connection or driver error while watching for changes.');
console.log(error instanceof Error ? error.message : String(error));
process.exit(1);
} finally {
await closeConnection();
}
}
/**
* Abbreviate token JSON for display
*/
function abbreviate(tokenJson: string): string {
const maxLength = 24;
if (tokenJson.length <= maxLength) {
return tokenJson;
}
return tokenJson.substring(0, maxLength);
}
main().catch(console.error);
使用更改流监控数据库变更
让我们通过示例了解更改流输出。
在此更改流事件中,我们看到新记录已inserted进入数据库中的exampleCollectioncs集合中,事件详细信息包括新添加的文档的完整内容。
{
"_id": { "_data": "AeARBpQ/AAAA" }, // "resume_token"
"operationType": "insert",
"fullDocument": {
"_id": { "$oid": "66e6f63e6f49ecaabf794958" },
"employee_id": "17986",
"name": "John",
"position": "Software Engineer",
"department": "IT",
"rating": 4
},
"ns": { "db": "cs", "coll": "exampleCollection" },
"documentKey": { "_id": { "$oid": "66e6f63e6f49ecaabf794958" } }
}
在此次更新事件中,John 的 position 和 rating 已被修改。 更改流反映 update 集合中 exampleCollection 文档的更新后状态。
{
"_id": { "_data": "AWACAKM/AAAA" },
"operationType": "update",
"fullDocument": {
"_id": { "$oid": "66e6f63e6f49ecaabf794958" },
"employee_id": "17986",
"name": "John",
"position": "SSE",
"department": "IT",
"rating": 5
},
"ns": {
"db": "cs", "coll": "exampleCollection"
},
"documentKey": {
"_id": { "$oid": "66e6f63e6f49ecaabf794958" }
}
}
注释
对多分片集群的变更流支持目前处于预览阶段。 若要在群集上启用此功能, 请创建支持请求。
{
"operationType": "update",
"updateDescription": {
"updatedFields": {
"position": "SSE",
"rating": 5
},
"removedFields": [],
"truncatedArrays": []
},
"fullDocument": { ... }
}
更改流事件指示文档来自deleted数据库中的exampleCollectioncs集合。 此事件捕捉已删除文档的唯一标识符。
{
"_id": { "_data": "ASgBAJs/AAAA" },
"operationType": "delete",
"ns": { "db": "cs", "coll": "exampleCollection" },
"documentKey": { "_id": { "$oid": "66e6f63e6f49ecaabf794958" } }
}
恢复变更流
通过在打开游标时指定“恢复令牌” resumeAfter 来恢复更改流。 每个变更事件的 _id 字段充当恢复令牌。 将它持久化存储,并在重新打开流时将它传回。
从活动更改日志恢复
// Resume from the last stored\processed token
const stream = db.exampleCollection.watch([], {resumeAfter: savedResumeToken});
while (stream.hasNext()) {
const event = stream.next();
savedResumeToken = event._id;
processEvent(event);
}
从历史检查点恢复流数据
标准更改流仅限于当前活动的 400 MB 更改日志内保留的事件。 轮换后,旧条目将存档并变得不可访问。 历史更改流通过以透明方式检索存档的日志、启用流重播或从过去的时间戳恢复来消除此限制。
// Replay all events since 2026-05-01 00:00:00 UTC, at current timestamp 2026-06-01 00:00:00 UTC
const startTime = new Timestamp(Math.floor(new Date("2026-05-01").getTime() / 1000), 1);
const stream = db.exampleCollection.watch([], {
startAtOperationTime: startTime
});
while (stream.hasNext()) {
printjson(stream.next());
}
重要
变更流支持通过 startAt、resumeAfter 和 startAtOperationTime 参数实现可恢复性。 集成时间点还原(PITR)日志后,流式恢复最长可延长至 35 天或集群初始化时间点(以较早者为准)。
建议在低流量期间处理历史检查点,以最大程度地减少额外资源消耗的影响。 根据从历史时间点开始需要处理的数据量,预留更多存储空间(适用于存储容量为 128 GB 或以下的集群)。
变更流中的变更前映像(预览版)
注释
原像支持当前处于预览阶段。 若要在群集上启用此功能, 请创建支持请求。
默认情况下,更改事件会显示更改后的文档。 启用前镜像会使系统在更改之前也记录完整文档,并在每个事件中以 fullDocumentBeforeChange 的形式提供。
| 选项 |
Behavior |
何时使用 |
"off"(默认值) |
从不包含预映像 |
标准流 |
"whenAvailable" |
如有则包含;否则静默省略 |
具有优雅降级能力的审计/变更数据捕获(CDC)场景 |
"required" |
包含;如果不可用,则引发错误 |
在必须明确标示缺失的预映像而不是将其跳过时使用 |
重要
原像仅适用于 update、delete 和 replace 事件。
启用预映像会增加存储和处理开销,因为必须为每个符合条件的操作捕获和保留以前版本的已修改文档,这会导致更高的写入放大、额外的 I/O 消耗和更改流处理延迟增加。 为了最大程度地减少长期资源影响,除非工作负荷明确要求,否则不建议在生产环境中持续启用。
在集合上启用预映像
db.runCommand({
collMod: "exampleCollection",
changeStreamPreAndPostImages: { enabled: true }
});
启用了预映像的示例事件
{
"_id": { "_data": "AWACAKM/AAAA" },
"operationType": "update",
"fullDocumentBeforeChange": {
"_id": { "$oid": "66e6f63e6f49ecaabf794958" },
"name": "John",
"position": "Software Engineer",
"rating": 4
},
"fullDocument": {
"_id": { "$oid": "66e6f63e6f49ecaabf794958" },
"name": "John",
"position": "SSE",
"rating": 5
},
"ns": { "db": "cs", "coll": "exampleCollection" },
"documentKey": { "_id": { "$oid": "66e6f63e6f49ecaabf794958" } }
}
在变更流中定制数据
通过在设置时指定一个或多个管道阶段的数组来定制变更流输出。 支持的运算符包括以下内容。
$addFields
$match
$project
$set
$unset
该示例允许仅筛选与“IT”部门相关的 insert 操作。
const pipeline = [
{ $match: { operationType: "insert", "fullDocument.department": "IT" } },
{ $project: { "fullDocument.name": 1, "fullDocument.position": 1 } }
];
const stream = db.exampleCollection.watch(pipeline);
局限性
- 集群拓扑从单分片转换为多分片会破坏变更流的续传能力。
- 多分片部署不会保留全局事件排序。
- 在故障转移事件后,必须重新初始化更改流游标,同时恢复功能仍保留。
- 对于聚合管道中的
update 事件,不支持 UpdateDescription。 不过,支持使用更新运算符。
- 不支持将
$changestream 用作另一阶段中的嵌套管道。
- 对日志进行历史处理不支持预映像,因为之前未跟踪其他预映像信息。
- 变更流不支持
showExpandedEvents。
相关内容