更改 Azure Cosmos DB 的 API for MongoDB 中的流
适用对象: MongoDB
可通过更改流 API 在 Azure Cosmos DB 的 API for MongoDB 中获取更改源支持。 通过使用更改流 API,你的应用程序可以获取对集合或对单个分片中的项的更改。 以后,可以根据结果采取进一步的措施。 对集合中的项所做的更改将按照其修改时间的顺序进行捕获,并按分片键保证排序顺序。
注意
若要使用更改流,请使用 3.6 或更高版本的服务器创建 Azure Cosmos DB 的用于 MongoDB 的 API 帐户。 如果对较早版本运行更改流示例,可能会看到“无法识别的管道阶段名称: $changeStream”错误。
示例
下面的示例演示如何获取集合中所有项的更改流。 此示例在插入、更新或替换项时创建光标来监视项。 若要获取更改流,需要使用 $match
阶段、$project
阶段和 fullDocument
选项。 当前不支持监视使用更改流删除操作。 作为一种替代方法,你可以对要删除的项添加软标记。 例如,可以在名为“deleted”的项中添加属性。如果要删除该项,可以将“deleted”设置为 true
并在该项上设置 TTL。 由于将“deleted”更新为 true
是一项更新,此更改将显示在更改流中。
var cursor = db.coll.watch(
[
{ $match: { "operationType": { $in: ["insert", "update", "replace"] } } },
{ $project: { "_id": 1, "fullDocument": 1, "ns": 1, "documentKey": 1 } }
],
{ fullDocument: "updateLookup" });
while (!cursor.isExhausted()) {
if (cursor.hasNext()) {
printjson(cursor.next());
}
}
单个分片中的更改
以下示例演示如何获取对单个分片中的项所做的更改。 此示例获取项的更改,这些项的分片键等于“a”,分片键值等于“1”。 可以让不同的客户端从不同的分片并行读取更改。
var cursor = db.coll.watch(
[
{
$match: {
$and: [
{ "fullDocument.a": 1 },
{ "operationType": { $in: ["insert", "update", "replace"] } }
]
}
},
{ $project: { "_id": 1, "fullDocument": 1, "ns": 1, "documentKey": 1} }
],
{ fullDocument: "updateLookup" });
缩放更改流
大规模使用更改流时,最好均匀分布负载。 利用 GetChangeStreamTokens 自定义命令在物理分片/分区之间分布负载。
当前限制
使用更改流时,以下限制适用:
- 输出文档中尚不支持
operationType
和updateDescription
属性。 - 目前支持
insert
、update
和replace
操作类型。 但是,尚不支持删除操作或其他事件。
由于存在这些限制,需要按前面示例中所示指定 $match 阶段、$project 阶段和 fullDocument 选项。
与 Azure Cosmos DB 的 API for NoSQL 中的更改源不同,没有单独的更改源处理器库来使用更改流,或无需使用租用容器。 目前不支持使用 Azure Functions 触发器来处理更改流。
错误处理。
使用更改流时,支持以下错误代码和消息:
HTTP 错误代码 16500 - 当更改流受到限制时,它会返回一个空页。
NamespaceNotFound (OperationType 失效) - 如果对不存在的集合运行更改流,或删除了集合,则会返回
NamespaceNotFound
错误。 由于operationType
属性无法在输出文档中返回,因此会返回NamespaceNotFound
错误,而不是operationType Invalidate
错误。