使用 PowerShell 根据更改跟踪信息,以增量方式将 Azure SQL 数据库中的数据加载到 Azure Blob 存储

适用于: Azure 数据工厂 Azure Synapse Analytics

提示

试用 Microsoft Fabric 中的数据工厂,这是一种适用于企业的一站式分析解决方案。 Microsoft Fabric 涵盖从数据移动到数据科学、实时分析、商业智能和报告的所有内容。 了解如何免费开始新的试用

在本教程中,请创建一个带管道的 Azure 数据工厂,该管道根据 Azure SQL 数据库的源数据库中的“更改跟踪”信息将增量数据加载到 Azure Blob 存储。

在本教程中执行以下步骤:

  • 准备源数据存储
  • 创建数据工厂。
  • 创建链接服务。
  • 创建源、接收器和更改跟踪数据集。
  • 创建、运行和监视完整复制管道
  • 在源表中添加或更新数据
  • 创建、运行和监视增量复制管道

注意

建议使用 Azure Az PowerShell 模块与 Azure 交互。 请参阅安装 Azure PowerShell 以开始使用。 若要了解如何迁移到 Az PowerShell 模块,请参阅 将 Azure PowerShell 从 AzureRM 迁移到 Az

概述

在数据集成解决方案中,一种广泛使用的方案是在完成初始数据加载后以增量方式加载数据。 在某些情况下,可以通过某种方式(例如,使用 LastModifyTime、CreationTime 等属性)将源数据存储中某个时段的更改数据轻松地进行切分。 在某些情况下,没有明确的方式可以将增量数据从上一次处理过的数据中区分出来。 可以使用 Azure SQL 数据库、SQL Server 等数据存储支持的更改跟踪技术来确定增量数据。 本教程介绍如何将 Azure 数据工厂与 SQL 更改跟踪技术配合使用,通过增量方式将增量数据从 Azure SQL 数据库加载到 Azure Blob 存储中。 有关 SQL 更改跟踪技术的更具体的信息,请参阅 SQL Server 中的更改跟踪

端到端工作流

下面是典型的端到端工作流步骤,用于通过更改跟踪技术以增量方式加载数据。

注意

Azure SQL 数据库和 SQL Server 都支持更改跟踪技术。 本教程使用 Azure SQL 数据库作为源数据存储。 此外,还可以使用 SQL Server 实例。

  1. 首次加载历史数据(运行一次):
    1. 在 Azure SQL 数据库的源数据库中启用更改跟踪技术。
    2. 在数据库中获取 SYS_CHANGE_VERSION 的初始值,作为捕获更改数据的基线。
    3. 将完整数据从源数据库加载到 Azure Blob 存储中。
  2. 以增量方式按计划加载增量数据(在首次加载数据后定期运行):
    1. 获取旧的和新的 SYS_CHANGE_VERSION 值。
    2. sys.change_tracking_tables 中已更改行(两个 SYS_CHANGE_VERSION 值之间)的主键与源表中的数据联接,以便加载增量数据,然后将增量数据移到目标位置。
    3. 更新 SYS_CHANGE_VERSION,以便下次进行增量加载。

高级解决方案

在本教程中,请创建两个管道来执行下述两项操作:

  1. 首次加载: 创建一个包含复制活动的管道,将完整数据从源数据存储(Azure SQL 数据库)复制到目标数据存储(Azure Blob 存储)。

    Full loading of data

  2. 增量加载: 创建一个包含以下活动的管道并定期运行。

    1. 创建两项查找活动,从 Azure SQL 数据库获取旧的和新的 SYS_CHANGE_VERSION,然后将其传递至复制活动。
    2. 创建一项复制活动,将两个 SYS_CHANGE_VERSION 值之间的插入/更新/删除数据从 Azure SQL 数据库复制到 Azure Blob 存储。
    3. 创建一项存储过程活动,更新 SYS_CHANGE_VERSION 的值,以便进行下一次的管道运行。

    Increment load flow diagram

如果没有 Azure 订阅,可在开始前创建一个试用帐户

先决条件

  • Azure PowerShell。 按如何安装和配置 Azure PowerShell 中的说明安装最新的 Azure PowerShell 模块。
  • Azure SQL 数据库。 将数据库用作数据存储。 如果没有 Azure SQL 数据库,请参阅创建 Azure SQL 数据库一文获取创建步骤。
  • Azure 存储帐户。 将 Blob 存储用作接收器数据存储。 如果没有 Azure 存储帐户,请参阅创建存储帐户一文获取创建步骤。 创建名为 adftutorial 的容器。

