Compartir a través de

使用 PowerShell 以增量方式将 Azure SQL 数据库中的数据加载到 Azure Blob 存储

适用于:Azure 数据工厂 Azure Synapse Analytics

本教程将使用 Azure 数据工厂创建一个管道,用于将 Azure SQL 数据库的表中的增量数据加载到 Azure Blob 存储。

在本教程中执行以下步骤:

  • 准备用于存储水印值的数据存储。
  • 创建数据工厂。
  • 创建链接服务。
  • 创建源、接收器和水印数据集。
  • 创建管道。
  • 运行管道。
  • 监视管道运行。

概述

下面是高级解决方案示意图:

以增量方式加载数据

下面是创建此解决方案所要执行的重要步骤:

  1. 选择水印列。 在源数据存储中选择一个列,该列可用于将每个运行的新记录或已更新记录切片。 通常,在创建或更新行时,此选定列中的数据(例如 last_modify_time 或 ID)会不断递增。 此列中的最大值用作水印。

  2. 准备用于存储水印值的数据存储
    本教程在 SQL 数据库中存储水印值。

  3. 创建采用以下工作流的管道

    此解决方案中的管道具有以下活动:

    • 创建两个 Lookup 活动。 使用第一个 Lookup 活动检索上一个水印值。 使用第二个 Lookup 活动检索新的水印值。 这些水印值会传递到 Copy 活动。
    • 创建 Copy 活动,用于复制源数据存储中其水印列值大于旧水印值但小于或等于新水印值的行。 然后,该活动将源数据存储中的增量数据作为新文件复制到 Blob 存储。
    • 创建 StoredProcedure 活动,用于更新下一次运行的管道的水印值。

如果没有 Azure 订阅,可在开始前创建一个试用帐户

先决条件

注意

建议使用 Azure Az PowerShell 模块与 Azure 交互。 请参阅安装 Azure PowerShell 以开始使用。 若要了解如何迁移到 Az PowerShell 模块,请参阅 将 Azure PowerShell 从 AzureRM 迁移到 Az

在 SQL 数据库中创建数据源表

  1. 打开 SQL Server Management Studio。 在“服务器资源管理器”中,右键单击数据库,然后选择“新建查询”。

  2. 针对 SQL 数据库运行以下 SQL 命令,创建名为 data_source_table 的表作为数据源存储:

    create table data_source_table
    (
        PersonID int,
        Name varchar(255),
        LastModifytime datetime
    );
    
    INSERT INTO data_source_table
    (PersonID, Name, LastModifytime)
    VALUES
    (1, 'aaaa','9/1/2017 12:56:00 AM'),
    (2, 'bbbb','9/2/2017 5:23:00 AM'),
    (3, 'cccc','9/3/2017 2:36:00 AM'),
    (4, 'dddd','9/4/2017 3:21:00 AM'),
    (5, 'eeee','9/5/2017 8:06:00 AM');
    

    本教程使用 LastModifytime 作为水印列。 下表显示了数据源存储中的数据:

    PersonID | Name | LastModifytime
    -------- | ---- | --------------
    1 | aaaa | 2017-09-01 00:56:00.000
    2 | bbbb | 2017-09-02 05:23:00.000
    3 | cccc | 2017-09-03 02:36:00.000
    4 | dddd | 2017-09-04 03:21:00.000
    5 | eeee | 2017-09-05 08:06:00.000
    

在 SQL 数据库中创建另一个表,用于存储高水印值

  1. 针对 SQL 数据库运行以下 SQL 命令,创建名为 watermarktable 的表,用于存储水印值:

    create table watermarktable
    (
    
    TableName varchar(255),
    WatermarkValue datetime,
    );
    
  2. 使用源数据存储的表名设置高水印的默认值。 在本教程中,表名为 data_source_table。

    INSERT INTO watermarktable
    VALUES ('data_source_table','1/1/2010 12:00:00 AM')    
    
  3. 查看 watermarktable 表中的数据。

    Select * from watermarktable
    

    输出:

    TableName  | WatermarkValue
    ----------  | --------------
    data_source_table | 2010-01-01 00:00:00.000
    

在 SQL 数据库中创建存储过程

运行以下命令,在 SQL 数据库中创建存储过程:

CREATE PROCEDURE usp_write_watermark @LastModifiedtime datetime, @TableName varchar(50)
AS

BEGIN

UPDATE watermarktable
SET [WatermarkValue] = @LastModifiedtime
WHERE [TableName] = @TableName

