Azure Cosmos DB's API for MongoDB 中的更改流Change streams in Azure Cosmos DB's API for MongoDB

适用于: Azure Cosmos DB API for MongoDB

可以使用更改流 API 获取 Azure Cosmos DB's API for MongoDB 中的更改源支持。Change feed support in Azure Cosmos DB's API for MongoDB is available by using the change streams API. 应用程序可以使用更改流 API 获取对集合或者对单个分片中的项所做的更改。By using the change streams API, your applications can get the changes made to the collection or to the items in a single shard. 以后,可以根据结果采取进一步的措施。Later you can take further actions based on the results. 对集合中的项所做的更改将按照其修改时间的顺序进行捕获,并按分片键保证排序顺序。Changes to the items in the collection are captured in the order of their modification time and the sort order is guaranteed per shard key.

备注

若要使用更改流,请使用 3.6 版的 Azure Cosmos DB API for MongoDB 或更高版本来创建帐户。To use change streams, create the account with version 3.6 of Azure Cosmos DB's API for MongoDB, or a later version. 如果针对早期版本运行更改流示例,可能会出现 Unrecognized pipeline stage name: $changeStream 错误。If you run the change stream examples against an earlier version, you might see the Unrecognized pipeline stage name: $changeStream error.

示例Examples

下面的示例演示如何获取集合中所有项的更改流。The following example shows how to get change streams on all the items in the collection. 此示例在插入、更新或替换项时创建光标来监视项。This example creates a cursor to watch items when they are inserted, updated, or replaced. 若要获取更改流,需要使用 $match 阶段、$project 阶段和 fullDocument 选项。The $match stage, $project stage, and fullDocument option are required to get the change streams. 当前不支持监视使用更改流删除操作。Watching for delete operations using change streams is currently not supported. 作为一种替代方法,你可以对要删除的项添加软标记。As a workaround, you can add a soft marker on the items that are being deleted. 例如,可以在名为“deleted”的项中添加属性。For example, you can add an attribute in the item called "deleted." 如果要删除该项,可以将“deleted”设置为 true 并在该项上设置 TTL。When you'd like to delete the item, you can set "deleted" to true and set a TTL on the item. 由于将“deleted”更新为 true 是一项更新,此更改将显示在更改流中。Since updating "deleted" to true is an update, this change will be visible in the change stream.

var cursor = db.coll.watch(
    [
        { $match: { "operationType": { $in: ["insert", "update", "replace"] } } },
        { $project: { "_id": 1, "fullDocument": 1, "ns": 1, "documentKey": 1 } }
    ],
    { fullDocument: "updateLookup" });

while (!cursor.isExhausted()) {
    if (cursor.hasNext()) {
        printjson(cursor.next());
    }
}

单个分片中的更改Changes within a single shard

以下示例演示如何获取对单个分片中的项所做的更改。The following example shows how to get changes to the items within a single shard. 此示例获取项的更改,这些项的分片键等于“a”,分片键值等于“1”。This example gets the changes of items that have shard key equal to "a" and the shard key value equal to "1". 可以让不同的客户端从不同的分片并行读取更改。It is possible to have different clients reading changes from different shards in parallel.

var cursor = db.coll.watch(
    [
        { 
            $match: { 
                $and: [
                    { "fullDocument.a": 1 }, 
                    { "operationType": { $in: ["insert", "update", "replace"] } }
                ]
            }
        },
        { $project: { "_id": 1, "fullDocument": 1, "ns": 1, "documentKey": 1} }
    ],
    { fullDocument: "updateLookup" });

当前限制Current limitations

使用更改流时,以下限制适用:The following limitations are applicable when using change streams:

  • 输出文档中尚不支持 operationTypeupdateDescription 属性。The operationType and updateDescription properties are not yet supported in the output document.
  • 目前支持 insertupdatereplace 操作类型。The insert, update, and replace operations types are currently supported. 但是,尚不支持删除操作或其他事件。However, the delete operation or other events are not yet supported.

由于存在这些限制,需要按前面示例中所示指定 $match 阶段、$project 阶段和 fullDocument 选项。Due to these limitations, the $match stage, $project stage, and fullDocument options are required as shown in the previous examples.

与 Azure Cosmos DB SQL API 中的更改源不同,没有单独的更改源处理器库来使用更改流,也不需要使用租用容器。Unlike the change feed in Azure Cosmos DB's SQL API, there is not a separate Change Feed Processor Library to consume change streams or a need for a leases container. 目前不支持使用 Azure Functions 触发器来处理更改流。There is not currently support for Azure Functions triggers to process change streams.

错误处理。Error handling

使用更改流时,支持以下错误代码和消息:The following error codes and messages are supported when using change streams:

  • HTTP 错误代码 16500 - 当更改流受到限制时,它会返回一个空页。HTTP error code 16500 - When the change stream is throttled, it returns an empty page.

  • NamespaceNotFound (OperationType 失效) - 如果对不存在的集合运行更改流,或删除了集合,则会返回 NamespaceNotFound 错误。NamespaceNotFound (OperationType Invalidate) - If you run change stream on the collection that does not exist or if the collection is dropped, then a NamespaceNotFound error is returned. 由于 operationType 属性无法在输出文档中返回,因此会返回 NamespaceNotFound 错误,而不是 operationType Invalidate 错误。Because the operationType property can't be returned in the output document, instead of the operationType Invalidate error, the NamespaceNotFound error is returned.

后续步骤Next steps