在数据库中创建数据源表

  1. 启动 SQL Server Management Studio,连接到 SQL 数据库。

  2. 在“服务器资源管理器”中,右键单击你的数据库,然后选择“新建查询”。

  3. 针对数据库运行以下 SQL 命令,创建名为 data_source_table 的表作为数据源存储。

    create table data_source_table
    (
        PersonID int NOT NULL,
        Name varchar(255),
        Age int
        PRIMARY KEY (PersonID)
    );
    
    INSERT INTO data_source_table
        (PersonID, Name, Age)
    VALUES
        (1, 'aaaa', 21),
        (2, 'bbbb', 24),
        (3, 'cccc', 20),
        (4, 'dddd', 26),
        (5, 'eeee', 22);
    
    
  4. 通过运行以下 SQL 查询,在数据库和源表 (data_source_table) 上启用更改跟踪机制:

    注意

    • 将 <your database name> 替换为你的数据库的名称,其中包含 data_source_table。
    • 在当前的示例中,更改的数据保留两天。 如果每隔三天或三天以上加载更改的数据,则不会包括某些更改的数据。 需将 CHANGE_RETENTION 的值更改为更大的值。 或者,确保在两天内加载一次更改的数据。 有关详细信息,请参阅对数据库启用更改跟踪
    ALTER DATABASE <your database name>
    SET CHANGE_TRACKING = ON  
    (CHANGE_RETENTION = 2 DAYS, AUTO_CLEANUP = ON)  
    
    ALTER TABLE data_source_table
    ENABLE CHANGE_TRACKING  
    WITH (TRACK_COLUMNS_UPDATED = ON)
    
  5. 通过运行以下查询,创建一个新表并存储带默认值的 ChangeTracking_version:

    create table table_store_ChangeTracking_version
    (
        TableName varchar(255),
        SYS_CHANGE_VERSION BIGINT,
    );
    
    DECLARE @ChangeTracking_version BIGINT
    SET @ChangeTracking_version = CHANGE_TRACKING_CURRENT_VERSION();  
    
    INSERT INTO table_store_ChangeTracking_version
    VALUES ('data_source_table', @ChangeTracking_version)
    

    注意

    如果对 SQL 数据库启用更改跟踪后数据并未更改,则更改跟踪版本的值为 0。

  6. 运行以下查询,在数据库中创建存储过程。 管道会调用此存储过程,以便更新上一步创建的表中的更改跟踪版本。

    CREATE PROCEDURE Update_ChangeTracking_Version @CurrentTrackingVersion BIGINT, @TableName varchar(50)
    AS
    
    BEGIN
    
    UPDATE table_store_ChangeTracking_version
    SET [SYS_CHANGE_VERSION] = @CurrentTrackingVersion
    WHERE [TableName] = @TableName
    
    END    
    

Azure PowerShell

如何安装和配置 Azure PowerShell 中的说明安装最新的 Azure PowerShell 模块。

创建数据工厂

  1. 为资源组名称定义一个变量,稍后会在 PowerShell 命令中使用该变量。 将以下命令文本复制到 PowerShell,在双引号中指定 Azure 资源组的名称,然后运行命令。 例如:"adfrg"

    $resourceGroupName = "ADFTutorialResourceGroup";
    

    如果该资源组已存在,请勿覆盖它。 为 $resourceGroupName 变量分配另一个值,然后再次运行命令

  2. 定义一个用于数据工厂位置的变量:

    $location = "China East 2"
    
  3. 若要创建 Azure 资源组,请运行以下命令:

    New-AzResourceGroup $resourceGroupName $location
    

    如果该资源组已存在,请勿覆盖它。 为 $resourceGroupName 变量分配另一个值,然后再次运行命令。

  4. 定义一个用于数据工厂名称的变量。

    重要

    更新数据工厂名称,使之全局唯一。

    $dataFactoryName = "IncCopyChgTrackingDF";
    
  5. 要创建数据工厂,请运行以下 Set-AzDataFactoryV2 cmdlet:

    Set-AzDataFactoryV2 -ResourceGroupName $resourceGroupName -Location $location -Name $dataFactoryName
    

