使用 PowerShell 根据更改跟踪信息,以增量方式将 Azure SQL 数据库中的数据加载到 Azure Blob 存储Incrementally load data from Azure SQL Database to Azure Blob Storage using change tracking information using PowerShell

适用于: Azure 数据工厂 Azure Synapse Analytics

在本教程中,请创建一个带管道的 Azure 数据工厂,该管道根据 Azure SQL 数据库的源数据库中的“更改跟踪”信息将增量数据加载到 Azure Blob 存储。In this tutorial, you create an Azure data factory with a pipeline that loads delta data based on change tracking information in the source database in Azure SQL Database to an Azure blob storage.

在本教程中执行以下步骤:You perform the following steps in this tutorial:

  • 准备源数据存储Prepare the source data store
  • 创建数据工厂。Create a data factory.
  • 创建链接服务。Create linked services.
  • 创建源、接收器和更改跟踪数据集。Create source, sink, and change tracking datasets.
  • 创建、运行和监视完整复制管道Create, run, and monitor the full copy pipeline
  • 在源表中添加或更新数据Add or update data in the source table
  • 创建、运行和监视增量复制管道Create, run, and monitor the incremental copy pipeline

备注

本文已经过更新,以便使用 Azure Az PowerShell 模块。This article has been updated to use the Azure Az PowerShell module. 若要与 Azure 交互,建议使用的 PowerShell 模块是 Az PowerShell 模块。The Az PowerShell module is the recommended PowerShell module for interacting with Azure. 若要开始使用 Az PowerShell 模块,请参阅安装 Azure PowerShellTo get started with the Az PowerShell module, see Install Azure PowerShell. 若要了解如何迁移到 Az PowerShell 模块,请参阅 将 Azure PowerShell 从 AzureRM 迁移到 AzTo learn how to migrate to the Az PowerShell module, see Migrate Azure PowerShell from AzureRM to Az.

概述Overview

在数据集成解决方案中,一种广泛使用的方案是在完成初始数据加载后以增量方式加载数据。In a data integration solution, incrementally loading data after initial data loads is a widely used scenario. 在某些情况下,可以通过某种方式(例如,使用 LastModifyTime、CreationTime 等属性)将源数据存储中某个时段的更改数据轻松地进行切分。In some cases, the changed data within a period in your source data store can be easily to sliced up (for example, LastModifyTime, CreationTime). 在某些情况下,没有明确的方式可以将增量数据从上一次处理过的数据中区分出来。In some cases, there is no explicit way to identify the delta data from last time you processed the data. 可以使用 Azure SQL 数据库、SQL Server 等数据存储支持的更改跟踪技术来确定增量数据。The Change Tracking technology supported by data stores such as Azure SQL Database and SQL Server can be used to identify the delta data. 本教程介绍如何将 Azure 数据工厂与 SQL 更改跟踪技术配合使用,通过增量方式将增量数据从 Azure SQL 数据库加载到 Azure Blob 存储中。This tutorial describes how to use Azure Data Factory with SQL Change Tracking technology to incrementally load delta data from Azure SQL Database into Azure Blob Storage. 有关 SQL 更改跟踪技术的更具体的信息,请参阅 SQL Server 中的更改跟踪For more concrete information about SQL Change Tracking technology, see Change tracking in SQL Server.

端到端工作流End-to-end workflow

下面是典型的端到端工作流步骤,用于通过更改跟踪技术以增量方式加载数据。Here are the typical end-to-end workflow steps to incrementally load data using the Change Tracking technology.

备注

Azure SQL 数据库和 SQL Server 都支持更改跟踪技术。Both Azure SQL Database and SQL Server support the Change Tracking technology. 本教程使用 Azure SQL 数据库作为源数据存储。This tutorial uses Azure SQL Database as the source data store. 此外,还可以使用 SQL Server 实例。You can also use a SQL Server instance.

  1. 首次加载历史数据(运行一次):Initial loading of historical data (run once):
    1. 在 Azure SQL 数据库的源数据库中启用更改跟踪技术。Enable Change Tracking technology in the source database in Azure SQL Database.
    2. 在数据库中获取 SYS_CHANGE_VERSION 的初始值,作为捕获更改数据的基线。Get the initial value of SYS_CHANGE_VERSION in the database as the baseline to capture changed data.
    3. 将完整数据从源数据库加载到 Azure Blob 存储中。Load full data from the source database into an Azure blob storage.
  2. 以增量方式按计划加载增量数据(在首次加载数据后定期运行):Incremental loading of delta data on a schedule (run periodically after the initial loading of data):
    1. 获取旧的和新的 SYS_CHANGE_VERSION 值。Get the old and new SYS_CHANGE_VERSION values.
    2. sys.change_tracking_tables 中已更改行(两个 SYS_CHANGE_VERSION 值之间)的主键与 源表 中的数据联接,以便加载增量数据,然后将增量数据移到目标位置。Load the delta data by joining the primary keys of changed rows (between two SYS_CHANGE_VERSION values) from sys.change_tracking_tables with data in the source table, and then move the delta data to destination.
    3. 更新 SYS_CHANGE_VERSION,以便下次进行增量加载。Update the SYS_CHANGE_VERSION for the delta loading next time.

