使用控制表从数据库执行增量复制
适用于: Azure 数据工厂 Azure Synapse Analytics
提示
试用 Microsoft Fabric 中的数据工厂,这是一种适用于企业的一站式分析解决方案。 Microsoft Fabric 涵盖从数据移动到数据科学、实时分析、商业智能和报告的所有内容。 了解如何免费开始新的试用!
本文介绍一个模板。借助该模板,可以使用一个存储高水印值的外部控制表,以增量方式将数据库表中的新行或已更新的行加载到 Azure。
此模板要求源数据库的架构包含时间戳列或递增键,以标识新行或更新后的行。
注意
如果你使用源数据库中的时间戳列标识新行或已更新的行,但不想要创建一个外部控制表用于增量复制,可以改用 Azure 数据工厂复制数据工具来获取管道。 该工具使用触发器计划的时间作为变量来从源数据库读取新行。
关于此解决方案模板
此模板先检索旧水印值,并将其与当前水印值进行比较。 然后,此模板根据两个水印值之间的比较仅复制源数据库中的更改。 最后,此模板会将新的高水印值存储到外部控制表中,以便下次加载增量数据。
此模板包含四个活动:
- 一个 Lookup 检索存储在外部控制表中的旧高水印值。
- 另一个 Lookup 活动从源数据库中检索当前的高水印值。
- Copy 仅将源数据库中的更改复制到目标存储。 用于在源数据库中识别更改的查询类似于 'SELECT * FROM Data_Source_Table WHERE TIMESTAMP_Column > “last high-watermark” 和 TIMESTAMP_Column <= “current high-watermark”'。
- SqlServerStoredProcedure 活动将当前高水印值写入外部控制表,以便下次进行增量复制。
模板定义以下参数:
- Data_Source_Table_Name 是源数据库中的要从中加载数据的表。
- Data_Source_WaterMarkColumn 是源表中的用于识别新行或已更新行的列名。 此列的类型通常为 datetime、INT 等。
- Data_Destination_Container 是在目标存储中将数据复制到的位置的根路径。
- Data_Destination_Directory 是在目标存储中将数据复制到的位置的根下的目录路径。
- Data_Destination_Table_Name 是目标存储中要将数据复制到的位置(在选择“Azure Synapse Analytics”作为“数据目标”时适用)。
- Data_Destination_Folder_Path 是目标存储中要将数据复制到的位置(在选择“文件系统”作为“数据目标”时适用)。
- Control_Table_Table_Name 是用于存储高水印值的外部控制表。
- Control_Table_Table_Name 是外部控制表中要将高水印值存储到的列。
如何使用此解决方案模板
浏览要加载的源表,并定义可用于识别新行或已更新行的高水印列。 此列的类型可能为 datetime、INT 等。 添加新行时,此列的值会增大。 在以下示例源表 (data_source_table) 中,我们可以使用 LastModifytime 列作为高水印列。
PersonID Name LastModifytime 1 aaaa 2017-09-01 00:56:00.000 2 bbbb 2017-09-02 05:23:00.000 3 cccc 2017-09-03 02:36:00.000 4 dddd 2017-09-04 03:21:00.000 5 eeee 2017-09-05 08:06:00.000 6 fffffff 2017-09-06 02:23:00.000 7 gggg 2017-09-07 09:01:00.000 8 hhhh 2017-09-08 09:01:00.000 9 iiiiiiiii 2017-09-09 09:01:00.000
在 SQL Server 或 Azure SQL 数据库中创建一个控制表,用于存储增量数据加载的高水印值。 在以下示例中,控制表的名称为 watermarktable。 在此表中,WatermarkValue 是用于存储高水印值的列,其类型为 datetime。
create table watermarktable ( WatermarkValue datetime, ); INSERT INTO watermarktable VALUES ('1/1/2010 12:00:00 AM')
在用于创建控制表的同一 SQL Server 或 Azure SQL 数据库实例中创建一个存储过程。 该存储过程用于将新的高水印值写入外部控制表,以便下次进行增量数据加载。
CREATE PROCEDURE update_watermark @LastModifiedtime datetime AS BEGIN UPDATE watermarktable SET [WatermarkValue] = @LastModifiedtime END
转到“从数据库执行增量复制”模板。 与要从中复制数据的源数据库建立新的连接。
与要将数据复制到的目标数据存储建立新的连接。
与在步骤 2 和 3 中创建的外部控制表与存储过程建立新的连接。
选择“使用此模板” 。
你将看到可用的管道,如以下示例所示:
选择“存储过程”。 对于“存储过程名称”,请选择“[dbo].[update_watermark]”。 依次选择“导入参数”、“添加动态内容”。
写入内容 @{activity('LookupCurrentWaterMark').output.firstRow.NewWatermarkValue},然后选择“完成”。
选择“调试”,输入参数,然后选择“完成”。
随后会显示类似于以下示例的结果:
可以在源表中创建新行。 下面是用于创建新行的示例 SQL 语言:
INSERT INTO data_source_table VALUES (10, 'newdata','9/10/2017 2:23:00 AM') INSERT INTO data_source_table VALUES (11, 'newdata','9/11/2017 9:01:00 AM')
若要再次运行管道,请选择“调试”,输入参数,然后选择“完成”。
你会看到,只有新行已复制到目标。
(可选:)如果选择“Azure Synapse Analytics”作为数据目标,则还需按 Azure Synapse Analytics Polybase 的要求,提供与用于暂存的 Azure Blob 存储的连接。 模板会为你生成容器路径。 管道运行后,检查是否已在 Blob 存储中创建容器。