聚合管道示例和限制

重要

你是否正在寻找一种数据库解决方案,以应对需要高扩展性、99.999% 可用性服务级别协议(SLA)、即时自动扩展和跨多个区域的自动故障转移的场景? 请考虑使用 Azure Cosmos DB for NoSQL

聚合管道允许你在用于 MongoDB 的 Azure Cosmos DB 中对集合执行高级数据分析和转换。 管道是一系列阶段,每个阶段都会筛选、重塑或计算流经该管道的文档上的值。 本文介绍常见的单集合管道模式,解释使用 $lookup 进行跨集合聚合的限制,并在原生聚合无法满足需求时提出变通方案。

先决条件

  • Azure Cosmos DB for MongoDB 帐户。 如果没有帐户, 请创建一个帐户
  • 连接到帐户的 MongoDB 兼容的客户端或 shell。

基本语法

db.<collection>.aggregate([
    { <stage1> },
    { <stage2> },
    // ...
    { <stageN> }
])

每个阶段接收上一阶段的文档输出,并将自己的输出传递到下一阶段。

单集合管道示例

本节中的示例使用一个 sales 集合,其中的文档采用以下格式:

{
  "_id": "ord-001",
  "date": "2024-03-15",
  "category": "electronics",
  "item": "laptop",
  "price": 1200,
  "quantity": 2,
  "tags": ["sale", "featured"]
}

使用$match筛选文档

使用 $match 仅传递满足某个条件的文档。 尽可能将 $match 放置在管道的早期阶段,以便它能够使用索引并减少后续阶段需要处理的文档数量。

db.sales.aggregate([
    { $match: { category: "electronics", price: { $gt: 500 } } }
])

使用$group分组和计算总计

用于$group按字段对文档进行分组,并应用累加器表达式,例如$sum$avg$min$max

db.sales.aggregate([
    {
        $group: {
            _id: "$category",
            totalRevenue: { $sum: { $multiply: ["$price", "$quantity"] } },
            orderCount: { $sum: 1 },
            avgPrice: { $avg: "$price" }
        }
    }
])

使用 $project 调整输出结构

用于 $project 在输出文档中包括、排除或计算字段。 将字段设为 1 以包含它,设为 0 以排除它,或提供一个表达式来计算新值。

db.sales.aggregate([
    {
        $project: {
            item: 1,
            price: 1,
            revenue: { $multiply: ["$price", "$quantity"] },
            discounted: { $cond: [{ $gt: ["$price", 1000] }, true, false] }
        }
    }
])

使用$sort对结果进行排序

用于 $sort 按一个或多个字段对文档进行排序。 用于 1 升序和 -1 降序。

db.sales.aggregate([
    { $sort: { price: -1, date: 1 } }
])

使用$limit和$skip限制结果

用于 $limit 限制输出文档的数量。 使用 $skip 可跳过指定数量的文档,与 $sort 结合使用时,这对于分页非常有用。

db.sales.aggregate([
    { $sort: { date: -1 } },
    { $skip: 20 },
    { $limit: 10 }
])

使用 $unwind 展开数组

用于 $unwind 析构数组字段,以便每个元素生成单独的输出文档。

db.sales.aggregate([
    { $unwind: "$tags" },
    { $group: { _id: "$tags", count: { $sum: 1 } } },
    { $sort: { count: -1 } }
])

使用$addFields添加计算字段

使用 $addFields 可向文档添加新字段,而无需重新指定所有现有字段;而 $project 则需要这样做。

db.sales.aggregate([
    {
        $addFields: {
            totalValue: { $multiply: ["$price", "$quantity"] },
            year: { $substr: ["$date", 0, 4] }
        }
    }
])

多阶段管道示例

以下管道组合了几个阶段,按 2024 年第一季度的总收入查找前五个产品类别:

db.sales.aggregate([
    // Stage 1: filter to Q1 2024
    { $match: { date: { $gte: "2024-01-01", $lt: "2024-04-01" } } },

    // Stage 2: compute per-document revenue
    { $addFields: { revenue: { $multiply: ["$price", "$quantity"] } } },

    // Stage 3: group by category and sum revenue
    { $group: { _id: "$category", totalRevenue: { $sum: "$revenue" } } },

    // Stage 4: sort by revenue descending
    { $sort: { totalRevenue: -1 } },

    // Stage 5: return top 5 only
    { $limit: 5 },

    // Stage 6: rename _id for readability
    { $project: { _id: 0, category: "$_id", totalRevenue: 1 } }
])

使用 $lookup 的跨集合聚合

$lookup 阶段对当前集合(本地)与同一数据库中的另一个集合(外部)执行左外连接。 以下示例使用匹配字段联接 orderscustomers 集合。

db.orders.aggregate([
    {
        $lookup: {
            from: "customers",
            localField: "customerId",
            foreignField: "_id",
            as: "customerInfo"
        }
    },
    { $unwind: "$customerInfo" },
    {
        $project: {
            orderId: "$_id",
            customerName: "$customerInfo.name",
            total: 1
        }
    }
])

Azure Cosmos DB for MongoDB 中 $lookup 的限制

mongoDB 的 Azure Cosmos DB 部分支持 $lookup。 以下限制适用于所有受支持的服务器版本:

  • 不支持不相关的子查询。 MongoDB 3.6 中引入的、使用 letpipeline 字段的扩展 $lookup 语法不受支持。 使用这些字段返回错误“let不受支持”。请改用基本from / localField / foreignField / as窗体。

    // This syntax is NOT supported:
    {
        $lookup: {
            from: "inventory",
            let: { ordItem: "$item" },
            pipeline: [ { $match: { $expr: { $eq: ["$sku", "$$ordItem"] } } } ],
            as: "inventoryDocs"
        }
    }
    
  • $graphLookup API 版本 4.2、6.0 和 7.0 不支持。 如果工作负荷需要递归查询或图形遍历查询,请参阅 跨集合方案的解决方法

  • 这两个集合必须驻留在同一数据库中。 不支持跨数据库 $lookup

  • 性能注意事项。 由于 Azure Cosmos DB for MongoDB 是一个分布式系统,因此,跨大型集合或位于不同逻辑分区中的集合的 $lookup 联接可能会消耗大量请求单位 (RU)。 使用 容量规划器 评估 RU 消耗量,并考虑不规范化数据以减少联接频率。

跨集合聚合限制

针对 MongoDB 的Azure Cosmos DB针对高吞吐量单集合访问模式进行优化。 如果需要跨多个集合聚合数据,请记住以下约束:

能力 Support
基本 $lookup (localField / foreignField) ✅ 是的
$lookup 使用 letpipeline(非相关子查询) ✖️ 不
$graphLookup (图遍历) ✖️ 否(版本 4.2–7.0)
$lookup 跨数据库 ✖️ 不
跨多个集合的事务 ✖️ 否(多文档事务仅限于单个非分片集合)

无论帐户使用的服务器版本(3.6 到 7.0),这些约束都适用。

跨集合场景的变通方法

如果原生聚合管道功能无法满足跨集合或导出场景的需求,请考虑以下替代方法。

应用程序端聚合

单独查询每个集合,然后合并和聚合应用程序中的结果。 此方法避免了 $lookup 的限制,并使你能够完全控制联接逻辑。

// Query collection A
const orders = await db.collection("orders").find({ status: "completed" }).toArray();
const customerIds = orders.map(o => o.customerId);

// Query collection B with matching IDs
const customers = await db.collection("customers")
    .find({ _id: { $in: customerIds } })
    .toArray();

// Join in application memory
const customerMap = new Map(customers.map(c => [c._id.toString(), c]));
const enriched = orders.map(o => ({
    ...o,
    customer: customerMap.get(o.customerId.toString())
}));

此模式最适用于中小型结果集。 对于大型导出,请使用下面所述的Azure服务集成之一。

azure Synapse Link for Azure Cosmos DB会自动将操作数据同步到Azure Synapse Analytics,而不会影响事务工作负荷。 在Azure Synapse中,可以使用 Apache Spark 或无服务器 SQL 池运行跨集合分析查询,这些池支持完全联接语义。

如果需要,请使用 Azure Synapse Link:

  • 跨多个集合的复杂分析查询。
  • 历史趋势分析或报告不会影响生产 RU 消耗。
  • 近乎实时的分析,具有最少的 ETL 开销。

用于导出和转换的Azure 数据工厂

Azure 数据工厂可以从 MongoDB 集合的多个Azure Cosmos DB读取、在数据流中应用转换,并将结果写入目标,例如Azure Blob 存储、Azure SQL 数据库或Azure Synapse Analytics。

在需要时使用Azure 数据工厂:

  • 从一个或多个集合中定时批量导出数据。
  • ETL 管道中的跨集合连接
  • 与不直接连接到Azure Cosmos DB的下游系统集成。