使用 Azure 数据工厂向/从 Azure Cosmos DB 的用于 MongoDB 的 API 复制数据Copy data to or from Azure Cosmos DB's API for MongoDB by using Azure Data Factory

适用于:是 Azure 数据工厂是 Azure Synapse Analytics(预览版)APPLIES TO: yesAzure Data Factory yesAzure Synapse Analytics (Preview)

本文概述如何使用 Azure 数据工厂中的复制活动从/向 Azure Cosmos DB 的用于 MongoDB 的 API 复制数据。This article outlines how to use Copy Activity in Azure Data Factory to copy data from and to Azure Cosmos DB's API for MongoDB. 本文是根据总体概述复制活动的 Azure 数据工厂中的复制活动编写的。The article builds on Copy Activity in Azure Data Factory, which presents a general overview of Copy Activity.

备注

此连接器仅支持向/从 Azure Cosmos DB 的用于 MongoDB 的 API 复制数据。This connector only support copy data to/from Azure Cosmos DB's API for MongoDB. 有关 SQL API,请参阅 Cosmos DB SQL API 连接器For SQL API, refer to Cosmos DB SQL API connector. 目前不支持其他 API 类型。Other API types are not supported now.

支持的功能Supported capabilities

可将 Azure Cosmos DB 的用于 MongoDB 的 API 中的数据复制到任一受支持的接收器数据存储,或将数据从任一受支持的源数据存储复制到 Azure Cosmos DB 的用于 MongoDB 的 API。You can copy data from Azure Cosmos DB's API for MongoDB to any supported sink data store, or copy data from any supported source data store to Azure Cosmos DB's API for MongoDB. 有关复制活动支持作为源和接收器的数据存储的列表,请参阅支持的数据存储和格式For a list of data stores that Copy Activity supports as sources and sinks, see Supported data stores and formats.

可以将 Azure Cosmos DB 的用于 MongoDB 的 API 连接器用于:You can use the Azure Cosmos DB's API for MongoDB connector to:

  • 将数据复制到 Azure Cosmos DB 的用于 MongoDB 的 APICopy data from and to the Azure Cosmos DB's API for MongoDB.
  • insertupsert 的形式写入 Azure Cosmos DB。Write to Azure Cosmos DB as insert or upsert.
  • 按原样导入和导出 JSON 文档,或在表格数据集中复制或粘贴数据。Import and export JSON documents as-is, or copy data from or to a tabular dataset. 示例包括 SQL 数据库和 CSV 文件。Examples include a SQL database and a CSV file. 若要在 JSON 文件或另一个 Azure Cosmos DB 集合中按原样复制或粘贴文档,请参阅“导入或导出 JSON 文档”。To copy documents as-is to or from JSON files or to or from another Azure Cosmos DB collection, see Import or export JSON documents.

入门Get started

若要使用管道执行复制活动,可以使用以下工具或 SDK 之一:To perform the Copy activity with a pipeline, you can use one of the following tools or SDKs:

对于特定于 Azure Cosmos DB 的用于 MongoDB 的 API 的数据工厂实体,以下部分提供了有关可用于定义这些实体的属性的详细信息。The following sections provide details about properties you can use to define Data Factory entities that are specific to Azure Cosmos DB's API for MongoDB.

链接服务属性Linked service properties

Azure Cosmos DB 的用于 MongoDB 的 API 链接服务支持以下属性:The following properties are supported for the Azure Cosmos DB's API for MongoDB linked service:

属性Property 说明Description 必须Required
typetype type 属性必须设置为 CosmosDbMongoDbApiThe type property must be set to CosmosDbMongoDbApi. Yes
connectionStringconnectionString 指定 Azure Cosmos DB 的用于 MongoDB 的 API 的连接字符串。Specify the connection string for your Azure Cosmos DB's API for MongoDB. 可以在 Azure 门户 -> Cosmos DB 边栏选项卡 -> 主要或辅助连接字符串中找到该字符串,其模式为 mongodb://<cosmosdb-name>:<password>@<cosmosdb-name>.documents.azure.cn:10255/?ssl=true&replicaSet=globaldbYou can find it in the Azure portal -> your Cosmos DB blade -> primary or secondary connection string, with the pattern of mongodb://<cosmosdb-name>:<password>@<cosmosdb-name>.documents.azure.cn:10255/?ssl=true&replicaSet=globaldb.

还可以将密码放在 Azure 密钥保管库中,并从连接字符串中拉取 password 配置。You can also put a password in Azure Key Vault and pull the password configuration out of the connection string. 有关更多详细信息,请参阅在 Azure Key Vault 中存储凭据Refer to Store credentials in Azure Key Vault with more details.
Yes
databasedatabase 要访问的数据库的名称。Name of the database that you want to access. Yes
connectViaconnectVia 用于连接到数据存储的 Integration RuntimeThe Integration Runtime to use to connect to the data store. 可使用 Azure Integration Runtime 或自承载集成运行时(如果数据存储位于专用网络)。You can use the Azure Integration Runtime or a self-hosted integration runtime (if your data store is located in a private network). 如果未指定此属性,则使用默认的 Azure Integration Runtime。If this property isn't specified, the default Azure Integration Runtime is used. No

示例Example

{
    "name": "CosmosDbMongoDBAPILinkedService",
    "properties": {
        "type": "CosmosDbMongoDbApi",
        "typeProperties": {
            "connectionString": "mongodb://<cosmosdb-name>:<password>@<cosmosdb-name>.documents.azure.cn:10255/?ssl=true&replicaSet=globaldb",
            "database": "myDatabase"
        },
        "connectVia": {
            "referenceName": "<name of Integration Runtime>",
            "type": "IntegrationRuntimeReference"
        }
    }
}

数据集属性Dataset properties

有关可用于定义数据集的各部分和属性的完整列表,请参阅数据集和链接服务For a full list of sections and properties that are available for defining datasets, see Datasets and linked services. Azure Cosmos DB 的用于 MongoDB 的 API 数据集支持以下属性:The following properties are supported for Azure Cosmos DB's API for MongoDB dataset:

属性Property 说明Description 必须Required
typetype 数据集的 type 属性必须设置为 CosmosDbMongoDbApiCollectionThe type property of the dataset must be set to CosmosDbMongoDbApiCollection. Yes
collectionNamecollectionName Azure Cosmos DB 集合的名称。The name of the Azure Cosmos DB collection. Yes

示例Example

{
    "name": "CosmosDbMongoDBAPIDataset",
    "properties": {
        "type": "CosmosDbMongoDbApiCollection",
        "typeProperties": {
            "collectionName": "<collection name>"
        },
        "schema": [],
        "linkedServiceName":{
            "referenceName": "<Azure Cosmos DB's API for MongoDB linked service name>",
            "type": "LinkedServiceReference"
        }
    }
}

复制活动属性Copy Activity properties

本部分列出 Azure Cosmos DB 的用于 MongoDB 的 API 源和接收器支持的属性。This section provides a list of properties that the Azure Cosmos DB's API for MongoDB source and sink support.

有关可用于定义活动的各个部分和属性的完整列表,请参阅管道For a full list of sections and properties that are available for defining activities, see Pipelines.

Azure Cosmos DB 的用于 MongoDB 的 API 作为源Azure Cosmos DB's API for MongoDB as source

复制活动 source 节支持以下属性:The following properties are supported in the Copy Activity source section:

属性Property 说明Description 必须Required
typetype 复制活动源的 type 属性必须设置为 CosmosDbMongoDbApiSourceThe type property of the copy activity source must be set to CosmosDbMongoDbApiSource. Yes
filterfilter 使用查询运算符指定选择筛选器。Specifies selection filter using query operators. 若要返回集合中的所有文档,请省略此参数或传递空文档 ({})。To return all documents in a collection, omit this parameter or pass an empty document ({}). No
cursorMethods.projectcursorMethods.project 指定要在文档中返回用于投影的字段。Specifies the fields to return in the documents for projection. 若要返回匹配文档中的所有字段,请省略此参数。To return all fields in the matching documents, omit this parameter. No
cursorMethods.sortcursorMethods.sort 指定查询返回匹配文档的顺序。Specifies the order in which the query returns matching documents. 请参阅 cursor.sort()Refer to cursor.sort(). No
cursorMethods.limitcursorMethods.limit 指定服务器返回的文档的最大数量。Specifies the maximum number of documents the server returns. 请参阅 cursor.limit()Refer to cursor.limit(). No
cursorMethods.skipcursorMethods.skip 指定要跳过的文档数量以及 MongoDB 开始返回结果的位置。Specifies the number of documents to skip and from where MongoDB begins to return results. 请参阅 cursor.skip()Refer to cursor.skip(). No
batchSizebatchSize 指定从 MongoDB 实例的每批响应中返回的文档数量。Specifies the number of documents to return in each batch of the response from MongoDB instance. 大多数情况下,修改批大小不会影响用户或应用程序。In most cases, modifying the batch size will not affect the user or the application. Cosmos DB 限制每个批不能超过 40 MB(这是文档大小的 batchSize 数量的总和),因此如果文档很大,请减小此值。Cosmos DB limits each batch cannot exceed 40MB in size, which is the sum of the batchSize number of documents' size, so decrease this value if your document size being large. No
(默认值为 100(the default is 100)

提示

ADF 支持在严格模式下使用 BSON 文档。ADF support consuming BSON document in Strict mode. 请确保筛选器查询处于严格模式,而不是 Shell 模式。Make sure your filter query is in Strict mode instead of Shell mode. 有关详细说明,请参阅 MongoDB 手册More description can be found at MongoDB manual.

示例Example

"activities":[
    {
        "name": "CopyFromCosmosDBMongoDBAPI",
        "type": "Copy",
        "inputs": [
            {
                "referenceName": "<Azure Cosmos DB's API for MongoDB input dataset name>",
                "type": "DatasetReference"
            }
        ],
        "outputs": [
            {
                "referenceName": "<output dataset name>",
                "type": "DatasetReference"
            }
        ],
        "typeProperties": {
            "source": {
                "type": "CosmosDbMongoDbApiSource",
                "filter": "{datetimeData: {$gte: ISODate(\"2018-12-11T00:00:00.000Z\"),$lt: ISODate(\"2018-12-12T00:00:00.000Z\")}, _id: ObjectId(\"5acd7c3d0000000000000000\") }",
                "cursorMethods": {
                    "project": "{ _id : 1, name : 1, age: 1, datetimeData: 1 }",
                    "sort": "{ age : 1 }",
                    "skip": 3,
                    "limit": 3
                }
            },
            "sink": {
                "type": "<sink type>"
            }
        }
    }
]

Azure Cosmos DB 的用于 MongoDB 的 API 作为链接Azure Cosmos DB's API for MongoDB as sink

复制活动 sink 节支持以下属性:The following properties are supported in the Copy Activity sink section:

属性Property 说明Description 必须Required
typetype 复制活动接收器的 type 属性必须设置为 CosmosDbMongoDbApiSinkThe type property of the Copy Activity sink must be set to CosmosDbMongoDbApiSink. Yes
writeBehaviorwriteBehavior 描述如何将数据写入 Azure Cosmos DB。Describes how to write data to Azure Cosmos DB. 允许的值为 insertupsertAllowed values: insert and upsert.

upsert 的行为是,如果已存在具有相同 _id 的文档,则替换该文档;否则将插入该文档。The behavior of upsert is to replace the document if a document with the same _id already exists; otherwise, insert the document.

注意:如果未在原始文档中或通过列映射指定 _id,则数据工厂会自动为文档生成 _idNote: Data Factory automatically generates an _id for a document if an _id isn't specified either in the original document or by column mapping. 这表示必须先确保文档有 ID,才能让 upsert 按预期工作。This means that you must ensure that, for upsert to work as expected, your document has an ID.
No
(默认值为 insert(the default is insert)
writeBatchSizewriteBatchSize writeBatchSize 属性控制每个批中可写入的文档大小。The writeBatchSize property controls the size of documents to write in each batch. 可尝试增大 writeBatchSize 的值以提高性能,并在文档大小较大时减小该值。You can try increasing the value for writeBatchSize to improve performance and decreasing the value if your document size being large. No
(默认值为 10,000(the default is 10,000)
writeBatchTimeoutwriteBatchTimeout 超时前等待批插入操作完成的时间。允许的值为 timespan。The wait time for the batch insert operation to finish before it times out. The allowed value is timespan. No
(默认值为 00:30:00 - 30 分钟)(the default is 00:30:00 - 30 minutes)

提示

若要按原样导入 JSON 文档,请参阅导入或导出 JSON 文档部分;若要从表格形数据复制,请参阅架构映射To import JSON documents as-is, refer to Import or export JSON documents section; to copy from tabular-shaped data, refer to Schema mapping.

示例Example

"activities":[
    {
        "name": "CopyToCosmosDBMongoDBAPI",
        "type": "Copy",
        "inputs": [
            {
                "referenceName": "<input dataset name>",
                "type": "DatasetReference"
            }
        ],
        "outputs": [
            {
                "referenceName": "<Document DB output dataset name>",
                "type": "DatasetReference"
            }
        ],
        "typeProperties": {
            "source": {
                "type": "<source type>"
            },
            "sink": {
                "type": "CosmosDbMongoDbApiSink",
                "writeBehavior": "upsert"
            }
        }
    }
]

导入和导出 JSON 文档Import and export JSON documents

使用此 Azure Cosmos DB 连接器,可以轻松地:You can use this Azure Cosmos DB connector to easily:

  • 在两个 Azure Cosmos DB 集合之间按原样复制文档。Copy documents between two Azure Cosmos DB collections as-is.
  • 从各种源(包括 MongoDB、Azure Blob 存储和 Azure 数据工厂支持的其他基于文件的存储)将 JSON 文档导入 Azure Cosmos DB。Import JSON documents from various sources to Azure Cosmos DB, including from MongoDB, Azure Blob storage, and other file-based stores that Azure Data Factory supports.
  • 将 JSON 文档从 Azure Cosmos DB 集合导出到各种基于文件的存储。Export JSON documents from an Azure Cosmos DB collection to various file-based stores.

若要实现“架构不可知”复制,请执行以下操作:To achieve schema-agnostic copy:

  • 使用复制数据工具时,选择“原样导出到 JSON 文件或 Cosmos DB 集合”选项 。When you use the Copy Data tool, select the Export as-is to JSON files or Cosmos DB collection option.
  • 使用活动创作时,请为源或接收器选择相应文件存储的 JSON 格式。When you use activity authoring, choose JSON format with the corresponding file store for source or sink.

架构映射Schema mapping

要将数据从 Azure Cosmos DB 的用于 MongoDB 的 API 复制到表格接收器或进行反向复制,请参阅架构映射To copy data from Azure Cosmos DB's API for MongoDB to tabular sink or reversed, refer to schema mapping.

具体而言,在写入到 Cosmos DB 时,为了确保使用源数据中的正确对象 ID 填充 Cosmos DB(例如,SQL 数据库表中包含一个“id”列,你想要使用该列的值作为 MongoDB 中的文档 ID 以完成插入/更新插入操作),需要根据 MongoDB 严格模式定义 (_id.$oid) 设置适当的架构映射,如下所示:Specifically for writing into Cosmos DB, to make sure you populate Cosmos DB with the right object ID from your source data, for example, you have an "id" column in SQL database table and want to use the value of that as the document ID in MongoDB for insert/upsert, you need to set the proper schema mapping according to MongoDB strict mode definition (_id.$oid) as the following:

MongoDB 接收器中的映射 ID

完成复制活动的执行后,接收器中将生成以下 BSON ObjectId:After copy activity execution, below BSON ObjectId is generated in sink:

{
    "_id": ObjectId("592e07800000000000000000")
}

后续步骤Next steps

有关 Azure 数据工厂中复制活动支持用作源和接收器的数据存储的列表,请参阅支持的数据存储For a list of data stores that Copy Activity supports as sources and sinks in Azure Data Factory, see supported data stores.