高级解决方案High-level solution

在本教程中,请创建两个管道来执行下述两项操作:In this tutorial, you create two pipelines that perform the following two operations:

  1. 首次加载: 创建一个包含复制活动的管道,将完整数据从源数据存储(Azure SQL 数据库)复制到目标数据存储(Azure Blob 存储)。Initial load: you create a pipeline with a copy activity that copies the entire data from the source data store (Azure SQL Database) to the destination data store (Azure Blob Storage).

    完整地加载数据

  2. 增量加载: 创建一个包含以下活动的管道并定期运行。Incremental load: you create a pipeline with the following activities, and run it periodically.

    1. 创建 两项查找活动,从 Azure SQL 数据库获取旧的和新的 SYS_CHANGE_VERSION,然后将其传递至复制活动。Create two lookup activities to get the old and new SYS_CHANGE_VERSION from Azure SQL Database and pass it to copy activity.
    2. 创建 一项复制活动,将两个 SYS_CHANGE_VERSION 值之间的插入/更新/删除数据从 Azure SQL 数据库复制到 Azure Blob 存储。Create one copy activity to copy the inserted/updated/deleted data between the two SYS_CHANGE_VERSION values from Azure SQL Database to Azure Blob Storage.
    3. 创建 一项存储过程活动,更新 SYS_CHANGE_VERSION 的值,以便进行下一次的管道运行。Create one stored procedure activity to update the value of SYS_CHANGE_VERSION for the next pipeline run.

    增量加载流程图

如果没有 Azure 订阅,请在开始前创建一个试用帐户If you don't have an Azure subscription, create a trial account before you begin.

先决条件Prerequisites

  • Azure PowerShell。Azure PowerShell. 如何安装和配置 Azure PowerShell 中的说明安装最新的 Azure PowerShell 模块。Install the latest Azure PowerShell modules by following instructions in How to install and configure Azure PowerShell.
  • Azure SQL 数据库Azure SQL Database. 将数据库用作 数据存储。You use the database as the source data store. 如果没有 Azure SQL 数据库,请参阅创建 Azure SQL 数据库一文获取创建步骤。If you don't have a database in Azure SQL Database, see the Create a database in Azure SQL Database article for steps to create one.
  • Azure 存储帐户Azure Storage account. 将 Blob 存储用作 接收器 数据存储。You use the blob storage as the sink data store. 如果没有 Azure 存储帐户,请参阅创建存储帐户一文获取创建步骤。If you don't have an Azure storage account, see the Create a storage account article for steps to create one. 创建名为 adftutorial 的容器。Create a container named adftutorial.