请注意以下几点:

  • Azure 数据工厂的名称必须全局唯一。 如果收到以下错误,请更改名称并重试。

    The specified Data Factory name 'ADFIncCopyChangeTrackingTestFactory' is already in use. Data Factory names must be globally unique.
    
  • 若要创建数据工厂实例,用于登录到 Azure 的用户帐户必须属于参与者所有者角色,或者是 Azure 订阅的管理员

  • 若要查看目前提供数据工厂的 Azure 区域的列表,请在以下页面上选择感兴趣的区域,然后展开“分析”以找到“数据工厂”:可用产品(按区域)。 数据工厂使用的数据存储(Azure 存储、Azure SQL 数据库,等等)和计算资源(HDInsight 等)可以位于其他区域中。

创建链接服务

可在数据工厂中创建链接服务,将数据存储和计算服务链接到数据工厂。 在本部分中,创建 Azure 存储帐户和 Azure SQL 数据库中数据库的链接服务。

创建 Azure 存储链接服务。

在此步骤中,请将 Azure 存储帐户链接到数据工厂。

  1. C:\ADFTutorials\IncCopyChangeTrackingTutorial 文件夹中,创建包含以下内容的名为 AzureStorageLinkedService.json 的 JSON 文件:(如果此文件夹尚未存在,请创建。) 将 <accountName><accountKey> 分别替换为 Azure 存储帐户的名称和密钥,然后保存文件。

    {
        "name": "AzureStorageLinkedService",
        "properties": {
            "type": "AzureStorage",
            "typeProperties": {
                "connectionString": "DefaultEndpointsProtocol=https;AccountName=<accountName>;AccountKey=<accountKey>;EndpointSuffix=core.chinacloudapi.cn"
            }
        }
    }
    
  2. Azure PowerShell 中切换到 C:\ADFTutorials\IncCopyChangeTrackingTutorial 文件夹。

  3. 运行 Set-AzDataFactoryV2LinkedService cmdlet 来创建链接服务:AzureStorageLinkedService。 在以下示例中,传递 ResourceGroupNameDataFactoryName 参数的值。

    Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "AzureStorageLinkedService" -File ".\AzureStorageLinkedService.json"
    

    下面是示例输出:

    LinkedServiceName : AzureStorageLinkedService
    ResourceGroupName : ADFTutorialResourceGroup
    DataFactoryName   : IncCopyChgTrackingDF
    Properties        : Microsoft.Azure.Management.DataFactory.Models.AzureStorageLinkedService
    

创建 Azure SQL 数据库链接服务

在此步骤中,请将数据库链接到数据工厂。

  1. 在 C:\ADFTutorials\IncCopyChangeTrackingTutorial 文件夹中创建包含以下内容的名为 AzureSQLDatabaseLinkedService.json 的 JSON 文件:将 <server>、<database name>、<user id> 和 <password> 替换为你的服务器名称、数据库名称、用户 ID 和密码,然后保存文件。

    {
        "name": "AzureSQLDatabaseLinkedService",
        "properties": {
            "type": "AzureSqlDatabase",
            "typeProperties": {
                "connectionString": "Server = tcp:<server>.database.chinacloudapi.cn,1433;Initial Catalog=<database name>; Persist Security Info=False; User ID=<user name>; Password=<password>; MultipleActiveResultSets = False; Encrypt = True; TrustServerCertificate = False; Connection Timeout = 30;"
            }
        }
    }
    
  2. Azure PowerShell 中,运行 Set-AzDataFactoryV2LinkedService cmdlet 来创建链接服务:AzureSQLDatabaseLinkedService

    Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "AzureSQLDatabaseLinkedService" -File ".\AzureSQLDatabaseLinkedService.json"
    

    下面是示例输出:

    LinkedServiceName : AzureSQLDatabaseLinkedService
    ResourceGroupName : ADFTutorialResourceGroup
    DataFactoryName   : IncCopyChgTrackingDF
    Properties        : Microsoft.Azure.Management.DataFactory.Models.AzureSqlDatabaseLinkedService
    

创建数据集

在此步骤中,请创建表示数据源和数据目标的数据集, 并创建用于存储 SYS_CHANGE_VERSION 的位置。

创建源数据集

