重要
你是否正在寻找一种数据库解决方案,以应对需要高扩展性、99.999% 可用性服务级别协议(SLA)、即时自动扩展和跨多个区域的自动故障转移的场景? 请考虑使用 Azure Cosmos DB for NoSQL。
聚合管道允许你在用于 MongoDB 的 Azure Cosmos DB 中对集合执行高级数据分析和转换。 管道是一系列阶段,每个阶段都会筛选、重塑或计算流经该管道的文档上的值。 本文介绍常见的单集合管道模式,解释使用 $lookup 进行跨集合聚合的限制,并在原生聚合无法满足需求时提出变通方案。
先决条件
- Azure Cosmos DB for MongoDB 帐户。 如果没有帐户, 请创建一个帐户。
- 连接到帐户的 MongoDB 兼容的客户端或 shell。
基本语法
db.<collection>.aggregate([
{ <stage1> },
{ <stage2> },
// ...
{ <stageN> }
])
每个阶段接收上一阶段的文档输出,并将自己的输出传递到下一阶段。
单集合管道示例
本节中的示例使用一个 sales 集合,其中的文档采用以下格式:
{
"_id": "ord-001",
"date": "2024-03-15",
"category": "electronics",
"item": "laptop",
"price": 1200,
"quantity": 2,
"tags": ["sale", "featured"]
}
使用$match筛选文档
使用 $match 仅传递满足某个条件的文档。 尽可能将 $match 放置在管道的早期阶段,以便它能够使用索引并减少后续阶段需要处理的文档数量。
db.sales.aggregate([
{ $match: { category: "electronics", price: { $gt: 500 } } }
])
使用$group分组和计算总计
用于$group按字段对文档进行分组,并应用累加器表达式,例如$sum,$avg$min和$max。
db.sales.aggregate([
{
$group: {
_id: "$category",
totalRevenue: { $sum: { $multiply: ["$price", "$quantity"] } },
orderCount: { $sum: 1 },
avgPrice: { $avg: "$price" }
}
}
])
使用 $project 调整输出结构
用于 $project 在输出文档中包括、排除或计算字段。 将字段设为 1 以包含它,设为 0 以排除它,或提供一个表达式来计算新值。
db.sales.aggregate([
{
$project: {
item: 1,
price: 1,
revenue: { $multiply: ["$price", "$quantity"] },
discounted: { $cond: [{ $gt: ["$price", 1000] }, true, false] }
}
}
])
使用$sort对结果进行排序
用于 $sort 按一个或多个字段对文档进行排序。 用于 1 升序和 -1 降序。
db.sales.aggregate([
{ $sort: { price: -1, date: 1 } }
])
使用$limit和$skip限制结果
用于 $limit 限制输出文档的数量。 使用 $skip 可跳过指定数量的文档,与 $sort 结合使用时,这对于分页非常有用。
db.sales.aggregate([
{ $sort: { date: -1 } },
{ $skip: 20 },
{ $limit: 10 }
])
使用 $unwind 展开数组
用于 $unwind 析构数组字段,以便每个元素生成单独的输出文档。
db.sales.aggregate([
{ $unwind: "$tags" },
{ $group: { _id: "$tags", count: { $sum: 1 } } },
{ $sort: { count: -1 } }
])
使用$addFields添加计算字段
使用 $addFields 可向文档添加新字段,而无需重新指定所有现有字段;而 $project 则需要这样做。
db.sales.aggregate([
{
$addFields: {
totalValue: { $multiply: ["$price", "$quantity"] },
year: { $substr: ["$date", 0, 4] }
}
}
])
多阶段管道示例
以下管道组合了几个阶段,按 2024 年第一季度的总收入查找前五个产品类别:
db.sales.aggregate([
// Stage 1: filter to Q1 2024
{ $match: { date: { $gte: "2024-01-01", $lt: "2024-04-01" } } },
// Stage 2: compute per-document revenue
{ $addFields: { revenue: { $multiply: ["$price", "$quantity"] } } },
// Stage 3: group by category and sum revenue
{ $group: { _id: "$category", totalRevenue: { $sum: "$revenue" } } },
// Stage 4: sort by revenue descending
{ $sort: { totalRevenue: -1 } },
// Stage 5: return top 5 only
{ $limit: 5 },
// Stage 6: rename _id for readability
{ $project: { _id: 0, category: "$_id", totalRevenue: 1 } }
])
使用 $lookup 的跨集合聚合
该 $lookup 阶段对当前集合(本地)与同一数据库中的另一个集合(外部)执行左外连接。 以下示例使用匹配字段联接 orders 到 customers 集合。
db.orders.aggregate([
{
$lookup: {
from: "customers",
localField: "customerId",
foreignField: "_id",
as: "customerInfo"
}
},
{ $unwind: "$customerInfo" },
{
$project: {
orderId: "$_id",
customerName: "$customerInfo.name",
total: 1
}
}
])
Azure Cosmos DB for MongoDB 中 $lookup 的限制
mongoDB 的 Azure Cosmos DB 部分支持 $lookup。 以下限制适用于所有受支持的服务器版本:
不支持不相关的子查询。 MongoDB 3.6 中引入的、使用
let和pipeline字段的扩展$lookup语法不受支持。 使用这些字段返回错误“let不受支持”。请改用基本from/localField/foreignField/as窗体。// This syntax is NOT supported: { $lookup: { from: "inventory", let: { ordItem: "$item" }, pipeline: [ { $match: { $expr: { $eq: ["$sku", "$$ordItem"] } } } ], as: "inventoryDocs" } }$graphLookupAPI 版本 4.2、6.0 和 7.0 不支持。 如果工作负荷需要递归查询或图形遍历查询,请参阅 跨集合方案的解决方法。这两个集合必须驻留在同一数据库中。 不支持跨数据库
$lookup。性能注意事项。 由于 Azure Cosmos DB for MongoDB 是一个分布式系统,因此,跨大型集合或位于不同逻辑分区中的集合的
$lookup联接可能会消耗大量请求单位 (RU)。 使用 容量规划器 评估 RU 消耗量,并考虑不规范化数据以减少联接频率。
跨集合聚合限制
针对 MongoDB 的Azure Cosmos DB针对高吞吐量单集合访问模式进行优化。 如果需要跨多个集合聚合数据,请记住以下约束:
| 能力 | Support |
|---|---|
基本 $lookup (localField / foreignField) |
✅ 是的 |
$lookup 使用 let 和 pipeline(非相关子查询) |
✖️ 不 |
$graphLookup (图遍历) |
✖️ 否(版本 4.2–7.0) |
$lookup 跨数据库 |
✖️ 不 |
| 跨多个集合的事务 | ✖️ 否(多文档事务仅限于单个非分片集合) |
无论帐户使用的服务器版本(3.6 到 7.0),这些约束都适用。
跨集合场景的变通方法
如果原生聚合管道功能无法满足跨集合或导出场景的需求,请考虑以下替代方法。
应用程序端聚合
单独查询每个集合,然后合并和聚合应用程序中的结果。 此方法避免了 $lookup 的限制,并使你能够完全控制联接逻辑。
// Query collection A
const orders = await db.collection("orders").find({ status: "completed" }).toArray();
const customerIds = orders.map(o => o.customerId);
// Query collection B with matching IDs
const customers = await db.collection("customers")
.find({ _id: { $in: customerIds } })
.toArray();
// Join in application memory
const customerMap = new Map(customers.map(c => [c._id.toString(), c]));
const enriched = orders.map(o => ({
...o,
customer: customerMap.get(o.customerId.toString())
}));
此模式最适用于中小型结果集。 对于大型导出,请使用下面所述的Azure服务集成之一。
用于分析的 Azure Synapse Link
azure Synapse Link for Azure Cosmos DB会自动将操作数据同步到Azure Synapse Analytics,而不会影响事务工作负荷。 在Azure Synapse中,可以使用 Apache Spark 或无服务器 SQL 池运行跨集合分析查询,这些池支持完全联接语义。
如果需要,请使用 Azure Synapse Link:
- 跨多个集合的复杂分析查询。
- 历史趋势分析或报告不会影响生产 RU 消耗。
- 近乎实时的分析,具有最少的 ETL 开销。
用于导出和转换的Azure 数据工厂
Azure 数据工厂可以从 MongoDB 集合的多个Azure Cosmos DB读取、在数据流中应用转换,并将结果写入目标,例如Azure Blob 存储、Azure SQL 数据库或Azure Synapse Analytics。
在需要时使用Azure 数据工厂:
- 从一个或多个集合中定时批量导出数据。
- ETL 管道中的跨集合连接
- 与不直接连接到Azure Cosmos DB的下游系统集成。