在数据库中创建数据源表Create a data source table in your database

  1. 启动 SQL Server Management Studio,连接到 SQL 数据库。Launch SQL Server Management Studio, and connect to SQL Database.

  2. 在“服务器资源管理器”中,右键单击你的 数据库,然后选择“新建查询”。In Server Explorer, right-click your database and choose the New Query.

  3. 针对数据库运行以下 SQL 命令,创建名为 data_source_table 的表作为数据源存储。Run the following SQL command against your database to create a table named data_source_table as data source store.

    create table data_source_table
    (
        PersonID int NOT NULL,
        Name varchar(255),
        Age int
        PRIMARY KEY (PersonID)
    );
    
    INSERT INTO data_source_table
        (PersonID, Name, Age)
    VALUES
        (1, 'aaaa', 21),
        (2, 'bbbb', 24),
        (3, 'cccc', 20),
        (4, 'dddd', 26),
        (5, 'eeee', 22);
    
    
  4. 通过运行以下 SQL 查询,在数据库和源表 (data_source_table) 上启用 更改跟踪 机制:Enable Change Tracking mechanism on your database and the source table (data_source_table) by running the following SQL query:

    备注

    • 将 <your database name> 替换为你的数据库的名称,其中包含 data_source_table。Replace <your database name> with the name of your database that has the data_source_table.
    • 在当前的示例中,更改的数据保留两天。The changed data is kept for two days in the current example. 如果每隔三天或三天以上加载更改的数据,则不会包括某些更改的数据。If you load the changed data for every three days or more, some changed data is not included. 需将 CHANGE_RETENTION 的值更改为更大的值。You need to either change the value of CHANGE_RETENTION to a bigger number. 或者,确保在两天内加载一次更改的数据。Alternatively, ensure that your period to load the changed data is within two days. 有关详细信息,请参阅对数据库启用更改跟踪For more information, see Enable change tracking for a database
    ALTER DATABASE <your database name>
    SET CHANGE_TRACKING = ON  
    (CHANGE_RETENTION = 2 DAYS, AUTO_CLEANUP = ON)  
    
    ALTER TABLE data_source_table
    ENABLE CHANGE_TRACKING  
    WITH (TRACK_COLUMNS_UPDATED = ON)
    
  5. 通过运行以下查询,创建一个新表并存储带默认值的 ChangeTracking_version:Create a new table and store the ChangeTracking_version with a default value by running the following query:

    create table table_store_ChangeTracking_version
    (
        TableName varchar(255),
        SYS_CHANGE_VERSION BIGINT,
    );
    
    DECLARE @ChangeTracking_version BIGINT
    SET @ChangeTracking_version = CHANGE_TRACKING_CURRENT_VERSION();  
    
    INSERT INTO table_store_ChangeTracking_version
    VALUES ('data_source_table', @ChangeTracking_version)
    

    备注

    如果对 SQL 数据库启用更改跟踪后数据并未更改,则更改跟踪版本的值为 0。If the data is not changed after you enabled the change tracking for SQL Database, the value of the change tracking version is 0.

  6. 运行以下查询,在数据库中创建存储过程。Run the following query to create a stored procedure in your database. 管道会调用此存储过程,以便更新上一步创建的表中的更改跟踪版本。The pipeline invokes this stored procedure to update the change tracking version in the table you created in the previous step.

    CREATE PROCEDURE Update_ChangeTracking_Version @CurrentTrackingVersion BIGINT, @TableName varchar(50)
    AS
    
    BEGIN
    
    UPDATE table_store_ChangeTracking_version
    SET [SYS_CHANGE_VERSION] = @CurrentTrackingVersion
    WHERE [TableName] = @TableName
    
    END    
    

Azure PowerShellAzure PowerShell

如何安装和配置 Azure PowerShell 中的说明安装最新的 Azure PowerShell 模块。Install the latest Azure PowerShell modules by following instructions in How to install and configure Azure PowerShell.

创建数据工厂Create a data factory

  1. 为资源组名称定义一个变量,稍后会在 PowerShell 命令中使用该变量。Define a variable for the resource group name that you use in PowerShell commands later. 将以下命令文本复制到 PowerShell,在双引号中指定 Azure 资源组的名称,然后运行命令。Copy the following command text to PowerShell, specify a name for the Azure resource group in double quotes, and then run the command. 例如:"adfrg"For example: "adfrg".

    $resourceGroupName = "ADFTutorialResourceGroup";
    

    如果该资源组已存在,请勿覆盖它。If the resource group already exists, you may not want to overwrite it. $resourceGroupName 变量分配另一个值,然后再次运行命令Assign a different value to the $resourceGroupName variable and run the command again

  2. 定义一个用于数据工厂位置的变量:Define a variable for the location of the data factory:

    $location = "China East 2"
    
  3. 若要创建 Azure 资源组,请运行以下命令:To create the Azure resource group, run the following command:

    New-AzResourceGroup $resourceGroupName $location
    

    如果该资源组已存在,请勿覆盖它。If the resource group already exists, you may not want to overwrite it. $resourceGroupName 变量分配另一个值,然后再次运行命令。Assign a different value to the $resourceGroupName variable and run the command again.

  4. 定义一个用于数据工厂名称的变量。Define a variable for the data factory name.

    重要

    更新数据工厂名称,使之全局唯一。Update the data factory name to be globally unique.

    $dataFactoryName = "IncCopyChgTrackingDF";
    
  5. 要创建数据工厂,请运行以下 Set-AzDataFactoryV2 cmdlet:To create the data factory, run the following Set-AzDataFactoryV2 cmdlet:

    Set-AzDataFactoryV2 -ResourceGroupName $resourceGroupName -Location $location -Name $dataFactoryName
    

