Azure DocumentDB 中的变更流

更改流是从数据库流到应用程序的数据库更改的实时流。 借助此功能,你可以通过订阅数据库更改来生成反应式应用程序,而无需持续轮询来检测更改。

注释

对多分片集群的变更流支持目前处于预览阶段。 若要在群集上启用此功能, 请创建支持请求

开始

示例代码在集合上 exampleCollection 启动更改流,持续监视任何更改。 检测到更改后,它会检索更改事件,并采用 JSON 格式打印该更改事件。

from pymongo.errors import OperationFailure, PyMongoError
from mongo_utils import get_collection, load_mongo_config, load_resume_token, ping_server, save_resume_token

def main() -> None:
    client = None
    try:
        _, _, source_collection_name, destination_collection_name = load_mongo_config()
        client, database, collection = get_collection()
        destination_collection = None
        if destination_collection_name:
            destination_collection = database[destination_collection_name]
            print(
                f"Mirroring change events from '{source_collection_name}' to '{destination_collection_name}'."
            )
        else:
            print(
                f"Watching source collection '{source_collection_name}' with publish-only mode (no destination collection configured)."
            )
        ping_server(client)

        # Try to resume from last known token
        resume_token = load_resume_token()
        if resume_token:
            print(f"Resuming from last token: {resume_token.get('_data', '')[:20]}...")
            stream = collection.watch(resume_after=resume_token)
        else:
            print("Starting fresh change stream (no prior token found).")
            stream = collection.watch()

        print("Watching for changes...")
        with stream:
            for change in stream:
                print(f"Change: {change['operationType']} on {change['ns']['coll']}")
                print(f"  Full doc: {change.get('fullDocument', {})}")

                if destination_collection is not None:
                    mirrored_event = {
                        "source": change.get("ns", {}),
                        "operationType": change.get("operationType"),
                        "documentKey": change.get("documentKey", {}),
                        "fullDocument": change.get("fullDocument", {}),
                        "resumeToken": change.get("_id", {}),
                    }
                    destination_collection.insert_one(mirrored_event)

                # Save token after each successful change
                save_resume_token(change["_id"])
                print()
    except OperationFailure as error:
        print("The change stream could not be opened on this collection.")
        print(error)
        print(
            "Check whether your Azure database deployment supports change streams for the configured aggregation properties."
        )
    except PyMongoError as error:
        print("Azure DocumentDB connection or driver error while watching for changes.")
        print(error)
    except KeyboardInterrupt:
        print("\nConsumer stopped by user.")
    finally:
        if client is not None:
            client.close()

if __name__ == "__main__":
    main()

使用更改流监控数据库变更

让我们通过示例了解更改流输出。

在此更改流事件中,我们看到新记录已inserted进入数据库中的exampleCollectioncs集合中,事件详细信息包括新添加的文档的完整内容。

{
  "_id": { "_data": "AeARBpQ/AAAA" }, // "resume_token"
  "operationType": "insert",
  "fullDocument": {
    "_id": { "$oid": "66e6f63e6f49ecaabf794958" },
    "employee_id": "17986",
    "name": "John",
    "position": "Software Engineer",
    "department": "IT",
    "rating": 4
  },
  "ns": { "db": "cs", "coll": "exampleCollection" },
  "documentKey": { "_id": { "$oid": "66e6f63e6f49ecaabf794958" } }
}

恢复变更流

通过在打开游标时指定“恢复令牌” resumeAfter 来恢复更改流。 每个变更事件的 _id 字段充当恢复令牌。 将它持久化存储,并在重新打开流时将它传回。

从活动更改日志恢复

// Resume from the last stored\processed token
const stream = db.exampleCollection.watch([], {resumeAfter: savedResumeToken});

while (stream.hasNext()) {
  const event = stream.next();
  savedResumeToken = event._id;
  processEvent(event);
}

从历史检查点恢复流数据

标准更改流仅限于当前活动的 400 MB 更改日志内保留的事件。 轮换后,旧条目将存档并变得不可访问。 历史更改流通过以透明方式检索存档的日志、启用流重播或从过去的时间戳恢复来消除此限制。

// Replay all events since 2026-05-01 00:00:00 UTC, at current timestamp 2026-06-01 00:00:00 UTC
const startTime = new Timestamp(Math.floor(new Date("2026-05-01").getTime() / 1000), 1);

const stream = db.exampleCollection.watch([], {
  startAtOperationTime: startTime
});

while (stream.hasNext()) {
  printjson(stream.next());
}

重要

变更流支持通过 startAtresumeAfterstartAtOperationTime 参数实现可恢复性。 集成时间点还原(PITR)日志后,流式恢复最长可延长至 35 天或集群初始化时间点(以较早者为准)。

建议在低流量期间处理历史检查点,以最大程度地减少额外资源消耗的影响。 根据从历史时间点开始需要处理的数据量,预留更多存储空间(适用于存储容量为 128 GB 或以下的集群)。

变更流中的变更前映像(预览版)

注释

原像支持当前处于预览阶段。 若要在群集上启用此功能, 请创建支持请求

默认情况下,更改事件会显示更改后的文档。 启用前镜像会使系统在更改之前也记录完整文档,并在每个事件中以 fullDocumentBeforeChange 的形式提供。

选项 Behavior 何时使用
"off"(默认值) 从不包含预映像 标准流
"whenAvailable" 如有则包含;否则静默省略 具有优雅降级能力的审计/变更数据捕获(CDC)场景
"required" 包含;如果不可用,则引发错误 在必须明确标示缺失的预映像而不是将其跳过时使用

重要

原像仅适用于 updatedeletereplace 事件。

启用预映像会增加存储和处理开销,因为必须为每个符合条件的操作捕获和保留以前版本的已修改文档,这会导致更高的写入放大、额外的 I/O 消耗和更改流处理延迟增加。 为了最大程度地减少长期资源影响,除非工作负荷明确要求,否则不建议在生产环境中持续启用。

在集合上启用预映像

db.runCommand({
  collMod: "exampleCollection",
  changeStreamPreAndPostImages: { enabled: true }
});

启用了预映像的示例事件

{
  "_id": { "_data": "AWACAKM/AAAA" },
  "operationType": "update",
  "fullDocumentBeforeChange": {
    "_id": { "$oid": "66e6f63e6f49ecaabf794958" },
    "name": "John",
    "position": "Software Engineer",
    "rating": 4
  },
  "fullDocument": {
    "_id": { "$oid": "66e6f63e6f49ecaabf794958" },
    "name": "John",
    "position": "SSE",
    "rating": 5
  },
  "ns": { "db": "cs", "coll": "exampleCollection" },
  "documentKey": { "_id": { "$oid": "66e6f63e6f49ecaabf794958" } }
}

在变更流中定制数据

通过在设置时指定一个或多个管道阶段的数组来定制变更流输出。 支持的运算符包括以下内容。

  • $addFields
  • $match
  • $project
  • $set
  • $unset

该示例允许仅筛选与“IT”部门相关的 insert 操作。

const pipeline = [
  { $match: { operationType: "insert", "fullDocument.department": "IT" } },
  { $project: { "fullDocument.name": 1, "fullDocument.position": 1 } }
];

const stream = db.exampleCollection.watch(pipeline);

局限性

  • 集群拓扑从单分片转换为多分片会破坏变更流的续传能力。
  • 多分片部署不会保留全局事件排序。
  • 在故障转移事件后,必须重新初始化更改流游标,同时恢复功能仍保留。
  • 对于聚合管道中的 update 事件,不支持 UpdateDescription。 不过,支持使用更新运算符。
  • 不支持将 $changestream 用作另一阶段中的嵌套管道。
  • 对日志进行历史处理不支持预映像,因为之前未跟踪其他预映像信息。
  • 变更流不支持 showExpandedEvents