Delta copy from a database with a control table
APPLIES TO: Azure Data Factory Azure Synapse Analytics
Tip
Try out Data Factory in Microsoft Fabric, an all-in-one analytics solution for enterprises. Microsoft Fabric covers everything from data movement to data science, real-time analytics, business intelligence, and reporting. Learn how to start a new trial for free!
This article describes a template that's available to incrementally load new or updated rows from a database table to Azure by using an external control table that stores a high-watermark value.
This template requires that the schema of the source database contains a timestamp column or incrementing key to identify new or updated rows.
Note
If you have a timestamp column in your source database to identify new or updated rows but you don't want to create an external control table to use for delta copy, you can instead use the Azure Data Factory Copy Data tool to get a pipeline. That tool uses a trigger-scheduled time as a variable to read new rows from the source database.
About this solution template
This template first retrieves the old watermark value and compares it with the current watermark value. After that, it copies only the changes from the source database, based on a comparison between the two watermark values. Finally, it stores the new high-watermark value to an external control table for delta data loading next time.
The template contains four activities:
- Lookup retrieves the old high-watermark value, which is stored in an external control table.
- Another Lookup activity retrieves the current high-watermark value from the source database.
- Copy copies only changes from the source database to the destination store. The query that identifies the changes in the source database is similar to 'SELECT * FROM Data_Source_Table WHERE TIMESTAMP_Column > “last high-watermark” and TIMESTAMP_Column <= “current high-watermark”'.
- SqlServerStoredProcedure writes the current high-watermark value to an external control table for delta copy next time.
The template defines following parameters:
- Data_Source_Table_Name is the table in the source database that you want to load data from.
- Data_Source_WaterMarkColumn is the name of the column in the source table that's used to identify new or updated rows. The type of this column is typically datetime, INT, or similar.
- Data_Destination_Container is the root path of the place where the data is copied to in your destination store.
- Data_Destination_Directory is the directory path under the root of the place where the data is copied to in your destination store.
- Data_Destination_Table_Name is the place where the data is copied to in your destination store (applicable when "Azure Synapse Analytics" is selected as Data Destination).
- Data_Destination_Folder_Path is the place where the data is copied to in your destination store (applicable when "File System" is selected as Data Destination).
- Control_Table_Table_Name is the external control table that stores the high-watermark value.
- Control_Table_Column_Name is the column in the external control table that stores the high-watermark value.
How to use this solution template
Explore the source table you that want to load, and define the high-watermark column that can be used to identify new or updated rows. The type of this column might be datetime, INT, or similar. This column's value increases as new rows are added. From the following sample source table (data_source_table), we can use the LastModifytime column as the high-watermark column.
PersonID Name LastModifytime 1 aaaa 2017-09-01 00:56:00.000 2 bbbb 2017-09-02 05:23:00.000 3 cccc 2017-09-03 02:36:00.000 4 dddd 2017-09-04 03:21:00.000 5 eeee 2017-09-05 08:06:00.000 6 fffffff 2017-09-06 02:23:00.000 7 gggg 2017-09-07 09:01:00.000 8 hhhh 2017-09-08 09:01:00.000 9 iiiiiiiii 2017-09-09 09:01:00.000
Create a control table in SQL Server or Azure SQL Database to store the high-watermark value for delta data loading. In the following example, the name of the control table is watermarktable. In this table, WatermarkValue is the column that stores the high-watermark value, and its type is datetime.
create table watermarktable ( WatermarkValue datetime, ); INSERT INTO watermarktable VALUES ('1/1/2010 12:00:00 AM')
Create a stored procedure in the same SQL Server or Azure SQL Database instance that you used to create the control table. The stored procedure is used to write the new high-watermark value to the external control table for delta data loading next time.
CREATE PROCEDURE update_watermark @LastModifiedtime datetime AS BEGIN UPDATE watermarktable SET [WatermarkValue] = @LastModifiedtime END
Go to the Delta copy from Database template. Create a New connection to the source database that you want to data copy from.
Create a New connection to the destination data store that you want to copy the data to.
Create a New connection to the external control table and stored procedure that you created in steps 2 and 3.
Select Use this template.
You see the available pipeline, as shown in the following example:
Select Stored Procedure. For Stored procedure name, choose [dbo].[update_watermark]. Select Import parameter, and then select Add dynamic content.
Write the content @{activity('LookupCurrentWaterMark').output.firstRow.NewWatermarkValue}, and then select Finish.
Select Debug, enter the Parameters, and then select Finish.
Results similar to the following example are displayed:
You can create new rows in your source table. Here is sample SQL language to create new rows:
INSERT INTO data_source_table VALUES (10, 'newdata','9/10/2017 2:23:00 AM') INSERT INTO data_source_table VALUES (11, 'newdata','9/11/2017 9:01:00 AM')
To run the pipeline again, select Debug, enter the Parameters, and then select Finish.
You will see that only new rows were copied to the destination.
(Optional:) If you select Azure Synapse Analytics as the data destination, you must also provide a connection to Azure Blob storage for staging, which is required by Azure Synapse Analytics Polybase. The template will generate a container path for you. After the pipeline run, check whether the container has been created in Blob storage.