Compartir a través de

使用 Azure 数据工厂或 Synapse Analytics 从/向 MongoDB 复制数据

适用于: Azure 数据工厂 Azure Synapse Analytics

提示

试用 Microsoft Fabric 中的数据工厂,这是一种适用于企业的一站式分析解决方案。 Microsoft Fabric 涵盖从数据移动到数据科学、实时分析、商业智能和报告的所有内容。 了解如何免费开始新的试用

本文概述如何使用 Azure 数据工厂或 Synapse Analytics 管道中的复制活动从/向 MongoDB 数据库复制数据。 它是基于概述复制活动总体的复制活动概述一文。

重要

新的 MongoDB 连接器提供改进的本机 MongoDB 支持。 如果在解决方案中使用了旧的 MongoDB 连接器,且该连接器仅“按原样”支持后向兼容性,请参阅 MongoDB 连接器(旧版)一文。

支持的功能

此 MongoDB 连接器支持以下功能:

支持的功能 IR
复制活动(源/接收器) ① ②

① Azure 集成运行时 ② 自承载集成运行时

如需可以用作源/接收器的数据存储的列表,请参阅支持的数据存储表。

具体而言,此 MongoDB 连接器最高支持版本 4.2。 如果你的工作需要 4.2 以上的版本,请考虑使用带有 MongoDB Atlas 连接器的 MongoDB Atlas,它提供更全面的支持和功能。

先决条件

如果数据存储位于本地网络、Azure 虚拟网络或 Amazon Virtual Private Cloud 内部,则需要配置自承载集成运行时才能连接到该数据存储。

如果数据存储是托管的云数据服务,则可以使用 Azure Integration Runtime。 如果访问范围限制为防火墙规则中允许的 IP,你可以选择将 Azure Integration Runtime IP 添加到允许列表。

此外,还可以使用 Azure 数据工厂中的托管虚拟网络集成运行时功能访问本地网络,而无需安装和配置自承载集成运行时。

要详细了解网络安全机制和数据工厂支持的选项,请参阅数据访问策略

入门

若要使用管道执行复制活动,可以使用以下工具或 SDK 之一:

使用 UI 创建一个到 MongoDB 的链接服务

使用以下步骤在 Azure 门户 UI 中创建一个到 MongoDB 的链接服务。

  1. 浏览到 Azure 数据工厂或 Synapse 工作区中的“管理”选项卡并选择“链接服务”,然后单击“新建”:

  2. 搜索 MongoDB 并选择 MongoDB 连接器。

    选择 MongoDB 连接器。

  3. 配置服务详细信息、测试连接并创建新的链接服务。

    配置到 MongoDB 的链接服务。

连接器配置详细信息

对于特定于 MongoDB 连接器的数据工厂实体,以下部分提供有关用于定义这些实体的属性的详细信息。

链接服务属性

MongoDB 链接的服务支持以下属性:

属性 描述 必需
type type 属性必须设置为:MongoDbV2
connectionString 指定 MongoDB 连接字符串,例如 mongodb://[username:password@]host[:port][/[database][?options]]。 请参阅 MongoDB 连接字符串手册获取详细信息。

还可以将连接字符串置于 Azure Key Vault 中。 有关更多详细信息,请参阅在 Azure Key Vault 中存储凭据
database 要访问的数据库的名称。
connectVia 用于连接到数据存储的集成运行时。 在先决条件部分了解更多信息。 如果未指定,则使用默认 Azure Integration Runtime。

示例:

{
    "name": "MongoDBLinkedService",
    "properties": {
        "type": "MongoDbV2",
        "typeProperties": {
            "connectionString": "mongodb://[username:password@]host[:port][/[database][?options]]",
            "database": "myDatabase"
        },
        "connectVia": {
            "referenceName": "<name of Integration Runtime>",
            "type": "IntegrationRuntimeReference"
        }
    }
}

数据集属性

有关可用于定义数据集的各部分和属性的完整列表,请参阅数据集和链接服务。 MongoDB 数据集支持以下属性:

属性 描述 必需
type 数据集的 type 属性必须设置为:MongoDbV2Collection
collectionName MongoDB 数据库中集合的名称。

示例:

{
    "name": "MongoDbDataset",
    "properties": {
        "type": "MongoDbV2Collection",
        "typeProperties": {
            "collectionName": "<Collection name>"
        },
        "schema": [],
        "linkedServiceName": {
            "referenceName": "<MongoDB linked service name>",
            "type": "LinkedServiceReference"
        }
    }
}

复制活动属性

有关可用于定义活动的各部分和属性的完整列表,请参阅管道一文。 本部分提供了 MongoDB 源和接收器支持的属性的列表。

以 MongoDB 作为源

复制活动source部分支持以下属性:

属性 描述 必需
type 复制活动 source 的 type 属性必须设置为:MongoDbV2Source
filter 使用查询运算符指定选择筛选器。 若要返回集合中的所有文档,请省略此参数或传递空文档 ({})。
cursorMethods.project 指定要在文档中返回用于投影的字段。 若要返回匹配文档中的所有字段,请省略此参数。
cursorMethods.sort 指定查询返回匹配文档的顺序。 请参阅 cursor.sort()
cursorMethods.limit 指定服务器返回的文档的最大数量。 请参阅 cursor.limit()
cursorMethods.skip 指定要跳过的文档数量以及 MongoDB 开始返回结果的位置。 请参阅 cursor.skip()
batchSize 指定从 MongoDB 实例的每批响应中返回的文档数量。 大多数情况下,修改批大小不会影响用户或应用程序。 Azure Cosmos DB 限制每个批不能超过 40 MB(这是文档大小的 batchSize 数量的总和),因此如果文档很大,请减小此值。
(默认值为 100

提示

服务支持在严格模式下使用 BSON 文档。 请确保筛选器查询处于严格模式,而不是 Shell 模式。 有关详细说明,请参阅 MongoDB 手册

示例:

"activities":[
    {
        "name": "CopyFromMongoDB",
        "type": "Copy",
        "inputs": [
            {
                "referenceName": "<MongoDB input dataset name>",
                "type": "DatasetReference"
            }
        ],
        "outputs": [
            {
                "referenceName": "<output dataset name>",
                "type": "DatasetReference"
            }
        ],
        "typeProperties": {
            "source": {
                "type": "MongoDbV2Source",
                "filter": "{datetimeData: {$gte: ISODate(\"2018-12-11T00:00:00.000Z\"),$lt: ISODate(\"2018-12-12T00:00:00.000Z\")}, _id: ObjectId(\"5acd7c3d0000000000000000\") }",
                "cursorMethods": {
                    "project": "{ _id : 1, name : 1, age: 1, datetimeData: 1 }",
                    "sort": "{ age : 1 }",
                    "skip": 3,
                    "limit": 3
                }
            },
            "sink": {
                "type": "<sink type>"
            }
        }
    }
]

MongoDB 作为接收器

复制活动 sink 节支持以下属性:

属性 描述 必需
type 复制活动接收器的“type”属性必须设置为“MongoDbV2Sink” 。
writeBehavior 介绍如何将数据写入 MongoDB。 允许的值为 insertupsert

upsert 的行为是,如果已存在具有相同 _id 的文档,则替换该文档;否则将插入该文档。

注意:如果未在原始文档中指定 _id,或未通过列映射指定 _id,则服务会自动为文档生成 _id。 这表示必须先确保文档有 ID,才能让 upsert 按预期工作。

(默认值为 insert
writeBatchSize writeBatchSize 属性控制每个批中可写入的文档大小。 可尝试增大 writeBatchSize 的值以提高性能,并在文档大小较大时减小该值。
(默认值为 10,000
writeBatchTimeout 超时前等待批插入操作完成的时间。允许的值为 timespan。
(默认值为 00:30:00 - 30 分钟)

提示

若要按原样导入 JSON 文档,请参阅导入或导出 JSON 文档部分;若要从表格形数据复制,请参阅架构映射

示例

"activities":[
    {
        "name": "CopyToMongoDB",
        "type": "Copy",
        "inputs": [
            {
                "referenceName": "<input dataset name>",
                "type": "DatasetReference"
            }
        ],
        "outputs": [
            {
                "referenceName": "<Document DB output dataset name>",
                "type": "DatasetReference"
            }
        ],
        "typeProperties": {
            "source": {
                "type": "<source type>"
            },
            "sink": {
                "type": "MongoDbV2Sink",
                "writeBehavior": "upsert"
            }
        }
    }
]

导入和导出 JSON 文档

可以使用此 MongoDB 连接器轻松完成以下操作:

  • 在两个 MongoDB 集合之间按原样复制文档。
  • 将各种源(包括 Azure Cosmos DB、Azure Blob 存储、Azure Data Lake Storage 和其他受支持的基于文件的存储)中的 JSON 文档导入 MongoDB。
  • 将 JSON 文档从 MongoDB 集合导出到各种基于文件的存储。

若要实现这种架构不可知的复制,请跳过数据集中的“结构”(也称为“架构”)节和复制活动中的架构映射 。

架构映射

若要将数据从 MongoDB 复制到表格接收器或进行反向复制,请参阅架构映射

升级 MongoDB 链接服务

下面是帮助你升级链接服务和相关查询的步骤:

  1. 创建新的 MongoDB 链接服务,并通过引用链接服务属性对其进行配置。

  2. 如果在引用旧 MongoDB 链接服务的管道中使用 SQL 查询,请将它们替换为等效的 MongoDB 查询。 有关替换示例,请参阅下表:

    SQL 查询 等效的 MongoDB 查询
    SELECT * FROM users db.users.find({})
    SELECT username, age FROM users db.users.find({}, {username: 1, age: 1})
    SELECT username AS User, age AS Age, statusNumber AS Status, CASE WHEN Status = 0 THEN "Pending" CASE WHEN Status = 1 THEN "Finished" ELSE "Unknown" END AS statusEnum LastUpdatedTime + interval '2' hour AS NewLastUpdatedTime FROM users db.users.aggregate([{ $project: { _id: 0, User: "$username", Age: "$age", Status: "$statusNumber", statusEnum: { $switch: { branches: [ { case: { $eq: ["$Status", 0] }, then: "Pending" }, { case: { $eq: ["$Status", 1] }, then: "Finished" } ], default: "Unknown" } }, NewLastUpdatedTime: { $add: ["$LastUpdatedTime", 2 * 60 * 60 * 1000] } } }])
    SELECT employees.name, departments.name AS department_name FROM employees LEFT JOIN departments ON employees.department_id = departments.id; db.employees.aggregate([ { $lookup: { from: "departments", localField: "department_id", foreignField: "_id", as: "department" } }, { $unwind: "$department" }, { $project: { _id: 0, name: 1, department_name: "$department.name" } } ])

有关复制活动支持作为源和接收器的数据存储的列表,请参阅支持的数据存储