END

创建数据工厂

  1. 为资源组名称定义一个变量,稍后会在 PowerShell 命令中使用该变量。 将以下命令文本复制到 PowerShell,在双引号中指定 Azure 资源组的名称,然后运行命令。 示例为 "adfrg"

    $resourceGroupName = "ADFTutorialResourceGroup";
    

    如果该资源组已存在,请勿覆盖它。 为 $resourceGroupName 变量分配另一个值,然后再次运行命令

  2. 定义一个用于数据工厂位置的变量。

    $location = "China East 2"
    
  3. 若要创建 Azure 资源组,请运行以下命令:

    New-AzResourceGroup $resourceGroupName $location
    

    如果该资源组已存在,请勿覆盖它。 为 $resourceGroupName 变量分配另一个值,然后再次运行命令

  4. 定义一个用于数据工厂名称的变量。

    重要

    更新数据工厂名称,使之全局唯一。 例如 ADFTutorialFactorySP1127。

    $dataFactoryName = "ADFIncCopyTutorialFactory";
    
  5. 要创建数据工厂,请运行以下 Set-AzDataFactoryV2 cmdlet:

    Set-AzDataFactoryV2 -ResourceGroupName $resourceGroupName -Location "China East 2" -Name $dataFactoryName 
    

请注意以下几点:

  • 数据工厂的名称必须全局唯一。 如果收到以下错误,请更改名称并重试:

    The specified Data Factory name 'ADFv2QuickStartDataFactory' is already in use. Data Factory names must be globally unique.
    
  • 若要创建数据工厂实例,用于登录到 Azure 的用户帐户必须属于参与者或所有者角色,或者是 Azure 订阅的管理员。

  • 若要查看目前提供数据工厂的 Azure 区域的列表,请在以下页面上选择感兴趣的区域,然后展开“分析”以找到“数据工厂”:可用产品(按区域)。 数据工厂使用的数据存储(Azure 存储、SQL 数据库、Azure SQL 托管实例等)和计算资源(Azure HDInsight 等)可以位于其他区域中。

创建链接服务

可在数据工厂中创建链接服务,将数据存储和计算服务链接到数据工厂。 在本部分中,创建到存储帐户和 SQL 数据库的链接服务。

创建存储链接服务

  1. 在 C:\ADF 文件夹中,创建包含以下内容的名为 AzureStorageLinkedService.json 的 JSON 文件。 (如果文件夹 ADF 不存在,请创建。)将 <accountName><accountKey> 替换为存储帐户的名称和密钥,然后保存文件。

    {
        "name": "AzureStorageLinkedService",
        "properties": {
            "type": "AzureStorage",
            "typeProperties": {
                "connectionString": "DefaultEndpointsProtocol=https;AccountName=<accountName>;AccountKey=<accountKey>;EndpointSuffix=core.chinacloudapi.cn"
            }
        }
    }
    
  2. 在 PowerShell 中切换到 ADF 文件夹。

  3. 运行 Set-AzDataFactoryV2LinkedService cmdlet 以创建链接服务 AzureStorageLinkedService。 在以下示例中,传递 ResourceGroupNameDataFactoryName 参数的值:

    Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "AzureStorageLinkedService" -File ".\AzureStorageLinkedService.json"
    

    下面是示例输出:

    LinkedServiceName : AzureStorageLinkedService
    ResourceGroupName : <resourceGroupName>
    DataFactoryName   : <dataFactoryName>
    Properties        : Microsoft.Azure.Management.DataFactory.Models.AzureStorageLinkedService
    

创建 SQL 数据库链接服务

  1. 在 C:\ADF 文件夹中,创建包含以下内容的名为 AzureSQLDatabaseLinkedService.json 的 JSON 文件。 (如果文件夹 ADF 不存在,请创建。)在保存文件之前,将 <your-server-name> 和 <your-database-name> 替换为你的服务器和数据库的名称。 此外,必须配置 Azure SQL Server,以便为数据工厂的托管标识授予访问权限

    {
    "name": "AzureSqlDatabaseLinkedService",
    "properties": {
            "type": "AzureSqlDatabase",
            "typeProperties": {
                "connectionString": "Server=tcp:<your-server-name>.database.chinacloudapi.cn,1433;Database=<your-database-name>;"
            },
            "authenticationType": "ManagedIdentity",
            "annotations": []
        }
    }
    
  2. 在 PowerShell 中切换到 ADF 文件夹。

  3. 运行 Set-AzDataFactoryV2LinkedService cmdlet 以创建链接服务 AzureSQLDatabaseLinkedService。

    Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "AzureSQLDatabaseLinkedService" -File ".\AzureSQLDatabaseLinkedService.json"
    

    下面是示例输出:

    LinkedServiceName : AzureSQLDatabaseLinkedService
    ResourceGroupName : ADF
    DataFactoryName   : incrementalloadingADF
    Properties        : Microsoft.Azure.Management.DataFactory.Models.AzureSqlDatabaseLinkedService
    ProvisioningState :
    

创建数据集

在此步骤中,创建表示源和接收器数据的数据集。

创建源数据集

  1. 在同一文件夹中,创建包含以下内容的名为 SourceDataset.json 的 JSON 文件:

    {
        "name": "SourceDataset",
        "properties": {
            "type": "AzureSqlTable",
            "typeProperties": {
                "tableName": "data_source_table"
            },
            "linkedServiceName": {
                "referenceName": "AzureSQLDatabaseLinkedService",
                "type": "LinkedServiceReference"
            }
        }
    }
    
    

    本教程使用表名 data_source_table。 如果使用其他名称的表,请替换名称。

  2. 运行 Set-AzDataFactoryV2Dataset cmdlet 以创建数据集 SourceDataset。

    Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "SourceDataset" -File ".\SourceDataset.json"
    

    下面是该 cmdlet 的示例输出:

    DatasetName       : SourceDataset
    ResourceGroupName : ADF
    DataFactoryName   : incrementalloadingADF
    Structure         :
    Properties        : Microsoft.Azure.Management.DataFactory.Models.AzureSqlTableDataset
    

创建接收器数据集

  1. 在同一文件夹中,创建包含以下内容的名为 SinkDataset.json 的 JSON 文件:

    {
        "name": "SinkDataset",
        "properties": {
            "type": "AzureBlob",
            "typeProperties": {
                "folderPath": "adftutorial/incrementalcopy",
                "fileName": "@CONCAT('Incremental-', pipeline().RunId, '.txt')",
                "format": {
                    "type": "TextFormat"
                }
            },
            "linkedServiceName": {
                "referenceName": "AzureStorageLinkedService",
                "type": "LinkedServiceReference"
            }
        }
    }   
    

    重要

    此代码片段假设 Blob 存储中有一个名为 adftutorial 的 Blob 容器。 创建容器(如果不存在),或者将容器设置为现有容器的名称。 会自动创建输出文件夹 incrementalcopy(如果容器中不存在)。 在本教程中,文件名是使用表达式 @CONCAT('Incremental-', pipeline().RunId, '.txt') 动态生成的。

  2. 运行 Set-AzDataFactoryV2Dataset cmdlet 以创建数据集 SinkDataset。

    Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "SinkDataset" -File ".\SinkDataset.json"
    

    下面是该 cmdlet 的示例输出:

    DatasetName       : SinkDataset
    ResourceGroupName : ADF
    DataFactoryName   : incrementalloadingADF
    Structure         :
    Properties        : Microsoft.Azure.Management.DataFactory.Models.AzureBlobDataset    
    

为水印创建数据集

在此步骤中,创建用于存储高水印值的数据集。

  1. 在同一文件夹中,创建包含以下内容的名为 WatermarkDataset.json 的 JSON 文件:

    {
        "name": " WatermarkDataset ",
        "properties": {
            "type": "AzureSqlTable",
            "typeProperties": {
                "tableName": "watermarktable"
            },
            "linkedServiceName": {
                "referenceName": "AzureSQLDatabaseLinkedService",
                "type": "LinkedServiceReference"
            }
        }
    }    
    
  2. 运行 Set-AzDataFactoryV2Dataset cmdlet 以创建数据集 WatermarkDataset。

    Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "WatermarkDataset" -File ".\WatermarkDataset.json"
    

    下面是该 cmdlet 的示例输出:

    DatasetName       : WatermarkDataset
    ResourceGroupName : ADF
    DataFactoryName   : incrementalloadingADF
    Structure         :
    Properties        : Microsoft.Azure.Management.DataFactory.Models.AzureSqlTableDataset    
    

创建管道

本教程创建包含两个 Lookup 活动、一个 Copy 活动和一个 StoredProcedure 活动的管道,这些活动链接在一个管道中。

  1. 在同一文件夹中,创建包含以下内容的 JSON 文件 IncrementalCopyPipeline.json:

    {
        "name": "IncrementalCopyPipeline",
        "properties": {
            "activities": [
                {
                    "name": "LookupOldWaterMarkActivity",
                    "type": "Lookup",
                    "typeProperties": {
                        "source": {
                        "type": "SqlSource",
                        "sqlReaderQuery": "select * from watermarktable"
                        },
    
                        "dataset": {
                        "referenceName": "WatermarkDataset",
                        "type": "DatasetReference"
                        }
                    }
                },
                {
                    "name": "LookupNewWaterMarkActivity",
                    "type": "Lookup",
                    "typeProperties": {
                        "source": {
                            "type": "SqlSource",
                            "sqlReaderQuery": "select MAX(LastModifytime) as NewWatermarkvalue from data_source_table"
                        },
    
                        "dataset": {
                        "referenceName": "SourceDataset",
                        "type": "DatasetReference"
                        }
                    }
                },
    
                {
                    "name": "IncrementalCopyActivity",
                    "type": "Copy",
                    "typeProperties": {
                        "source": {
                            "type": "SqlSource",
                            "sqlReaderQuery": "select * from data_source_table where LastModifytime > '@{activity('LookupOldWaterMarkActivity').output.firstRow.WatermarkValue}' and LastModifytime <= '@{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}'"
                        },
                        "sink": {
                            "type": "BlobSink"
                        }
                    },
                    "dependsOn": [
                        {
                            "activity": "LookupNewWaterMarkActivity",
                            "dependencyConditions": [
                                "Succeeded"
                            ]
                        },
                        {
                            "activity": "LookupOldWaterMarkActivity",
                            "dependencyConditions": [
                                "Succeeded"
                            ]
                        }
                    ],
    
                    "inputs": [
                        {
                            "referenceName": "SourceDataset",
                            "type": "DatasetReference"
                        }
                    ],
                    "outputs": [
                        {
                            "referenceName": "SinkDataset",
                            "type": "DatasetReference"
                        }
                    ]
                },
    
                {
                    "name": "StoredProceduretoWriteWatermarkActivity",
                    "type": "SqlServerStoredProcedure",
                    "typeProperties": {
    
                        "storedProcedureName": "usp_write_watermark",
                        "storedProcedureParameters": {
                            "LastModifiedtime": {"value": "@{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}", "type": "datetime" },
                            "TableName":  { "value":"@{activity('LookupOldWaterMarkActivity').output.firstRow.TableName}", "type":"String"}
                        }
                    },
    
                    "linkedServiceName": {
                        "referenceName": "AzureSQLDatabaseLinkedService",
                        "type": "LinkedServiceReference"
                    },
    
                    "dependsOn": [
                        {
                            "activity": "IncrementalCopyActivity",
                            "dependencyConditions": [
                                "Succeeded"
                            ]
                        }
                    ]
                }
            ]
    
        }
    }
    
  2. 运行 Set-AzDataFactoryV2Pipeline cmdlet 以创建管道 IncrementalCopyPipeline。

    Set-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "IncrementalCopyPipeline" -File ".\IncrementalCopyPipeline.json"
    

    下面是示例输出:

     PipelineName      : IncrementalCopyPipeline
     ResourceGroupName : ADF
     DataFactoryName   : incrementalloadingADF
     Activities        : {LookupOldWaterMarkActivity, LookupNewWaterMarkActivity, IncrementalCopyActivity, StoredProceduretoWriteWatermarkActivity}
     Parameters        :
    

运行管道

  1. 使用 Invoke-AzDataFactoryV2Pipeline cmdlet 运行管道 IncrementalCopyPipeline。 将占位符替换为自己的资源组和数据工厂名称。

    $RunId = Invoke-AzDataFactoryV2Pipeline -PipelineName "IncrementalCopyPipeline" -ResourceGroupName $resourceGroupName -dataFactoryName $dataFactoryName
    
  2. 运行 Get-AzDataFactoryV2ActivityRun cmdlet 检查管道的状态,直到看到所有活动成功运行的消息。 将占位符替换为针对 RunStartedAfterRunStartedBefore 参数指定的自己的适当时间。 本教程使用 -RunStartedAfter "2017/09/14"-RunStartedBefore "2017/09/15"

    Get-AzDataFactoryV2ActivityRun -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineRunId $RunId -RunStartedAfter "<start time>" -RunStartedBefore "<end time>"
    

    下面是示例输出:

    ResourceGroupName : ADF
    DataFactoryName   : incrementalloadingADF
    ActivityName      : LookupNewWaterMarkActivity
    PipelineRunId     : d4bf3ce2-5d60-43f3-9318-923155f61037
    PipelineName      : IncrementalCopyPipeline
    Input             : {source, dataset}
    Output            : {NewWatermarkvalue}
    LinkedServiceName :
    ActivityRunStart  : 9/14/2017 7:42:42 AM
    ActivityRunEnd    : 9/14/2017 7:42:50 AM
    DurationInMs      : 7777
    Status            : Succeeded
    Error             : {errorCode, message, failureType, target}
    
    ResourceGroupName : ADF
    DataFactoryName   : incrementalloadingADF
    ActivityName      : LookupOldWaterMarkActivity
    PipelineRunId     : d4bf3ce2-5d60-43f3-9318-923155f61037
    PipelineName      : IncrementalCopyPipeline
    Input             : {source, dataset}
    Output            : {TableName, WatermarkValue}
    LinkedServiceName :
    ActivityRunStart  : 9/14/2017 7:42:42 AM
    ActivityRunEnd    : 9/14/2017 7:43:07 AM
    DurationInMs      : 25437
    Status            : Succeeded
    Error             : {errorCode, message, failureType, target}
    
    ResourceGroupName : ADF
    DataFactoryName   : incrementalloadingADF
    ActivityName      : IncrementalCopyActivity
    PipelineRunId     : d4bf3ce2-5d60-43f3-9318-923155f61037
    PipelineName      : IncrementalCopyPipeline
    Input             : {source, sink}
    Output            : {dataRead, dataWritten, rowsCopied, copyDuration...}
    LinkedServiceName :
    ActivityRunStart  : 9/14/2017 7:43:10 AM
    ActivityRunEnd    : 9/14/2017 7:43:29 AM
    DurationInMs      : 19769
    Status            : Succeeded
    Error             : {errorCode, message, failureType, target}
    
    ResourceGroupName : ADF
    DataFactoryName   : incrementalloadingADF
    ActivityName      : StoredProceduretoWriteWatermarkActivity
    PipelineRunId     : d4bf3ce2-5d60-43f3-9318-923155f61037
    PipelineName      : IncrementalCopyPipeline
    Input             : {storedProcedureName, storedProcedureParameters}
    Output            : {}
    LinkedServiceName :
    ActivityRunStart  : 9/14/2017 7:43:32 AM
    ActivityRunEnd    : 9/14/2017 7:43:47 AM
    DurationInMs      : 14467
    Status            : Succeeded
    Error             : {errorCode, message, failureType, target}
    
    

查看结果

  1. 在 Blob 存储(接收器存储)中,可看到数据已复制到 SinkDataset 中定义的文件。 在当前的教程中,文件名为 Incremental- d4bf3ce2-5d60-43f3-9318-923155f61037.txt。 打开该文件,可以看到其中与 SQL 数据库中的数据相同的记录。

    1,aaaa,2017-09-01 00:56:00.0000000
    2,bbbb,2017-09-02 05:23:00.0000000
    3,cccc,2017-09-03 02:36:00.0000000
    4,dddd,2017-09-04 03:21:00.0000000
    5,eeee,2017-09-05 08:06:00.0000000
    
  2. watermarktable 中查看最新值。 可看到水印值已更新。

    Select * from watermarktable
    

    下面是示例输出:

    TableName WatermarkValue
    data_source_table 2017-09-05 8:06:00.000

将数据插入数据源存储,验证增量数据的加载

  1. 在 SQL 数据库(数据源存储)中插入新数据。

    INSERT INTO data_source_table
    VALUES (6, 'newdata','9/6/2017 2:23:00 AM')
    
    INSERT INTO data_source_table
    VALUES (7, 'newdata','9/7/2017 9:01:00 AM')
    

    SQL 数据库中的更新数据为:

    PersonID | Name | LastModifytime
    -------- | ---- | --------------
    1 | aaaa | 2017-09-01 00:56:00.000
    2 | bbbb | 2017-09-02 05:23:00.000
    3 | cccc | 2017-09-03 02:36:00.000
    4 | dddd | 2017-09-04 03:21:00.000
    5 | eeee | 2017-09-05 08:06:00.000
    6 | newdata | 2017-09-06 02:23:00.000
    7 | newdata | 2017-09-07 09:01:00.000
    
  2. 通过使用 Invoke-AzDataFactoryV2Pipeline cmdlet 再次运行管道 IncrementalCopyPipeline。 将占位符替换为自己的资源组和数据工厂名称。

    $RunId = Invoke-AzDataFactoryV2Pipeline -PipelineName "IncrementalCopyPipeline" -ResourceGroupName $resourceGroupName -dataFactoryName $dataFactoryName
    
  3. 运行 Get-AzDataFactoryV2ActivityRun cmdlet 检查管道的状态,直到看到所有活动成功运行的消息。 将占位符替换为针对 RunStartedAfterRunStartedBefore 参数指定的自己的适当时间。 本教程使用 -RunStartedAfter "2017/09/14"-RunStartedBefore "2017/09/15"

    Get-AzDataFactoryV2ActivityRun -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineRunId $RunId -RunStartedAfter "<start time>" -RunStartedBefore "<end time>"
    

    下面是示例输出:

    ResourceGroupName : ADF
    DataFactoryName   : incrementalloadingADF
    ActivityName      : LookupNewWaterMarkActivity
    PipelineRunId     : 2fc90ab8-d42c-4583-aa64-755dba9925d7
    PipelineName      : IncrementalCopyPipeline
    Input             : {source, dataset}
    Output            : {NewWatermarkvalue}
    LinkedServiceName :
    ActivityRunStart  : 9/14/2017 8:52:26 AM
    ActivityRunEnd    : 9/14/2017 8:52:58 AM
    DurationInMs      : 31758
    Status            : Succeeded
    Error             : {errorCode, message, failureType, target}
    
    ResourceGroupName : ADF
    DataFactoryName   : incrementalloadingADF
    ActivityName      : LookupOldWaterMarkActivity
    PipelineRunId     : 2fc90ab8-d42c-4583-aa64-755dba9925d7
    PipelineName      : IncrementalCopyPipeline
    Input             : {source, dataset}
    Output            : {TableName, WatermarkValue}
    LinkedServiceName :
    ActivityRunStart  : 9/14/2017 8:52:26 AM
    ActivityRunEnd    : 9/14/2017 8:52:52 AM
    DurationInMs      : 25497
    Status            : Succeeded
    Error             : {errorCode, message, failureType, target}
    
    ResourceGroupName : ADF
    DataFactoryName   : incrementalloadingADF
    ActivityName      : IncrementalCopyActivity
    PipelineRunId     : 2fc90ab8-d42c-4583-aa64-755dba9925d7
    PipelineName      : IncrementalCopyPipeline
    Input             : {source, sink}
    Output            : {dataRead, dataWritten, rowsCopied, copyDuration...}
    LinkedServiceName :
    ActivityRunStart  : 9/14/2017 8:53:00 AM
    ActivityRunEnd    : 9/14/2017 8:53:20 AM
    DurationInMs      : 20194
    Status            : Succeeded
    Error             : {errorCode, message, failureType, target}
    
    ResourceGroupName : ADF
    DataFactoryName   : incrementalloadingADF
    ActivityName      : StoredProceduretoWriteWatermarkActivity
    PipelineRunId     : 2fc90ab8-d42c-4583-aa64-755dba9925d7
    PipelineName      : IncrementalCopyPipeline
    Input             : {storedProcedureName, storedProcedureParameters}
    Output            : {}
    LinkedServiceName :
    ActivityRunStart  : 9/14/2017 8:53:23 AM
    ActivityRunEnd    : 9/14/2017 8:53:41 AM
    DurationInMs      : 18502
    Status            : Succeeded
    Error             : {errorCode, message, failureType, target}
    
    
  4. 在 Blob 存储中,可以看到另一文件已创建。 在本教程中,新文件名为 Incremental-2fc90ab8-d42c-4583-aa64-755dba9925d7.txt。 打开该文件,会看到其中包含两行记录。

  5. watermarktable 中查看最新值。 可看到水印值已再次更新。

    Select * from watermarktable
    

    示例输出:

    TableName WatermarkValue
    data_source_table 2017-09-07 09:01:00.000

已在本教程中执行了以下步骤:

  • 准备用于存储水印值的数据存储。
  • 创建数据工厂。
  • 创建链接服务。
  • 创建源、接收器和水印数据集。
  • 创建管道。
  • 运行管道。
  • 监视管道运行。

在本教程中,管道将数据从 Azure SQL 数据库中的单个表复制到 Blob 存储。 转到下面的教程,了解如何将数据从 SQL Server 数据库中的多个表复制到 SQL 数据库。