Observação
O acesso a essa página exige autorização. Você pode tentar entrar ou alterar diretórios.
O acesso a essa página exige autorização. Você pode tentar alterar os diretórios.
适用于:
Azure 数据工厂
Azure Synapse Analytics
本文概述了如何在Azure 数据工厂或Azure Synapse管道中使用复制活动从/向Azure SQL 数据库复制数据,并使用数据流转换Azure SQL 数据库中的数据。 若要了解详细信息,请阅读 Azure 数据工厂 或 Azure Synapse Analytics的介绍性文章。
支持的功能
以下功能支持此Azure SQL 数据库连接器:
| 支持的功能 | IR | 托管私有终结点 |
|---|---|---|
| 复制活动 (源/接收器) | (1) (2) | ✓ |
| 映射数据流(源/汇) | ① | ✓ |
| 查询活动 | (1) (2) | ✓ |
| GetMetadata 活动 | (1) (2) | ✓ |
| 脚本活动 | (1) (2) | ✓ |
| 存储过程活动 | (1) (2) | ✓ |
(1) Azure集成运行时 (2) 自承载集成运行时
对于复制活动,此Azure SQL 数据库连接器支持以下功能:
- 使用 SQL 身份验证复制数据,并借助 Azure 资源的服务主体或托管标识进行 Microsoft Entra 应用程序令牌身份验证。
- 作为源,使用 SQL 查询或存储过程检索数据。 还可以选择从 Azure SQL 数据库源进行并行复制,有关详细信息,请参阅 SQL 数据库的 并行复制部分。
- 作为接收器,根据源架构自动创建目标表(如果不存在);在复制过程中,将数据追加到表或使用自定义逻辑调用存储过程。
如果使用 Azure SQL 数据库 无服务器层,请注意,暂停服务器时,活动运行会失败,而不是等待自动恢复准备就绪。 您可以添加活动重试功能或链接其他活动,以确保服务器在实际执行时保持活跃状态。
重要
如果使用 Azure 集成运行时复制数据,请配置 server 级防火墙规则以便Azure服务可以访问服务器。 如果使用自承载集成运行时复制数据,请将防火墙配置为允许合适的 IP 范围。 此范围包括用于连接到Azure SQL 数据库的计算机的 IP。
开始
若要使用管道执行复制活动,可以使用以下工具或 SDK 之一:
使用 UI 创建Azure SQL 数据库链接服务
使用以下步骤在Azure门户 UI 中创建Azure SQL 数据库链接服务。
浏览到Azure 数据工厂或 Synapse 工作区中的“管理”选项卡并选择“链接服务”,然后单击“新建”:
搜索 SQL 并选择 Azure SQL 数据库 连接器。
配置服务详细信息、测试连接并创建新的链接服务。
连接器配置详细信息
以下部分提供有关用于定义特定于Azure SQL 数据库连接器的Azure 数据工厂或 Synapse 管道实体的属性的详细信息。
连接的服务属性
Azure SQL 数据库连接器 Recommended 版本支持 TLS 1.3。 请参阅此 section,从 Legacy 1 升级Azure SQL 数据库连接器版本。 有关属性详细信息,请参阅相应部分。
提示
如果遇到错误代码为“UserErrorFailedToConnectToSqlServer”的错误,以及“数据库的会话限制为 XXX 且已达到”的消息,请向连接字符串添加 Pooling=false,然后重试。
Pooling=false还建议用于 自托管集成运行时类型的链接服务配置。 在创建链接服务时,可以在“其他连接属性”部分中将连接池和其他连接参数作为新的参数名称和值添加。
建议的版本
应用 Recommended 版本时,Azure SQL 数据库链接服务支持这些泛型属性:
| 属性 | 描述 | 必需 |
|---|---|---|
| 类型 | type 属性必须设置为 AzureSqlDatabase 。 | 是 |
| 服务器 | 要连接到的 SQL Server 实例的名称或网络地址。 | 是 |
| 数据库 | 数据库的名称。 | 是 |
| 认证类型 | 用于身份验证的类型。 允许的值为 SQL(默认值)、ServicePrincipal、SystemAssignedManagedIdentity、UserAssignedManagedIdentity。 转到有关特定属性和先决条件的相关身份验证部分。 | 是 |
| 始终加密设置 | 指定所需的 alwaysencryptedsettings 信息来启用 Always Encrypted,以使用托管标识或服务主体保护 SQL Server 中存储的敏感数据。 有关详细信息,请参阅表格后面的 JSON 示例以及使用 Always Encrypted 部分。 如果不指定此属性,将禁用默认的 Always Encrypted 设置。 | 否 |
| 加密 | 指示客户端和服务器之间发送的所有数据是否需要 TLS 加密。 选项:必需(对于 true,默认值)/可选(对于 false)/严格。 | 否 |
| trustServerCertificate | 指示是否在绕过验证信任的证书链时加密通道。 | 否 |
| 证书中的主机名称 | 验证连接的服务器证书时要使用的主机名。 如果未指定,则服务器名称用于证书验证。 | 否 |
| connectVia | 此集成运行时用于连接到数据存储。 如果数据存储位于专用网络中,则可以使用Azure集成运行时或自承载集成运行时。 如果未指定,则使用默认Azure集成运行时。 | 否 |
有关其他连接属性,请查看下表:
| 属性 | 描述 | 必需 |
|---|---|---|
| 应用意图 | 连接到服务器时的应用程序工作负载类型。 允许的值为 ReadOnly 和 ReadWrite。 |
否 |
| connectTimeout | 在终止尝试并生成错误之前等待与服务器建立连接的时间(以秒为单位)。 | 否 |
| 连接重试次数 (connectRetryCount) | 识别空闲连接失败后尝试的重新连接次数。 该值应为介于 0 到 255 之间的整数。 | 否 |
| connectRetryInterval | 识别空闲连接失败后,每次重新连接尝试之间的时间(以秒为单位)。 该值应为介于 1 到 60 之间的整数。 | 否 |
| loadBalanceTimeout | 在连接被断开之前,连接在连接池中存在的最短时间(以秒为单位)。 | 否 |
| commandTimeout | 在终止尝试执行命令并生成错误之前的默认等待时间(以秒为单位)。 | 否 |
| integratedSecurity | 允许的值为 true 或 false。 指定 false 时,指示是否在连接中指定了 userName 和密码。 指定 true 时,指示当前Windows帐户凭据是否用于身份验证。 |
否 |
| failoverPartner | 主服务器关闭时要连接到的伙伴服务器的名称或地址。 | 否 |
| 最大池大小 | 特定连接的连接池中允许的最大连接数。 | 否 |
| minPoolSize | 特定连接的连接池中允许的最小连接数。 | 否 |
| multipleActiveResultSets | 允许的值为 true 或 false。 指定 true 时,应用程序可维护多重活动结果集 (MARS)。 指定 false 时,应用程序必须先处理或取消从某一批处理生成的所有结果集,然后才能对该连接执行任何其他批处理。 |
否 |
| multiSubnetFailover | 允许的值为 true 或 false。 如果你的应用程序要连接到不同子网上的 AlwaysOn 可用性组 (AG),那么将此属性设置为 true 会加快检测和连接到当前活动服务器。 |
否 |
| 数据包大小 | 用于与服务器实例通信的网络数据包的大小(字节数)。 | 否 |
| 池化 | 允许的值为 true 或 false。 指定 true 时,连接将共用。 指定 false 时,每次请求连接时都会显式打开连接。 |
否 |
SQL 身份验证
若要使用 SQL 身份验证,除了前面部分所述的通用属性,还指定以下属性:
| 属性 | 描述 | 必需 |
|---|---|---|
| userName | 用于连接到服务器的用户名。 | 是 |
| 密码 | 对应于用户名的密码。 将此字段标记为 SecureString 以安全存储它。 或者,可以引用存储在 Azure 密钥保管库 中的机密。 | 是 |
示例:使用 SQL 身份验证
{
"name": "AzureSqlDbLinkedService",
"properties": {
"type": "AzureSqlDatabase",
"typeProperties": {
"server": "<name or network address of the SQL server instance>",
"database": "<database name>",
"encrypt": "<encrypt>",
"trustServerCertificate": false,
"authenticationType": "SQL",
"userName": "<user name>",
"password": {
"type": "SecureString",
"value": "<password>"
}
},
"connectVia": {
"referenceName": "<name of Integration Runtime>",
"type": "IntegrationRuntimeReference"
}
}
}
{
"name": "AzureSqlDbLinkedService",
"properties": {
"type": "AzureSqlDatabase",
"typeProperties": {
"server": "<name or network address of the SQL server instance>",
"database": "<database name>",
"encrypt": "<encrypt>",
"trustServerCertificate": false,
"authenticationType": "SQL",
"userName": "<user name>",
"password": {
"type": "AzureKeyVaultSecret",
"store": {
"referenceName": "<Azure Key Vault linked service name>",
"type": "LinkedServiceReference"
},
"secretName": "<secretName>"
}
},
"connectVia": {
"referenceName": "<name of Integration Runtime>",
"type": "IntegrationRuntimeReference"
}
}
}
示例:使用 Always Encrypted
{
"name": "AzureSqlDbLinkedService",
"properties": {
"type": "AzureSqlDatabase",
"typeProperties": {
"server": "<name or network address of the SQL server instance>",
"database": "<database name>",
"encrypt": "<encrypt>",
"trustServerCertificate": false,
"authenticationType": "SQL",
"userName": "<user name>",
"password": {
"type": "SecureString",
"value": "<password>"
},
"alwaysEncryptedSettings": {
"alwaysEncryptedAkvAuthType": "ServicePrincipal",
"servicePrincipalId": "<service principal id>",
"servicePrincipalKey": {
"type": "SecureString",
"value": "<service principal key>"
}
}
},
"connectVia": {
"referenceName": "<name of Integration Runtime>",
"type": "IntegrationRuntimeReference"
}
}
}
服务主体身份验证
要使用服务主体身份验证,除了上一部分中描述的通用属性外,还需要指定以下属性:
| 属性 | 描述 | 必需 |
|---|---|---|
| servicePrincipalId | 指定应用程序的客户端 ID。 | 是 |
| servicePrincipalCredential | 服务主体凭据。 指定应用程序的密钥。 将此字段标记为 SecureString 以安全地存储它,或引用存储在 Azure 密钥保管库 中的机密。 | 是 |
| 租户 | 指定应用程序所在的租户的信息(例如域名或租户 ID)。 通过将鼠标悬停在Azure门户右上角来检索它。 | 是 |
| azureCloudType | 对于服务主体身份验证,请指定注册Microsoft Entra应用程序的Azure云环境的类型。 默认情况下,使用数据工厂或 Synapse 管道的云环境。 |
否 |
还需要执行以下步骤:
从Azure门户创建Microsoft Entra应用程序。 记下应用程序名称,以及以下定义链接服务的值:
- 应用程序 ID
- 应用程序密钥
- 租户 ID
在Azure门户中为服务器预配Microsoft Entra管理员(如果尚未这样做)。 Microsoft Entra管理员必须是Microsoft Entra用户或Microsoft Entra组,但不能是服务主体。 完成此步骤后,在下一步中,可以使用Microsoft Entra标识为服务主体创建包含的数据库用户。
为服务主体创建包含的数据库用户。 使用类似SQL Server Management Studio的工具,通过具有至少“ALTER ANY USER”权限的Microsoft Entra身份,连接到要复制数据的目标或源数据库。 运行以下 T-SQL:
CREATE USER [your application name] FROM EXTERNAL PROVIDER;像通常对 SQL 用户或其他用户所做的那样向服务主体授予所需的权限。 运行以下代码。 有关更多选项,请参阅此文档。
ALTER ROLE [role name] ADD MEMBER [your application name];在 Azure 数据工厂 或 Synapse 工作区中配置Azure SQL 数据库链接服务。
使用服务主体身份验证的链接服务示例
{
"name": "AzureSqlDbLinkedService",
"properties": {
"type": "AzureSqlDatabase",
"typeProperties": {
"server": "<name or network address of the SQL server instance>",
"database": "<database name>",
"encrypt": "<encrypt>",
"trustServerCertificate": false,
"hostNameInCertificate": "<host name>",
"authenticationType": "ServicePrincipal",
"servicePrincipalId": "<service principal id>",
"servicePrincipalCredential": {
"type": "SecureString",
"value": "<application key>"
},
"tenant": "<tenant info, e.g. microsoft.partner.onmschina.cn>"
},
"connectVia": {
"referenceName": "<name of Integration Runtime>",
"type": "IntegrationRuntimeReference"
}
}
}
系统分配的托管身份验证
数据工厂或 Synapse 工作区可与 Azure 资源的 系统分配的托管标识相关联,该标识在向Azure中的其他资源进行身份验证时表示服务。 可以使用此托管标识进行Azure SQL 数据库身份验证。 指定的工厂或 Synapse 工作区可以使用此标识访问数据库数据或从/向数据库复制数据。
若要使用系统分配的托管标识身份验证,请指定上一部分所述的通用属性,然后按照以下步骤操作。
在Azure门户中为服务器预配Microsoft Entra管理员(如果尚未这样做)。 Microsoft Entra管理员可以是Microsoft Entra用户或Microsoft Entra组。 如果授予具有托管标识的组管理员角色,则可跳过步骤 3 和 4。 管理员拥有对数据库的完全访问权限。
为托管标识创建包含的数据库用户。 使用类似SQL Server Management Studio的工具,通过具有至少“ALTER ANY USER”权限的Microsoft Entra身份,连接到要复制数据的目标或源数据库。 运行以下 T-SQL:
CREATE USER [your_resource_name] FROM EXTERNAL PROVIDER;授予托管标识所需的权限,就像通常为 SQL 用户和其他用户授予权限时所做的一样。 运行以下代码。 有关更多选项,请参阅此文档。
ALTER ROLE [role name] ADD MEMBER [your_resource_name];配置Azure SQL 数据库链接服务。
示例
{
"name": "AzureSqlDbLinkedService",
"properties": {
"type": "AzureSqlDatabase",
"typeProperties": {
"server": "<name or network address of the SQL server instance>",
"database": "<database name>",
"encrypt": "<encrypt>",
"trustServerCertificate": false,
"authenticationType": "SystemAssignedManagedIdentity"
},
"connectVia": {
"referenceName": "<name of Integration Runtime>",
"type": "IntegrationRuntimeReference"
}
}
}
用户分配的托管标识身份验证
数据工厂或 Synapse 工作区可以与用户分配的托管标识相关联,该标识在向Azure中的其他资源进行身份验证时表示服务。 可以使用此托管标识进行Azure SQL 数据库身份验证。 指定的工厂或 Synapse 工作区可以使用此标识访问数据库数据或从/向数据库复制数据。
要使用用户分配的托管标识身份验证,除了上一部分中描述的通用属性外,还要指定以下属性:
| 属性 | 描述 | 必需 |
|---|---|---|
| 凭据 | 将用户分配的托管标识指定为凭据对象。 | 是 |
还需要执行以下步骤:
在Azure门户中为服务器预配Microsoft Entra管理员(如果尚未这样做)。 Microsoft Entra管理员可以是Microsoft Entra用户或Microsoft Entra组。 如果为具有用户分配的托管标识的组授予管理员角色,请跳过步骤 3。 管理员拥有对数据库的完全访问权限。
为用户分配的托管标识创建独立的数据库用户。 使用类似SQL Server Management Studio的工具,通过具有至少“ALTER ANY USER”权限的Microsoft Entra身份,连接到要复制数据的目标或源数据库。 运行以下 T-SQL:
CREATE USER [your_resource_name] FROM EXTERNAL PROVIDER;创建一个或多个用户分配的托管标识,并为用户分配的托管标识授予所需的权限,就像通常为 SQL 用户和其他用户所做的那样。 运行以下代码。 有关更多选项,请参阅此文档。
ALTER ROLE [role name] ADD MEMBER [your_resource_name];为数据工厂分配一个或多个用户分配的托管标识,并为每个用户分配的托管标识创建凭据。
配置Azure SQL 数据库链接服务。
示例
{
"name": "AzureSqlDbLinkedService",
"properties": {
"type": "AzureSqlDatabase",
"typeProperties": {
"server": "<name or network address of the SQL server instance>",
"database": "<database name>",
"encrypt": "<encrypt>",
"trustServerCertificate": false,
"authenticationType": "UserAssignedManagedIdentity",
"credential": {
"referenceName": "credential1",
"type": "CredentialReference"
}
},
"connectVia": {
"referenceName": "<name of Integration Runtime>",
"type": "IntegrationRuntimeReference"
}
}
}
旧版本
应用 Legacy 版本时,Azure SQL 数据库链接服务支持这些泛型属性:
| 属性 | 描述 | 必需 |
|---|---|---|
| 类型 | type 属性必须设置为 AzureSqlDatabase 。 | 是 |
| connectionString | 指定用于连接到 Azure SQL 数据库 实例的 connectionString 属性所需的信息。 还可以将密码或服务主体密钥放入Azure 密钥保管库。 如果是 SQL 身份验证,请从连接字符串中拉取 password 配置。 有关详细信息,请参阅 |
是 |
| 始终加密设置 | 指定所需的 alwaysencryptedsettings 信息来启用 Always Encrypted,以使用托管标识或服务主体保护 SQL Server 中存储的敏感数据。 有关详细信息,请参阅使用 Always Encrypted 部分。 如果不指定此属性,将禁用默认的 Always Encrypted 设置。 | 否 |
| connectVia | 此集成运行时用于连接到数据存储。 如果数据存储位于专用网络中,则可以使用Azure集成运行时或自承载集成运行时。 如果未指定,则使用默认Azure集成运行时。 | 否 |
有关各种身份验证类型,请参阅以下部分,分别了解特定属性和先决条件:
旧版的 SQL 身份验证
要使用 SQL 身份验证,请指定前面部分所述的通用属性。
旧版本的服务主体身份验证
要使用服务主体身份验证,除了上一部分中描述的通用属性外,还需要指定以下属性:
| 属性 | 描述 | 必需 |
|---|---|---|
| servicePrincipalId | 指定应用程序的客户端 ID。 | 是 |
| servicePrincipalKey | 指定应用程序的密钥。 将此字段标记为 SecureString 以安全地存储该字段或引用存储在 Azure 密钥保管库 中的机密。 | 是 |
| 租户 | 指定应用程序所在的租户的信息(例如域名或租户 ID)。 通过将鼠标悬停在Azure门户右上角来检索它。 | 是 |
| azureCloudType | 对于服务主体身份验证,请指定注册Microsoft Entra应用程序的Azure云环境的类型。 默认情况下,使用数据工厂或 Synapse 管道的云环境。 |
否 |
还需要按照服务主体身份验证中的步骤授予相应的权限。
旧版本的系统分配的托管标识身份验证
若要使用系统分配的托管标识身份验证,请按照系统分配的托管标识身份验证中的推荐版本步骤进行操作。
遗留版本的用户指定的托管标识身份验证
若要使用用户分配的托管标识身份验证,请按照用户分配的托管标识身份验证,按照推荐的版本执行相同的步骤。
数据集属性
有关可用于定义数据集的各个节和属性的完整列表,请参阅数据集。
Azure SQL 数据库数据集支持以下属性:
| 属性 | 描述 | 必需 |
|---|---|---|
| 类型 | 数据集的 type 属性必须设置为 AzureSqlTable 。 | 是 |
| 架构 | 架构的名称。 | 对于源为“No”,对于接收器为“Yes” |
| 表 | 表/视图的名称。 | 对于源为“No”,对于接收器为“Yes” |
| tableName | 具有架构的表/视图的名称。 此属性支持后向兼容性。 对于新的工作负荷,请使用 schema 和 table。 |
对于源为“No”,对于接收器为“Yes” |
数据集属性示例
{
"name": "AzureSQLDbDataset",
"properties":
{
"type": "AzureSqlTable",
"linkedServiceName": {
"referenceName": "<Azure SQL Database linked service name>",
"type": "LinkedServiceReference"
},
"schema": [ < physical schema, optional, retrievable during authoring > ],
"typeProperties": {
"schema": "<schema_name>",
"table": "<table_name>"
}
}
}
复制活动 属性
有关可用于定义活动的各个部分和属性的完整列表,请参阅管道。 本部分提供Azure SQL 数据库源和接收器支持的属性列表。
Azure SQL 数据库 作为源
提示
若要使用数据分区高效地从 Azure SQL 数据库加载数据,请参阅 Parallel copy from SQL database 了解详细信息。
若要从 Azure SQL 数据库 复制数据,复制活动 source 节支持以下属性:
| 属性 | 描述 | 必需 |
|---|---|---|
| 类型 | 复制活动源的 type 属性必须设置为 AzureSqlSource。 为了向后兼容,仍然支持“SqlSource”类型。 | 是 |
| sqlReaderQuery | 此属性使用自定义 SQL 查询来读取数据。 例如 select * from MyTable。 |
否 |
| sqlReaderStoredProcedureName | 从源表读取数据的存储过程的名称。 最后一个 SQL 语句必须是存储过程中的 SELECT 语句。 | 否 |
| 存储过程参数 | 存储过程的参数。 允许的值为名称或值对。 参数的名称和大小写必须与存储过程参数的名称和大小写匹配。 |
否 |
| 隔离级别 | 指定 SQL 源的事务锁定行为。 允许的值为:ReadCommitted、ReadUncommitted、RepeatableRead、Serializable、Snapshot 。 如果未指定,则使用数据库的默认隔离级别。 请参阅此文档了解更多详细信息。 | 否 |
| 分区选项 | 指定用于从Azure SQL 数据库加载数据的数据分区选项。 允许值包括:None(默认值)、PhysicalPartitionsOfTable 和 DynamicRange 。 启用分区选项(即,不是 None的情况时),从Azure SQL 数据库并发加载数据的并行度由复制活动中的parallelCopies设置所控制。 |
否 |
| 分区设置 | 指定数据分区的设置组。 当分区选项不是 None 时适用。 |
否 |
在 partitionSettings 下: |
||
| partitionColumnName | 以整数类型、日期类型或日期/时间类型(、int、smallint、bigint、date、smalldatetime、datetime 或 datetime2)指定源列的名称,范围分区将使用它进行并行复制。 如果未指定,系统会自动检测表的索引或主键并将其用作分区列。当分区选项是 DynamicRange 时适用。 如果使用查询来检索源数据,请在 WHERE 子句中挂接 ?DfDynamicRangePartitionCondition 。 有关示例,请参阅从 SQL 数据库进行并行复制部分。 |
否 |
| partitionUpperBound | 分区范围拆分的分区列的最大值。 此值用于决定分区步幅,不用于筛选表中的行。 将对表或查询结果中的所有行进行分区和复制。 如果未指定,复制活动会自动检测该值。 当分区选项是 DynamicRange 时适用。 有关示例,请参阅从 SQL 数据库进行并行复制部分。 |
否 |
| partitionLowerBound | 分区范围拆分的分区列的最小值。 此值用于决定分区步幅,不用于筛选表中的行。 将对表或查询结果中的所有行进行分区和复制。 如果未指定,复制活动会自动检测该值。 当分区选项是 DynamicRange 时适用。 有关示例,请参阅从 SQL 数据库进行并行复制部分。 |
否 |
请注意以下几点:
- 如果为 sqlReaderQuery 指定了 Azure Sql Source,复制活动将针对 Azure SQL 数据库 源运行此查询以获取数据。 也可通过指定 sqlReaderStoredProcedureName 和 storedProcedureParameters 来指定存储过程,前提是存储过程使用参数 。
- 在源中使用存储过程检索数据时,请注意,如果存储过程旨在当传入不同的参数值时返回不同的架构,则从 UI 导入架构时,或通过自动创建表的功能将数据复制到 SQL 数据库时,可能会遇到故障或出现意外的结果。
SQL 查询示例
"activities":[
{
"name": "CopyFromAzureSQLDatabase",
"type": "Copy",
"inputs": [
{
"referenceName": "<Azure SQL Database input dataset name>",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "<output dataset name>",
"type": "DatasetReference"
}
],
"typeProperties": {
"source": {
"type": "AzureSqlSource",
"sqlReaderQuery": "SELECT * FROM MyTable"
},
"sink": {
"type": "<sink type>"
}
}
}
]
存储过程示例
"activities":[
{
"name": "CopyFromAzureSQLDatabase",
"type": "Copy",
"inputs": [
{
"referenceName": "<Azure SQL Database input dataset name>",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "<output dataset name>",
"type": "DatasetReference"
}
],
"typeProperties": {
"source": {
"type": "AzureSqlSource",
"sqlReaderStoredProcedureName": "CopyTestSrcStoredProcedureWithParameters",
"storedProcedureParameters": {
"stringData": { "value": "str3" },
"identifier": { "value": "$$Text.Format('{0:yyyy}', <datetime parameter>)", "type": "Int"}
}
},
"sink": {
"type": "<sink type>"
}
}
}
]
存储过程定义
CREATE PROCEDURE CopyTestSrcStoredProcedureWithParameters
(
@stringData varchar(20),
@identifier int
)
AS
SET NOCOUNT ON;
BEGIN
select *
from dbo.UnitTestSrcTable
where dbo.UnitTestSrcTable.stringData != stringData
and dbo.UnitTestSrcTable.identifier != identifier
END
GO
作为接收器Azure SQL 数据库
提示
从 将数据加载到 Azure SQL 数据库 的最佳实践中,深入了解支持的写入行为、配置和最佳做法。
若要将数据复制到 Azure SQL 数据库,复制活动 sink 节支持以下属性:
| 属性 | 描述 | 必需 |
|---|---|---|
| 类型 | 复制活动接收器的 type 属性必须设置为 AzureSqlSink。 为了向后兼容,仍然支持“SqlSink”类型。 | 是 |
| preCopyScript | 指定在将数据写入Azure SQL 数据库之前要运行的复制活动的 SQL 查询。 在每次复制运行中,仅调用一次。 使用此属性清理预加载的数据。 | 否 |
| 表格选项 | 指定是否根据源架构自动创建接收器表(如果不存在)。 接收器指定存储过程时不支持自动创建表。 允许的值为: none(默认值)、autoCreate。 |
否 |
| `sqlWriterStoredProcedureName`(用于SQL编写器的存储过程名称) | 定义如何将源数据应用于目标表的存储过程的名称。 此存储过程在每个批处理时调用。 若要执行仅运行一次且与源数据无关的操作(例如删除或截断),请使用 preCopyScript 属性。请参阅从 SQL 接收器调用存储过程中的示例。 |
否 |
| storedProcedureTableTypeParameterName | 存储过程中指定的表类型的参数名称。 | 否 |
| sqlWriterTableType | 要在存储过程中使用的表类型名称。 通过复制活动,使移动数据在具备此表类型的临时表中可用。 然后,存储过程代码可合并复制数据和现有数据。 | 否 |
| 存储过程参数 | 存储过程的参数。 允许的值为名称和值对。 参数的名称和大小写必须与存储过程参数的名称和大小写匹配。 |
否 |
| writeBatchSize | 每批次插入到 SQL 表中的行数。 允许的值为 integer(行数)。 默认情况下,该服务根据行大小动态确定适当的批大小。 |
否 |
| writeBatchTimeout | 插入、更新插入和存储过程操作在超时之前完成的等待时间。 允许的值是指时间跨度。 例如,“00:30:00”表示 30 分钟。 如果未指定值,则超时默认为“00:30:00”。 |
否 |
| 禁用指标收集 | 该服务收集指标,例如 Azure SQL 数据库 DTU,用于优化复制性能和提供建议,这会引入额外访问主数据库。 如果你担心此行为,请指定 true 将其关闭。 |
否(默认值为 false) |
| maxConcurrentConnections | 活动运行期间与数据存储建立的并发连接的上限。 仅在要限制并发连接时指定一个值。 | 否 |
| WriteBehavior | 指定复制活动的写入行为以将数据加载到Azure SQL 数据库。 允许的值为 Insert 和 Upsert。 默认情况下,服务使用 insert 来加载数据。 |
否 |
| upsertSettings | 指定用于写入行为的设置组。 当 WriteBehavior 选项为 Upsert 时应用。 |
否 |
在 upsertSettings 下: |
||
| useTempDB | 指定是将全局临时表还是物理表用作 upsert 的临时表。 默认情况下,该服务使用全局临时表作为临时表。 值为 true。 |
否 |
| interimSchemaName | 如果使用了物理表,则指定用于创建临时表的临时架构。 注意:用户需要具有创建和删除表的权限。 默认情况下,临时表将与接收器表共享相同的架构。 当 useTempDB 选项为 False 时应用。 |
否 |
| 密钥 | 请指定用于唯一行标识的列名称。 可使用单个键,也可使用一系列键。 如果未指定,将使用主键。 | 否 |
示例 1:追加数据
"activities":[
{
"name": "CopyToAzureSQLDatabase",
"type": "Copy",
"inputs": [
{
"referenceName": "<input dataset name>",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "<Azure SQL Database output dataset name>",
"type": "DatasetReference"
}
],
"typeProperties": {
"source": {
"type": "<source type>"
},
"sink": {
"type": "AzureSqlSink",
"tableOption": "autoCreate",
"writeBatchSize": 100000
}
}
}
]
示例 2:在复制过程中调用存储过程
请参阅调用 SQL 接收器的存储过程,了解更多详细信息。
"activities":[
{
"name": "CopyToAzureSQLDatabase",
"type": "Copy",
"inputs": [
{
"referenceName": "<input dataset name>",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "<Azure SQL Database output dataset name>",
"type": "DatasetReference"
}
],
"typeProperties": {
"source": {
"type": "<source type>"
},
"sink": {
"type": "AzureSqlSink",
"sqlWriterStoredProcedureName": "CopyTestStoredProcedureWithParameters",
"storedProcedureTableTypeParameterName": "MyTable",
"sqlWriterTableType": "MyTableType",
"storedProcedureParameters": {
"identifier": { "value": "1", "type": "Int" },
"stringData": { "value": "str1" }
}
}
}
}
]
示例 3:增改数据
"activities":[
{
"name": "CopyToAzureSQLDatabase",
"type": "Copy",
"inputs": [
{
"referenceName": "<input dataset name>",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "<Azure SQL Database output dataset name>",
"type": "DatasetReference"
}
],
"typeProperties": {
"source": {
"type": "<source type>"
},
"sink": {
"type": "AzureSqlSink",
"tableOption": "autoCreate",
"writeBehavior": "upsert",
"upsertSettings": {
"useTempDB": true,
"keys": [
"<column name>"
]
},
}
}
}
]
从 SQL 数据库进行并行复制
复制活动中的Azure SQL 数据库连接器提供内置的数据分区来并行复制数据。 可以在复制活动的“源”表中找到数据分区选项。
启用分区复制时,复制活动针对Azure SQL 数据库源运行并行查询,以便按分区加载数据。 可通过复制活动中的 parallelCopies 设置来控制并行度。 例如,如果将 parallelCopies 设置为 4,则服务会根据指定的分区选项和设置同时生成并运行四个查询,并且每个查询从Azure SQL 数据库检索一部分数据。
建议使用数据分区启用并行复制,尤其是在从Azure SQL 数据库加载大量数据时。 下面是适用于不同方案的建议配置。 将数据复制到基于文件的数据存储中时,建议将数据作为多个文件写入文件夹(仅指定文件夹名称),在这种情况下,性能优于写入单个文件。
| 场景 | 建议的设置 |
|---|---|
| 从具有物理分区的大型表中进行完整数据加载。 |
分区选项:表的物理分区。 在执行期间,该服务将自动检测物理分区并按分区复制数据。 若要检查表是否有物理分区,可参考此查询。 |
| 从不包含物理分区的大型表中加载完整数据,其中有一个用于数据分区的整数或日期时间列。 |
分区选项:动态范围分区。 分区列(可选):指定用于对数据进行分区的列。 如果未指定,将使用索引或主键列。 分区上限和分区下限(可选) :指定是否要确定分区步幅。 这不适用于筛选表中的行,表中的所有行都将进行分区和复制。 如果未指定,复制活动会自动检测这些值。 例如,如果分区列“ID”的值范围为 1 至 100,并且将此值的下限设置为 20、上限设置为 80,并行复制设置为 4,服务将按 4 个分区(分区的 ID 范围分别为 <=20、[21, 50]、[51, 80] 和 >=81)检索数据。 |
| 通过使用自定义查询,在没有物理分区的情况下,通过整数或日期/日期时间列进行数据分区,加载大量数据。 |
分区选项:动态范围分区。 查询: SELECT * FROM <TableName> WHERE ?DfDynamicRangePartitionCondition AND <your_additional_where_clause>。分区列:指定用于对数据进行分区的列。 分区上界和分区下界(可选):请指定您是否要确定分区步幅。 这不适用于筛选表中的行,查询结果中的所有行都将进行分区和复制。 如果未指定,复制活动会自动检测该值。 例如,如果分区列“ID”的值范围为 1 至 100,并且将此值的下限设置为 20、上限设置为 80,并行复制设置为 4,服务将按 4 个分区(分区的 ID 范围分别为 <=20、[21, 50]、[51, 80] 和 >=81)检索数据。 下面是针对不同场景的更多示例查询: 1.查询整个表: SELECT * FROM <TableName> WHERE ?DfDynamicRangePartitionCondition2.使用列选择和附加的 where 子句筛选器从表中查询: SELECT <column_list> FROM <TableName> WHERE ?DfDynamicRangePartitionCondition AND <your_additional_where_clause>3.使用子查询进行查询: SELECT <column_list> FROM (<your_sub_query>) AS T WHERE ?DfDynamicRangePartitionCondition AND <your_additional_where_clause>4.在子查询中使用分区查询: SELECT <column_list> FROM (SELECT <your_sub_query_column_list> FROM <TableName> WHERE ?DfDynamicRangePartitionCondition) AS T |
使用分区选项加载数据的最佳做法:
- 选择独特的列作为分区列(如主键或唯一键),以避免数据倾斜。
- 如果表具有内置分区,请使用名为“表的物理分区”分区选项来提升性能。
- 如果使用Azure Integration Runtime复制数据,则可以设置更大的“Data Integration Units (DIU)”(>4)以利用更多的计算资源。 检查其中适用的情景。
- “复制并行度”可控制分区数量,将此数字设置得太大有时会损害性能,建议将此数字设置按以下公式计算的值:(DIU 或自承载 IR 节点数)*(2 到 4)。
示例:从包含物理分区的大型表进行完整加载
"source": {
"type": "AzureSqlSource",
"partitionOption": "PhysicalPartitionsOfTable"
}
示例:使用动态范围分区进行查询
"source": {
"type": "AzureSqlSource",
"query": "SELECT * FROM <TableName> WHERE ?DfDynamicRangePartitionCondition AND <your_additional_where_clause>",
"partitionOption": "DynamicRange",
"partitionSettings": {
"partitionColumnName": "<partition_column_name>",
"partitionUpperBound": "<upper_value_of_partition_column (optional) to decide the partition stride, not as data filter>",
"partitionLowerBound": "<lower_value_of_partition_column (optional) to decide the partition stride, not as data filter>"
}
}
检查物理分区的示例查询
SELECT DISTINCT s.name AS SchemaName, t.name AS TableName, pf.name AS PartitionFunctionName, c.name AS ColumnName, iif(pf.name is null, 'no', 'yes') AS HasPartition
FROM sys.tables AS t
LEFT JOIN sys.objects AS o ON t.object_id = o.object_id
LEFT JOIN sys.schemas AS s ON o.schema_id = s.schema_id
LEFT JOIN sys.indexes AS i ON t.object_id = i.object_id
LEFT JOIN sys.index_columns AS ic ON ic.partition_ordinal > 0 AND ic.index_id = i.index_id AND ic.object_id = t.object_id
LEFT JOIN sys.columns AS c ON c.object_id = ic.object_id AND c.column_id = ic.column_id
LEFT JOIN sys.partition_schemes ps ON i.data_space_id = ps.data_space_id
LEFT JOIN sys.partition_functions pf ON pf.function_id = ps.function_id
WHERE s.name='[your schema]' AND t.name = '[your table name]'
如果表具有物理分区,你可看到“HasPartition”为“是”,如下所示。
将数据加载到Azure SQL 数据库的最佳做法
将数据复制到Azure SQL 数据库时,可能需要不同的写入行为:
- 追加:我的源数据只包含新记录。
- 插入更新:我的源数据既有插入又有更新。
- 覆盖:我需要每次都重新加载整个维度表。
- 使用自定义逻辑进行写入:在将数据最终插入目标表之前,我需要额外的处理。
有关如何在该服务中进行配置和最佳做法,请参阅相应的部分。
追加数据
追加数据是此Azure SQL 数据库接收器连接器的默认行为。 该服务执行批量插入,以有效地在表中写入数据。 可以相应地在复制活动中配置源和接收器。
插入或更新数据
复制活动现在支持本机加载数据到数据库临时表中,如果键存在则更新目标表中的数据,否则插入新数据。 若要了解有关复制活动中 upsert 设置的详细信息,请参阅 Azure SQL 数据库 作为接收器。
覆盖整个表
可以在复制活动接收器中配置 preCopyScript 属性。 在这种情况下,对于运行的每个复制活动,该服务首先运行脚本。 然后,执行复制程序以插入数据。 例如,若要使用最新数据覆盖整个表,请指定一个脚本,以先删除所有记录,然后从源批量加载新数据。
使用自定义逻辑写入数据
使用自定义逻辑写入数据的步骤与更新插入数据部分中的描述类似。 在需要在将源数据最终插入目标表之前应用额外处理时,可以加载到暂存表,然后调用存储过程活动,或在复制活动的目的地调用存储过程来应用数据,或者使用映射数据流。
调用 SQL 结果接收端的存储过程
将数据复制到Azure SQL 数据库时,还可以在源表的每个批次上配置和调用具有附加参数的用户指定的存储过程。 存储过程功能利用表值参数。
当内置复制机制无法使用时,还可使用存储过程。 例如,在将源数据最终插入目标表之前应用额外的处理。 额外处理的示例包括合并列、查找其他值以及将数据插入多个表。
以下示例演示如何使用存储过程,在 Azure SQL 数据库的表中执行插入或更新操作。 假设输入数据和接收器 Marketing 表各有三列:ProfileID、State 和 Category。 在基于 ProfileID 列的基础上执行更新插入,并仅将其应用于名为“ProductA”的特定类别。
在数据库中,使用与 sqlWriterTableType 相同的名称定义表类型。 表类型的架构与输入数据返回的架构相同。
CREATE TYPE [dbo].[MarketingType] AS TABLE( [ProfileID] [varchar](256) NOT NULL, [State] [varchar](256) NOT NULL, [Category] [varchar](256) NOT NULL )在数据库中,使用与 sqlWriterStoredProcedureName 相同的名称定义存储过程。 它可处理来自指定源的输入数据,并将其合并到输出表中。 存储过程中的表类型的参数名称与数据集中定义的 tableName 相同。
CREATE PROCEDURE spOverwriteMarketing @Marketing [dbo].[MarketingType] READONLY, @category varchar(256) AS BEGIN MERGE [dbo].[Marketing] AS target USING @Marketing AS source ON (target.ProfileID = source.ProfileID and target.Category = @category) WHEN MATCHED THEN UPDATE SET State = source.State WHEN NOT MATCHED THEN INSERT (ProfileID, State, Category) VALUES (source.ProfileID, source.State, source.Category); END在 Azure 数据工厂 或 Synapse 管道中,按如下所示在复制活动中定义 SQL sink 部分:
"sink": { "type": "AzureSqlSink", "sqlWriterStoredProcedureName": "spOverwriteMarketing", "storedProcedureTableTypeParameterName": "Marketing", "sqlWriterTableType": "MarketingType", "storedProcedureParameters": { "category": { "value": "ProductA" } } }
使用存储过程将数据写入 Azure SQL 数据库 时,汇聚器将源数据拆分为小型批处理,然后执行插入操作,因此存储过程中的额外查询可以执行多次。 如果查询需要在将数据写入 Azure SQL 数据库之前运行复制活动,建议不要将其添加到存储过程中,而是将其添加到 预复制脚本 框中。
映射数据流属性
在映射数据流中转换数据时,可以从Azure SQL 数据库读取和写入表。 有关详细信息,请参阅映射数据流中的源转换和汇聚转换。
源转换
特定于Azure SQL 数据库的设置在源转换的 Source Options 选项卡中可用。
输入: 选择将源指向某个表(等效于 Select * from <table-name>),还是输入自定义 SQL 查询。
查询:如果在“输入”字段中选择“查询”,请为源输入 SQL 查询。 此设置会替代在数据集中选择的任何表。 此处不支持 Order By 子句,但你可以设置完整的 SELECT FROM 语句。 还可以使用用户定义的表函数。 select * from udfGetData() 是 SQL 中可返回表的 UDF。 此查询将生成可以在数据流中使用的源表。 使用查询也是一种减少用于测试或查找的行数的好方法。
提示
在映射数据流查询模式下不支持 SQL 中的公用表表达式 (CTE),因为使用此模式的先决条件是在 SQL 查询 FROM 子句中可以使用查询,但 CTE 无法这样做。 要使用 CTE,你需要使用以下查询创建存储过程:
CREATE PROC CTESP @query nvarchar(max)
AS
BEGIN
EXECUTE sp_executesql @query;
END
然后在映射数据流的源转换中使用存储过程模式,并将其设置为示例with CTE as (select 'test' as a) select * from CTE。 然后你便可以按照预期使用 CTE。
存储过程:如果希望从源数据库执行的存储过程生成投影和源数据,请选择此选项。 可以键入架构、过程名称和参数,或者单击“刷新”以要求该服务发现架构和过程名称。 然后,可以单击“导入”以使用格式 @paraName 导入所有过程参数。
- SQL 示例:
Select * from MyTable where customerId > 1000 and customerId < 2000 - 参数化 SQL 示例:
"select * from {$tablename} where orderyear > {$year}"
批大小:输入批大小,以将大型数据分成多个读取操作。
隔离级别: 映射数据流中 SQL 源的默认设置为“读取未提交的内容”。 你可以将此处的隔离级别更改为以下值之一:
- 读取已提交的内容
- 读取未提交的内容
- 可重复的读取
- 可序列化
- 无(忽略隔离级别)
启用增量提取:使用此选项告知 ADF 仅处理自上次执行管道以来已更改的行。 若要启用支持架构漂移的增量提取,请选择基于增量/水印列的表,而不是启用了本机变更数据捕获功能的表。
增量列:在使用增量提取功能时,必须选择用于作为源表水印的日期/时间列或数字列。
启用本地变更数据捕获(预览版):使用此选项告知 ADF 仅处理 SQL 变更数据捕获技术捕获的增量数据(自上次执行管道以来)。 使用此选项时,包括行插入、更新和删除在内的增量数据将自动加载,而无需任何增量列。 在 ADF 中使用此选项之前,需要在 Azure SQL DB 上启用更改数据捕获。 有关 ADF 中此选项的详细信息,请参阅本机变更数据捕获。
从头开始读取:使用增量提取设置此选项将指示 ADF 在首次执行具有增量提取的管道时读取所有行。
接收器转换
特定于Azure SQL 数据库的设置在接收器转换的 Settings 选项卡中可用。
更新方法: 确定数据库目标上允许哪些操作。 默认设置为仅允许插入。 若要更新、更新插入或删除行,需要进行 alter-row 转换才能标记这些操作的行。 对于更新、更新插入和删除操作,必须设置一个或多个键列,以确定要更改的行。
该服务在后续的更新、更新插入和删除中会使用你在此处将其选取为密钥的列名称。 因此,你必须选取存在于汇聚映射中的列。 如果你不希望将值写入此键列,请单击“跳过写入键列”。
可以参数化此处用于更新目标Azure SQL 数据库表的键列。 如果你有多个用于组合键的列,单击“自定义表达式”,你将能够使用数据流表达式语言添加动态内容,其中可以包含一个字符串数组,该数组包含一个组合键的列名。
表操作: 确定在写入之前是否从目标表重新创建或删除所有行。
- 无操作:不会对表进行任何操作。
- 重新创建:将删除表并重新创建。 创建新表时,如果是动态创建,这是必需的。
- 截断:将删除目标表中的所有行。
批大小:控制每个 Bucket 中写入的行数。 较大的批大小可提高压缩比并改进内存优化,但在缓存数据时可能会导致内存不足异常。
使用 TempDB:默认情况下,作为加载过程的一部分,该服务将使用全局临时表来存储数据。 你也可以取消选中“使用 TempDB”选项,改为要求该服务将临时保存表存储在用于此接收器的数据库中的用户数据库中。
预处理和后处理 SQL 脚本:输入将在数据写入接收器数据库之前(预处理)和之后(后处理)执行的多行 SQL 脚本
提示
- 建议将包含多个命令的单个批处理脚本拆分为多个批处理。
- 只有返回简单更新计数的数据定义语言 (Data Definition Language, DDL) 和数据操作语言 (Data Manipulation Language, DML) 语句可作为批处理的一部分运行。 在执行批量操作中了解详情
行处理时出错
写入 Azure SQL DB 时,由于目标设置的约束,某些数据行可能会失败。 一些常见错误包括:
- 字符串或二进制数据在表中会被截断
- 无法在列中插入 NULL 值
- INSERT 语句与 CHECK 约束冲突
默认情况下,遇到第一个错误时,数据流运行会失败。 你可以选择Continue on error,这样即使某些行存在错误,也能完成数据流。 该服务提供了不同的选项来处理这些错误行。
事务提交:选择是在单个事务中写入数据,还是分批写入数据。 单个事务将提供较差的性能,但在事务完成之前,其他人将看不到任何写入的数据。
输出被拒绝的数据: 如果启用,则可以将错误行输出到您选择的Azure Blob 存储或Azure Data Lake Storage Gen2帐户中的csv文件中。 这会写入包含三个附加列的错误行:SQL 操作(例如插入或更新)、数据流错误代码,以及有关行的错误消息。
出错时报告成功:如果已启用,则即使发现了错误行,也会将数据流标记为成功。
Azure SQL 数据库的数据类型映射
当从或向 Azure SQL 数据库复制数据时,将使用以下映射将 Azure SQL 数据库数据类型转换为 Azure 数据工厂的临时数据类型。 Synapse 管道功能使用相同的映射,该功能直接实现Azure 数据工厂。 若要了解复制活动如何将源架构和数据类型映射到接收器,请参阅架构和数据类型映射。
| Azure SQL 数据库数据类型 | 数据工厂临时数据类型 |
|---|---|
| bigint | Int64 |
| 二进制 | Byte[] |
| 比特 | 布尔 |
| 字符型 | 字符串、Char[] |
| 日期 | 日期时间 |
| 日期时间 | 日期时间 |
| datetime2 | 日期时间 |
| 日期时间偏移量 (Datetimeoffset) | DateTimeOffset |
| 十进制 | 十进制 |
| FILESTREAM 属性 (varbinary(max)) | Byte[] |
| Float | 双精度 |
| 图像 | Byte[] |
| int(整数) | Int32 |
| 钱 | 十进制 |
| nchar | 字符串、Char[] |
| ntext | 字符串、Char[] |
| 数字 | 十进制 |
| nvarchar | 字符串、Char[] |
| 真实 | Single |
| 行版本号 (rowversion) | Byte[] |
| smalldatetime | 日期时间 |
| smallint | Int16 |
| smallmoney | 十进制 |
| sql_variant | 对象 |
| 文本消息 | 字符串、Char[] |
| 时间 | TimeSpan |
| 时间戳 | Byte[] |
| tinyint | 字节 |
| 唯一标识符 (uniqueidentifier) | Guid |
| varbinary | Byte[] |
| varchar | 字符串、Char[] |
| xml | 字符串 |
注意
对于映射到十进制中间类型的数据类型,目前复制活动支持的精度最高为 28。 如果有精度大于 28 的数据,请考虑在 SQL 查询中将其转换为字符串。
Lookup 活动属性
若要了解有关属性的详细信息,请查看 Lookup 活动。
GetMetadata 活动属性
若要了解有关属性的详细信息,请查看 GetMetadata 活动
使用 Always Encrypted
使用 Always Encrypted 从/向Azure SQL 数据库复制数据时,请执行以下步骤:
将 Column 主密钥(CMK)存储在 Azure 密钥保管库 中。 详细了解如何使用 Azure 密钥保管库 配置 Always Encrypted
确保获取对用于存储列主密钥 (CMK) 的密钥保管库的访问权限。 有关所需的权限,请参阅此文。
创建链接服务,以使用托管标识或服务主体连接到 SQL 数据库并启用“Always Encrypted”功能。
注意
Azure SQL 数据库 Always Encrypted支持以下方案:
- 源或汇数据存储使用托管身份或服务主体作为密钥认证类型。
- 源和接收器数据存储都使用托管标识作为密钥提供程序身份验证类型。
- 源和接收器数据存储都使用同一个服务主体作为密钥提供程序身份验证类型。
注意
目前,Azure SQL 数据库 Always Encrypted 不支持在映射数据流中的汇聚转换中使用。
原生变更数据捕获
Azure 数据工厂 可支持 SQL Server、Azure SQL DB 和 Azure SQL MI 的原生更改数据捕获功能。 可以通过 ADF 映射数据流自动检测和提取 SQL 存储中的行插入、更新和删除等变更数据。 利用无代码映射数据流的体验,用户可以通过将数据库追加为目标存储,轻松实现 SQL 存储中的数据复制场景。 更重要的是,用户还可以在两者之间撰写任何数据转换逻辑,以便从 SQL 存储实现增量 ETL 方案。
请确保管道和活动名称保持不变,以便 ADF 可以记录检查点,从而自动从上次运行中获取更改的数据。 如果更改管道名称或活动名称,检查点将会被重置,从而使您必须从头开始流程,或者在下一次运行时只应用更改后的数据。 如果要更改管道名称或活动名称,但仍保留检查点,以自动从上次运行中获取变更的数据,请使用自己的数据流活动中的检查点密钥来实现此目的。
调试管道时,此功能以相同的方式工作。 请注意,在调试运行期间刷新浏览器时,检查点将重置。 若对调试运行中的管道结果感到满意,可继续发布并触发管道。 首次触发已发布管道时,管道将自动从头开始重启,或者立即开始获取更改数据。
在监控部分,你总是有机会重新运行管道。 执行此操作时,始终可从所选管道运行的上一个检查点捕获已更改的数据。
示例 1:
将引用到已启用 SQL CDC 的数据集的源转换与映射数据流中引用到数据库的接收器转换直接链接起来时,SQL 源上发生的变更将自动应用到目标数据库,这样很轻松就可以获得数据库之间的数据复制方案。 可以使用接收器转换中的更新方法来选择是否允许在目标数据库上进行插入、更新或删除操作。 映射数据流中的示例脚本如下所示。
source(output(
id as integer,
name as string
),
allowSchemaDrift: true,
validateSchema: false,
enableNativeCdc: true,
netChanges: true,
skipInitialLoad: false,
isolationLevel: 'READ_UNCOMMITTED',
format: 'table') ~> source1
source1 sink(allowSchemaDrift: true,
validateSchema: false,
deletable:true,
insertable:true,
updateable:true,
upsertable:true,
keys:['id'],
format: 'table',
skipDuplicateMapInputs: true,
skipDuplicateMapOutputs: true,
errorHandlingOption: 'stopOnFirstError') ~> sink1
示例 2:
如果想启用 ETL 方案,而不是通过 SQL CDC 在数据库之间复制数据,则可以使用映射数据流中的表达式,包括 isInsert(1)、isUpdate(1) 和 isDelete(1) 来区分具有不同操作类型的行。 下面是用于映射数据流的一个示例脚本,它创建了一列并赋予如下值:1 表示插入的行,2 表示更新的行,3 表示删除的行,这用于下游转换处理增量数据。
source(output(
id as integer,
name as string
),
allowSchemaDrift: true,
validateSchema: false,
enableNativeCdc: true,
netChanges: true,
skipInitialLoad: false,
isolationLevel: 'READ_UNCOMMITTED',
format: 'table') ~> source1
source1 derive(operationType = iif(isInsert(1), 1, iif(isUpdate(1), 2, 3))) ~> derivedColumn1
derivedColumn1 sink(allowSchemaDrift: true,
validateSchema: false,
skipDuplicateMapInputs: true,
skipDuplicateMapOutputs: true) ~> sink1
已知限制:
- 只有来自 SQL CDC 的净变化将通过cdc.fn_cdc_get_net_changes_由 ADF 加载。
升级Azure SQL 数据库版本
若要升级Azure SQL 数据库版本,请在Edit 链接服务页中, 在Version下选择推荐的版本,然后通过参考推荐版本的链接服务属性来配置该链接服务。
建议版本和旧版本之间的差异
下表显示了使用建议版本和旧版Azure SQL 数据库之间的差异。
| 建议的版本 | 旧版本 |
|---|---|
支持 TLS 1.3 通过 encrypt 作为 strict。 |
不支持 TLS 1.3。 |
相关内容
有关复制活动支持作为源和接收器的数据存储的列表,请参阅支持的数据存储和格式。