使用 Azure 数据工厂在 Azure Cosmos DB for NoSQL 中复制和转换数据

适用于: Azure 数据工厂 Azure Synapse Analytics

提示

试用 Microsoft Fabric 中的数据工厂,这是一种适用于企业的一站式分析解决方案。 Microsoft Fabric 涵盖从数据移动到数据科学、实时分析、商业智能和报告的所有内容。 了解如何免费开始新的试用版

这篇文章概述了如何使用 Azure 数据工厂中的“复制活动”功能来从/向 Azure Cosmos DB for NoSQL 复制数据,并使用数据流转换 Azure Cosmos DB for NoSQL 中的数据。 有关详细信息,请阅读 Azure 数据工厂Azure Synapse Analytics 的简介文章。

注意

此连接器仅支持 Azure Cosmos DB for NoSQL。 有关 Azure Cosmos DB for MongoDB,请参阅适用于 Azure Cosmos DB for MongoDB 的连接器。 目前不支持其他 API 类型。

支持的功能

此 Azure Cosmos DB for NoSQL 连接器支持以下功能:

支持的功能 IR 托管专用终结点
复制活动(源/接收器) ① ②
映射数据流源(源/接收器)
Lookup 活动 ① ②

① Azure 集成运行时 ② 自承载集成运行时

对于复制活动,此 Azure Cosmos DB for NoSQL 连接器支持:

  • 使用用于 Azure 资源身份验证的密钥、服务主体或托管标识从/向 Azure Cosmos DB for NoSQL 复制数据。
  • insertupsert 的形式写入 Azure Cosmos DB。
  • 按原样导入和导出 JSON 文档,或在表格数据集中复制或粘贴数据。 示例包括 SQL 数据库和 CSV 文件。 若要在 JSON 文件或另一个 Azure Cosmos DB 集合中按原样复制或粘贴文档,请参阅导入和导出 JSON 文档

数据工厂和 Synapse 管道与 Azure Cosmos DB 批量执行工具库集成,以便在写入 Azure Cosmos DB 时提供最佳性能。

入门

若要使用管道执行复制活动,可以使用以下工具或 SDK 之一:

使用 UI 创建到 Azure Cosmos DB 的链接服务

使用以下步骤在 Azure 门户 UI 中创建一个到 Azure Cosmos DB 的链接服务。

  1. 浏览到 Azure 数据工厂或 Synapse 工作区中的“管理”选项卡并选择“链接服务”,然后单击“新建”:

  2. 搜索 Azure Cosmos DB for NoSQL,然后选择 Azure Cosmos DB for NoSQL 连接器。

    选择 Azure Cosmos DB for NoSQL 连接器。

  3. 配置服务详细信息、测试连接并创建新的链接服务。

    Azure Cosmos DB 的链接服务配置的屏幕截图。

连接器配置详细信息

以下部分提供有关属性的详细信息,这些属性可用于定义特定于 Azure Cosmos DB for NoSQL 的实体。

链接服务属性

Azure Cosmos DB for NoSQL 连接器支持以下身份验证类型。 请参阅相应部分的了解详细信息:

密钥身份验证

属性 描述 必需
type type 属性必须设置为 CosmosDb
connectionString 指定连接 Azure Cosmos DB 数据库所需的信息。
注意:必须如以下示例所示,在连接字符串中指定数据库信息。
还可以将帐户密钥放在 Azure 密钥保管库中,并从连接字符串中拉取 accountKey 配置。 有关更多详细信息,请参阅以下示例和在 Azure 密钥保管库中存储凭据一文。
connectVia 用于连接到数据存储的 Integration Runtime。 可使用 Azure Integration Runtime 或自承载集成运行时(如果数据存储位于专用网络)。 如果未指定此属性,则使用默认的 Azure Integration Runtime。

示例

{
    "name": "CosmosDbSQLAPILinkedService",
    "properties": {
        "type": "CosmosDb",
        "typeProperties": {
            "connectionString": "AccountEndpoint=<EndpointUrl>;AccountKey=<AccessKey>;Database=<Database>"
        },
        "connectVia": {
            "referenceName": "<name of Integration Runtime>",
            "type": "IntegrationRuntimeReference"
        }
    }
}

