在 Azure 门户中使用更改跟踪以增量方式将 Azure SQL 数据库中的数据复制到 Azure Blob 存储

适用于: Azure 数据工厂 Azure Synapse Analytics

提示

试用 Microsoft Fabric 中的数据工厂,这是一种适用于企业的一站式分析解决方案。 Microsoft Fabric 涵盖从数据移动到数据科学、实时分析、商业智能和报告的所有内容。 了解如何免费开始新的试用

在数据集成解决方案中,一种广泛使用的方案是在完成初始数据加载后以增量方式加载数据。 可以轻松地将源数据存储中在一段时间内更改的数据切片(例如,LastModifyTimeCreationTime)。 但在某些情况下,没有明确的方式可以将增量数据从上一次处理过的数据中区分出来。 可以使用 Azure SQL 数据库、SQL Server 等数据存储支持的更改跟踪技术来识别增量数据。

本教程介绍如何将 Azure 数据工厂与更改跟踪配合使用,以增量方式将增量数据从 Azure SQL 数据库加载到 Azure Blob 存储中。 有关更改跟踪的详细信息,请参阅 SQL Server 中的更改跟踪

在本教程中执行以下步骤:

  • 准备源数据存储。
  • 创建数据工厂。
  • 创建链接服务。
  • 创建源、接收器和更改跟踪数据集。
  • 创建、运行和监视完整复制管道。
  • 在源表中添加或更新数据。
  • 创建、运行和监视增量复制管道。

高级解决方案

在本教程中,你将创建两个管道用于执行以下操作。

注意

本教程使用 Azure SQL 数据库作为源数据存储。 也可以使用 SQL Server。

  1. 首次加载历史数据:创建一个包含复制活动的管道,将完整数据从源数据存储(Azure SQL 数据库)复制到目标数据存储(Azure Blob 存储):

    1. 在 Azure SQL 数据库的源数据库中启用更改跟踪技术。
    2. 在数据库中获取 SYS_CHANGE_VERSION 的初始值,作为捕获更改数据的基线。
    3. 将完整数据从源数据库加载到 Azure Blob 存储中。

    Diagram that shows full loading of data.

  2. 按计划增量加载增量数据:创建并定期运行包含以下活动的管道:

    1. 创建两个查找活动以从 Azure SQL 数据库中获取旧的和新的 SYS_CHANGE_VERSION 值。

    2. 创建一个复制活动,以将两个 SYS_CHANGE_VERSION 值之间插入、更新或删除的数据(增量数据)从 Azure SQL 数据库复制到 Azure Blob 存储。

      sys.change_tracking_tables 中已更改行(两个 SYS_CHANGE_VERSION 值之间)的主键与源表中的数据联接,以便加载增量数据,然后将增量数据移到目标。

    3. 创建一个存储过程活动,以更新下一个管道运行的 SYS_CHANGE_VERSION 值。

    Diagram that shows incremental loading of data.

先决条件

  • Azure 订阅。 如果没有订阅,请在开始之前创建一个试用帐户
  • Azure SQL 数据库。 使用 Azure SQL 数据库中的数据库作为源数据存储。 如果你没有数据库,请参阅在 Azure SQL 数据库中创建数据库了解创建步骤。
  • Azure 存储帐户。 使用 Blob 存储作为接收器数据存储。 如果你没有 Azure 存储帐户,请参阅创建 Azure 存储帐户了解创建步骤。 创建名为 adftutorial 的容器。

注意

建议使用 Azure Az PowerShell 模块与 Azure 交互。 请参阅安装 Azure PowerShell 以开始使用。 若要了解如何迁移到 Az PowerShell 模块,请参阅 将 Azure PowerShell 从 AzureRM 迁移到 Az

在 Azure SQL 数据库中创建数据源表

  1. 打开 SQL Server Management Studio 并连接到 SQL 数据库。

  2. 在服务器资源管理器中右键单击你的数据库,然后选择“新建查询”。

  3. 针对数据库运行以下 SQL 命令,以创建名为 data_source_table 的表作为源数据存储:

    create table data_source_table
    (
        PersonID int NOT NULL,
        Name varchar(255),
        Age int
        PRIMARY KEY (PersonID)
    );
    INSERT INTO data_source_table
        (PersonID, Name, Age)
    VALUES
        (1, 'aaaa', 21),
        (2, 'bbbb', 24),
        (3, 'cccc', 20),
        (4, 'dddd', 26),
        (5, 'eeee', 22);
    
  4. 运行以下 SQL 查询,对数据库和源表 (data_source_table) 启用更改跟踪。

    注意

    • 请将 <your database name> 替换为 Azure SQL 数据库中包含 data_source_table 的数据库的名称。
    • 在当前的示例中,更改的数据保留两天。 如果每隔三天或三天以上加载更改的数据,则不会包括某些更改的数据。 需要将 CHANGE_RETENTION 的值更改为更大的数字,或者确保加载两天内的已更改数据。 有关详细信息,请参阅对数据库启用更改跟踪
    ALTER DATABASE <your database name>
    SET CHANGE_TRACKING = ON  
    (CHANGE_RETENTION = 2 DAYS, AUTO_CLEANUP = ON)  
    ALTER TABLE data_source_table
    ENABLE CHANGE_TRACKING  
    WITH (TRACK_COLUMNS_UPDATED = ON)
    
  5. 运行以下查询,使用默认值创建名为 ChangeTracking_version 的新表和存储:

    create table table_store_ChangeTracking_version
    (
        TableName varchar(255),
        SYS_CHANGE_VERSION BIGINT,
    );
    DECLARE @ChangeTracking_version BIGINT
    SET @ChangeTracking_version = CHANGE_TRACKING_CURRENT_VERSION();  
    INSERT INTO table_store_ChangeTracking_version
    VALUES ('data_source_table', @ChangeTracking_version)
    

    注意

    如果对 SQL 数据库启用更改跟踪后数据并未更改,则更改跟踪版本的值为 0

  6. 运行以下查询,在数据库中创建存储过程。 管道会调用此存储过程,以便更新上一步创建的表中的更改跟踪版本。

    CREATE PROCEDURE Update_ChangeTracking_Version @CurrentTrackingVersion BIGINT, @TableName varchar(50)
    AS
    BEGIN
    UPDATE table_store_ChangeTracking_version
    SET [SYS_CHANGE_VERSION] = @CurrentTrackingVersion
    WHERE [TableName] = @TableName
    END    
    

创建数据工厂

  1. 打开 Microsoft Edge 或 Google Chrome Web 浏览器。 目前只有这些浏览器支持数据工厂用户界面 (UI)。

  2. Azure 门户的左侧菜单中选择“创建资源”。

  3. 选择“数据 + 分析”>“数据工厂”。

    Screenshot that shows selection of a data factory in creating a resource.

  4. 在“新建数据工厂”页上,输入“ADFTutorialDataFactory”作为名称。

    数据工厂的名称必须全局唯一。 如果有错误消息指出选择的名称不可用,请更改名称(例如,更改为 yournameADFTutorialDataFactory)并再次尝试创建数据工厂。 有关详细信息,请参阅 Azure 数据工厂命名规则

  5. 选择要在其中创建数据工厂的 Azure 订阅。

  6. 对于“资源组”,请执行以下步骤之一:

    • 选择“使用现有资源组”,然后从下拉列表选择现有的资源组。
    • 选择“新建”,然后输入资源组的名称。

    若要了解有关资源组的详细信息,请参阅 使用资源组管理 Azure 资源

  7. 对于“区域”,请选择数据工厂的区域。

    下拉列表仅显示支持的位置。 数据工厂使用的数据存储(例如 Azure 存储和 Azure SQL 数据库)和计算(例如 Azure HDInsight)可以位于其他区域。

  8. 选择“查看 + 创建”。

  9. 选择“创建”。

    在仪表板上,“部署数据工厂”磁贴会显示状态。

    Screenshot of the tile that shows the status of deploying a data factory.

  10. 创建完成后,会显示“数据工厂”页。 选择“启动工作室”磁贴,在单独的选项卡上打开 Azure 数据工厂 UI。

创建链接服务

可在数据工厂中创建链接服务,将数据存储和计算服务链接到数据工厂。 在本部分,你将创建 Azure 存储帐户和 Azure SQL 数据库中数据库的链接服务。

创建 Azure 存储链接服务

若要将存储帐户链接到数据工厂,请执行以下操作:

  1. 在数据工厂 UI 中,在“管理”选项卡上的“连接”下,选择“链接服务”。 然后选择“+ 新建”或“创建链接服务”按钮。 Screenshot that shows selections for creating a linked service.
  2. 在“新建链接服务”窗口中选择“Azure Blob 存储”,然后选择“继续”。
  3. 输入以下信息:
    1. 至于“名称”,请输入 AzureStorageLinkedService
    2. 对于“通过集成运行时连接”,请选择集成运行时。
    3. 对于“身份验证类型”,请选择一种身份验证方法。
    4. 对于“存储帐户名称”,请选择你的 Azure 存储帐户。
  4. 选择“创建”。

创建 Azure SQL 数据库链接服务

若要将数据库链接到数据工厂,请执行以下操作:

  1. 在数据工厂 UI 中,在“管理”选项卡上的“连接”下,选择“链接服务”。 然后选择“+ 新建”。

  2. 在“新建链接服务”窗口中选择“Azure SQL 数据库”,然后选择“继续”。

  3. 输入以下信息:

    1. 对于“名称”,请输入“AzureSqlDatabaseLinkedService”。
    2. 对于“服务器名称”,请选择你的服务器。
    3. 对于“数据库名称”,请选择你的数据库。
    4. 对于“身份验证类型”,请选择一种身份验证方法。 本教程使用 SQL 身份验证进行演示。
    5. 对于“用户名”,请输入用户的名称。
    6. 对于“密码”,请输入用户的密码。 或者,提供“Azure 密钥保管库 - AKV 链接服务”、“机密名称”和“机密版本”的信息。
  4. 选择“测试连接”以测试连接。

  5. 选择“创建”以创建链接服务。

    Screenshot that shows settings for an Azure SQL Database linked service.

创建数据集

在本部分,你将创建用于表示数据源和数据目标的数据集,以及用于存储 SYS_CHANGE_VERSION 值的位置。

创建用于表示源数据的数据集

  1. 在数据工厂 UI 中的“创作”选项卡上,选择加号 (+)。 然后选择“数据集”,或选择数据集操作对应的省略号。

    Screenshot that shows selections for starting the creation of a dataset.

  2. 选择“Azure SQL 数据库”,然后选择“继续” 。

  3. 在“设置属性”窗口中执行以下步骤:

    1. 对于“名称”,请输入“SourceDataset”。
    2. 对于“链接服务”,请选择“AzureSqlDatabaseLinkedService”。
    3. 对于“表名称”,请选择“dbo.data_source_table”。
    4. 对于“导入架构”,请选择“从连接/存储”选项。
    5. 选择“确定”

    Screenshot that shows property settings for a source dataset.

创建一个数据集用于表示复制到接收器数据存储的数据

在以下过程中,你将创建一个数据集用于表示从源数据存储复制的数据。 在执行先决条件中的步骤期间,你已在 Azure Blob 存储中创建了 adftutorial 容器。 创建容器(如果不存在),或者将容器设置为现有容器的名称。 在本教程中,输出文件名是从表达式 @CONCAT('Incremental-', pipeline().RunId, '.txt') 动态生成的。

  1. 在数据工厂 UI 中的“创作”选项卡上,选择 +。 然后选择“数据集”,或选择数据集操作对应的省略号。

    Screenshot that shows selections for starting the creation of a dataset.

  2. 依次选择“Azure Blob 存储”、“继续”。

  3. 选择“DelimitedText”作为数据类型的格式,然后选择“继续”。

  4. 在“设置属性”窗口中执行以下步骤:

    1. 对于“名称”,请输入“SinkDataset”。
    2. 对于“链接服务”,请选择“AzureBlobStorageLinkedService”。
    3. 对于“文件路径”,请输入“adftutorial/incchgtracking”。
    4. 选择“确定”。
  5. 数据集显示在树视图中后,转到“连接”选项卡,然后选择“文件名”文本框。 出现“添加动态内容”选项时,请将其选中。

    Screenshot that shows the option for setting a dynamic file path for a sink dataset.

  6. 此时将显示“管道表达式生成器”窗口。 在文本框中粘贴 @concat('Incremental-',pipeline().RunId,'.csv')

  7. 选择“确定”。

创建用于表示更改跟踪数据的数据集

在以下过程中,你将创建一个数据集用于存储更改跟踪版本。 在执行先决条件中的步骤期间,你已创建 table_store_ChangeTracking_version 表。

  1. 在数据工厂 UI 中,在“创作”选项卡上选择 +,然后选择“数据集”。
  2. 选择“Azure SQL 数据库”,然后选择“继续” 。
  3. 在“设置属性”窗口中执行以下步骤
    1. 对于“名称”,请输入“ChangeTrackingDataset”。
    2. 对于“链接服务”,请选择“AzureSqlDatabaseLinkedService”。
    3. 对于“表名称”,请选择“dbo.table_store_ChangeTracking_version”。
    4. 对于“导入架构”,请选择“从连接/存储”选项。
    5. 选择“确定”。

创建用于完整复制的管道

在以下过程中,你将创建一个包含复制活动的管道,该活动可将完整数据从源数据存储(Azure SQL 数据库)复制到目标数据存储(Azure Blob 存储):

  1. 在数据工厂 UI 中,在“创作”选项卡上选择 +,然后选择“管道”>“管道”。

    Screenshot that shows selections for starting to create a pipeline for a data factory.

  2. 此时会显示用于配置管道的新选项卡。 该管道也会显示在树视图中。 在“属性”窗口中,将管道的名称更改为 FullCopyPipeline

  3. 在“活动”工具箱中,展开“移动和转换”。 执行以下任一步骤:

    • 将复制活动拖放到管道设计器图面中。
    • 在“活动”下的搜索栏上搜索复制数据活动,然后将名称设置为“FullCopyActivity”。
  4. 切换到“源”选项卡。对于“源数据集”,请选择“SourceDataset”。

  5. 切换到“接收器”选项卡。对于“接收器数据集”,请选择“SinkDataset”。

  6. 若要验证管道定义,请选择工具栏中的“验证”。 确认没有任何验证错误。 关闭管道验证输出。

  7. 若要发布实体(链接服务、数据集和管道),请选择“全部发布”。 等待“已成功发布”消息出现。

    Screenshot of the message that says publishing succeeded.

  8. 若要查看通知,请选择“显示通知”按钮。

运行完整的复制管道

  1. 在数据工厂 UI 中,在管道的工具栏上选择“添加触发器”,然后选择“立即触发”。

    Screenshot that shows the option for triggering a full copy now.

  2. 在“管道运行”窗口中选择“确定”。

    Screenshot that shows a pipeline run confirmation with a parameter check.

监视完整的复制管道

  1. 在数据工厂 UI 中选择“监视”选项卡。管道运行及其状态会显示在列表中。 若要刷新列表,请选择“刷新”。 将鼠标悬停在管道运行上以显示“重新运行”或“使用情况”选项。

    Screenshot that shows a pipeline run and status.

  2. 若要查看与管道运行关联的活动运行,请在“管道名称”列中选择管道名称。 该管道中只有一个活动,因此列表中只显示了一个条目。 若要切换回到管道运行视图,请选择顶部的“所有管道运行”链接。

查看结果

adftutorial 容器的 incchgtracking 文件夹包含名为 incremental-<GUID>.csv 的文件。

Screenshot of an output file from a full copy.

该文件应包含数据库中的数据:


PersonID,Name,Age
1,"aaaa",21
2,"bbbb",24
3,"cccc",20
4,"dddd",26
5,"eeee",22

5,eeee,PersonID,Name,Age
1,"aaaa",21
2,"bbbb",24
3,"cccc",20
4,"dddd",26
5,"eeee",22

向源表中添加更多数据

对数据库运行以下查询以添加和更新行:

INSERT INTO data_source_table
(PersonID, Name, Age)
VALUES
(6, 'new','50');


UPDATE data_source_table
SET [Age] = '10', [name]='update' where [PersonID] = 1

创建用于增量复制的管道

在以下过程中,你将创建并定期运行一个包含活动的管道。 运行该管道时:

  • 查找活动将获取 Azure SQL 数据库中旧和新的 SYS_CHANGE_VERSION 值,并将其传递给复制活动。
  • 复制活动将两个 SYS_CHANGE_VERSION 值之间插入、更新或删除的数据从 Azure SQL 数据库复制到 Azure Blob 存储。
  • 存储过程活动将更新下一个管道运行的 SYS_CHANGE_VERSION 值。
  1. 在数据工厂 UI 中切换到“创作”选项卡。选择 +,然后选择“管道”>“管道”。

    Screenshot that shows how to create a pipeline in a data factory.

  2. 此时会显示用于配置管道的新选项卡。 该管道也会显示在树视图中。 在“属性”窗口中,将管道的名称更改为 IncrementalCopyPipeline

  3. 在“活动”工具箱中展开“常规”。 将查找活动拖放到管道设计器图面,或者在“搜索活动”框中进行搜索。 将活动的名称设置为 LookupLastChangeTrackingVersionActivity。 此活动获取在上次复制操作中使用的、存储在 table_store_ChangeTracking_version 表中的更改跟踪版本。

  4. 切换到“属性”窗口中的“设置”选项卡。 对于“源数据集”,请选择“ChangeTrackingDataset”。

  5. 将“活动”工具箱中的查找活动拖放到管道设计器图面。 将活动的名称设置为 LookupCurrentChangeTrackingVersionActivity。 此活动获取当前的更改跟踪版本。

  6. 在“属性”窗口中切换到“设置”选项卡,然后执行以下步骤:

    1. 对于“源数据集”,请选择“SourceDataset”。

    2. 对于“使用查询”,请选择“查询”。

    3. 对于“查询”,请输入以下 SQL 查询:

      SELECT CHANGE_TRACKING_CURRENT_VERSION() as CurrentChangeTrackingVersion
      

    Screenshot that shows a query added to the Settings tab in the Properties window.

  7. 在“活动”工具箱中,展开“移动和转换”。 将复制数据活动拖放到管道设计器图面中。 将活动的名称设置为 IncrementalCopyActivity。 此活动将上一跟踪版本与当前更改跟踪版本之间的数据复制到目标数据存储。

  8. 在“属性”窗口中切换到“源”选项卡,然后执行以下步骤:

    1. 对于“源数据集”,请选择“SourceDataset”。

    2. 对于“使用查询”,请选择“查询”。

    3. 对于“查询”,请输入以下 SQL 查询:

      SELECT data_source_table.PersonID,data_source_table.Name,data_source_table.Age, CT.SYS_CHANGE_VERSION, SYS_CHANGE_OPERATION from data_source_table RIGHT OUTER JOIN CHANGETABLE(CHANGES data_source_table, @{activity('LookupLastChangeTrackingVersionActivity').output.firstRow.SYS_CHANGE_VERSION}) AS CT ON data_source_table.PersonID = CT.PersonID where CT.SYS_CHANGE_VERSION <= @{activity('LookupCurrentChangeTrackingVersionActivity').output.firstRow.CurrentChangeTrackingVersion}
      

    Screenshot that shows a query added to the Source tab in the Properties window.

  9. 切换到“接收器”选项卡。对于“接收器数据集”,请选择“SinkDataset”。

  10. 将两个查找活动逐一连接到复制活动。 将附加到查找活动的绿色按钮拖放到复制活动。

  11. 将存储过程活动从“活动”工具箱拖放到管道设计器图面。 将活动的名称设置为 StoredProceduretoUpdateChangeTrackingActivity。 此活动更新 table_store_ChangeTracking_version 表中的更改跟踪版本。

  12. 切换到“设置”选项卡,然后执行以下步骤:

    1. 对于“链接服务”,请选择“AzureSqlDatabaseLinkedService”。
    2. 至于“存储过程名称”,请选择 Update_ChangeTracking_Version
    3. 选择“导入”。
    4. 在“存储过程参数”部分,指定以下参数值:
    名称 类型
    CurrentTrackingVersion Int64 @{activity('LookupCurrentChangeTrackingVersionActivity').output.firstRow.CurrentChangeTrackingVersion}
    TableName 字符串 @{activity('LookupLastChangeTrackingVersionActivity').output.firstRow.TableName}

    Screenshot that shows setting parameters for the stored procedure activity.

  13. 将复制活动连接到存储过程活动。 将附加到复制活动的绿色按钮拖放到存储过程活动。

  14. 在工具栏中选择“验证”。 确认没有任何验证错误。 关闭“管道验证报告”窗口。

  15. 选择“全部发布”按钮,将实体(链接服务、数据集和管道)发布到数据工厂服务。 等待“发布成功”消息出现。

    Screenshot that shows the button for publishing all entities for a data factory.

运行增量复制管道

  1. 在管道的工具栏中选择“添加触发器”,然后选择“立即触发”。

    Screenshot that shows the option for triggering an incremental copy now.

  2. 在“管道运行”窗口中选择“确定”。

监视增量复制管道

  1. 选择“监视”选项卡。管道运行及其状态会显示在列表中。 若要刷新列表,请选择“刷新”。

    Screenshot that shows pipeline runs for a data factory.

  2. 若要查看与管道运行关联的活动运行,请选择“管道名称”列中的“IncrementalCopyPipeline”链接。 活动运行会显示在列表中。

    Screenshot that shows activity runs for a data factory.

查看结果

第二个文件显示在 adftutorial 容器的 incchgtracking 文件夹中。

Screenshot that shows the output file from an incremental copy.

该文件应该只包含数据库中的增量数据。 包含 U 的记录是数据库中已更新的行,包含 I 的记录是添加的行。

PersonID,Name,Age,SYS_CHANGE_VERSION,SYS_CHANGE_OPERATION
1,update,10,2,U
6,new,50,1,I

前三列是 data_source_table 中已更改的数据。 最后两列是更改跟踪系统的表中的元数据。 第四列是每个已更改行的 SYS_CHANGE_VERSION 值。 第五列是操作:U = 更新,I = 插入。 如需详细了解更改跟踪信息,请参阅 CHANGETABLE

==================================================================
PersonID Name    Age    SYS_CHANGE_VERSION    SYS_CHANGE_OPERATION
==================================================================
1        update  10            2                                 U
6        new     50            1                                 I

请继续阅读以下教程,了解如何根据 LastModifiedDate 仅复制新的和已更改的文件: