使用 PowerShell 以增量方式将 Azure SQL 数据库中的数据加载到 Azure Blob 存储Incrementally load data from Azure SQL Database to Azure Blob storage using PowerShell

适用于: Azure 数据工厂 Azure Synapse Analytics(预览版)

在本教程中,创建一个带管道的 Azure 数据工厂,将增量数据从 Azure SQL 数据库中的表加载到 Azure Blob 存储。In this tutorial, you create an Azure data factory with a pipeline that loads delta data from a table in Azure SQL Database to Azure Blob storage.

在本教程中执行以下步骤:You perform the following steps in this tutorial:

  • 准备用于存储水印值的数据存储。Prepare the data store to store the watermark value.
  • 创建数据工厂。Create a data factory.
  • 创建链接服务。Create linked services.
  • 创建源、接收器和水印数据集。Create source, sink, and watermark datasets.
  • 创建管道。Create a pipeline.
  • 运行管道。Run the pipeline.
  • 监视管道运行。Monitor the pipeline run.

概述Overview

下面是高级解决方案示意图:Here is the high-level solution diagram:

以增量方式加载数据

下面是创建此解决方案所要执行的重要步骤:Here are the important steps to create this solution:

  1. 选择水印列Select the watermark column. 在源数据存储中选择一个列,该列可用于将每个运行的新记录或已更新记录切片。Select one column in the source data store, which can be used to slice the new or updated records for every run. 通常,在创建或更新行时,此选定列中的数据(例如 last_modify_time 或 ID)会不断递增。Normally, the data in this selected column (for example, last_modify_time or ID) keeps increasing when rows are created or updated. 此列中的最大值用作水印。The maximum value in this column is used as a watermark.

  2. 准备用于存储水印值的数据存储Prepare a data store to store the watermark value.
    本教程在 SQL 数据库中存储水印值。In this tutorial, you store the watermark value in a SQL database.

  3. 创建采用以下工作流的管道Create a pipeline with the following workflow:

    此解决方案中的管道具有以下活动:The pipeline in this solution has the following activities:

    • 创建两个 Lookup 活动。Create two Lookup activities. 使用第一个 Lookup 活动检索上一个水印值。Use the first Lookup activity to retrieve the last watermark value. 使用第二个 Lookup 活动检索新的水印值。Use the second Lookup activity to retrieve the new watermark value. 这些水印值会传递到 Copy 活动。These watermark values are passed to the Copy activity.
    • 创建 Copy 活动,用于复制源数据存储中其水印列值大于旧水印值但小于新水印值的行。Create a Copy activity that copies rows from the source data store with the value of the watermark column greater than the old watermark value and less than the new watermark value. 然后,该活动将源数据存储中的增量数据作为新文件复制到 Blob 存储。Then, it copies the delta data from the source data store to Blob storage as a new file.
    • 创建 StoredProcedure 活动,用于更新下一次运行的管道的水印值。Create a StoredProcedure activity that updates the watermark value for the pipeline that runs next time.

如果没有 Azure 订阅,可在开始前创建一个 1 元人民币试用帐户。If you don't have an Azure subscription, create a 1rmb trial account before you begin.

先决条件Prerequisites

备注

本文进行了更新,以便使用新的 Azure PowerShell Az 模块。This article has been updated to use the new Azure PowerShell Az module. 你仍然可以使用 AzureRM 模块,至少在 2020 年 12 月之前,它将继续接收 bug 修补程序。You can still use the AzureRM module, which will continue to receive bug fixes until at least December 2020. 若要详细了解新的 Az 模块和 AzureRM 兼容性,请参阅新 Azure Powershell Az 模块简介To learn more about the new Az module and AzureRM compatibility, see Introducing the new Azure PowerShell Az module. 有关 Az 模块安装说明,请参阅安装 Azure PowerShellFor Az module installation instructions, see Install Azure PowerShell.

在 SQL 数据库中创建数据源表Create a data source table in your SQL database

  1. 打开 SQL Server Management Studio。Open SQL Server Management Studio. 在“服务器资源管理器”中,右键单击数据库,然后选择“新建查询”。In Server Explorer, right-click the database, and choose New Query.

  2. 针对 SQL 数据库运行以下 SQL 命令,创建名为 data_source_table 的表作为数据源存储:Run the following SQL command against your SQL database to create a table named data_source_table as the data source store:

    create table data_source_table
    (
        PersonID int,
        Name varchar(255),
        LastModifytime datetime
    );
    
    INSERT INTO data_source_table
    (PersonID, Name, LastModifytime)
    VALUES
    (1, 'aaaa','9/1/2017 12:56:00 AM'),
    (2, 'bbbb','9/2/2017 5:23:00 AM'),
    (3, 'cccc','9/3/2017 2:36:00 AM'),
    (4, 'dddd','9/4/2017 3:21:00 AM'),
    (5, 'eeee','9/5/2017 8:06:00 AM');
    

    本教程使用 LastModifytime 作为水印列。In this tutorial, you use LastModifytime as the watermark column. 下表显示了数据源存储中的数据:The data in the data source store is shown in the following table:

    PersonID | Name | LastModifytime
    -------- | ---- | --------------
    1 | aaaa | 2017-09-01 00:56:00.000
    2 | bbbb | 2017-09-02 05:23:00.000
    3 | cccc | 2017-09-03 02:36:00.000
    4 | dddd | 2017-09-04 03:21:00.000
    5 | eeee | 2017-09-05 08:06:00.000
    

在 SQL 数据库中创建另一个表,用于存储高水印值Create another table in your SQL database to store the high watermark value

  1. 针对 SQL 数据库运行以下 SQL 命令,创建名为 watermarktable 的表,用于存储水印值:Run the following SQL command against your SQL database to create a table named watermarktable to store the watermark value:

    create table watermarktable
    (
    
    TableName varchar(255),
    WatermarkValue datetime,
    );
    
  2. 使用源数据存储的表名设置高水印的默认值。Set the default value of the high watermark with the table name of source data store. 在本教程中,表名为 data_source_table。In this tutorial, the table name is data_source_table.

    INSERT INTO watermarktable
    VALUES ('data_source_table','1/1/2010 12:00:00 AM')    
    
  3. 查看 watermarktable 表中的数据。Review the data in the table watermarktable.

    Select * from watermarktable
    

    输出:Output:

    TableName  | WatermarkValue
    ----------  | --------------
    data_source_table | 2010-01-01 00:00:00.000
    

在 SQL 数据库中创建存储过程Create a stored procedure in your SQL database

运行以下命令,在 SQL 数据库中创建存储过程:Run the following command to create a stored procedure in your SQL database:

CREATE PROCEDURE usp_write_watermark @LastModifiedtime datetime, @TableName varchar(50)
AS

BEGIN

    UPDATE watermarktable
    SET [WatermarkValue] = @LastModifiedtime
WHERE [TableName] = @TableName

END

创建数据工厂Create a data factory

  1. 为资源组名称定义一个变量,稍后会在 PowerShell 命令中使用该变量。Define a variable for the resource group name that you use in PowerShell commands later. 将以下命令文本复制到 PowerShell,在双引号中指定 Azure 资源组的名称,然后运行命令。Copy the following command text to PowerShell, specify a name for the Azure resource group in double quotation marks, and then run the command. 示例为 "adfrg"An example is "adfrg".

    $resourceGroupName = "ADFTutorialResourceGroup";
    

    如果该资源组已存在,请勿覆盖它。If the resource group already exists, you might not want to overwrite it. $resourceGroupName 变量分配另一个值,然后再次运行命令Assign a different value to the $resourceGroupName variable, and run the command again.

  2. 定义一个用于数据工厂位置的变量。Define a variable for the location of the data factory.

    $location = "China East 2"
    
  3. 若要创建 Azure 资源组,请运行以下命令:To create the Azure resource group, run the following command:

    New-AzResourceGroup $resourceGroupName $location
    

    如果该资源组已存在,请勿覆盖它。If the resource group already exists, you might not want to overwrite it. $resourceGroupName 变量分配另一个值,然后再次运行命令Assign a different value to the $resourceGroupName variable, and run the command again.

  4. 定义一个用于数据工厂名称的变量。Define a variable for the data factory name.

    重要

    更新数据工厂名称,使之全局唯一。Update the data factory name to make it globally unique. 例如 ADFTutorialFactorySP1127。An example is ADFTutorialFactorySP1127.

    $dataFactoryName = "ADFIncCopyTutorialFactory";
    
  5. 要创建数据工厂,请运行以下 Set-AzDataFactoryV2 cmdlet:To create the data factory, run the following Set-AzDataFactoryV2 cmdlet:

    Set-AzDataFactoryV2 -ResourceGroupName $resourceGroupName -Location "China East 2" -Name $dataFactoryName 
    

请注意以下几点:Note the following points:

  • 数据工厂的名称必须全局唯一。The name of the data factory must be globally unique. 如果收到以下错误,请更改名称并重试:If you receive the following error, change the name and try again:

    The specified Data Factory name 'ADFv2QuickStartDataFactory' is already in use. Data Factory names must be globally unique.
    
  • 若要创建数据工厂实例,用于登录到 Azure 的用户帐户必须属于参与者或所有者角色,或者是 Azure 订阅的管理员。To create Data Factory instances, the user account you use to sign in to Azure must be a member of contributor or owner roles, or an administrator of the Azure subscription.

  • 若要查看目前提供数据工厂的 Azure 区域的列表,请在以下页面上选择感兴趣的区域,然后展开“分析”以找到“数据工厂”:可用产品(按区域)For a list of Azure regions in which Data Factory is currently available, select the regions that interest you on the following page, and then expand Analytics to locate Data Factory: Products available by region. 数据工厂使用的数据存储(Azure 存储、SQL 数据库、Azure SQL 托管实例等)和计算资源(Azure HDInsight 等)可以位于其他区域中。The data stores (Storage, SQL Database, Azure SQL Managed Instance, and so on) and computes (Azure HDInsight, etc.) used by the data factory can be in other regions.

创建链接服务Create linked services

可在数据工厂中创建链接服务,将数据存储和计算服务链接到数据工厂。You create linked services in a data factory to link your data stores and compute services to the data factory. 在本部分中,创建到存储帐户和 SQL 数据库的链接服务。In this section, you create linked services to your storage account and SQL Database.

创建存储链接服务Create a Storage linked service

  1. 在 C:\ADF 文件夹中,创建包含以下内容的名为 AzureStorageLinkedService.json 的 JSON 文件。Create a JSON file named AzureStorageLinkedService.json in the C:\ADF folder with the following content. (如果文件夹 ADF 不存在,请创建。)将 <accountName><accountKey> 替换为存储帐户的名称和密钥,然后保存文件。(Create the folder ADF if it doesn't already exist.) Replace <accountName> and <accountKey> with the name and key of your storage account before you save the file.

    {
        "name": "AzureStorageLinkedService",
        "properties": {
            "type": "AzureStorage",
            "typeProperties": {
                "connectionString": "DefaultEndpointsProtocol=https;AccountName=<accountName>;AccountKey=<accountKey>;EndpointSuffix=core.chinacloudapi.cn"
            }
        }
    }
    
  2. 在 PowerShell 中切换到 ADF 文件夹。In PowerShell, switch to the ADF folder.

  3. 运行 Set-AzDataFactoryV2LinkedService cmdlet 以创建链接服务 AzureStorageLinkedService。Run the Set-AzDataFactoryV2LinkedService cmdlet to create the linked service AzureStorageLinkedService. 在以下示例中,传递 ResourceGroupNameDataFactoryName 参数的值:In the following example, you pass values for the ResourceGroupName and DataFactoryName parameters:

    Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "AzureStorageLinkedService" -File ".\AzureStorageLinkedService.json"
    

    下面是示例输出:Here is the sample output:

    LinkedServiceName : AzureStorageLinkedService
    ResourceGroupName : <resourceGroupName>
    DataFactoryName   : <dataFactoryName>
    Properties        : Microsoft.Azure.Management.DataFactory.Models.AzureStorageLinkedService
    

创建 SQL 数据库链接服务Create a SQL Database linked service

  1. 在 C:\ADF 文件夹中,创建包含以下内容的名为 AzureSQLDatabaseLinkedService.json 的 JSON 文件。Create a JSON file named AzureSQLDatabaseLinkedService.json in the C:\ADF folder with the following content. (如果文件夹 ADF 不存在,请创建。)将 <server>、<database>、<user id> 和 <password> 分别替换为自己的服务器名称、数据库、用户 ID 和密码,然后保存文件。(Create the folder ADF if it doesn't already exist.) Replace <server>, <database>, <user id>, and <password> with the name of your server, database, user ID, and password before you save the file.

    {
        "name": "AzureSQLDatabaseLinkedService",
        "properties": {
            "type": "AzureSqlDatabase",
            "typeProperties": {
                "connectionString": "Server = tcp:<server>.database.chinacloudapi.cn,1433;Initial Catalog=<database>; Persist Security Info=False; User ID=<user> ; Password=<password>; MultipleActiveResultSets = False; Encrypt = True; TrustServerCertificate = False; Connection Timeout = 30;"
            }
        }
    }
    
  2. 在 PowerShell 中切换到 ADF 文件夹。In PowerShell, switch to the ADF folder.

  3. 运行 Set-AzDataFactoryV2LinkedService cmdlet 以创建链接服务 AzureSQLDatabaseLinkedService。Run the Set-AzDataFactoryV2LinkedService cmdlet to create the linked service AzureSQLDatabaseLinkedService.

    Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "AzureSQLDatabaseLinkedService" -File ".\AzureSQLDatabaseLinkedService.json"
    

    下面是示例输出:Here is the sample output:

    LinkedServiceName : AzureSQLDatabaseLinkedService
    ResourceGroupName : ADF
    DataFactoryName   : incrementalloadingADF
    Properties        : Microsoft.Azure.Management.DataFactory.Models.AzureSqlDatabaseLinkedService
    ProvisioningState :
    

创建数据集Create datasets

在此步骤中,创建表示源和接收器数据的数据集。In this step, you create datasets to represent source and sink data.

创建源数据集Create a source dataset

  1. 在同一文件夹中,创建包含以下内容的名为 SourceDataset.json 的 JSON 文件:Create a JSON file named SourceDataset.json in the same folder with the following content:

    {
        "name": "SourceDataset",
        "properties": {
            "type": "AzureSqlTable",
            "typeProperties": {
                "tableName": "data_source_table"
            },
            "linkedServiceName": {
                "referenceName": "AzureSQLDatabaseLinkedService",
                "type": "LinkedServiceReference"
            }
        }
    }
    
    

    本教程使用表名 data_source_table。In this tutorial, you use the table name data_source_table. 如果使用其他名称的表,请替换名称。Replace it if you use a table with a different name.

  2. 运行 Set-AzDataFactoryV2Dataset cmdlet 以创建数据集 SourceDataset。Run the Set-AzDataFactoryV2Dataset cmdlet to create the dataset SourceDataset.

    Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "SourceDataset" -File ".\SourceDataset.json"
    

    下面是该 cmdlet 的示例输出:Here is the sample output of the cmdlet:

    DatasetName       : SourceDataset
    ResourceGroupName : ADF
    DataFactoryName   : incrementalloadingADF
    Structure         :
    Properties        : Microsoft.Azure.Management.DataFactory.Models.AzureSqlTableDataset
    

创建接收器数据集Create a sink dataset

  1. 在同一文件夹中,创建包含以下内容的名为 SinkDataset.json 的 JSON 文件:Create a JSON file named SinkDataset.json in the same folder with the following content:

    {
        "name": "SinkDataset",
        "properties": {
            "type": "AzureBlob",
            "typeProperties": {
                "folderPath": "adftutorial/incrementalcopy",
                "fileName": "@CONCAT('Incremental-', pipeline().RunId, '.txt')",
                "format": {
                    "type": "TextFormat"
                }
            },
            "linkedServiceName": {
                "referenceName": "AzureStorageLinkedService",
                "type": "LinkedServiceReference"
            }
        }
    }   
    

    重要

    此代码片段假设 Blob 存储中有一个名为 adftutorial 的 Blob 容器。This snippet assumes that you have a blob container named adftutorial in your blob storage. 创建容器(如果不存在),或者将容器设置为现有容器的名称。Create the container if it doesn't exist, or set it to the name of an existing one. 会自动创建输出文件夹 incrementalcopy(如果容器中不存在)。The output folder incrementalcopy is automatically created if it doesn't exist in the container. 在本教程中,文件名是使用表达式 @CONCAT('Incremental-', pipeline().RunId, '.txt') 动态生成的。In this tutorial, the file name is dynamically generated by using the expression @CONCAT('Incremental-', pipeline().RunId, '.txt').

  2. 运行 Set-AzDataFactoryV2Dataset cmdlet 以创建数据集 SinkDataset。Run the Set-AzDataFactoryV2Dataset cmdlet to create the dataset SinkDataset.

    Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "SinkDataset" -File ".\SinkDataset.json"
    

    下面是该 cmdlet 的示例输出:Here is the sample output of the cmdlet:

    DatasetName       : SinkDataset
    ResourceGroupName : ADF
    DataFactoryName   : incrementalloadingADF
    Structure         :
    Properties        : Microsoft.Azure.Management.DataFactory.Models.AzureBlobDataset    
    

为水印创建数据集Create a dataset for a watermark

在此步骤中,创建用于存储高水印值的数据集。In this step, you create a dataset for storing a high watermark value.

  1. 在同一文件夹中,创建包含以下内容的名为 WatermarkDataset.json 的 JSON 文件:Create a JSON file named WatermarkDataset.json in the same folder with the following content:

    {
        "name": " WatermarkDataset ",
        "properties": {
            "type": "AzureSqlTable",
            "typeProperties": {
                "tableName": "watermarktable"
            },
            "linkedServiceName": {
                "referenceName": "AzureSQLDatabaseLinkedService",
                "type": "LinkedServiceReference"
            }
        }
    }    
    
  2. 运行 Set-AzDataFactoryV2Dataset cmdlet 以创建数据集 WatermarkDataset。Run the Set-AzDataFactoryV2Dataset cmdlet to create the dataset WatermarkDataset.

    Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "WatermarkDataset" -File ".\WatermarkDataset.json"
    

    下面是该 cmdlet 的示例输出:Here is the sample output of the cmdlet:

    DatasetName       : WatermarkDataset
    ResourceGroupName : ADF
    DataFactoryName   : incrementalloadingADF
    Structure         :
    Properties        : Microsoft.Azure.Management.DataFactory.Models.AzureSqlTableDataset    
    

创建管道Create a pipeline

本教程创建包含两个 Lookup 活动、一个 Copy 活动和一个 StoredProcedure 活动的管道,这些活动链接在一个管道中。In this tutorial, you create a pipeline with two Lookup activities, one Copy activity, and one StoredProcedure activity chained in one pipeline.

  1. 在同一文件夹中,创建包含以下内容的 JSON 文件 IncrementalCopyPipeline.json:Create a JSON file IncrementalCopyPipeline.json in the same folder with the following content:

    {
        "name": "IncrementalCopyPipeline",
        "properties": {
            "activities": [
                {
                    "name": "LookupOldWaterMarkActivity",
                    "type": "Lookup",
                    "typeProperties": {
                        "source": {
                        "type": "SqlSource",
                        "sqlReaderQuery": "select * from watermarktable"
                        },
    
                        "dataset": {
                        "referenceName": "WatermarkDataset",
                        "type": "DatasetReference"
                        }
                    }
                },
                {
                    "name": "LookupNewWaterMarkActivity",
                    "type": "Lookup",
                    "typeProperties": {
                        "source": {
                            "type": "SqlSource",
                            "sqlReaderQuery": "select MAX(LastModifytime) as NewWatermarkvalue from data_source_table"
                        },
    
                        "dataset": {
                        "referenceName": "SourceDataset",
                        "type": "DatasetReference"
                        }
                    }
                },
    
                {
                    "name": "IncrementalCopyActivity",
                    "type": "Copy",
                    "typeProperties": {
                        "source": {
                            "type": "SqlSource",
                            "sqlReaderQuery": "select * from data_source_table where LastModifytime > '@{activity('LookupOldWaterMarkActivity').output.firstRow.WatermarkValue}' and LastModifytime <= '@{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}'"
                        },
                        "sink": {
                            "type": "BlobSink"
                        }
                    },
                    "dependsOn": [
                        {
                            "activity": "LookupNewWaterMarkActivity",
                            "dependencyConditions": [
                                "Succeeded"
                            ]
                        },
                        {
                            "activity": "LookupOldWaterMarkActivity",
                            "dependencyConditions": [
                                "Succeeded"
                            ]
                        }
                    ],
    
                    "inputs": [
                        {
                            "referenceName": "SourceDataset",
                            "type": "DatasetReference"
                        }
                    ],
                    "outputs": [
                        {
                            "referenceName": "SinkDataset",
                            "type": "DatasetReference"
                        }
                    ]
                },
    
                {
                    "name": "StoredProceduretoWriteWatermarkActivity",
                    "type": "SqlServerStoredProcedure",
                    "typeProperties": {
    
                        "storedProcedureName": "usp_write_watermark",
                        "storedProcedureParameters": {
                            "LastModifiedtime": {"value": "@{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}", "type": "datetime" },
                            "TableName":  { "value":"@{activity('LookupOldWaterMarkActivity').output.firstRow.TableName}", "type":"String"}
                        }
                    },
    
                    "linkedServiceName": {
                        "referenceName": "AzureSQLDatabaseLinkedService",
                        "type": "LinkedServiceReference"
                    },
    
                    "dependsOn": [
                        {
                            "activity": "IncrementalCopyActivity",
                            "dependencyConditions": [
                                "Succeeded"
                            ]
                        }
                    ]
                }
            ]
    
        }
    }
    
  2. 运行 Set-AzDataFactoryV2Pipeline cmdlet 以创建管道 IncrementalCopyPipeline。Run the Set-AzDataFactoryV2Pipeline cmdlet to create the pipeline IncrementalCopyPipeline.

    Set-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "IncrementalCopyPipeline" -File ".\IncrementalCopyPipeline.json"
    

    下面是示例输出:Here is the sample output:

     PipelineName      : IncrementalCopyPipeline
     ResourceGroupName : ADF
     DataFactoryName   : incrementalloadingADF
     Activities        : {LookupOldWaterMarkActivity, LookupNewWaterMarkActivity, IncrementalCopyActivity, StoredProceduretoWriteWatermarkActivity}
     Parameters        :
    

运行管道Run the pipeline

  1. 使用 Invoke-AzDataFactoryV2Pipeline cmdlet 运行管道 IncrementalCopyPipeline。Run the pipeline IncrementalCopyPipeline by using the Invoke-AzDataFactoryV2Pipeline cmdlet. 将占位符替换为自己的资源组和数据工厂名称。Replace placeholders with your own resource group and data factory name.

    $RunId = Invoke-AzDataFactoryV2Pipeline -PipelineName "IncrementalCopyPipeline" -ResourceGroupName $resourceGroupName -dataFactoryName $dataFactoryName
    
  2. 运行 Get-AzDataFactoryV2ActivityRun cmdlet 检查管道的状态,直到看到所有活动成功运行的消息。Check the status of the pipeline by running the Get-AzDataFactoryV2ActivityRun cmdlet until you see all the activities running successfully. 将占位符替换为针对 RunStartedAfterRunStartedBefore 参数指定的自己的适当时间。Replace placeholders with your own appropriate time for the parameters RunStartedAfter and RunStartedBefore. 本教程使用 -RunStartedAfter "2017/09/14"-RunStartedBefore "2017/09/15"In this tutorial, you use -RunStartedAfter "2017/09/14" and -RunStartedBefore "2017/09/15".

    Get-AzDataFactoryV2ActivityRun -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineRunId $RunId -RunStartedAfter "<start time>" -RunStartedBefore "<end time>"
    

    下面是示例输出:Here is the sample output:

    ResourceGroupName : ADF
    DataFactoryName   : incrementalloadingADF
    ActivityName      : LookupNewWaterMarkActivity
    PipelineRunId     : d4bf3ce2-5d60-43f3-9318-923155f61037
    PipelineName      : IncrementalCopyPipeline
    Input             : {source, dataset}
    Output            : {NewWatermarkvalue}
    LinkedServiceName :
    ActivityRunStart  : 9/14/2017 7:42:42 AM
    ActivityRunEnd    : 9/14/2017 7:42:50 AM
    DurationInMs      : 7777
    Status            : Succeeded
    Error             : {errorCode, message, failureType, target}
    
    ResourceGroupName : ADF
    DataFactoryName   : incrementalloadingADF
    ActivityName      : LookupOldWaterMarkActivity
    PipelineRunId     : d4bf3ce2-5d60-43f3-9318-923155f61037
    PipelineName      : IncrementalCopyPipeline
    Input             : {source, dataset}
    Output            : {TableName, WatermarkValue}
    LinkedServiceName :
    ActivityRunStart  : 9/14/2017 7:42:42 AM
    ActivityRunEnd    : 9/14/2017 7:43:07 AM
    DurationInMs      : 25437
    Status            : Succeeded
    Error             : {errorCode, message, failureType, target}
    
    ResourceGroupName : ADF
    DataFactoryName   : incrementalloadingADF
    ActivityName      : IncrementalCopyActivity
    PipelineRunId     : d4bf3ce2-5d60-43f3-9318-923155f61037
    PipelineName      : IncrementalCopyPipeline
    Input             : {source, sink}
    Output            : {dataRead, dataWritten, rowsCopied, copyDuration...}
    LinkedServiceName :
    ActivityRunStart  : 9/14/2017 7:43:10 AM
    ActivityRunEnd    : 9/14/2017 7:43:29 AM
    DurationInMs      : 19769
    Status            : Succeeded
    Error             : {errorCode, message, failureType, target}
    
    ResourceGroupName : ADF
    DataFactoryName   : incrementalloadingADF
    ActivityName      : StoredProceduretoWriteWatermarkActivity
    PipelineRunId     : d4bf3ce2-5d60-43f3-9318-923155f61037
    PipelineName      : IncrementalCopyPipeline
    Input             : {storedProcedureName, storedProcedureParameters}
    Output            : {}
    LinkedServiceName :
    ActivityRunStart  : 9/14/2017 7:43:32 AM
    ActivityRunEnd    : 9/14/2017 7:43:47 AM
    DurationInMs      : 14467
    Status            : Succeeded
    Error             : {errorCode, message, failureType, target}
    
    

查看结果Review the results

  1. 在 Blob 存储(接收器存储)中,可看到数据已复制到 SinkDataset 中定义的文件。In the blob storage (sink store), you see that the data were copied to the file defined in SinkDataset. 在当前的教程中,文件名为 Incremental- d4bf3ce2-5d60-43f3-9318-923155f61037.txtIn the current tutorial, the file name is Incremental- d4bf3ce2-5d60-43f3-9318-923155f61037.txt. 打开该文件,可以看到其中与 SQL 数据库中的数据相同的记录。Open the file, and you can see records in the file that are the same as the data in the SQL database.

    1,aaaa,2017-09-01 00:56:00.0000000
    2,bbbb,2017-09-02 05:23:00.0000000
    3,cccc,2017-09-03 02:36:00.0000000
    4,dddd,2017-09-04 03:21:00.0000000
    5,eeee,2017-09-05 08:06:00.0000000
    
  2. watermarktable 中查看最新值。Check the latest value from watermarktable. 可看到水印值已更新。You see that the watermark value was updated.

    Select * from watermarktable
    

    下面是示例输出:Here is the sample output:

    TableNameTableName WatermarkValueWatermarkValue
    data_source_tabledata_source_table 2017-09-05 8:06:00.0002017-09-05 8:06:00.000

将数据插入数据源存储,验证增量数据的加载Insert data into the data source store to verify delta data loading

  1. 在 SQL 数据库(数据源存储)中插入新数据。Insert new data into the SQL database (data source store).

    INSERT INTO data_source_table
    VALUES (6, 'newdata','9/6/2017 2:23:00 AM')
    
    INSERT INTO data_source_table
    VALUES (7, 'newdata','9/7/2017 9:01:00 AM')
    

    SQL 数据库中的更新数据为:The updated data in the SQL database is:

    PersonID | Name | LastModifytime
    -------- | ---- | --------------
    1 | aaaa | 2017-09-01 00:56:00.000
    2 | bbbb | 2017-09-02 05:23:00.000
    3 | cccc | 2017-09-03 02:36:00.000
    4 | dddd | 2017-09-04 03:21:00.000
    5 | eeee | 2017-09-05 08:06:00.000
    6 | newdata | 2017-09-06 02:23:00.000
    7 | newdata | 2017-09-07 09:01:00.000
    
  2. 通过使用 Invoke-AzDataFactoryV2Pipeline cmdlet 再次运行管道 IncrementalCopyPipeline。Run the pipeline IncrementalCopyPipeline again by using the Invoke-AzDataFactoryV2Pipeline cmdlet. 将占位符替换为自己的资源组和数据工厂名称。Replace placeholders with your own resource group and data factory name.

    $RunId = Invoke-AzDataFactoryV2Pipeline -PipelineName "IncrementalCopyPipeline" -ResourceGroupName $resourceGroupName -dataFactoryName $dataFactoryName
    
  3. 运行 Get-AzDataFactoryV2ActivityRun cmdlet 检查管道的状态,直到看到所有活动成功运行的消息。Check the status of the pipeline by running the Get-AzDataFactoryV2ActivityRun cmdlet until you see all the activities running successfully. 将占位符替换为针对 RunStartedAfterRunStartedBefore 参数指定的自己的适当时间。Replace placeholders with your own appropriate time for the parameters RunStartedAfter and RunStartedBefore. 本教程使用 -RunStartedAfter "2017/09/14"-RunStartedBefore "2017/09/15"In this tutorial, you use -RunStartedAfter "2017/09/14" and -RunStartedBefore "2017/09/15".

    Get-AzDataFactoryV2ActivityRun -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineRunId $RunId -RunStartedAfter "<start time>" -RunStartedBefore "<end time>"
    

    下面是示例输出:Here is the sample output:

    ResourceGroupName : ADF
    DataFactoryName   : incrementalloadingADF
    ActivityName      : LookupNewWaterMarkActivity
    PipelineRunId     : 2fc90ab8-d42c-4583-aa64-755dba9925d7
    PipelineName      : IncrementalCopyPipeline
    Input             : {source, dataset}
    Output            : {NewWatermarkvalue}
    LinkedServiceName :
    ActivityRunStart  : 9/14/2017 8:52:26 AM
    ActivityRunEnd    : 9/14/2017 8:52:58 AM
    DurationInMs      : 31758
    Status            : Succeeded
    Error             : {errorCode, message, failureType, target}
    
    ResourceGroupName : ADF
    DataFactoryName   : incrementalloadingADF
    ActivityName      : LookupOldWaterMarkActivity
    PipelineRunId     : 2fc90ab8-d42c-4583-aa64-755dba9925d7
    PipelineName      : IncrementalCopyPipeline
    Input             : {source, dataset}
    Output            : {TableName, WatermarkValue}
    LinkedServiceName :
    ActivityRunStart  : 9/14/2017 8:52:26 AM
    ActivityRunEnd    : 9/14/2017 8:52:52 AM
    DurationInMs      : 25497
    Status            : Succeeded
    Error             : {errorCode, message, failureType, target}
    
    ResourceGroupName : ADF
    DataFactoryName   : incrementalloadingADF
    ActivityName      : IncrementalCopyActivity
    PipelineRunId     : 2fc90ab8-d42c-4583-aa64-755dba9925d7
    PipelineName      : IncrementalCopyPipeline
    Input             : {source, sink}
    Output            : {dataRead, dataWritten, rowsCopied, copyDuration...}
    LinkedServiceName :
    ActivityRunStart  : 9/14/2017 8:53:00 AM
    ActivityRunEnd    : 9/14/2017 8:53:20 AM
    DurationInMs      : 20194
    Status            : Succeeded
    Error             : {errorCode, message, failureType, target}
    
    ResourceGroupName : ADF
    DataFactoryName   : incrementalloadingADF
    ActivityName      : StoredProceduretoWriteWatermarkActivity
    PipelineRunId     : 2fc90ab8-d42c-4583-aa64-755dba9925d7
    PipelineName      : IncrementalCopyPipeline
    Input             : {storedProcedureName, storedProcedureParameters}
    Output            : {}
    LinkedServiceName :
    ActivityRunStart  : 9/14/2017 8:53:23 AM
    ActivityRunEnd    : 9/14/2017 8:53:41 AM
    DurationInMs      : 18502
    Status            : Succeeded
    Error             : {errorCode, message, failureType, target}
    
    
  4. 在 Blob 存储中,可以看到另一文件已创建。In the blob storage, you see that another file was created. 在本教程中,新文件名为 Incremental-2fc90ab8-d42c-4583-aa64-755dba9925d7.txtIn this tutorial, the new file name is Incremental-2fc90ab8-d42c-4583-aa64-755dba9925d7.txt. 打开该文件,会看到其中包含两行记录。Open that file, and you see two rows of records in it.

  5. watermarktable 中查看最新值。Check the latest value from watermarktable. 可看到水印值已再次更新。You see that the watermark value was updated again.

    Select * from watermarktable
    

    示例输出:sample output:

    TableNameTableName WatermarkValueWatermarkValue
    data_source_tabledata_source_table 2017-09-07 09:01:00.0002017-09-07 09:01:00.000

后续步骤Next steps

已在本教程中执行了以下步骤:You performed the following steps in this tutorial:

  • 准备用于存储水印值的数据存储。Prepare the data store to store the watermark value.
  • 创建数据工厂。Create a data factory.
  • 创建链接服务。Create linked services.
  • 创建源、接收器和水印数据集。Create source, sink, and watermark datasets.
  • 创建管道。Create a pipeline.
  • 运行管道。Run the pipeline.
  • 监视管道运行。Monitor the pipeline run.

在本教程中,管道将数据从 Azure SQL 数据库中的单个表复制到 Blob 存储。In this tutorial, the pipeline copied data from a single table in Azure SQL Database to Blob storage. 转到下面的教程,了解如何将数据从 SQL Server 数据库中的多个表复制到 SQL 数据库。Advance to the following tutorial to learn how to copy data from multiple tables in a SQL Server database to SQL Database.