示例:在 Azure 密钥保管库中存储帐户密钥

{
    "name": "CosmosDbSQLAPILinkedService",
    "properties": {
        "type": "CosmosDb",
        "typeProperties": {
            "connectionString": "AccountEndpoint=<EndpointUrl>;Database=<Database>",
            "accountKey": { 
                "type": "AzureKeyVaultSecret", 
                "store": { 
                    "referenceName": "<Azure Key Vault linked service name>", 
                    "type": "LinkedServiceReference" 
                }, 
                "secretName": "<secretName>" 
            }
        },
        "connectVia": {
            "referenceName": "<name of Integration Runtime>",
            "type": "IntegrationRuntimeReference"
        }
    }
}

服务主体身份验证

注意

目前,不支持在数据流中进行服务主体身份验证。

若要使用服务主体身份验证,请按照以下步骤操作。

  1. 将应用程序注册到 Microsoft 标识平台。 快速入门:通过 Microsoft 标识平台注册应用程序。 记下以下值,这些值用于定义链接服务:

    • 应用程序 ID
    • 应用程序密钥
    • 租户 ID
  2. 向服务主体授予适当权限。 有关 Azure Cosmos DB 中权限的工作原理的示例,请参阅针对文件和目录的访问控制列表。 更具体地说,请先创建角色定义,然后通过服务主体对象 ID 将角色分配给服务主体。

链接服务支持以下属性:

属性 描述 必需
type type 属性必须设置为 CosmosDb
accountEndpoint 指定 Azure Cosmos DB 实例的帐户终结点 URL。
database 指定数据库的名称。
servicePrincipalId 指定应用程序的客户端 ID。
servicePrincipalCredentialType 要用于服务主体身份验证的凭据类型。 允许的值为“ServicePrincipalKey”和“ServicePrincipalCert” 。
servicePrincipalCredential 服务主体凭据。
使用 ServicePrincipalKey 作为凭据类型时,请指定应用程序的密钥。 将此字段标记为 SecureString 以安全地存储它,或引用存储在 Azure Key Vault 中的机密
当使用 ServicePrincipalCert 作为凭据时,请引用 Azure Key Vault 中的证书并确保证书内容类型为 PKCS #12。
tenant 指定应用程序的租户信息(域名或租户 ID)。 将鼠标悬停在 Azure 门户右上角进行检索。
azureCloudType 对于服务主体身份验证,请指定 Microsoft Entra 应用程序注册到的 Azure 云环境的类型。
允许的值为“AzureChina”。 默认情况下,使用服务的云环境。
connectVia 用于连接到数据存储的集成运行时。 可使用 Azure Integration Runtime 或自承载集成运行时(如果数据存储位于专用网络)。 如果未指定,则使用默认 Azure Integration Runtime。

示例:使用服务主体密钥身份验证

也可以将服务主体密钥存储在 Azure Key Vault 中。

{
    "name": "CosmosDbSQLAPILinkedService",
    "properties": {
        "type": "CosmosDb",
        "typeProperties": {
            "accountEndpoint": "<account endpoint>",
            "database": "<database name>",
            "servicePrincipalId": "<service principal id>",
            "servicePrincipalCredentialType": "ServicePrincipalKey",
            "servicePrincipalCredential": {
                "type": "SecureString",
                "value": "<service principal key>"
            },
            "tenant": "<tenant info, e.g. microsoft.partner.onmschina.cn>" 
        },
        "connectVia": {
            "referenceName": "<name of Integration Runtime>",
            "type": "IntegrationRuntimeReference"
        }
    }
}

示例:使用服务主体证书身份验证

{
    "name": "CosmosDbSQLAPILinkedService",
    "properties": {
        "type": "CosmosDb",
        "typeProperties": {
            "accountEndpoint": "<account endpoint>",
            "database": "<database name>", 
            "servicePrincipalId": "<service principal id>",
            "servicePrincipalCredentialType": "ServicePrincipalCert",
            "servicePrincipalCredential": { 
                "type": "AzureKeyVaultSecret", 
                "store": { 
                    "referenceName": "<AKV reference>", 
                    "type": "LinkedServiceReference" 
                }, 
                "secretName": "<certificate name in AKV>" 
            },
            "tenant": "<tenant info, e.g. microsoft.partner.onmschina.cn>" 
        },
        "connectVia": {
            "referenceName": "<name of Integration Runtime>",
            "type": "IntegrationRuntimeReference"
        }
    }
}

系统分配的托管标识身份验证

注意

目前,数据流中不支持系统分配的托管标识身份验证。

可将数据工厂或 Synapse 管道与代表此特定服务实例的 Azure 资源系统分配的托管标识相关联。 可以像使用你自己的服务主体一样,直接使用此托管标识进行 Azure Cosmos DB 身份验证。 此指定资源可通过此方法访问以及向/从 Azure Cosmos DB 实例复制数据。

若要使用系统分配的 Azure 资源托管标识身份验证,请执行以下步骤。

  1. 通过复制与服务一起生成的托管标识对象 ID 的值,检索系统分配的托管标识信息

  2. 向系统分配的托管标识授予适当的权限。 有关 Azure Cosmos DB 中权限的工作原理的示例,请参阅针对文件和目录的访问控制列表。 更具体地说,创建角色定义,并将角色分配给系统分配的托管标识。

链接服务支持以下属性:

属性 描述 必需
type type 属性必须设置为 CosmosDb
accountEndpoint 指定 Azure Cosmos DB 实例的帐户终结点 URL。
database 指定数据库的名称。
connectVia 用于连接到数据存储的集成运行时。 可使用 Azure Integration Runtime 或自承载集成运行时(如果数据存储位于专用网络)。 如果未指定,则使用默认 Azure Integration Runtime。

示例:

{
    "name": "CosmosDbSQLAPILinkedService",
    "properties": {
        "type": "CosmosDb",
        "typeProperties": {
            "accountEndpoint": "<account endpoint>",
            "database": "<database name>"
        },
        "connectVia": {
            "referenceName": "<name of Integration Runtime>",
            "type": "IntegrationRuntimeReference"
        }
    }
}

用户分配的托管标识身份验证

注意

目前,数据流中不支持用户分配的托管标识身份验证。

可将数据工厂或 Synapse 管道与代表此特定服务实例的用户分配的托管标识相关联。 可以像使用你自己的服务主体一样,直接使用此托管标识进行 Azure Cosmos DB 身份验证。 此指定资源可通过此方法访问以及向/从 Azure Cosmos DB 实例复制数据。

若要使用用户分配的 Azure 资源托管标识身份验证,请执行以下步骤。

  1. 创建一个或多个用户分配的托管标识,并向用户分配的托管标识授予适当的权限。 有关 Azure Cosmos DB 中权限的工作原理的示例,请参阅针对文件和目录的访问控制列表。 更具体地说,创建角色定义,并将角色分配给用户分配的托管标识。

  2. 为数据工厂分配一个或多个用户分配的托管标识,并为每个用户分配的托管标识创建凭据

链接服务支持以下属性:

属性 描述 必需
type type 属性必须设置为 CosmosDb
accountEndpoint 指定 Azure Cosmos DB 实例的帐户终结点 URL。
database 指定数据库的名称。
凭据 将用户分配的托管标识指定为凭据对象。
connectVia 用于连接到数据存储的集成运行时。 可使用 Azure Integration Runtime 或自承载集成运行时(如果数据存储位于专用网络)。 如果未指定,则使用默认 Azure Integration Runtime。

示例:

{
    "name": "CosmosDbSQLAPILinkedService",
    "properties": {
        "type": "CosmosDb",
        "typeProperties": {
            "accountEndpoint": "<account endpoint>",
            "database": "<database name>",
            "credential": {
                "referenceName": "credential1",
                "type": "CredentialReference"
            }
        },
        "connectVia": {
            "referenceName": "<name of Integration Runtime>",
            "type": "IntegrationRuntimeReference"
        }
    }
}

数据集属性

有关可用于定义数据集的各部分和属性的完整列表,请参阅数据集和链接服务

Azure Cosmos DB for NoSQL 数据集支持以下属性:

Property 描述 必需
type 数据集的 type 属性必须设置为 CosmosDbSqlApiCollection
collectionName Azure Cosmos DB 文档集合的名称。

如果你使用“DocumentDbCollection”类型的数据集,则复制和查找活动仍按原样支持它以提供后向兼容性,但数据流不支持它。 建议你今后使用新模型。

示例

{
    "name": "CosmosDbSQLAPIDataset",
    "properties": {
        "type": "CosmosDbSqlApiCollection",
        "linkedServiceName":{
            "referenceName": "<Azure Cosmos DB linked service name>",
            "type": "LinkedServiceReference"
        },
        "schema": [],
        "typeProperties": {
            "collectionName": "<collection name>"
        }
    }
}

复制活动属性

本部分列出 Azure Cosmos DB for NoSQL 源和接收器支持的属性。 有关可用于定义活动的各个部分和属性的完整列表,请参阅管道

Azure Cosmos DB for NoSQL 作为源

若要从 Azure Cosmos DB for NoSQL 复制数据,请将“复制活动”中的“源”类型设置为 DocumentDbCollectionSource。

复制活动 source 节支持以下属性:

属性 描述 必需
type 复制活动源的 type 属性必须设置为 CosmosDbSqlApiSource
query 指定要读取数据的 Azure Cosmos DB 查询。

示例:
SELECT c.BusinessEntityID, c.Name.First AS FirstName, c.Name.Middle AS MiddleName, c.Name.Last AS LastName, c.Suffix, c.EmailPromotion FROM c WHERE c.ModifiedDate > \"2009-01-01T00:00:00\"


如果未指定,则执行此 SQL 语句:select <columns defined in structure> from mycollection
preferredRegions 从 Azure Cosmos DB 检索数据时要连接到的区域的首选列表。
pageSize 查询结果的每页文档数。 默认值为“-1”,表示使用服务端动态页大小,最大为 1000。
detectDatetime 是否从文档中的字符串值中检测日期时间。 允许的值是:true(默认)、false。

如果使用“DocumentDbCollectionSource”类型的源,则仍按原样提供支持以实现后向兼容性。 建议今后使用新模型,新模型提供了更丰富的功能来从 Azure Cosmos DB 复制数据。

示例

"activities":[
    {
        "name": "CopyFromCosmosDBSQLAPI",
        "type": "Copy",
        "inputs": [
            {
                "referenceName": "<Cosmos DB for NoSQL input dataset name>",
                "type": "DatasetReference"
            }
        ],
        "outputs": [
            {
                "referenceName": "<output dataset name>",
                "type": "DatasetReference"
            }
        ],
        "typeProperties": {
            "source": {
                "type": "CosmosDbSqlApiSource",
                "query": "SELECT c.BusinessEntityID, c.Name.First AS FirstName, c.Name.Middle AS MiddleName, c.Name.Last AS LastName, c.Suffix, c.EmailPromotion FROM c WHERE c.ModifiedDate > \"2009-01-01T00:00:00\"",
                "preferredRegions": [
                    "China East 2"
                ]
            },
            "sink": {
                "type": "<sink type>"
            }
        }
    }
]

从 Azure Cosmos DB 复制数据时,除非想要按原样导出 JSON 文档,否则,最佳做法是在复制活动中指定映射。 此服务遵循你在活动上指定的映射 - 如果某个行的某个列中未包含值,则会为列值提供 null 值。 如果未指定映射,则服务将使用数据中的第一行来推断架构。 如果第一行不包含完整架构,则活动操作的结果中将丢失部分列。

Azure Cosmos DB for NoSQL 作为接收器

若要将数据复制到 Azure Cosmos DB for NoSQL,请将“复制活动”中的“接收器”类型设置为 DocumentDbCollectionSink。

复制活动 sink 节支持以下属性:

属性 描述 必需
type 复制活动接收器的 type 属性必须设置为 CosmosDbSqlApiSink
writeBehavior 描述如何将数据写入 Azure Cosmos DB。 允许的值为 insertupsert

upsert 的行为是,如果已存在具有相同 ID 的文档,则替换该文档;否则将插入该文档。

注意:如果未在原始文档中指定 ID,或未通过列映射指定 ID,则服务会自动为文档生成 ID。 这表示必须先确保文档有 ID,才能让 upsert 按预期工作。

(默认值为 insert
writeBatchSize 服务使用 Azure Cosmos DB 批量执行根据库将数据写入 Azure Cosmos DB。 writeBatchSize 属性控制服务提供给库的文档的大小。 可尝试增加 writeBatchSize 的值以提高性能,并在文档大小较大时降低该值 - 请参阅下面的提示
(默认值为 10,000
disableMetricsCollection 服务收集 Azure Cosmos DB RU 等指标,用于复制性能优化和建议。 如果你担心此行为,请指定 true 将其关闭。 否(默认值为 false
 maxConcurrentConnections 活动运行期间与数据存储建立的并发连接的上限。 仅在要限制并发连接时指定一个值。  无

提示

若要按原样导入 JSON 文档,请参阅导入或导出 JSON 文档部分;若要从表格形数据复制,请参阅从关系数据库迁移到 Azure Cosmos DB

提示

Azure Cosmos DB 将单个请求的大小限制为 2MB。 公式为请求大小 = 单个文档大小 * 写入批大小。 若出现“请求太大。”错误,请减少复制接收器配置中的 writeBatchSize

如果使用“DocumentDbCollectionSink”类型的源,则仍按原样提供支持以实现后向兼容性。 建议今后使用新模型,新模型提供了更丰富的功能来从 Azure Cosmos DB 复制数据。

示例

"activities":[
    {
        "name": "CopyToCosmosDBSQLAPI",
        "type": "Copy",
        "inputs": [
            {
                "referenceName": "<input dataset name>",
                "type": "DatasetReference"
            }
        ],
        "outputs": [
            {
                "referenceName": "<Document DB output dataset name>",
                "type": "DatasetReference"
            }
        ],
        "typeProperties": {
            "source": {
                "type": "<source type>"
            },
            "sink": {
                "type": "CosmosDbSqlApiSink",
                "writeBehavior": "upsert"
            }
        }
    }
]

架构映射

要将数据从 Azure Cosmos DB 复制到表格接收器或进行反向复制,请参阅架构映射

映射数据流属性

在映射数据流中转换数据时,可以在 Azure Cosmos DB 的集合中读取和写入数据。 有关详细信息,请参阅映射数据流中的源转换接收器转换

注意

映射数据流中不支持 Azure Cosmos DB 无服务器。

源转换

特定于 Azure Cosmos DB 的设置可在源转换的“源选项”选项卡中找到。

包括系统列:如果为 true,则会将 id_ts 和其他系统列包括在 Azure Cosmos DB 的数据流元数据中。 更新集合时,必须包括此项,以便获取现有行 ID。

页面大小:查询结果的每页文档数。 默认值为“-1”,表示使用服务动态页,最大为 1000。

吞吐量:为读取操作期间每次执行此数据流时要应用于 Azure Cosmos DB 集合的 RU 数设置一个可选值。 最小值为 400。

首选区域:选择此进程的首选读取区域。

更改源:如果为 true,则会按照上次运行中发生更改的顺序自动从 Azure Cosmos DB 更改源获取数据。更改源对容器的更改的持久性记录。 如果将其设置为 true,则不要同时将“推断偏移的列类型”和“允许架构偏移”设置为 true 。 有关更多详细信息,请参阅 Azure Cosmos DB 更改源

从头开始:如果为 true,则会获取首次运行中完整快照数据的初始加载,然后捕获后续运行中已更改的数据。 如果为 false,则在第一次运行时将跳过初始加载,然后在下一次运行时捕获更改的数据。 设置与 Azure Cosmos DB 引用中的相同设置名称对齐。 有关更多详细信息,请参阅 Azure Cosmos DB 更改源

接收器转换

特定于 Azure Cosmos DB 的设置可在接收器转换的“设置”选项卡中找到。

更新方法: 确定数据库目标上允许哪些操作。 默认设置为仅允许插入。 若要更新、更新插入或删除行,需要进行 alter-row 转换才能标记这些操作的行。 对于更新、更新插入和删除操作,必须设置一个或多个键列,以确定要更改的行。

集合操作:确定在写入之前是否重新创建目标集合。

  • 无:不会对集合执行任何操作。
  • 重新创建:将删除集合并重新创建集合

批大小:一个整数,表示每个批中有多少对象被写入 Azure Cosmos DB 集合。 通常,从默认批大小开始就足够了。 若要进一步调整此值,请注意:

  • Azure Cosmos DB 将单个请求的大小限制为 2MB。 公式为“请求大小 = 单个文档大小 * 写入批大小”。 若出现“请求太大”错误,请减少批大小值。
  • 批大小越大,服务可以实现的吞吐量就越好,同时请确保分配足够的 RU 来授权工作负载。

分区键: 输入表示集合的分区键的字符串。 示例: /movies/title

吞吐量:为每次执行此数据流时要应用于 Azure Cosmos DB 集合的 RU 数设置一个可选值。 最小值为 400。

写入吞吐量预算: 一个整数,表示要分配给该集合的总吞吐量中要为此数据流写入操作分配的 RU。

注意

若要限制 RU 使用情况,请将 Cosmos DB 吞吐量(自动缩放) 设置为“手动”。

查找活动属性

若要了解有关属性的详细信息,请查看 Lookup 活动

导入和导出 JSON 文档

使用此 Azure Cosmos DB for NoSQL 连接器,可以轻松完成以下操作:

  • 在两个 Azure Cosmos DB 集合之间按原样复制文档。
  • 从各种源(包括 Azure Blob 存储和服务支持的其他基于文件的存储)将 JSON 文档导入 Azure Cosmos DB。
  • 将 JSON 文档从 Azure Cosmos DB 集合导出到各种基于文件的存储。

若要实现“架构不可知”复制,请执行以下操作:

  • 使用“复制数据”工具时,选择“按原样导出到 JSON 文件或 Azure Cosmos DB 集合”选项。
  • 使用活动创作时,请为源或接收器选择 JSON 格式以及相应的文件存储。

从关系数据库迁移到 Azure Cosmos DB

从关系数据库(例如 SQL Server)迁移到 Azure Cosmos DB 时,复制活动可以轻松地从源映射表格数据,以在 Azure Cosmos DB 中平展 JSON 文档。 某些情况下,你可能希望根据 Azure Cosmos DB 中的数据建模重新设计数据模型,以便针对 NoSQL 用例对其进行优化,例如,通过将所有相关子项嵌入到一个 JSON 文档中来使数据非规范化。 对于这种情况,请参阅此文,其中包含如何使用复制活动实现此目的的演练。

Azure Cosmos DB 更改源

Azure 数据工厂可在映射数据流源转换中启用 Azure Cosmos DB 更改源,从而从中获取数据。 通过此连接器选项,可以在将转换后的数据加载到所选目标数据集之前,读取更改源并应用转换。 不需要使用 Azure Functions 读取更改源,然后编写自定义转换。 可以使用此选项将数据从一个容器移动到另一个容器,根据目的准备更改源驱动的材料视图,或根据更改源自动进行容器备份或恢复,并使用 Azure 数据工厂的可视拖放功能来启用更多此类用例。

请确保管道和活动名称保持不变,以便 ADF 可以记录检查点,从而自动从上次运行中获取更改的数据。 如果更改管道名称或活动名称,检查点将重置,进而导致从头开始或是在下一次运行中开始获取现在开始的更改数据。

调试管道时,此功能都以相同的方式工作。 请注意,在调试运行期间刷新浏览器时,检查点将重置。 若对调试运行中的管道结果感到满意,可继续发布并触发管道。 首次触发已发布管道时,管道将自动从头开始重启,或者立即开始获取更改数据。

在监视部分,你始终有机会重新运行管道。 执行此操作时,始终可从所选管道运行的上一个检查点捕获已更改的数据。

此外,Azure Cosmos DB 分析存储现在支持 Azure Cosmos DB API for NoSQL 和 Azure Cosmos DB API for Mongo DB 的变更数据捕获 (CDC)(公共预览版)。 通过 Azure Cosmos DB 分析存储,你可以高效地使用分析存储中变更(插入、更新和删除)数据的连续和增量馈送。

有关复制活动支持作为源和接收器的数据存储的列表,请参阅支持的数据存储