在此步骤中,请创建一个代表源数据的数据集。

  1. 在同一文件夹中,创建包含以下内容的名为 SourceDataset.json 的 JSON 文件:

    {
        "name": "SourceDataset",
        "properties": {
            "type": "AzureSqlTable",
            "typeProperties": {
                "tableName": "data_source_table"
            },
            "linkedServiceName": {
                "referenceName": "AzureSQLDatabaseLinkedService",
                "type": "LinkedServiceReference"
            }
        }
    }   
    
  2. 运行 Set-AzDataFactoryV2Dataset cmdlet 以创建数据集:SourceDataset

    Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "SourceDataset" -File ".\SourceDataset.json"
    

    下面是该 cmdlet 的示例输出:

    DatasetName       : SourceDataset
    ResourceGroupName : ADFTutorialResourceGroup
    DataFactoryName   : IncCopyChgTrackingDF
    Structure         :
    Properties        : Microsoft.Azure.Management.DataFactory.Models.AzureSqlTableDataset
    

创建接收器数据集

在此步骤中,请创建一个数据集,代表从源数据存储复制的数据。

  1. 在同一文件夹中,创建包含以下内容的名为 SinkDataset.json 的 JSON 文件:

    {
        "name": "SinkDataset",
        "properties": {
            "type": "AzureBlob",
            "typeProperties": {
                "folderPath": "adftutorial/incchgtracking",
                "fileName": "@CONCAT('Incremental-', pipeline().RunId, '.txt')",
                "format": {
                    "type": "TextFormat"
                }
            },
            "linkedServiceName": {
                "referenceName": "AzureStorageLinkedService",
                "type": "LinkedServiceReference"
            }
        }
    }
    

    在 Azure Blob 存储中创建 adftutorial 容器,这是先决条件的部分要求。 创建容器(如果不存在),或者将容器设置为现有容器的名称。 在本教程中,输出文件名是使用以下表达式动态生成的:@CONCAT('Incremental-', pipeline().RunId, '.txt')。

  2. 运行 Set-AzDataFactoryV2Dataset cmdlet 以创建数据集:SinkDataset

    Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "SinkDataset" -File ".\SinkDataset.json"
    

    下面是该 cmdlet 的示例输出:

    DatasetName       : SinkDataset
    ResourceGroupName : ADFTutorialResourceGroup
    DataFactoryName   : IncCopyChgTrackingDF
    Structure         :
    Properties        : Microsoft.Azure.Management.DataFactory.Models.AzureBlobDataset
    

创建更改跟踪数据集

在此步骤中,请创建用于存储更改跟踪版本的数据集。

  1. 在同一文件夹中,创建包含以下内容的名为 ChangeTrackingDataset.json 的 JSON 文件:

    {
        "name": " ChangeTrackingDataset",
        "properties": {
            "type": "AzureSqlTable",
            "typeProperties": {
                "tableName": "table_store_ChangeTracking_version"
            },
            "linkedServiceName": {
                "referenceName": "AzureSQLDatabaseLinkedService",
                "type": "LinkedServiceReference"
            }
        }
    }
    

    创建 table_store_ChangeTracking_version 表,这是先决条件的部分要求。

  2. 运行 Set-AzDataFactoryV2Dataset cmdlet 以创建数据集:ChangeTrackingDataset

    Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "ChangeTrackingDataset" -File ".\ChangeTrackingDataset.json"
    

    下面是该 cmdlet 的示例输出:

    DatasetName       : ChangeTrackingDataset
    ResourceGroupName : ADFTutorialResourceGroup
    DataFactoryName   : IncCopyChgTrackingDF
    Structure         :
    Properties        : Microsoft.Azure.Management.DataFactory.Models.AzureSqlTableDataset
    

创建用于完整复制的管道

在这一步,请创建一个包含复制活动的管道,将完整数据从源数据存储(Azure SQL 数据库)复制到目标数据存储(Azure Blob 存储)。

  1. 创建一个 JSON 文件:在同一文件夹中,创建包含以下内容的 FullCopyPipeline.json:

    {
        "name": "FullCopyPipeline",
        "properties": {
            "activities": [{
                "name": "FullCopyActivity",
                "type": "Copy",
                "typeProperties": {
                    "source": {
                        "type": "SqlSource"
                    },
                    "sink": {
                        "type": "BlobSink"
                    }
                },
    
                "inputs": [{
                    "referenceName": "SourceDataset",
                    "type": "DatasetReference"
                }],
                "outputs": [{
                    "referenceName": "SinkDataset",
                    "type": "DatasetReference"
                }]
            }]
        }
    }
    
  2. 运行 Set-AzDataFactoryV2Pipeline cmdlet 来创建管道:FullCopyPipeline。

     Set-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "FullCopyPipeline" -File ".\FullCopyPipeline.json"
    

    下面是示例输出:

     PipelineName      : FullCopyPipeline
     ResourceGroupName : ADFTutorialResourceGroup
     DataFactoryName   : IncCopyChgTrackingDF
     Activities        : {FullCopyActivity}
     Parameters        :
    

运行完整的复制管道

运行管道:使用 Invoke-AzDataFactoryV2Pipeline cmdlet 运行管道 FullCopyPipeline

Invoke-AzDataFactoryV2Pipeline -PipelineName "FullCopyPipeline" -ResourceGroup $resourceGroupName -dataFactoryName $dataFactoryName        

监视完整的复制管道

  1. 登录到 Azure 门户

  2. 单击“所有服务”,使用关键字 data factories 进行搜索,然后选择“数据工厂”。

    Data factories menu

  3. 在数据工厂列表中搜索你的数据工厂,然后选择它来启动“数据工厂”页。

    Search for your data factory

  4. 在“数据工厂”页中,单击“监视和管理”磁贴。

    Monitor & Manage tile

  5. 数据集成应用程序在单独的选项卡中启动。可以看到所有管道运行及其状态。 请注意,在以下示例中,管道运行的状态为“成功”。 单击“参数”列中的链接即可查看传递至管道的参数。 如果有错误,在“错误”列可以看到链接。 单击“操作”列中的链接。

    Screenshot shows pipeline runs for a data factory.

  6. 单击“操作”列中的链接时,可以看到以下页面,其中显示管道的所有 活动运行

    Screenshot shows activity runs for a data factory with the Pipelines link called out.

  7. 若要切换回“管道运行”视图,请单击“管道”,如图所示。

查看结果

可以在 adftutorial 容器的 incchgtracking 文件夹中看到名为 incremental-<GUID>.txt 的文件。

Output file from full copy

该文件应包含数据库中的数据:

1,aaaa,21
2,bbbb,24
3,cccc,20
4,dddd,26
5,eeee,22

向源表中添加更多数据

对数据库运行以下查询来添加和更新行。

INSERT INTO data_source_table
(PersonID, Name, Age)
VALUES
(6, 'new','50');


UPDATE data_source_table
SET [Age] = '10', [name]='update' where [PersonID] = 1

创建用于增量复制的管道

在此步骤中,请创建一个包含以下活动的管道并定期运行。 查找活动从 Azure SQL 数据库获取旧的和新的 SYS_CHANGE_VERSION,然后将其传递至复制活动。 复制活动将两个 SYS_CHANGE_VERSION 值之间的插入/更新/删除数据从 Azure SQL 数据库复制到 Azure Blob 存储。 存储过程活动更新 SYS_CHANGE_VERSION 的值,以便进行下一次的管道运行。

  1. 创建一个 JSON 文件:在同一文件夹中,创建包含以下内容的 IncrementalCopyPipeline.json:

    {
        "name": "IncrementalCopyPipeline",
        "properties": {
            "activities": [
                {
                    "name": "LookupLastChangeTrackingVersionActivity",
                    "type": "Lookup",
                    "typeProperties": {
                        "source": {
                            "type": "SqlSource",
                            "sqlReaderQuery": "select * from table_store_ChangeTracking_version"
                        },
                        "dataset": {
                            "referenceName": "ChangeTrackingDataset",
                            "type": "DatasetReference"
                        }
                    }
                },
                {
                    "name": "LookupCurrentChangeTrackingVersionActivity",
                    "type": "Lookup",
                    "typeProperties": {
                        "source": {
                            "type": "SqlSource",
                            "sqlReaderQuery": "SELECT CHANGE_TRACKING_CURRENT_VERSION() as CurrentChangeTrackingVersion"
                        },
                        "dataset": {
                            "referenceName": "SourceDataset",
                            "type": "DatasetReference"
                        }
                    }
                },
                {
                    "name": "IncrementalCopyActivity",
                    "type": "Copy",
                    "typeProperties": {
                        "source": {
                            "type": "SqlSource",
                            "sqlReaderQuery": "select data_source_table.PersonID,data_source_table.Name,data_source_table.Age, CT.SYS_CHANGE_VERSION, SYS_CHANGE_OPERATION from data_source_table RIGHT OUTER JOIN CHANGETABLE(CHANGES data_source_table, @{activity('LookupLastChangeTrackingVersionActivity').output.firstRow.SYS_CHANGE_VERSION}) as CT on data_source_table.PersonID = CT.PersonID where CT.SYS_CHANGE_VERSION <= @{activity('LookupCurrentChangeTrackingVersionActivity').output.firstRow.CurrentChangeTrackingVersion}"
                        },
                        "sink": {
                            "type": "BlobSink"
                        }
                    },
                    "dependsOn": [
                        {
                            "activity": "LookupLastChangeTrackingVersionActivity",
                            "dependencyConditions": [
                                "Succeeded"
                            ]
                        },
                        {
                            "activity": "LookupCurrentChangeTrackingVersionActivity",
                            "dependencyConditions": [
                                "Succeeded"
                            ]
                        }
                    ],
                    "inputs": [
                        {
                            "referenceName": "SourceDataset",
                            "type": "DatasetReference"
                        }
                    ],
                    "outputs": [
                        {
                            "referenceName": "SinkDataset",
                            "type": "DatasetReference"
                        }
                    ]
                },
                {
                    "name": "StoredProceduretoUpdateChangeTrackingActivity",
                    "type": "SqlServerStoredProcedure",
                    "typeProperties": {
                        "storedProcedureName": "Update_ChangeTracking_Version",
                        "storedProcedureParameters": {
                            "CurrentTrackingVersion": {
                                "value": "@{activity('LookupCurrentChangeTrackingVersionActivity').output.firstRow.CurrentChangeTrackingVersion}",
                                "type": "INT64"
                            },
                            "TableName": {
                                "value": "@{activity('LookupLastChangeTrackingVersionActivity').output.firstRow.TableName}",
                                "type": "String"
                            }
                        }
                    },
                    "linkedServiceName": {
                        "referenceName": "AzureSQLDatabaseLinkedService",
                        "type": "LinkedServiceReference"
                    },
                    "dependsOn": [
                        {
                            "activity": "IncrementalCopyActivity",
                            "dependencyConditions": [
                                "Succeeded"
                            ]
                        }
                    ]
                }
            ]
        }
    }
    
  2. 运行 Set-AzDataFactoryV2Pipeline cmdlet 来创建管道:FullCopyPipeline。

     Set-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "IncrementalCopyPipeline" -File ".\IncrementalCopyPipeline.json"
    

    下面是示例输出:

     PipelineName      : IncrementalCopyPipeline
     ResourceGroupName : ADFTutorialResourceGroup
     DataFactoryName   : IncCopyChgTrackingDF
     Activities        : {LookupLastChangeTrackingVersionActivity, LookupCurrentChangeTrackingVersionActivity, IncrementalCopyActivity, StoredProceduretoUpdateChangeTrackingActivity}
     Parameters        :
    

运行增量复制管道

运行管道:使用 Invoke-AzDataFactoryV2Pipeline cmdlet 运行管道 IncrementalCopyPipeline

Invoke-AzDataFactoryV2Pipeline -PipelineName "IncrementalCopyPipeline" -ResourceGroup $resourceGroupName -dataFactoryName $dataFactoryName     

监视增量复制管道

  1. 数据集成应用程序中,刷新“管道运行”视图。 确认在列表中看到 IncrementalCopyPipeline。 单击“操作”列中的链接。

    Screenshot shows pipeline runs for a data factory including your pipeline.

  2. 单击“操作”列中的链接时,可以看到以下页面,其中显示管道的所有 活动运行

    Screenshot shows pipeline runs for a data factory with several marked as succeeded.

  3. 若要切换回“管道运行”视图,请单击“管道”,如图所示。

查看结果

可以在 adftutorial 容器的 incchgtracking 文件夹中看到第二个文件。

Output file from incremental copy

该文件应该只包含数据库中的增量数据。 带 U 的记录是数据库中的更新行,带 I 的记录是添加的行。

1,update,10,2,U
6,new,50,1,I

前三个列是 data_source_table 中的更改数据。 最后两个列是更改跟踪系统表中的元数据。 第四列是每个更改行的 SYS_CHANGE_VERSION。 第五列是操作:U = update(更新),I = insert(插入)。 如需详细了解更改跟踪信息,请参阅 CHANGETABLE

==================================================================
PersonID Name    Age    SYS_CHANGE_VERSION    SYS_CHANGE_OPERATION
==================================================================
1        update  10            2                                 U
6        new     50            1                                 I

继续查看以下教程,了解如何仅基于 LastModifiedDate 来复制新的和更改的文件: