使用 Azure 数据工厂或 Synapse Analytics 来复制和转换 Azure SQL 托管实例中的数据

适用于: Azure 数据工厂 Azure Synapse Analytics

提示

试用 Microsoft Fabric 中的数据工厂,这是一种适用于企业的一站式分析解决方案。 Microsoft Fabric 涵盖从数据移动到数据科学、实时分析、商业智能和报告的所有内容。 了解如何免费开始新的试用版

本文概述如何“复制活动”功能从 Azure SQL 托管实例复制数据或将数据复制到其中,并使用“数据流”转换 Azure SQL 托管实例中的数据。 有关详细信息,请阅读 Azure 数据工厂Synapse Analytics 的简介文章。

支持的功能

此 Azure SQL 托管实例连接器支持以下功能:

支持的功能 IR 托管专用终结点
复制活动(源/接收器) ① ② ✓ 公共预览版
映射数据流源(源/接收器) ✓ 公共预览版
Lookup 活动 ① ② ✓ 公共预览版
GetMetadata 活动 ① ② ✓ 公共预览版
脚本活动 ① ② ✓ 公共预览版
存储过程活动 ① ② ✓ 公共预览版

① Azure 集成运行时 ② 自承载集成运行时

对于复制活动,此 Azure SQL 数据库连接器支持以下功能:

  • 使用 SQL 身份验证和 Microsoft Entra 应用程序令牌身份验证通过服务主体或 Azure 资源托管标识来复制数据。
  • 作为源,使用 SQL 查询或存储过程检索数据。 还可选择从 SQL MI 源进行并行复制。有关详细信息,请参阅从 SQL MI 进行并行复制部分。
  • 作为接收器,根据源架构自动创建目标表(如果不存在);在复制过程中,使用自定义逻辑将数据追加到表或调用存储过程。

先决条件

若要访问 SQL 托管实例公用终结点,可以使用托管的 Azure 集成运行时。 确保启用公共终结点,并在网络安全组中允许公共终结点流量,使该服务能够连接到你的数据库。 有关详细信息,请参阅此指南

若要访问 SQL 托管实例专用终结点,请设置一个能够访问数据库的自承载集成运行时。 如果预配托管实例所在的虚拟网络中的自承载集成运行时,请确保集成运行时计算机与托管实例位于不同的子网中。 如果预配的自承载集成运行时与托管实例位于不同的虚拟网络中,则可使用虚拟网络对等互连或虚拟网络间的连接。 有关详细信息,请参阅将应用程序连接到 SQL 托管实例

入门

若要使用管道执行复制活动,可以使用以下工具或 SDK 之一:

使用 UI 创建到 Azure SQL 托管实例的链接服务

使用以下步骤在 Azure 门户 UI 中创建到 SQL 托管实例的链接服务。

  1. 浏览到 Azure 数据工厂或 Synapse 工作区中的“管理”选项卡并选择“链接服务”,然后单击“新建”:

  2. 搜索“SQL”并选择 Azure SQL Server 托管实例连接器。

    Screenshot of the Azure SQL Server Managed Instance connector.

  3. 配置服务详细信息、测试连接并创建新的链接服务。

    Screenshot of linked service configuration for a SQL Managed instance.

连接器配置详细信息

以下部分详述了用来定义特定于 SQL 托管实例连接器的 Azure 数据工厂实体的属性。

链接服务属性

SQL 托管实例链接服务支持这些通用属性:

属性 描述 必需
type type 属性必须设置为 AzureSqlMI
connectionString 此属性指定通过 SQL 身份验证连接到 SQL 托管实例时所需的 connectionString 信息。 有关详细信息,请参阅以下示例。
默认端口为 1433。 如果将 SQL 托管实例与公共终结点配合使用,请显式指定端口 3342。
还可以在 Azure Key Vault 中输入密码。 如果使用 SQL 身份验证,请从连接字符串中提取 password 配置。 有关详细信息,请参阅表格后面的 JSON 示例,以及在 Azure Key Vault 中存储凭据
azureCloudType 对于服务主体身份验证,请指定 Microsoft Entra 应用程序注册到的 Azure 云环境的类型。
允许的值为“AzureChina”。 默认情况下,使用服务的云环境。
alwaysEncryptedSettings 指定所需的 alwaysencryptedsettings 信息来启用 Always Encrypted,以使用托管标识或服务主体保护 SQL Server 中存储的敏感数据。 有关详细信息,请参阅表格后面的 JSON 示例以及使用 Always Encrypted 部分。 如果不指定此属性,将禁用默认的 Always Encrypted 设置。
connectVia 集成运行时用于连接到数据存储。 如果托管实例具有公共终结点并允许服务访问该服务公共终结点,则可以使用自承载集成运行时或 Azure 集成运行时。 如果未指定,则使用默认 Azure Integration Runtime。

对于不同的身份验证类型,请分别参考以下有关特定属性、先决条件和 JSON 示例的部分:

SQL 身份验证

若要使用 SQL 身份验证类型,请指定上一部分所述的通用属性。

示例 1:使用 SQL 身份验证

{
    "name": "AzureSqlMILinkedService",
    "properties": {
        "type": "AzureSqlMI",
        "typeProperties": {
            "connectionString": "Data Source=<hostname,port>;Initial Catalog=<databasename>;Integrated Security=False;User ID=<username>;Password=<password>;"
        },
        "connectVia": {
            "referenceName": "<name of Integration Runtime>",
            "type": "IntegrationRuntimeReference"
        }
    }
}

示例 2:将 SQL 身份验证与 Azure Key Vault 中的密码结合使用

{
    "name": "AzureSqlMILinkedService",
    "properties": {
        "type": "AzureSqlMI",
        "typeProperties": {
            "connectionString": "Data Source=<hostname,port>;Initial Catalog=<databasename>;Integrated Security=False;User ID=<username>;",
            "password": { 
                "type": "AzureKeyVaultSecret", 
                "store": { 
                    "referenceName": "<Azure Key Vault linked service name>", 
                    "type": "LinkedServiceReference" 
                }, 
                "secretName": "<secretName>" 
            }
        },
        "connectVia": {
            "referenceName": "<name of Integration Runtime>",
            "type": "IntegrationRuntimeReference"
        }
    }
}

示例 3:将 SQL 身份验证与 Always Encrypted 配合使用

{
    "name": "AzureSqlMILinkedService",
    "properties": {
        "type": "AzureSqlMI",
        "typeProperties": {
            "connectionString": "Data Source=<hostname,port>;Initial Catalog=<databasename>;Integrated Security=False;User ID=<username>;Password=<password>;"
        },
        "alwaysEncryptedSettings": {
            "alwaysEncryptedAkvAuthType": "ServicePrincipal",
            "servicePrincipalId": "<service principal id>",
            "servicePrincipalKey": {
                "type": "SecureString",
                "value": "<service principal key>"
            }
        },
        "connectVia": {
            "referenceName": "<name of Integration Runtime>",
            "type": "IntegrationRuntimeReference"
        }
    }
}

服务主体身份验证

要使用服务主体身份验证,除了上一节中描述的通用属性外,还要指定以下属性

属性 描述 必选
servicePrincipalId 指定应用程序的客户端 ID。
servicePrincipalKey 指定应用程序的密钥。 将此字段标记为 SecureString 以安全地存储它,或引用存储在 Azure 密钥保管库中的机密
tenant 指定应用程序所在的租户的信息(例如域名或租户 ID)。 将鼠标悬停在 Azure 门户右上角进行检索。

还需要执行以下步骤:

  1. 按照步骤为托管实例预配 Microsoft Entra 管理员

  2. 在 Azure 门户中创建 Microsoft Entra 应用程序。 记下应用程序名称,以及以下定义链接服务的值:

    • 应用程序 ID
    • 应用程序密钥
    • 租户 ID
  3. 为服务主体创建登录名。 在 SQL Server Management Studio (SSMS) 中,使用作为 sysadmin 的 SQL Server 帐户连接到托管实例。 在 master 数据库中,运行以下 T-SQL:

    CREATE LOGIN [your application name] FROM EXTERNAL PROVIDER
    
  4. 为服务主体创建包含的数据库用户。 连接到要从其复制数据或将数据复制到其中的数据库,运行以下 T-SQL:

    CREATE USER [your application name] FROM EXTERNAL PROVIDER
    
  5. 像通常对 SQL 用户和其他用户所做的那样向服务主体授予所需的权限。 运行以下代码。 有关更多选项,请参阅此文档

    ALTER ROLE [role name e.g. db_owner] ADD MEMBER [your application name]
    
  6. 配置 SQL 托管实例链接服务。

示例:使用服务主体身份验证

{
    "name": "AzureSqlDbLinkedService",
    "properties": {
        "type": "AzureSqlMI",
        "typeProperties": {
            "connectionString": "Data Source=<hostname,port>;Initial Catalog=<databasename>;",
            "servicePrincipalId": "<service principal id>",
            "servicePrincipalKey": {
                "type": "SecureString",
                "value": "<service principal key>"
            },
            "tenant": "<tenant info, e.g. microsoft.partner.onmschina.cn>"
        },
        "connectVia": {
            "referenceName": "<name of Integration Runtime>",
            "type": "IntegrationRuntimeReference"
        }
    }
}

系统分配的托管标识身份验证

数据工厂或 Synapse 工作区可以与系统分配的 Azure 资源的托管标识相关联,该托管标识代表了向其他 Azure 服务进行身份验证的服务。 可将此托管标识用于 SQL 托管实例身份验证。 指定的服务可以使用此标识访问数据库数据或从/向数据库复制数据。

若要使用系统分配的托管标识身份验证,请指定上一部分所述的通用属性,然后按照以下步骤操作。

  1. 按照步骤为托管实例预配 Microsoft Entra 管理员

  2. 为系统分配的托管标识创建登录名。 在 SQL Server Management Studio (SSMS) 中,使用作为 sysadmin 的 SQL Server 帐户连接到托管实例。 在 master 数据库中,运行以下 T-SQL:

    CREATE LOGIN [your_factory_or_workspace_ name] FROM EXTERNAL PROVIDER
    
  3. 为系统分配的托管标识创建包含的数据库用户。 连接到要从其复制数据或将数据复制到其中的数据库,运行以下 T-SQL:

    CREATE USER [your_factory_or_workspace_name] FROM EXTERNAL PROVIDER
    
  4. 为系统分配的托管标识授予所需的权限,就像通常为 SQL 用户和其他用户所做的那样。 运行以下代码。 有关更多选项,请参阅此文档

    ALTER ROLE [role name e.g. db_owner] ADD MEMBER [your_factory_or_workspace_name]
    
  5. 配置 SQL 托管实例链接服务。

示例:使用系统分配的托管标识身份验证

{
    "name": "AzureSqlDbLinkedService",
    "properties": {
        "type": "AzureSqlMI",
        "typeProperties": {
            "connectionString": "Data Source=<hostname,port>;Initial Catalog=<databasename>;"
        },
        "connectVia": {
            "referenceName": "<name of Integration Runtime>",
            "type": "IntegrationRuntimeReference"
        }
    }
}

用户分配的托管标识身份验证

数据工厂或 Synapse 工作区可以与用户分配的托管标识相关联,该托管标识代表了向其他 Azure 服务进行身份验证的服务。 可将此托管标识用于 SQL 托管实例身份验证。 指定的服务可以使用此标识访问数据库数据或从/向数据库复制数据。

要使用用户分配的托管身份身份验证,除了上一节中描述的通用属性外,还要指定以下属性:

属性 描述 必需
凭据 将用户分配的托管标识指定为凭据对象。

还需要执行以下步骤:

  1. 按照步骤为托管实例预配 Microsoft Entra 管理员

  2. 为用户分配的托管标识创建登录名。 在 SQL Server Management Studio (SSMS) 中,使用作为 sysadmin 的 SQL Server 帐户连接到托管实例。 在 master 数据库中,运行以下 T-SQL:

    CREATE LOGIN [your_factory_or_workspace_ name] FROM EXTERNAL PROVIDER
    
  3. 为用户分配的托管标识创建包含的数据库用户。 连接到要从其复制数据或将数据复制到其中的数据库,运行以下 T-SQL:

    CREATE USER [your_factory_or_workspace_name] FROM EXTERNAL PROVIDER
    
  4. 创建一个或多个由用户分配的托管标识,并向用户分配的托管标识授予对 SQL 用户和其他用户正常操作所需的权限。 运行以下代码。 有关更多选项,请参阅此文档

    ALTER ROLE [role name e.g. db_owner] ADD MEMBER [your_factory_or_workspace_name]
    
  5. 为数据工厂分配一个或多个用户分配的托管标识,并为每个用户分配的托管标识创建凭据

  6. 配置 SQL 托管实例链接服务。

示例:使用用户分配的托管标识身份验证

{
    "name": "AzureSqlDbLinkedService",
    "properties": {
        "type": "AzureSqlMI",
        "typeProperties": {
            "connectionString": "Data Source=<hostname,port>;Initial Catalog=<databasename>;",
            "credential": {
                "referenceName": "credential1",
                "type": "CredentialReference"
            }
        },
        "connectVia": {
            "referenceName": "<name of Integration Runtime>",
            "type": "IntegrationRuntimeReference"
        }
    }
}

数据集属性

如需可用于定义数据集的节和属性的完整列表,请参阅有关数据集的文章。 此部分提供 SQL 托管实例数据集支持的属性列表。

若要从/向 SQL 托管实例复制数据,以下属性需受支持:

属性 描述 必需
type 数据集的 type 属性必须设置为 AzureSqlMITable。
schema 架构的名称。 对于源为“No”,对于接收器为“Yes”
表/视图的名称。 对于源为“No”,对于接收器为“Yes”
tableName 具有架构的表/视图的名称。 此属性支持后向兼容性。 对于新的工作负荷,请使用 schematable 对于源为“No”,对于接收器为“Yes”

示例

{
    "name": "AzureSqlMIDataset",
    "properties":
    {
        "type": "AzureSqlMITable",
        "linkedServiceName": {
            "referenceName": "<SQL Managed Instance linked service name>",
            "type": "LinkedServiceReference"
        },
        "schema": [ < physical schema, optional, retrievable during authoring > ],
        "typeProperties": {
            "schema": "<schema_name>",
            "table": "<table_name>"
        }
    }
}

复制活动属性

有关可用于定义活动的节和属性的完整列表,请参阅管道一文。 此部分提供 SQL 托管实例源和和接收器支持的属性列表。

以 SQL 托管实例作为源

提示

若要使用数据分区从 SQL MI 高效地加载数据,请参阅从 SQL MI 进行并行复制

若要从 SQL 托管实例复制数据,复制活动的 source 节需要支持以下属性:

属性 描述 必需
type 复制活动源的 type 属性必须设置为 SqlMISource
sqlReaderQuery 此属性使用自定义 SQL 查询来读取数据。 例如 select * from MyTable
sqlReaderStoredProcedureName 此属性是从源表读取数据的存储过程的名称。 最后一个 SQL 语句必须是存储过程中的 SELECT 语句。
storedProcedureParameters 这些参数用于存储过程。
允许的值为名称或值对。 参数的名称和大小写必须与存储过程参数的名称和大小写匹配。
isolationLevel 指定 SQL 源的事务锁定行为。 允许的值为:ReadCommitted、ReadUncommitted、RepeatableRead、Serializable、Snapshot 。 如果未指定,则使用数据库的默认隔离级别。 请参阅此文档了解更多详细信息。
partitionOptions 指定用于从 SQL MI 加载数据的数据分区选项。
允许值包括:None(默认值)、PhysicalPartitionsOfTable 和 DynamicRange 。
启用分区选项(即该选项不为 None)时,用于从 SQL MI 并行加载数据的并行度由复制活动上的 parallelCopies 设置控制。
partitionSettings 指定数据分区的设置组。
当分区选项不是 None 时适用。
partitionSettings
partitionColumnName 以整数类型、日期类型或日期/时间类型(intsmallintbigintdatesmalldatetimedatetimedatetime2datetimeoffset)指定源列的名称,范围分区将使用它进行并行复制。 如果未指定,系统会自动检测表的索引或主键并将其用作分区列。
当分区选项是 DynamicRange 时适用。 如果使用查询来检索源数据,请在 WHERE 子句中挂接 ?AdfDynamicRangePartitionCondition 。 有关示例,请参阅从 SQL 数据库进行并行复制部分。
partitionUpperBound 分区范围拆分的分区列的最大值。 此值用于决定分区步幅,不用于筛选表中的行。 将对表或查询结果中的所有行进行分区和复制。 如果未指定,复制活动会自动检测该值。
当分区选项是 DynamicRange 时适用。 有关示例,请参阅从 SQL 数据库进行并行复制部分。
partitionLowerBound 分区范围拆分的分区列的最小值。 此值用于决定分区步幅,不用于筛选表中的行。 将对表或查询结果中的所有行进行分区和复制。 如果未指定,复制活动会自动检测该值。
当分区选项是 DynamicRange 时适用。 有关示例,请参阅从 SQL 数据库进行并行复制部分。

请注意以下几点:

  • 如果为 SqlMISource 指定 sqlReaderQuery,则复制活动会针对 SQL 托管实例源运行此查询以获取数据。 也可通过指定 sqlReaderStoredProcedureName 和 storedProcedureParameters 来指定存储过程,前提是存储过程使用参数 。
  • 在源中使用存储过程检索数据时,请注意,如果存储过程设计为当传入不同的参数值时返回不同的架构,则从 UI 导入架构或使用自动表创建将数据复制到 SQL 数据库时,可能会遇到故障或出现意外的结果。

示例:使用 SQL 查询

"activities":[
    {
        "name": "CopyFromAzureSqlMI",
        "type": "Copy",
        "inputs": [
            {
                "referenceName": "<SQL Managed Instance input dataset name>",
                "type": "DatasetReference"
            }
        ],
        "outputs": [
            {
                "referenceName": "<output dataset name>",
                "type": "DatasetReference"
            }
        ],
        "typeProperties": {
            "source": {
                "type": "SqlMISource",
                "sqlReaderQuery": "SELECT * FROM MyTable"
            },
            "sink": {
                "type": "<sink type>"
            }
        }
    }
]

示例:使用存储过程

"activities":[
    {
        "name": "CopyFromAzureSqlMI",
        "type": "Copy",
        "inputs": [
            {
                "referenceName": "<SQL Managed Instance input dataset name>",
                "type": "DatasetReference"
            }
        ],
        "outputs": [
            {
                "referenceName": "<output dataset name>",
                "type": "DatasetReference"
            }
        ],
        "typeProperties": {
            "source": {
                "type": "SqlMISource",
                "sqlReaderStoredProcedureName": "CopyTestSrcStoredProcedureWithParameters",
                "storedProcedureParameters": {
                    "stringData": { "value": "str3" },
                    "identifier": { "value": "$$Text.Format('{0:yyyy}', <datetime parameter>)", "type": "Int"}
                }
            },
            "sink": {
                "type": "<sink type>"
            }
        }
    }
]

存储过程定义

CREATE PROCEDURE CopyTestSrcStoredProcedureWithParameters
(
    @stringData varchar(20),
    @identifier int
)
AS
SET NOCOUNT ON;
BEGIN
    select *
    from dbo.UnitTestSrcTable
    where dbo.UnitTestSrcTable.stringData != stringData
    and dbo.UnitTestSrcTable.identifier != identifier
END
GO

以 SQL 托管实例作为接收器

提示

若要详细了解支持的写入行为、配置和最佳做法,请参阅将数据加载到 SQL 托管实例中的最佳做法

若要将数据复制到 SQL 托管实例,复制活动的 sink 节需要支持以下属性:

属性 描述 必需
type 复制活动接收器的 type 属性必须设置为 SqlMISink
preCopyScript 此属性指定将数据写入到 SQL 托管实例之前要由复制活动运行的 SQL 查询。 每次运行复制仅调用该查询一次。 可以使用此属性清除预加载的数据。
tableOption 指定是否根据源架构自动创建接收器表(如果不存在)。 接收器指定存储过程时不支持自动创建表。 允许的值为:none(默认值)、autoCreate
sqlWriterStoredProcedureName 定义如何将源数据应用于目标表的存储过程的名称。
此存储过程由每个批处理调用。 若要执行仅运行一次且与源数据无关的操作(例如删除或截断),请使用 preCopyScript 属性。
请参阅调用 SQL 接收器的存储过程中的示例。
storedProcedureTableTypeParameterName 存储过程中指定的表类型的参数名称。
sqlWriterTableType 要在存储过程中使用的表类型名称。 通过复制活动,使移动数据在具备此表类型的临时表中可用。 然后,存储过程代码可合并复制数据和现有数据。
storedProcedureParameters 存储过程的参数。
允许的值为名称和值对。 参数的名称和大小写必须与存储过程参数的名称和大小写匹配。
writeBatchSize 每批要插入到 SQL 表中的行数。
允许的值为表示行数的整数。 默认情况下,该服务根据行大小动态确定适当的批大小。
writeBatchTimeout 此属性指定超时前等待批插入操作完成的时间。
允许的值是指时间跨度。 例如,“00:30:00”表示 30 分钟。
 maxConcurrentConnections 活动运行期间与数据存储建立的并发连接的上限。 仅在要限制并发连接时指定一个值。  无
WriteBehavior 指定复制活动的写入行为,以将数据加载到 Azure SQL MI。
允许的值为 insert 和 upsert。 默认情况下,服务使用 insert 来加载数据。
upsertSettings 指定写入行为的设置组。
当 WriteBehavior 选项为 Upsert 时应用。
upsertSettings
useTempDB 指定是将全局临时表还是物理表用作 upsert 的临时表。
默认情况下,该服务使用全局临时表作为临时表。 值为 true
interimSchemaName 如果使用了物理表,则指定用于创建临时表的临时架构。 注意:用户需要具有创建和删除表的权限。 默认情况下,临时表将与接收器表共享相同的架构。
当 useTempDB 选项为 False 时应用。
密钥 指定唯一行标识的列名称。 可使用单个键,也可使用一系列键。 如果未指定,将使用主键。

示例 1:追加数据

"activities":[
    {
        "name": "CopyToAzureSqlMI",
        "type": "Copy",
        "inputs": [
            {
                "referenceName": "<input dataset name>",
                "type": "DatasetReference"
            }
        ],
        "outputs": [
            {
                "referenceName": "<SQL Managed Instance output dataset name>",
                "type": "DatasetReference"
            }
        ],
        "typeProperties": {
            "source": {
                "type": "<source type>"
            },
            "sink": {
                "type": "SqlMISink",
                "tableOption": "autoCreate",
                "writeBatchSize": 100000
            }
        }
    }
]

示例 2:在复制过程中调用存储过程

请参阅调用 SQL MI 接收器的存储过程,了解更多详细信息。

"activities":[
    {
        "name": "CopyToAzureSqlMI",
        "type": "Copy",
        "inputs": [
            {
                "referenceName": "<input dataset name>",
                "type": "DatasetReference"
            }
        ],
        "outputs": [
            {
                "referenceName": "<SQL Managed Instance output dataset name>",
                "type": "DatasetReference"
            }
        ],
        "typeProperties": {
            "source": {
                "type": "<source type>"
            },
            "sink": {
                "type": "SqlMISink",
                "sqlWriterStoredProcedureName": "CopyTestStoredProcedureWithParameters",
                "storedProcedureTableTypeParameterName": "MyTable",
                "sqlWriterTableType": "MyTableType",
                "storedProcedureParameters": {
                    "identifier": { "value": "1", "type": "Int" },
                    "stringData": { "value": "str1" }
                }
            }
        }
    }
]

示例 3:更新插入数据

"activities":[
    {
        "name": "CopyToAzureSqlMI",
        "type": "Copy",
        "inputs": [
            {
                "referenceName": "<input dataset name>",
                "type": "DatasetReference"
            }
        ],
        "outputs": [
            {
                "referenceName": "<SQL Managed Instance output dataset name>",
                "type": "DatasetReference"
            }
        ],
        "typeProperties": {
            "source": {
                "type": "<source type>"
            },
            "sink": {
                "type": "SqlMISink",
                "tableOption": "autoCreate",
                "writeBehavior": "upsert",
                "upsertSettings": {
                    "useTempDB": true,
                    "keys": [
                        "<column name>"
                    ]
                },            
            }
        }
    }
]

从 SQL MI 进行并行复制

复制活动中的 Azure SQL 托管实例连接器提供内置的数据分区,用于并行复制数据。 可以在复制活动的“源”表中找到数据分区选项。

Screenshot of partition options

启用分区复制时,复制活动将对 SQL MI 源运行并行查询,以按分区加载数据。 可通过复制活动中的 parallelCopies 设置控制并行度。 例如,如果将 parallelCopies 设置为 4,则该服务会根据指定的分区选项和设置并行生成并运行 4 个查询,每个查询从 SQL MI 检索一部分数据。

建议同时启用并行复制和数据分区,尤其是从 SQL MI 加载大量数据时。 下面是适用于不同方案的建议配置。 将数据复制到基于文件的数据存储中时,建议将数据作为多个文件写入文件夹(仅指定文件夹名称),在这种情况下,性能优于写入单个文件。

方案 建议的设置
从包含物理分区的大型表进行完整加载。 分区选项:表的物理分区。

在执行期间,该服务将自动检测物理分区并按分区复制数据。

若要检查表是否有物理分区,可参考此查询
从不包含物理分区但包含用于数据分区的整数或日期时间列的大型表进行完整加载。 分区选项:动态范围分区。
分区列(可选):指定用于对数据进行分区的列。 如果未指定,将使用索引或主键列。
分区上限和分区下限(可选) :指定是否要确定分区步幅。 这不适用于筛选表中的行,表中的所有行都将进行分区和复制。 如果未指定,复制活动会自动检测这些值。

例如,如果分区列“ID”的值范围为 1 至 100,并且将此值的下限设置为 20、上限设置为 80,并行复制设置为 4,服务将按 4 个分区(分区的 ID 范围分别为 <=20、[21, 50]、[51, 80] 和 >=81)检索数据。
使用自定义查询从不包含物理分区但包含用于数据分区的整数或日期/日期时间列的表加载大量数据。 分区选项:动态范围分区。
查询SELECT * FROM <TableName> WHERE ?AdfDynamicRangePartitionCondition AND <your_additional_where_clause>
分区列:指定用于对数据进行分区的列。
分区上限和分区下限(可选) :指定是否要确定分区步幅。 这不适用于筛选表中的行,查询结果中的所有行都将进行分区和复制。 如果未指定,复制活动会自动检测该值。

在执行期间,该服务会将 ?AdfRangePartitionColumnName 替换为每个分区的实际列名称和值范围,并发送到 SQL MI。
例如,如果分区列“ID”的值范围为 1 至 100,并且将此值的下限设置为 20、上限设置为 80,并行复制设置为 4,服务将按 4 个分区(分区的 ID 范围分别为 <=20、[21, 50]、[51, 80] 和 >=81)检索数据。

下面是针对不同场景的更多示例查询:
1.查询整个表:
SELECT * FROM <TableName> WHERE ?AdfDynamicRangePartitionCondition
2.使用列选择和附加的 where 子句筛选器从表中查询:
SELECT <column_list> FROM <TableName> WHERE ?AdfDynamicRangePartitionCondition AND <your_additional_where_clause>
3.使用子查询进行查询:
SELECT <column_list> FROM (<your_sub_query>) AS T WHERE ?AdfDynamicRangePartitionCondition AND <your_additional_where_clause>
4.在子查询中使用分区查询:
SELECT <column_list> FROM (SELECT <your_sub_query_column_list> FROM <TableName> WHERE ?AdfDynamicRangePartitionCondition) AS T

使用分区选项加载数据的最佳做法:

  1. 选择独特的列作为分区列(如主键或唯一键),以避免数据倾斜。
  2. 如果表具有内置分区,请使用名为“表的物理分区”分区选项来提升性能。
  3. 如果使用 Azure Integration Runtime 复制数据,则可设置较大的“数据集成单元 (DIU)”(>4) 以利用更多计算资源。 检查此处适用的方案。
  4. 复制并行度”可控制分区数量,将此数字设置得太大有时会损害性能,建议将此数字设置按以下公式计算的值:(DIU 或自承载 IR 节点数)*(2 到 4)。

示例:从包含物理分区的大型表进行完整加载

"source": {
    "type": "SqlMISource",
    "partitionOption": "PhysicalPartitionsOfTable"
}

示例:使用动态范围分区进行查询

"source": {
    "type": "SqlMISource",
    "query": "SELECT * FROM <TableName> WHERE ?AdfDynamicRangePartitionCondition AND <your_additional_where_clause>",
    "partitionOption": "DynamicRange",
    "partitionSettings": {
        "partitionColumnName": "<partition_column_name>",
        "partitionUpperBound": "<upper_value_of_partition_column (optional) to decide the partition stride, not as data filter>",
        "partitionLowerBound": "<lower_value_of_partition_column (optional) to decide the partition stride, not as data filter>"
    }
}

检查物理分区的示例查询

SELECT DISTINCT s.name AS SchemaName, t.name AS TableName, pf.name AS PartitionFunctionName, c.name AS ColumnName, iif(pf.name is null, 'no', 'yes') AS HasPartition
FROM sys.tables AS t
LEFT JOIN sys.objects AS o ON t.object_id = o.object_id
LEFT JOIN sys.schemas AS s ON o.schema_id = s.schema_id
LEFT JOIN sys.indexes AS i ON t.object_id = i.object_id 
LEFT JOIN sys.index_columns AS ic ON ic.partition_ordinal > 0 AND ic.index_id = i.index_id AND ic.object_id = t.object_id 
LEFT JOIN sys.columns AS c ON c.object_id = ic.object_id AND c.column_id = ic.column_id 
LEFT JOIN sys.partition_schemes ps ON i.data_space_id = ps.data_space_id 
LEFT JOIN sys.partition_functions pf ON pf.function_id = ps.function_id 
WHERE s.name='[your schema]' AND t.name = '[your table name]'

如果表具有物理分区,你可看到“HasPartition”为“是”,如下所示。

Sql query result

将数据加载到 SQL 托管实例中的最佳做法

将数据复制到 SQL 托管实例中时,可能需要不同的写入行为:

有关配置步骤和最佳做法,请参阅相应的部分。

追加数据

追加数据是此 SQL 托管实例接收器连接器的默认行为。 该服务执行批量插入,以有效地在表中写入数据。 可以相应地在复制活动中配置源和接收器。

更新插入数据

复制活动现在支持以本机方式将数据加载到数据库临时表中,并在键存在时更新接收器表中的数据,不存在时插入新数据。 若要详细了解复制活动中的更新插入设置,请参阅 以 SQL 托管实例作为接收器

覆盖整个表

可以在复制活动接收器中配置 preCopyScript 属性。 在这种情况下,对于运行的每个复制活动,该服务首先运行脚本。 然后,运行复制来插入数据。 例如,若要使用最新数据覆盖整个表,请指定一个脚本,以先删除所有记录,然后从源批量加载新数据。

使用自定义逻辑写入数据

使用自定义逻辑写入数据的步骤与更新插入数据部分中的描述类似。 如果在将源数据最终插入目标表之前需要应用额外的处理,则可先将数据加载到临时表,然后再调用存储过程活动,或者在复制活动接收器中调用存储过程来应用数据。

调用 SQL 接收器的存储过程

将数据复制到 SQL 托管实例中时,还可以通过对每批的源表使用更多参数来配置和调用用户指定的存储过程。 存储过程功能利用表值参数

当内置复制机制无法使用时,还可使用存储过程。 例如,在将源数据最终插入目标表之前应用额外的处理。 额外处理的示例包括合并列、查找其他值以及将数据插入多个表。

以下示例演示如何使用存储过程,在 SQL Server 数据库中的表内执行 upsert。 假设输入数据和接收器 Marketing 表各有三列:ProfileIDStateCategory。 基于 ProfileID 列执行更新插入,并仅将其应用于名为“ProductA”的特定类别。

  1. 在数据库中,使用与 sqlWriterTableType 相同的名称定义表类型。 表类型的架构与输入数据返回的架构相同。

    CREATE TYPE [dbo].[MarketingType] AS TABLE(
        [ProfileID] [varchar](256) NOT NULL,
        [State] [varchar](256) NOT NULL,
        [Category] [varchar](256) NOT NULL
    )
    
  2. 在数据库中,使用与 sqlWriterStoredProcedureName 相同的名称定义存储过程。 它可处理来自指定源的输入数据,并将其合并到输出表中。 存储过程中的表类型的参数名称与数据集中定义的 tableName 相同。

    CREATE PROCEDURE spOverwriteMarketing @Marketing [dbo].[MarketingType] READONLY, @category varchar(256)
    AS
    BEGIN
    MERGE [dbo].[Marketing] AS target
    USING @Marketing AS source
    ON (target.ProfileID = source.ProfileID and target.Category = @category)
    WHEN MATCHED THEN
        UPDATE SET State = source.State
    WHEN NOT MATCHED THEN
        INSERT (ProfileID, State, Category)
        VALUES (source.ProfileID, source.State, source.Category);
    END
    
  3. 在管道中,在复制活动中定义 SQL MI 接收器部分,如下所示:

    "sink": {
        "type": "SqlMISink",
        "sqlWriterStoredProcedureName": "spOverwriteMarketing",
        "storedProcedureTableTypeParameterName": "Marketing",
        "sqlWriterTableType": "MarketingType",
        "storedProcedureParameters": {
            "category": {
                "value": "ProductA"
            }
        }
    }
    

映射数据流属性

在映射数据流中转换数据时,可以在 Azure SQL 托管实例中读取表以及将数据写入表。 有关详细信息,请参阅映射数据流中的源转换接收器转换

源转换

下表列出了 Azure SQL 托管实例源支持的属性。 你可以在“源选项”选项卡中编辑这些属性。

名称 说明 必需 允许的值 数据流脚本属性
如果你选择“表”作为输入,则数据流会从数据集中指定的表提取所有数据。 - -
查询 如果你选择“查询”作为输入,请指定一个用来从源提取数据的 SQL 查询,这将替代在数据集中指定的任何表。 使用查询是一个好方法,它可以减少用于测试或查找的行数。

不支持 Order By 子句,但你可以设置完整的 SELECT FROM 语句。 还可以使用用户定义的表函数。 select * from udfGetData() 是 SQL 中的一个 UDF,它返回你可以在数据流中使用的表。
查询示例:Select * from MyTable where customerId > 1000 and customerId < 2000
字符串 query
批大小 指定批大小,以将大型数据分成多个读取操作。 Integer batchSize
隔离级别 选择下列隔离级别之一:
- 读取已提交的内容
- 读取未提交的内容(默认)
- 可重复的读取
- 可序列化
- 无(忽略隔离级别)
READ_COMMITTED
READ_UNCOMMITTED
REPEATABLE_READ
SERIALIZABLE
NONE
isolationLevel
启用增量提取 使用此选项告知 ADF 仅处理自上次执行管道以来已更改的行。 - -
增量列 使用增量提取功能时,必须选择要用作源表中水印的日期/时间或数字列。 - -
启用本机变更数据捕获(预览) 使用此选项告知 ADF 仅处理 SQL 变更数据捕获技术捕获的增量数据(自上次执行管道以来)。 使用此选项时,包括行插入、更新和删除在内的增量数据将自动加载,而无需任何增量列。 在 ADF 中使用此选项前,需要在 Azure SQL MI 上启用变更数据捕获。 有关 ADF 中此选项的详细信息,请参阅本机变更数据捕获 - -
从头开始读取 使用增量提取设置此选项将指示 ADF 在首次执行具有增量提取的管道时读取所有行。 - -

提示

在映射数据流查询模式下不支持 SQL 中的公用表表达式 (CTE),因为使用此模式的先决条件是在 SQL 查询 FROM 子句中可以使用查询,但 CTE 无法这样做。 要使用 CTE,你需要使用以下查询创建存储过程:

CREATE PROC CTESP @query nvarchar(max)
AS
BEGIN
EXECUTE sp_executesql @query;
END

然后在映射数据流的源转换中使用存储过程模式,然后设置类似 @query 的示例 with CTE as (select 'test' as a) select * from CTE。 然后你便可以按照预期使用 CTE。

Azure SQL 托管实例源脚本示例

使用 Azure SQL 托管实例作为源类型时,关联的数据流脚本为:

source(allowSchemaDrift: true,
    validateSchema: false,
    isolationLevel: 'READ_UNCOMMITTED',
    query: 'select * from MYTABLE',
    format: 'query') ~> SQLMISource

接收器转换

下表列出了 Azure SQL 托管实例接收器支持的属性。 可以在“接收器选项”选项卡中编辑这些属性。

名称 说明 必需 允许的值 数据流脚本属性
Update 方法 指定数据库目标上允许哪些操作。 默认设置为仅允许插入。
若要更新、更新插入或删除行,需要进行“更改行”转换才能标记这些操作的行。
truefalse deletable
insertable
updateable
upsertable
键列 对于更新、更新插入和删除操作,必须设置键列来确定要更改的行。
后续的更新、更新插入和删除将使用你选取为密钥的列名称。 因此,你必须选取存在于接收器映射中的列。
Array 密钥
跳过写入键列 如果你不希望将值写入到键列,请选择“跳过写入键列”。 truefalse skipKeyWrites
表操作 确定在写入之前是否从目标表重新创建或删除所有行。
- :不会对表进行任何操作。
- 重新创建:将删除表并重新创建表。 如果以动态方式创建表,则是必需的。
- 截断:将删除目标表中的所有行。
truefalse recreate
truncate
批大小 指定每批中写入的行数。 较大的批大小可提高压缩比并改进内存优化,但在缓存数据时可能会导致内存不足异常。 Integer batchSize
预处理和后处理 SQL 脚本 指定在将数据写入接收器数据库之前(预处理)和之后(后处理)会执行的多行 SQL 脚本。 字符串 preSQLs
postSQLs

提示

  1. 建议将包含多个命令的单个批处理脚本拆分为多个批处理。
  2. 只有返回简单更新计数的数据定义语言 (Data Definition Language, DDL) 和数据操作语言 (Data Manipulation Language, DML) 语句可作为批处理的一部分运行。 在执行批量操作中了解详情

Azure SQL 托管实例接收器脚本示例

使用 Azure SQL 托管实例作为接收器类型时,关联的数据流脚本为:

IncomingStream sink(allowSchemaDrift: true,
    validateSchema: false,
    deletable:false,
    insertable:true,
    updateable:true,
    upsertable:true,
    keys:['keyColumn'],
    format: 'table',
    skipDuplicateMapInputs: true,
    skipDuplicateMapOutputs: true) ~> SQLMISink

Lookup 活动属性

若要了解有关属性的详细信息,请查看 Lookup 活动

GetMetadata 活动属性

若要了解有关属性的详细信息,请查看 GetMetadata 活动

SQL 托管实例的数据类型映射

使用复制活动向/从 SQL 托管实例复制数据时,会使用以下从 SQL 托管实例数据类型到服务内部使用的临时数据类型映射。 若要了解复制活动如何从源架构和数据类型映射到接收器,请参阅架构和数据类型映射

SQL 托管实例数据类型 临时服务数据类型
bigint Int64
binary Byte[]
bit Boolean
char String, Char[]
date DateTime
datetime DateTime
datetime2 DateTime
Datetimeoffset DateTimeOffset
Decimal Decimal
FILESTREAM attribute (varbinary(max)) Byte[]
Float Double
image Byte[]
int Int32
money Decimal
nchar String, Char[]
ntext String, Char[]
numeric Decimal
nvarchar String, Char[]
real Single
rowversion Byte[]
smalldatetime DateTime
smallint Int16
smallmoney 小数
sql_variant Object
text String, Char[]
time TimeSpan
timestamp Byte[]
tinyint Int16
uniqueidentifier Guid
varbinary Byte[]
varchar String, Char[]
xml String

注意

对于映射到十进制临时类型的数据类型,目前复制活动支持的最大精度为 28。 如果数据需要的精度大于 28,请考虑在 SQL 查询中将其转换为字符串。

使用 Always Encrypted

使用 Always Encrypted 从/向 SQL 托管实例复制数据时,请执行以下步骤:

  1. 列主密钥 (CMK) 存储在 Azure 密钥保管库中。 详细了解如何使用 Azure 密钥保管库配置 Always Encrypted

  2. 确保授予对存储了列主密钥 (CMK) 的密钥保管库的访问权限。 有关所需的权限,请参阅此文

  3. 创建链接服务,以使用托管标识或服务主体连接到 SQL 数据库并启用“Always Encrypted”功能。

注意

SQL 托管实例 Always Encrypted 支持以下场景:

  1. 源或接收器数据存储使用托管标识或服务主体作为密钥提供程序身份验证类型。
  2. 源和接收器数据存储都使用托管标识作为密钥提供程序身份验证类型。
  3. 源和接收器数据存储都使用同一个服务主体作为密钥提供程序身份验证类型。

注意

目前,SQL 托管实例 Always Encrypted 仅支持映射数据流中的源转换。

本机变更数据捕获

Azure 数据工厂可针对 SQL Server、Azure SQL DB 和 Azure SQL MI 支持本机变更数据捕获功能。 可以通过 ADF 映射数据流自动检测和提取 SQL 存储中的行插入、更新和删除等变更数据。 由于映射数据流没有代码体验,用户可以通过将数据库追加为目标存储来轻松实现 SQL 存储中的数据复制方案。 更重要的是,用户还可以在两者之间撰写任何数据转换逻辑,以便从 SQL 存储实现增量 ETL 方案。

请确保管道和活动名称保持不变,以便 ADF 可以记录检查点,从而自动从上次运行中获取更改的数据。 如果更改管道名称或活动名称,检查点将重置,进而导致从头开始或是在下一次运行中开始获取现在开始的更改数据。 如果要更改管道名称或活动名称,但仍保留检查点,以自动从上次运行中获取变更的数据,请使用自己的数据流活动中的检查点密钥来实现此目的。

调试管道时,此功能都以相同的方式工作。 请注意,在调试运行期间刷新浏览器时,检查点将重置。 若对调试运行中的管道结果感到满意,可继续发布并触发管道。 首次触发已发布管道时,管道将自动从头开始重启,或者立即开始获取更改数据。

在监视部分,你始终有机会重新运行管道。 执行此操作时,始终可从所选管道运行的上一个检查点捕获已更改的数据。

示例 1:

将引用到已启用 SQL CDC 的数据集的源转换与映射数据流中引用到数据库的接收器转换直接链接起来时,SQL 源上发生的变更将自动应用到目标数据库,这样很轻松就可以获得数据库之间的数据复制方案。 可以使用接收器转换中的更新方法选择是否在目标数据库上允许插入、允许更新或允许删除。 映射数据流中的示例脚本如下所示。

source(output(
		id as integer,
		name as string
	),
	allowSchemaDrift: true,
	validateSchema: false,
	enableNativeCdc: true,
	netChanges: true,
	skipInitialLoad: false,
	isolationLevel: 'READ_UNCOMMITTED',
	format: 'table') ~> source1
source1 sink(allowSchemaDrift: true,
	validateSchema: false,
	deletable:true,
	insertable:true,
	updateable:true,
	upsertable:true,
	keys:['id'],
	format: 'table',
	skipDuplicateMapInputs: true,
	skipDuplicateMapOutputs: true,
	errorHandlingOption: 'stopOnFirstError') ~> sink1

示例 2:

如果想启用 ETL 方案,而不是通过 SQL CDC 在数据库之间复制数据,则可以使用映射数据流中的表达式,包括 isInsert(1)、isUpdate(1) 和 isDelete(1) 来区分具有不同操作类型的行。 下面是映射数据流的示例脚本之一,它派生一个列以及相关值:1 表示插入的行,2 表示更新的行,3 表示删除的行,用于下游转换以处理增量数据。

source(output(
		id as integer,
		name as string
	),
	allowSchemaDrift: true,
	validateSchema: false,
	enableNativeCdc: true,
	netChanges: true,
	skipInitialLoad: false,
	isolationLevel: 'READ_UNCOMMITTED',
	format: 'table') ~> source1
source1 derive(operationType = iif(isInsert(1), 1, iif(isUpdate(1), 2, 3))) ~> derivedColumn1
derivedColumn1 sink(allowSchemaDrift: true,
	validateSchema: false,
	skipDuplicateMapInputs: true,
	skipDuplicateMapOutputs: true) ~> sink1

已知限制:

有关复制活动支持作为源和接收器的数据存储的列表,请参阅受支持的数据存储