请注意以下几点:Note the following points:

  • Azure 数据工厂的名称必须全局唯一。The name of the Azure data factory must be globally unique. 如果收到以下错误,请更改名称并重试。If you receive the following error, change the name and try again.

    The specified Data Factory name 'ADFIncCopyChangeTrackingTestFactory' is already in use. Data Factory names must be globally unique.
    
  • 若要创建数据工厂实例,用于登录到 Azure 的用户帐户必须属于 参与者所有者 角色,或者是 Azure 订阅的 管理员To create Data Factory instances, the user account you use to log in to Azure must be a member of contributor or owner roles, or an administrator of the Azure subscription.

  • 若要查看目前提供数据工厂的 Azure 区域的列表,请在以下页面上选择感兴趣的区域,然后展开“分析” 以找到“数据工厂” :可用产品(按区域)For a list of Azure regions in which Data Factory is currently available, select the regions that interest you on the following page, and then expand Analytics to locate Data Factory: Products available by region. 数据工厂使用的数据存储(Azure 存储、Azure SQL 数据库,等等)和计算资源(HDInsight 等)可以位于其他区域中。The data stores (Azure Storage, Azure SQL Database, etc.) and computes (HDInsight, etc.) used by data factory can be in other regions.

创建链接服务Create linked services

可在数据工厂中创建链接服务,将数据存储和计算服务链接到数据工厂。You create linked services in a data factory to link your data stores and compute services to the data factory. 在本部分中,创建 Azure 存储帐户和 Azure SQL 数据库中数据库的链接服务。In this section, you create linked services to your Azure Storage account and your database in Azure SQL Database.

创建 Azure 存储链接服务。Create Azure Storage linked service.

在此步骤中,请将 Azure 存储帐户链接到数据工厂。In this step, you link your Azure Storage Account to the data factory.

  1. C:\ADFTutorials\IncCopyChangeTrackingTutorial 文件夹中,创建包含以下内容的名为 AzureStorageLinkedService.json 的 JSON 文件:(如果此文件夹尚未存在,请创建。)Create a JSON file named AzureStorageLinkedService.json in C:\ADFTutorials\IncCopyChangeTrackingTutorial folder with the following content: (Create the folder if it does not already exist.). <accountName><accountKey> 分别替换为 Azure 存储帐户的名称和密钥,然后保存文件。Replace <accountName>, <accountKey> with name and key of your Azure storage account before saving the file.

    {
        "name": "AzureStorageLinkedService",
        "properties": {
            "type": "AzureStorage",
            "typeProperties": {
                "connectionString": "DefaultEndpointsProtocol=https;AccountName=<accountName>;AccountKey=<accountKey>;EndpointSuffix=core.chinacloudapi.cn"
            }
        }
    }
    
  2. Azure PowerShell 中切换到 C:\ADFTutorials\IncCopyChangeTrackingTutorial 文件夹。In Azure PowerShell, switch to the C:\ADFTutorials\IncCopyChangeTrackingTutorial folder.

  3. 运行 Set-AzDataFactoryV2LinkedService cmdlet 来创建链接服务:AzureStorageLinkedServiceRun the Set-AzDataFactoryV2LinkedService cmdlet to create the linked service: AzureStorageLinkedService. 在以下示例中,传递 ResourceGroupNameDataFactoryName 参数的值。In the following example, you pass values for the ResourceGroupName and DataFactoryName parameters.

    Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "AzureStorageLinkedService" -File ".\AzureStorageLinkedService.json"
    

    下面是示例输出:Here is the sample output:

    LinkedServiceName : AzureStorageLinkedService
    ResourceGroupName : ADFTutorialResourceGroup
    DataFactoryName   : IncCopyChgTrackingDF
    Properties        : Microsoft.Azure.Management.DataFactory.Models.AzureStorageLinkedService
    

创建 Azure SQL 数据库链接服务Create Azure SQL Database linked service.

在此步骤中,请将数据库链接到数据工厂。In this step, you link your database to the data factory.

  1. C:\ADFTutorials\IncCopyChangeTrackingTutorial 文件夹中,创建包含以下内容的名为 AzureSQLDatabaseLinkedService.json 的 JSON 文件:将 <server>、<database name>、<user id> 和 <password> 分别替换为自己的服务器名称、数据库名称、用户 ID 和密码,然后保存文件。Create a JSON file named AzureSQLDatabaseLinkedService.json in C:\ADFTutorials\IncCopyChangeTrackingTutorial folder with the following content: Replace <server> <database name>, <user id>, and <password> with name of your server, name of your database, user ID, and password before saving the file.

    {
        "name": "AzureSQLDatabaseLinkedService",
        "properties": {
            "type": "AzureSqlDatabase",
            "typeProperties": {
                "connectionString": "Server = tcp:<server>.database.chinacloudapi.cn,1433;Initial Catalog=<database name>; Persist Security Info=False; User ID=<user name>; Password=<password>; MultipleActiveResultSets = False; Encrypt = True; TrustServerCertificate = False; Connection Timeout = 30;"
            }
        }
    }
    
  2. Azure PowerShell 中,运行 Set-AzDataFactoryV2LinkedService cmdlet 来创建链接服务:AzureSQLDatabaseLinkedServiceIn Azure PowerShell, run the Set-AzDataFactoryV2LinkedService cmdlet to create the linked service: AzureSQLDatabaseLinkedService.

    Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "AzureSQLDatabaseLinkedService" -File ".\AzureSQLDatabaseLinkedService.json"
    

    下面是示例输出:Here is the sample output:

    LinkedServiceName : AzureSQLDatabaseLinkedService
    ResourceGroupName : ADFTutorialResourceGroup
    DataFactoryName   : IncCopyChgTrackingDF
    Properties        : Microsoft.Azure.Management.DataFactory.Models.AzureSqlDatabaseLinkedService
    

创建数据集Create datasets

在此步骤中,请创建表示数据源和数据目标的数据集,In this step, you create datasets to represent data source, data destination. 并创建用于存储 SYS_CHANGE_VERSION 的位置。and the place to store the SYS_CHANGE_VERSION.

创建源数据集Create a source dataset

在此步骤中,请创建一个代表源数据的数据集。In this step, you create a dataset to represent the source data.

  1. 在同一文件夹中,创建包含以下内容的名为 SourceDataset.json 的 JSON 文件:Create a JSON file named SourceDataset.json in the same folder with the following content:

    {
        "name": "SourceDataset",
        "properties": {
            "type": "AzureSqlTable",
            "typeProperties": {
                "tableName": "data_source_table"
            },
            "linkedServiceName": {
                "referenceName": "AzureSQLDatabaseLinkedService",
                "type": "LinkedServiceReference"
            }
        }
    }   
    
  2. 运行 Set-AzDataFactoryV2Dataset cmdlet 以创建数据集:SourceDatasetRun the Set-AzDataFactoryV2Dataset cmdlet to create the dataset: SourceDataset

    Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "SourceDataset" -File ".\SourceDataset.json"
    

    下面是该 cmdlet 的示例输出:Here is the sample output of the cmdlet:

    DatasetName       : SourceDataset
    ResourceGroupName : ADFTutorialResourceGroup
    DataFactoryName   : IncCopyChgTrackingDF
    Structure         :
    Properties        : Microsoft.Azure.Management.DataFactory.Models.AzureSqlTableDataset
    

创建接收器数据集Create a sink dataset

在此步骤中,请创建一个数据集,代表从源数据存储复制的数据。In this step, you create a dataset to represent the data that is copied from the source data store.

  1. 在同一文件夹中,创建包含以下内容的名为 SinkDataset.json 的 JSON 文件:Create a JSON file named SinkDataset.json in the same folder with the following content:

    {
        "name": "SinkDataset",
        "properties": {
            "type": "AzureBlob",
            "typeProperties": {
                "folderPath": "adftutorial/incchgtracking",
                "fileName": "@CONCAT('Incremental-', pipeline().RunId, '.txt')",
                "format": {
                    "type": "TextFormat"
                }
            },
            "linkedServiceName": {
                "referenceName": "AzureStorageLinkedService",
                "type": "LinkedServiceReference"
            }
        }
    }
    

    在 Azure Blob 存储中创建 adftutorial 容器,这是先决条件的部分要求。You create the adftutorial container in your Azure Blob Storage as part of the prerequisites. 创建容器(如果不存在),或者将容器设置为现有容器的名称。Create the container if it does not exist (or) set it to the name of an existing one. 在本教程中,输出文件名是使用以下表达式动态生成的:@CONCAT('Incremental-', pipeline().RunId, '.txt')。In this tutorial, the output file name is dynamically generated by using the expression: @CONCAT('Incremental-', pipeline().RunId, '.txt').

  2. 运行 Set-AzDataFactoryV2Dataset cmdlet 以创建数据集:SinkDatasetRun the Set-AzDataFactoryV2Dataset cmdlet to create the dataset: SinkDataset

    Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "SinkDataset" -File ".\SinkDataset.json"
    

    下面是该 cmdlet 的示例输出:Here is the sample output of the cmdlet:

    DatasetName       : SinkDataset
    ResourceGroupName : ADFTutorialResourceGroup
    DataFactoryName   : IncCopyChgTrackingDF
    Structure         :
    Properties        : Microsoft.Azure.Management.DataFactory.Models.AzureBlobDataset
    

创建更改跟踪数据集Create a change tracking dataset

在此步骤中,请创建用于存储更改跟踪版本的数据集。In this step, you create a dataset for storing the change tracking version.

  1. 在同一文件夹中,创建包含以下内容的名为 ChangeTrackingDataset.json 的 JSON 文件:Create a JSON file named ChangeTrackingDataset.json in the same folder with the following content:

    {
        "name": " ChangeTrackingDataset",
        "properties": {
            "type": "AzureSqlTable",
            "typeProperties": {
                "tableName": "table_store_ChangeTracking_version"
            },
            "linkedServiceName": {
                "referenceName": "AzureSQLDatabaseLinkedService",
                "type": "LinkedServiceReference"
            }
        }
    }
    

    创建 table_store_ChangeTracking_version 表,这是先决条件的部分要求。You create the table table_store_ChangeTracking_version as part of the prerequisites.

  2. 运行 Set-AzDataFactoryV2Dataset cmdlet 以创建数据集:ChangeTrackingDatasetRun the Set-AzDataFactoryV2Dataset cmdlet to create the dataset: ChangeTrackingDataset

    Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "ChangeTrackingDataset" -File ".\ChangeTrackingDataset.json"
    

    下面是该 cmdlet 的示例输出:Here is the sample output of the cmdlet:

    DatasetName       : ChangeTrackingDataset
    ResourceGroupName : ADFTutorialResourceGroup
    DataFactoryName   : IncCopyChgTrackingDF
    Structure         :
    Properties        : Microsoft.Azure.Management.DataFactory.Models.AzureSqlTableDataset
    

创建用于完整复制的管道Create a pipeline for the full copy

在这一步,请创建一个包含复制活动的管道,将完整数据从源数据存储(Azure SQL 数据库)复制到目标数据存储(Azure Blob 存储)。In this step, you create a pipeline with a copy activity that copies the entire data from the source data store (Azure SQL Database) to the destination data store (Azure Blob Storage).

  1. 创建一个 JSON 文件:在同一文件夹中,创建包含以下内容的 FullCopyPipeline.json:Create a JSON file: FullCopyPipeline.json in same folder with the following content:

    {
        "name": "FullCopyPipeline",
        "properties": {
            "activities": [{
                "name": "FullCopyActivity",
                "type": "Copy",
                "typeProperties": {
                    "source": {
                        "type": "SqlSource"
                    },
                    "sink": {
                        "type": "BlobSink"
                    }
                },
    
                "inputs": [{
                    "referenceName": "SourceDataset",
                    "type": "DatasetReference"
                }],
                "outputs": [{
                    "referenceName": "SinkDataset",
                    "type": "DatasetReference"
                }]
            }]
        }
    }
    
  2. 运行 Set-AzDataFactoryV2Pipeline cmdlet 来创建管道:FullCopyPipeline。Run the Set-AzDataFactoryV2Pipeline cmdlet to create the pipeline: FullCopyPipeline.

     Set-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "FullCopyPipeline" -File ".\FullCopyPipeline.json"
    

    下面是示例输出:Here is the sample output:

     PipelineName      : FullCopyPipeline
     ResourceGroupName : ADFTutorialResourceGroup
     DataFactoryName   : IncCopyChgTrackingDF
     Activities        : {FullCopyActivity}
     Parameters        :
    

运行完整的复制管道Run the full copy pipeline

运行管道:使用 Invoke-AzDataFactoryV2Pipeline cmdlet 运行管道 FullCopyPipelineRun the pipeline: FullCopyPipeline by using Invoke-AzDataFactoryV2Pipeline cmdlet.

Invoke-AzDataFactoryV2Pipeline -PipelineName "FullCopyPipeline" -ResourceGroup $resourceGroupName -dataFactoryName $dataFactoryName        

监视完整的复制管道Monitor the full copy pipeline

  1. 登录到 Azure 门户Log in to Azure portal.

  2. 单击“所有服务”,使用关键字 data factories 进行搜索,然后选择“数据工厂”。 Click All services, search with the keyword data factories, and select Data factories.

    数据工厂菜单

  3. 在数据工厂列表中搜索 你的数据工厂,然后选择它来启动“数据工厂”页。Search for your data factory in the list of data factories, and select it to launch the Data factory page.

    搜索你的数据工厂

  4. 在“数据工厂”页中,单击“监视和管理”磁贴。In the Data factory page, click Monitor & Manage tile.

    “监视和管理”磁贴

  5. 数据集成应用程序 在单独的选项卡中启动。可以看到所有 管道运行 及其状态。The Data Integration Application launches in a separate tab. You can see all the pipeline runs and their statuses. 请注意,在以下示例中,管道运行的状态为“成功”。Notice that in the following example, the status of the pipeline run is Succeeded. 单击“参数”列中的链接即可查看传递至管道的参数。You can check parameters passed to the pipeline by clicking link in the Parameters column. 如果有错误,在“错误”列可以看到链接。If there was an error, you see a link in the Error column. 单击“操作”列中的链接。Click the link in the Actions column.

    屏幕截图显示了数据工厂的管道运行。

  6. 单击“操作”列中的链接时,可以看到以下页面,其中显示管道的所有 活动运行When you click the link in the Actions column, you see the following page that shows all the activity runs for the pipeline.

    屏幕截图显示了已标注“管道”链接的数据工厂的活动运行。

  7. 若要切换回“管道运行”视图,请单击“管道”,如图所示。To switch back to the Pipeline runs view, click Pipelines as shown in the image.

查看结果Review the results

可以在 adftutorial 容器的 incchgtracking 文件夹中看到名为 incremental-<GUID>.txt 的文件。You see a file named incremental-<GUID>.txt in the incchgtracking folder of the adftutorial container.

来自完整复制的输出文件

该文件应包含数据库中的数据:The file should have the data from your database:

1,aaaa,21
2,bbbb,24
3,cccc,20
4,dddd,26
5,eeee,22

向源表中添加更多数据Add more data to the source table

对数据库运行以下查询来添加和更新行。Run the following query against your database to add a row and update a row.

INSERT INTO data_source_table
(PersonID, Name, Age)
VALUES
(6, 'new','50');


UPDATE data_source_table
SET [Age] = '10', [name]='update' where [PersonID] = 1

创建用于增量复制的管道Create a pipeline for the delta copy

在此步骤中,请创建一个包含以下活动的管道并定期运行。In this step, you create a pipeline with the following activities, and run it periodically. 查找活动 从 Azure SQL 数据库获取旧的和新的 SYS_CHANGE_VERSION,然后将其传递至复制活动。The lookup activities get the old and new SYS_CHANGE_VERSION from Azure SQL Database and pass it to copy activity. 复制活动 将两个 SYS_CHANGE_VERSION 值之间的插入/更新/删除数据从 Azure SQL 数据库复制到 Azure Blob 存储。The copy activity copies the inserted/updated/deleted data between the two SYS_CHANGE_VERSION values from Azure SQL Database to Azure Blob Storage. 存储过程活动 更新 SYS_CHANGE_VERSION 的值,以便进行下一次的管道运行。The stored procedure activity updates the value of SYS_CHANGE_VERSION for the next pipeline run.

  1. 创建一个 JSON 文件:在同一文件夹中,创建包含以下内容的 IncrementalCopyPipeline.json:Create a JSON file: IncrementalCopyPipeline.json in same folder with the following content:

    {
        "name": "IncrementalCopyPipeline",
        "properties": {
            "activities": [
                {
                    "name": "LookupLastChangeTrackingVersionActivity",
                    "type": "Lookup",
                    "typeProperties": {
                        "source": {
                            "type": "SqlSource",
                            "sqlReaderQuery": "select * from table_store_ChangeTracking_version"
                        },
                        "dataset": {
                            "referenceName": "ChangeTrackingDataset",
                            "type": "DatasetReference"
                        }
                    }
                },
                {
                    "name": "LookupCurrentChangeTrackingVersionActivity",
                    "type": "Lookup",
                    "typeProperties": {
                        "source": {
                            "type": "SqlSource",
                            "sqlReaderQuery": "SELECT CHANGE_TRACKING_CURRENT_VERSION() as CurrentChangeTrackingVersion"
                        },
                        "dataset": {
                            "referenceName": "SourceDataset",
                            "type": "DatasetReference"
                        }
                    }
                },
                {
                    "name": "IncrementalCopyActivity",
                    "type": "Copy",
                    "typeProperties": {
                        "source": {
                            "type": "SqlSource",
                            "sqlReaderQuery": "select data_source_table.PersonID,data_source_table.Name,data_source_table.Age, CT.SYS_CHANGE_VERSION, SYS_CHANGE_OPERATION from data_source_table RIGHT OUTER JOIN CHANGETABLE(CHANGES data_source_table, @{activity('LookupLastChangeTrackingVersionActivity').output.firstRow.SYS_CHANGE_VERSION}) as CT on data_source_table.PersonID = CT.PersonID where CT.SYS_CHANGE_VERSION <= @{activity('LookupCurrentChangeTrackingVersionActivity').output.firstRow.CurrentChangeTrackingVersion}"
                        },
                        "sink": {
                            "type": "BlobSink"
                        }
                    },
                    "dependsOn": [
                        {
                            "activity": "LookupLastChangeTrackingVersionActivity",
                            "dependencyConditions": [
                                "Succeeded"
                            ]
                        },
                        {
                            "activity": "LookupCurrentChangeTrackingVersionActivity",
                            "dependencyConditions": [
                                "Succeeded"
                            ]
                        }
                    ],
                    "inputs": [
                        {
                            "referenceName": "SourceDataset",
                            "type": "DatasetReference"
                        }
                    ],
                    "outputs": [
                        {
                            "referenceName": "SinkDataset",
                            "type": "DatasetReference"
                        }
                    ]
                },
                {
                    "name": "StoredProceduretoUpdateChangeTrackingActivity",
                    "type": "SqlServerStoredProcedure",
                    "typeProperties": {
                        "storedProcedureName": "Update_ChangeTracking_Version",
                        "storedProcedureParameters": {
                            "CurrentTrackingVersion": {
                                "value": "@{activity('LookupCurrentChangeTrackingVersionActivity').output.firstRow.CurrentChangeTrackingVersion}",
                                "type": "INT64"
                            },
                            "TableName": {
                                "value": "@{activity('LookupLastChangeTrackingVersionActivity').output.firstRow.TableName}",
                                "type": "String"
                            }
                        }
                    },
                    "linkedServiceName": {
                        "referenceName": "AzureSQLDatabaseLinkedService",
                        "type": "LinkedServiceReference"
                    },
                    "dependsOn": [
                        {
                            "activity": "IncrementalCopyActivity",
                            "dependencyConditions": [
                                "Succeeded"
                            ]
                        }
                    ]
                }
            ]
        }
    }
    
  2. 运行 Set-AzDataFactoryV2Pipeline cmdlet 来创建管道:FullCopyPipeline。Run the Set-AzDataFactoryV2Pipeline cmdlet to create the pipeline: FullCopyPipeline.

     Set-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "IncrementalCopyPipeline" -File ".\IncrementalCopyPipeline.json"
    

    下面是示例输出:Here is the sample output:

     PipelineName      : IncrementalCopyPipeline
     ResourceGroupName : ADFTutorialResourceGroup
     DataFactoryName   : IncCopyChgTrackingDF
     Activities        : {LookupLastChangeTrackingVersionActivity, LookupCurrentChangeTrackingVersionActivity, IncrementalCopyActivity, StoredProceduretoUpdateChangeTrackingActivity}
     Parameters        :
    

运行增量复制管道Run the incremental copy pipeline

运行管道:使用 Invoke-AzDataFactoryV2Pipeline cmdlet 运行管道 IncrementalCopyPipelineRun the pipeline: IncrementalCopyPipeline by using Invoke-AzDataFactoryV2Pipeline cmdlet.

Invoke-AzDataFactoryV2Pipeline -PipelineName "IncrementalCopyPipeline" -ResourceGroup $resourceGroupName -dataFactoryName $dataFactoryName     

监视增量复制管道Monitor the incremental copy pipeline

  1. 数据集成应用程序 中,刷新“管道运行”视图。In the Data Integration Application, refresh the pipeline runs view. 确认在列表中看到 IncrementalCopyPipeline。Confirm that you see the IncrementalCopyPipeline in the list. 单击“操作”列中的链接。Click the link in the Actions column.

    屏幕截图显示了数据工厂的管道运行,包括你的管道。

  2. 单击“操作”列中的链接时,可以看到以下页面,其中显示管道的所有 活动运行When you click the link in the Actions column, you see the following page that shows all the activity runs for the pipeline.

    屏幕截图显示了数据工厂的管道运行,其中几个已标记为“成功”。

  3. 若要切换回“管道运行”视图,请单击“管道”,如图所示。To switch back to the Pipeline runs view, click Pipelines as shown in the image.

查看结果Review the results

可以在 adftutorial 容器的 incchgtracking 文件夹中看到第二个文件。You see the second file in the incchgtracking folder of the adftutorial container.

来自增量复制的输出文件

该文件应该只包含数据库中的增量数据。The file should have only the delta data from your database. U 的记录是数据库中的更新行,带 I 的记录是添加的行。The record with U is the updated row in the database and I is the one added row.

1,update,10,2,U
6,new,50,1,I

前三个列是 data_source_table 中的更改数据。The first three columns are changed data from data_source_table. 最后两个列是更改跟踪系统表中的元数据。The last two columns are the metadata from change tracking system table. 第四列是每个更改行的 SYS_CHANGE_VERSION。The fourth column is the SYS_CHANGE_VERSION for each changed row. 第五列是操作:U = update(更新),I = insert(插入)。The fifth column is the operation: U = update, I = insert. 如需详细了解更改跟踪信息,请参阅 CHANGETABLEFor details about the change tracking information, see CHANGETABLE.

==================================================================
PersonID Name    Age    SYS_CHANGE_VERSION    SYS_CHANGE_OPERATION
==================================================================
1        update  10            2                                 U
6        new     50            1                                 I

后续步骤Next steps

继续查看以下教程,了解如何仅基于 LastModifiedDate 来复制新的和更改的文件:Advance to the following tutorial to learn about copying new and changed files only based on their LastModifiedDate: