Observação
O acesso a essa página exige autorização. Você pode tentar entrar ou alterar diretórios.
O acesso a essa página exige autorização. Você pode tentar alterar os diretórios.
适用对象:
Mongodb
重要
你是否正在寻找一种数据库解决方案,以应对需要高扩展性、99.999% 可用性服务级别协议(SLA)、即时自动扩展和跨多个区域的自动故障转移的场景? 请考虑使用 Azure Cosmos DB for NoSQL。
在 Azure Cosmos DB 的 MongoDB API 中,可通过更改流 API 获取更改源支持。 通过使用更改流 API,你的应用程序可以获取对集合或对单个分片中的项的更改。 以后,可以根据结果采取进一步的措施。 对集合中项的更改将按照其更改时间的顺序捕获,并按每个分片键保证排序顺序。
注释
若要使用更改流,请使用 3.6 或更高版本的服务器创建 Azure Cosmos DB 的用于 MongoDB 的 API 帐户。 如果对较早版本运行更改流示例,可能会看到“无法识别的管道阶段名称: $changeStream”错误。
示例
下面的示例演示如何获取集合中所有项的更改流。 此示例在插入、更新或替换项时创建光标来监视项。 若要获取更改流,需要使用 $match 阶段、$project 阶段和 fullDocument 选项。 当前不支持通过更改流监控删除操作。 作为一种替代方法,你可以对要删除的项添加软标记。 例如,可以在名为“deleted”的项中添加属性。如果要删除该项,可以将“deleted”设置为 true 并在该项上设置 TTL。 由于将“deleted”更新为 true 是一项更新,此更改将显示在更改流中。
var cursor = db.coll.watch(
[
{ $match: { "operationType": { $in: ["insert", "update", "replace"] } } },
{ $project: { "_id": 1, "fullDocument": 1, "ns": 1, "documentKey": 1 } }
],
{ fullDocument: "updateLookup" });
while (!cursor.isExhausted()) {
if (cursor.hasNext()) {
printjson(cursor.next());
}
}
单个分片中的更改
以下示例演示如何获取对单个分片中的项所做的更改。 此示例获取分片键等于“a”且分片键值等于“1”的项的更改。 可以让不同的客户端从不同的分片并行读取更改。
var cursor = db.coll.watch(
[
{
$match: {
$and: [
{ "fullDocument.a": 1 },
{ "operationType": { $in: ["insert", "update", "replace"] } }
]
}
},
{ $project: { "_id": 1, "fullDocument": 1, "ns": 1, "documentKey": 1} }
],
{ fullDocument: "updateLookup" });
扩展变更流
大规模使用更改流时,最好均匀分布负载。 利用 GetChangeStreamTokens 自定义命令在物理分片/分区之间分布负载。
当前限制
使用更改流时,以下限制适用:
- 输出文档中尚不支持
operationType和updateDescription属性。 - 目前支持
insert、update和replace操作类型。 但是,尚不支持删除操作或其他事件。
由于存在这些限制,需要按前面示例中所示指定 $match 阶段、$project 阶段和 fullDocument 选项。
与 Azure Cosmos DB 的 NoSQL API 中的更改源不同,这里没有单独的更改源处理器库来处理更改流,也不需要租用容器。 目前不支持使用 Azure Functions 触发器来处理更改流。
错误处理
使用更改流时,支持以下错误代码和消息:
HTTP 错误代码 16500 - 当更改流受到限制时,它会返回一个空页。
NamespaceNotFound (OperationType 失效) - 如果对不存在的集合运行更改流,或删除了集合,则会返回
NamespaceNotFound错误。 由于operationType属性无法在输出文档中返回,因此会返回operationType Invalidate错误,而不是NamespaceNotFound错误。