使用 PowerShell 以递增方式将数据从 SQL Server 中的多个表加载到 Azure SQL 数据库Incrementally load data from multiple tables in SQL Server to Azure SQL Database using PowerShell

适用于:是 Azure 数据工厂否 Azure Synapse Analytics(预览版)APPLIES TO: yesAzure Data Factory noAzure Synapse Analytics (Preview)

在本教程中,你将创建一个带管道的 Azure 数据工厂,该管道将增量数据从 SQL Server 数据库中的多个表加载到 Azure SQL 数据库。In this tutorial, you create an Azure data factory with a pipeline that loads delta data from multiple tables in a SQL Server database to Azure SQL Database.

在本教程中执行以下步骤:You perform the following steps in this tutorial:

  • 准备源和目标数据存储。Prepare source and destination data stores.
  • 创建数据工厂。Create a data factory.
  • 创建自我托管的集成运行时。Create a self-hosted integration runtime.
  • 安装 Integration Runtime。Install the integration runtime.
  • 创建链接服务。Create linked services.
  • 创建源、接收器和水印数据集。Create source, sink, and watermark datasets.
  • 创建、运行和监视管道。Create, run, and monitor a pipeline.
  • 查看结果。Review the results.
  • 在源表中添加或更新数据。Add or update data in source tables.
  • 重新运行和监视管道。Rerun and monitor the pipeline.
  • 查看最终结果。Review the final results.

概述Overview

下面是创建此解决方案所要执行的重要步骤:Here are the important steps to create this solution:

  1. 选择水印列Select the watermark column.

    为源数据存储中的每个表选择一个列,你可以在其中标识每次运行的新记录或更新记录。Select one column for each table in the source data store, which you can identify the new or updated records for every run. 通常,在创建或更新行时,此选定列中的数据(例如 last_modify_time 或 ID)会不断递增。Normally, the data in this selected column (for example, last_modify_time or ID) keeps increasing when rows are created or updated. 此列中的最大值用作水印。The maximum value in this column is used as a watermark.

  2. 准备用于存储水印值的数据存储Prepare a data store to store the watermark value.

    本教程在 SQL 数据库中存储水印值。In this tutorial, you store the watermark value in a SQL database.

  3. 创建包含以下活动的管道Create a pipeline with the following activities:

    a.a. 创建一个 ForEach 活动,循环访问一个列表,其中的源表名称是作为参数传递到管道的。Create a ForEach activity that iterates through a list of source table names that is passed as a parameter to the pipeline. 对于每个源表,它会调用以下活动,为该表执行增量加载。For each source table, it invokes the following activities to perform delta loading for that table.

    b.b. 创建两个 Lookup 活动。Create two lookup activities. 使用第一个 Lookup 活动检索上一个水印值。Use the first Lookup activity to retrieve the last watermark value. 使用第二个 Lookup 活动检索新的水印值。Use the second Lookup activity to retrieve the new watermark value. 这些水印值会传递到 Copy 活动。These watermark values are passed to the Copy activity.

    c.c. 创建 Copy 活动,用于复制源数据存储中其水印列值大于旧水印值但小于新水印值的行。Create a Copy activity that copies rows from the source data store with the value of the watermark column greater than the old watermark value and less than the new watermark value. 然后,该活动将源数据存储中的增量数据作为新文件复制到 Azure Blob 存储。Then, it copies the delta data from the source data store to Azure Blob storage as a new file.

    d.d. 创建 StoredProcedure 活动,用于更新下一次运行的管道的水印值。Create a StoredProcedure activity that updates the watermark value for the pipeline that runs next time.

    下面是高级解决方案示意图:Here is the high-level solution diagram:

    以增量方式加载数据

如果没有 Azure 订阅,可在开始前创建一个 1 元人民币试用帐户。If you don't have an Azure subscription, create a 1rmb trial account before you begin.

先决条件Prerequisites

  • SQL ServerSQL Server. 在本教程中,请将 SQL Server 数据库用作源数据存储。You use a SQL Server database as the source data store in this tutorial.
  • Azure SQL 数据库Azure SQL Database. 使用 Azure SQL 数据库中的数据库作为接收器数据存储。You use a database in Azure SQL Database as the sink data store. 如果没有 SQL 数据库,请参阅在 Azure SQL 数据库中创建数据库了解创建步骤。If you don't have a SQL database, see Create a database in Azure SQL Database for steps to create one.

在 SQL Server 数据库中创建源表Create source tables in your SQL Server database

  1. 打开 SQL Server Management Studio (SSMS)Azure Data Studio,然后连接到 SQL Server 数据库。Open SQL Server Management Studio (SSMS) or Azure Data Studio, and connect to your SQL Server database.

  2. 服务器资源管理器 (SSMS) 或“连接”窗格 (Azure Data Studio) 中,右键单击数据库,然后选择“新建查询”。In Server Explorer (SSMS) or in the Connections pane (Azure Data Studio), right-click the database and choose New Query.

  3. 对数据库运行以下 SQL 命令,以便创建名为 customer_tableproject_table 的表:Run the following SQL command against your database to create tables named customer_table and project_table:

    create table customer_table
    (
        PersonID int,
        Name varchar(255),
        LastModifytime datetime
    );
    
    create table project_table
    (
        Project varchar(255),
        Creationtime datetime
    );
    
    INSERT INTO customer_table
    (PersonID, Name, LastModifytime)
    VALUES
    (1, 'John','9/1/2017 12:56:00 AM'),
    (2, 'Mike','9/2/2017 5:23:00 AM'),
    (3, 'Alice','9/3/2017 2:36:00 AM'),
    (4, 'Andy','9/4/2017 3:21:00 AM'),
    (5, 'Anny','9/5/2017 8:06:00 AM');
    
    INSERT INTO project_table
    (Project, Creationtime)
    VALUES
    ('project1','1/1/2015 0:00:00 AM'),
    ('project2','2/2/2016 1:23:00 AM'),
    ('project3','3/4/2017 5:16:00 AM');
    

在 Azure SQL 数据库中创建目标表Create destination tables in your Azure SQL Database

  1. 打开 SQL Server Management Studio (SSMS)Azure Data Studio,然后连接到 SQL Server 数据库。Open SQL Server Management Studio (SSMS) or Azure Data Studio, and connect to your SQL Server database.

  2. 服务器资源管理器 (SSMS) 或“连接”窗格 (Azure Data Studio) 中,右键单击数据库,然后选择“新建查询”。In Server Explorer (SSMS) or in the Connections pane (Azure Data Studio), right-click the database and choose New Query.

  3. 对数据库运行以下 SQL 命令,以便创建名为 customer_tableproject_table 的表:Run the following SQL command against your database to create tables named customer_table and project_table:

    create table customer_table
    (
        PersonID int,
        Name varchar(255),
        LastModifytime datetime
    );
    
    create table project_table
    (
        Project varchar(255),
        Creationtime datetime
    );
    

在 Azure SQL 数据库中再创建一个表,用于存储高水印值Create another table in Azure SQL Database to store the high watermark value

  1. 针对数据库运行以下 SQL 命令,创建一个名为 watermarktable 的表来存储水印值:Run the following SQL command against your database to create a table named watermarktable to store the watermark value:

    create table watermarktable
    (
    
        TableName varchar(255),
        WatermarkValue datetime,
    );
    
  2. 将两个源表的初始水印值插入水印表中。Insert initial watermark values for both source tables into the watermark table.

    
    INSERT INTO watermarktable
    VALUES 
    ('customer_table','1/1/2010 12:00:00 AM'),
    ('project_table','1/1/2010 12:00:00 AM');
    
    

在 Azure SQL 数据库中创建存储过程Create a stored procedure in the Azure SQL Database

运行以下命令,在数据库中创建存储过程。Run the following command to create a stored procedure in your database. 此存储过程在每次管道运行后更新水印值。This stored procedure updates the watermark value after every pipeline run.

CREATE PROCEDURE usp_write_watermark @LastModifiedtime datetime, @TableName varchar(50)
AS

BEGIN

    UPDATE watermarktable
    SET [WatermarkValue] = @LastModifiedtime 
WHERE [TableName] = @TableName

END

在 Azure SQL 数据库中创建数据类型和其他存储过程Create data types and additional stored procedures in Azure SQL Database

运行以下查询,在数据库中创建两个存储过程和两个数据类型。Run the following query to create two stored procedures and two data types in your database. 以便将源表中的数据合并到目标表中。They're used to merge the data from source tables into destination tables.

为了方便入门,我们直接使用这些存储过程通过表变量来传入增量数据,然后将其合并到目标存储中。In order to make the journey easy to start with, we directly use these Stored Procedures passing the delta data in via a table variable and then merge the them into destination store. 请注意,不能将大量的增量行(超过 100 行)存储在表变量中。Be cautious it is not expecting a "large" number of delta rows (more than 100) to be stored in the table variable.

如果确实需要将大量增量行合并到目标存储中,则建议你先使用复制活动将所有增量数据复制到目标存储的某个临时“暂存”表中,然后在不使用表变量的情况下生成你自己的存储过程,以便将它们从“暂存”表合并到“最终”表中。If you do need to merge a large number of delta rows into the destination store, we suggest you to use copy activity to copy all the delta data into a temporary "staging" table in the destination store first, and then built your own stored procedure without using table variable to merge them from the “staging” table to the “final” table.

CREATE TYPE DataTypeforCustomerTable AS TABLE(
    PersonID int,
    Name varchar(255),
    LastModifytime datetime
);

GO

CREATE PROCEDURE usp_upsert_customer_table @customer_table DataTypeforCustomerTable READONLY
AS

BEGIN
  MERGE customer_table AS target
  USING @customer_table AS source
  ON (target.PersonID = source.PersonID)
  WHEN MATCHED THEN
      UPDATE SET Name = source.Name,LastModifytime = source.LastModifytime
  WHEN NOT MATCHED THEN
      INSERT (PersonID, Name, LastModifytime)
      VALUES (source.PersonID, source.Name, source.LastModifytime);
END

GO

CREATE TYPE DataTypeforProjectTable AS TABLE(
    Project varchar(255),
    Creationtime datetime
);

GO

CREATE PROCEDURE usp_upsert_project_table @project_table DataTypeforProjectTable READONLY
AS

BEGIN
  MERGE project_table AS target
  USING @project_table AS source
  ON (target.Project = source.Project)
  WHEN MATCHED THEN
      UPDATE SET Creationtime = source.Creationtime
  WHEN NOT MATCHED THEN
      INSERT (Project, Creationtime)
      VALUES (source.Project, source.Creationtime);
END

Azure PowerShellAzure PowerShell

安装和配置 Azure PowerShell 中的说明安装最新的 Azure PowerShell 模块。Install the latest Azure PowerShell modules by following the instructions in Install and configure Azure PowerShell.

创建数据工厂Create a data factory

  1. 为资源组名称定义一个变量,稍后会在 PowerShell 命令中使用该变量。Define a variable for the resource group name that you use in PowerShell commands later. 将以下命令文本复制到 PowerShell,在双引号中指定 Azure 资源组的名称,然后运行命令。Copy the following command text to PowerShell, specify a name for the Azure resource group in double quotation marks, and then run the command. 示例为 "adfrg"An example is "adfrg".

    $resourceGroupName = "ADFTutorialResourceGroup";
    

    如果该资源组已存在,请勿覆盖它。If the resource group already exists, you might not want to overwrite it. $resourceGroupName 变量分配另一个值,然后再次运行命令Assign a different value to the $resourceGroupName variable, and run the command again.

  2. 定义一个用于数据工厂位置的变量。Define a variable for the location of the data factory.

    $location = "China East 2"
    
  3. 若要创建 Azure 资源组,请运行以下命令:To create the Azure resource group, run the following command:

    New-AzResourceGroup $resourceGroupName $location
    

    如果该资源组已存在,请勿覆盖它。If the resource group already exists, you might not want to overwrite it. $resourceGroupName 变量分配另一个值,然后再次运行命令Assign a different value to the $resourceGroupName variable, and run the command again.

  4. 定义一个用于数据工厂名称的变量。Define a variable for the data factory name.

    重要

    更新数据工厂名称,使之全局唯一。Update the data factory name to make it globally unique. 例如,ADFIncMultiCopyTutorialFactorySP1127。An example is ADFIncMultiCopyTutorialFactorySP1127.

    $dataFactoryName = "ADFIncMultiCopyTutorialFactory";
    
  5. 要创建数据工厂,请运行以下 Set-AzDataFactoryV2 cmdlet:To create the data factory, run the following Set-AzDataFactoryV2 cmdlet:

    Set-AzDataFactoryV2 -ResourceGroupName $resourceGroupName -Location $location -Name $dataFactoryName 
    

请注意以下几点:Note the following points:

  • 数据工厂的名称必须全局唯一。The name of the data factory must be globally unique. 如果收到以下错误,请更改名称并重试:If you receive the following error, change the name and try again:

    Set-AzDataFactoryV2 : HTTP Status Code: Conflict
    Error Code: DataFactoryNameInUse
    Error Message: The specified resource name 'ADFIncMultiCopyTutorialFactory' is already in use. Resource names must be globally unique.
    
  • 若要创建数据工厂实例,用于登录到 Azure 的用户帐户必须属于参与者或所有者角色,或者是 Azure 订阅的管理员。To create Data Factory instances, the user account you use to sign in to Azure must be a member of contributor or owner roles, or an administrator of the Azure subscription.

  • 若要查看目前提供数据工厂的 Azure 区域的列表,请在以下页面上选择感兴趣的区域,然后展开“分析”以找到“数据工厂”:可用产品(按区域)For a list of Azure regions in which Data Factory is currently available, select the regions that interest you on the following page, and then expand Analytics to locate Data Factory: Products available by region. 数据工厂使用的数据存储(Azure 存储、SQL 数据库、SQL 托管实例等)和计算资源(Azure HDInsight 等)可位于其他区域中。The data stores (Azure Storage, SQL Database, SQL Managed Instance, and so on) and computes (Azure HDInsight, etc.) used by the data factory can be in other regions.

创建自承载 Integration RuntimeCreate a self-hosted integration runtime

在本部分,请创建一个自承载 Integration Runtime,然后将其与安装了 SQL Server 数据库的本地计算机相关联。In this section, you create a self-hosted integration runtime and associate it with an on-premises machine with the SQL Server database. 自承载集成运行时是一个组件,用于将数据从计算机上的 SQL Server 复制到 Azure SQL 数据库。The self-hosted integration runtime is the component that copies data from SQL Server on your machine to Azure SQL database.

  1. 创建一个适用于 Integration Runtime 名称的变量。Create a variable for the name of the integration runtime. 使用唯一的名称,并记下它。Use a unique name, and make a note of it. 本教程后面部分需要使用它。You use it later in this tutorial.

    $integrationRuntimeName = "ADFTutorialIR"
    
  2. 创建自我托管的集成运行时。Create a self-hosted integration runtime.

    Set-AzDataFactoryV2IntegrationRuntime -Name $integrationRuntimeName -Type SelfHosted -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName
    

    下面是示例输出:Here is the sample output:

     Name              : <Integration Runtime name>
     Type              : SelfHosted
     ResourceGroupName : <ResourceGroupName>
     DataFactoryName   : <DataFactoryName>
     Description       : 
     Id                : /subscriptions/<subscription ID>/resourceGroups/<ResourceGroupName>/providers/Microsoft.DataFactory/factories/<DataFactoryName>/integrationruntimes/ADFTutorialIR
    
  3. 若要检索所创建的 Integration Runtime 的状态,请运行以下命令。To retrieve the status of the created integration runtime, run the following command. 确认 State 属性的值已设置为 NeedRegistrationConfirm that the value of the State property is set to NeedRegistration.

    Get-AzDataFactoryV2IntegrationRuntime -name $integrationRuntimeName -ResourceGroupName $resourceGroupName -DataFactoryName $dataFactoryName -Status
    

    下面是示例输出:Here is the sample output:

    State                     : NeedRegistration
    Version                   : 
    CreateTime                : 9/24/2019 6:00:00 AM
    AutoUpdate                : On
    ScheduledUpdateDate       : 
    UpdateDelayOffset         : 
    LocalTimeZoneOffset       : 
    InternalChannelEncryption : 
    Capabilities              : {}
    ServiceUrls               : {chinae2.frontend.datamovement.azure.cn}
    Nodes                     : {}
    Links                     : {}
    Name                      : ADFTutorialIR
    Type                      : SelfHosted
    ResourceGroupName         : <ResourceGroup name>
    DataFactoryName           : <DataFactory name>
    Description               : 
    Id                        : /subscriptions/<subscription ID>/resourceGroups/<ResourceGroup name>/providers/Microsoft.DataFactory/factories/<DataFactory name>/integrationruntimes/<Integration Runtime name>
    
  4. 若要检索用于将自承载 Integration Runtime 注册到云中 Azure 数据工厂服务的身份验证密钥,请运行以下命令:To retrieve the authentication keys used to register the self-hosted integration runtime with Azure Data Factory service in the cloud, run the following command:

    Get-AzDataFactoryV2IntegrationRuntimeKey -Name $integrationRuntimeName -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName | ConvertTo-Json
    

    下面是示例输出:Here is the sample output:

    {
     "AuthKey1": "IR@0000000000-0000-0000-0000-000000000000@xy0@xy@xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx=",
     "AuthKey2":  "IR@0000000000-0000-0000-0000-000000000000@xy0@xy@yyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyy="
    }
    
  5. 复制其中一个密钥(去除双引号),用于注册将在以下步骤中安装到计算机上的自承载 Integration Runtime。Copy one of the keys (exclude the double quotation marks) used to register the self-hosted integration runtime that you install on your machine in the following steps.

安装集成运行时工具Install the integration runtime tool

  1. 如果已在计算机上安装 Integration Runtime,请使用“添加或删除程序”将其卸载。If you already have the integration runtime on your machine, uninstall it by using Add or Remove Programs.

  2. 将自承载 Integration Runtime 下载到本地 Windows 计算机上。Download the self-hosted integration runtime on a local Windows machine. 进行安装。Run the installation.

  3. 在“欢迎使用 Microsoft Integration Runtime 安装程序”页上,选择“下一步”。 On the Welcome to Microsoft Integration Runtime Setup page, select Next.

  4. 在“最终用户许可协议”页上接受许可协议的条款,然后选择“下一步” 。On the End-User License Agreement page, accept the terms and license agreement, and select Next.

  5. 在“目标文件夹”页上选择“下一步”。 On the Destination Folder page, select Next.

  6. 在“准备安装 Microsoft Integration Runtime”页上选择“安装”。 On the Ready to install Microsoft Integration Runtime page, select Install.

  7. 在“完成 Microsoft Integration Runtime 安装程序”页上选择“完成”。 On the Completed the Microsoft Integration Runtime Setup page, select Finish.

  8. 在“注册 Integration Runtime (自承载)”页上粘贴在上一部分保存的密钥,然后选择“注册”。 On the Register Integration Runtime (Self-hosted) page, paste the key you saved in the previous section, and select Register.

    注册 Integration Runtime

  9. 在“新建 Integration Runtime (自承载)节点”页上,选择“完成”。 On the New Integration Runtime (Self-hosted) Node page, select Finish.

  10. 成功注册自承载 Integration Runtime 后,会看到以下消息:When the self-hosted integration runtime is registered successfully, you see the following message:

    已成功注册

  11. 在“注册 Integration Runtime (自承载)”页上,选择“启动配置管理器”。 On the Register Integration Runtime (Self-hosted) page, select Launch Configuration Manager.

  12. 将节点连接到云服务后,会看到以下页:When the node is connected to the cloud service, you see the following page:

    “节点已连接”页

  13. 现在,请测试到 SQL Server 数据库的连接。Now, test the connectivity to your SQL Server database.

    “诊断”选项卡

    a.a. 在“配置管理器”页上,转到“诊断”选项卡。 On the Configuration Manager page, go to the Diagnostics tab.

    b.b. 选择“SqlServer”作为数据源类型。 Select SqlServer for the data source type.

    c.c. 输入服务器名称。Enter the server name.

    d.d. 输入数据库名称。Enter the database name.

    e.e. 选择身份验证模式。Select the authentication mode.

    f.f. 输入用户名。Enter the user name.

    g.g. 输入与用户名关联的密码。Enter the password that's associated with for the user name.

    h.h. 若要确认 Integration Runtime 能否连接到 SQL Server,请选择“测试”。 Select Test to confirm that the integration runtime can connect to SQL Server. 如果连接成功,则会看到绿色复选标记。If the connection is successful, you see a green check mark. 如果连接不成功,则会看到错误消息。If the connection is not successful, you see an error message. 请解决问题,确保 Integration Runtime 可以连接到 SQL Server。Fix any issues, and ensure that the integration runtime can connect to SQL Server.

    备注

    记下身份验证类型、服务器、数据库、用户和密码的值。Make a note of the values for authentication type, server, database, user, and password. 本教程后面会用到它们。You use them later in this tutorial.

创建链接服务Create linked services

可在数据工厂中创建链接服务,将数据存储和计算服务链接到数据工厂。You create linked services in a data factory to link your data stores and compute services to the data factory. 在本部分中,你将创建 SQL Server 数据库和 Azure SQL 数据库中数据库的链接服务。In this section, you create linked services to your SQL Server database and your database in Azure SQL Database.

创建 SQL Server 链接服务Create the SQL Server linked service

在此步骤中,请将 SQL Server 数据库链接到数据工厂。In this step, you link your SQL Server database to the data factory.

  1. 使用以下内容在 C:\ADFTutorials\IncCopyMultiTableTutorial 文件夹中创建一个名为 SqlServerLinkedService.json 的 JSON 文件(如果本地文件夹尚不存在,请创建它们)。Create a JSON file named SqlServerLinkedService.json in the C:\ADFTutorials\IncCopyMultiTableTutorial folder (create the local folders if they don't already exist) with the following content. 根据连接到 SQL Server 时所使用的身份验证选择适当的节。Select the right section based on the authentication you use to connect to SQL Server.

    重要

    根据连接到 SQL Server 时所使用的身份验证选择适当的节。Select the right section based on the authentication you use to connect to SQL Server.

    如果使用 SQL 身份验证,请复制以下 JSON 定义:If you use SQL authentication, copy the following JSON definition:

    {  
        "name":"SqlServerLinkedService",
        "properties":{  
            "annotations":[  
    
            ],
            "type":"SqlServer",
            "typeProperties":{  
                "connectionString":"integrated security=False;data source=<servername>;initial catalog=<database name>;user id=<username>;Password=<password>"
            },
            "connectVia":{  
                "referenceName":"<integration runtime name>",
                "type":"IntegrationRuntimeReference"
            }
        }
    } 
    

    如果使用 Windows 身份验证,请复制以下 JSON 定义:If you use Windows authentication, copy the following JSON definition:

    {  
        "name":"SqlServerLinkedService",
        "properties":{  
            "annotations":[  
    
            ],
            "type":"SqlServer",
            "typeProperties":{  
                "connectionString":"integrated security=True;data source=<servername>;initial catalog=<database name>",
                "userName":"<username> or <domain>\\<username>",
                "password":{  
                    "type":"SecureString",
                    "value":"<password>"
                }
            },
            "connectVia":{  
                "referenceName":"<integration runtime name>",
                "type":"IntegrationRuntimeReference"
            }
        }
    }
    

    重要

    • 根据连接到 SQL Server 时所使用的身份验证选择适当的节。Select the right section based on the authentication you use to connect to SQL Server.
    • 将 <integration runtime name> 替换为 Integration Runtime 的名称。Replace <integration runtime name> with the name of your integration runtime.
    • 保存文件之前,请将 <servername>、<databasename>、<username> 和 <password> 替换为 SQL Server 数据库的值。Replace <servername>, <databasename>, <username>, and <password> with values of your SQL Server database before you save the file.
    • 如需在用户帐户或服务器名称中使用斜杠字符 (\),请使用转义字符 (\)。If you need to use a slash character (\) in your user account or server name, use the escape character (\). 示例为 mydomain\\myuserAn example is mydomain\\myuser.
  2. 在 PowerShell 中,运行以下 cmdlet 切换到 C:\ADFTutorials\IncCopyMultiTableTutorial 文件夹。In PowerShell, run the following cmdlet to switch to the C:\ADFTutorials\IncCopyMultiTableTutorial folder.

    Set-Location 'C:\ADFTutorials\IncCopyMultiTableTutorial'
    
  3. 运行 Set-AzDataFactoryV2LinkedService cmdlet 以创建链接服务 AzureStorageLinkedService。Run the Set-AzDataFactoryV2LinkedService cmdlet to create the linked service AzureStorageLinkedService. 在以下示例中,传递 ResourceGroupNameDataFactoryName 参数的值:In the following example, you pass values for the ResourceGroupName and DataFactoryName parameters:

    Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "SqlServerLinkedService" -File ".\SqlServerLinkedService.json"
    

    下面是示例输出:Here is the sample output:

    LinkedServiceName : SqlServerLinkedService
    ResourceGroupName : <ResourceGroupName>
    DataFactoryName   : <DataFactoryName>
    Properties        : Microsoft.Azure.Management.DataFactory.Models.SqlServerLinkedService
    

创建 SQL 数据库链接服务Create the SQL Database linked service

  1. 使用以下内容,在 C:\ADFTutorials\IncCopyMultiTableTutorial 文件夹中创建名为 AzureSQLDatabaseLinkedService.json 的 JSON 文件。Create a JSON file named AzureSQLDatabaseLinkedService.json in C:\ADFTutorials\IncCopyMultiTableTutorial folder with the following content. (如果文件夹 ADF 不存在,请创建。)保存文件之前,请将 <servername>、<database name>、<user name> 和 <password> 替换为 SQL Server 数据库名称、数据库名称、用户名和密码。(Create the folder ADF if it doesn't already exist.) Replace <servername>, <database name>, <user name>, and <password> with the name of your SQL Server database, name of your database, user name, and password before you save the file.

    {  
        "name":"AzureSQLDatabaseLinkedService",
        "properties":{  
            "annotations":[  
    
            ],
            "type":"AzureSqlDatabase",
            "typeProperties":{  
                "connectionString":"integrated security=False;encrypt=True;connection timeout=30;data source=<servername>.database.chinacloudapi.cn;initial catalog=<database name>;user id=<user name>;Password=<password>;"
            }
        }
    }
    
  2. 在 PowerShell 中运行 Set-AzDataFactoryV2LinkedService cmdlet,以便创建链接服务 AzureSQLDatabaseLinkedService。In PowerShell, run the Set-AzDataFactoryV2LinkedService cmdlet to create the linked service AzureSQLDatabaseLinkedService.

    Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "AzureSQLDatabaseLinkedService" -File ".\AzureSQLDatabaseLinkedService.json"
    

    下面是示例输出:Here is the sample output:

    LinkedServiceName : AzureSQLDatabaseLinkedService
    ResourceGroupName : <ResourceGroupName>
    DataFactoryName   : <DataFactoryName>
    Properties        : Microsoft.Azure.Management.DataFactory.Models.AzureSqlDatabaseLinkedService
    

创建数据集Create datasets

在此步骤中,请创建多个数据集,分别表示数据源、数据目标以及用于存储水印的位置。In this step, you create datasets to represent the data source, the data destination, and the place to store the watermark.

创建源数据集Create a source dataset

  1. 在同一文件夹中,创建包含以下内容的名为 SourceDataset.json 的 JSON 文件:Create a JSON file named SourceDataset.json in the same folder with the following content:

    {  
        "name":"SourceDataset",
        "properties":{  
            "linkedServiceName":{  
                "referenceName":"SqlServerLinkedService",
                "type":"LinkedServiceReference"
            },
            "annotations":[  
    
            ],
            "type":"SqlServerTable",
            "schema":[  
    
            ]
        }
    }
    
    

    管道中的 Copy 活动使用 SQL 查询来加载数据,而不是加载整个表。The Copy activity in the pipeline uses a SQL query to load the data rather than load the entire table.

  2. 运行 Set-AzDataFactoryV2Dataset cmdlet 以创建数据集 SourceDataset。Run the Set-AzDataFactoryV2Dataset cmdlet to create the dataset SourceDataset.

    Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "SourceDataset" -File ".\SourceDataset.json"
    

    下面是该 cmdlet 的示例输出:Here is the sample output of the cmdlet:

    DatasetName       : SourceDataset
    ResourceGroupName : <ResourceGroupName>
    DataFactoryName   : <DataFactoryName>
    Structure         :
    Properties        : Microsoft.Azure.Management.DataFactory.Models.SqlServerTableDataset
    

创建接收器数据集Create a sink dataset

  1. 在同一文件夹中,创建包含以下内容的名为 SinkDataset.json 的 JSON 文件。Create a JSON file named SinkDataset.json in the same folder with the following content. tableName 元素由管道在运行时动态设置。The tableName element is set by the pipeline dynamically at runtime. 管道中的 ForEach 活动循环访问一个包含表名的列表,每一次迭代都将表名传递到此数据集。The ForEach activity in the pipeline iterates through a list of table names and passes the table name to this dataset in each iteration.

    {  
        "name":"SinkDataset",
        "properties":{  
            "linkedServiceName":{  
                "referenceName":"AzureSQLDatabaseLinkedService",
                "type":"LinkedServiceReference"
            },
            "parameters":{  
                "SinkTableName":{  
                    "type":"String"
                }
            },
            "annotations":[  
    
            ],
            "type":"AzureSqlTable",
            "typeProperties":{  
                "tableName":{  
                    "value":"@dataset().SinkTableName",
                    "type":"Expression"
                }
            }
        }
    }
    
  2. 运行 Set-AzDataFactoryV2Dataset cmdlet 以创建数据集 SinkDataset。Run the Set-AzDataFactoryV2Dataset cmdlet to create the dataset SinkDataset.

    Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "SinkDataset" -File ".\SinkDataset.json"
    

    下面是该 cmdlet 的示例输出:Here is the sample output of the cmdlet:

    DatasetName       : SinkDataset
    ResourceGroupName : <ResourceGroupName>
    DataFactoryName   : <DataFactoryName>
    Structure         :
    Properties        : Microsoft.Azure.Management.DataFactory.Models.AzureSqlTableDataset
    

为水印创建数据集Create a dataset for a watermark

在此步骤中,创建用于存储高水印值的数据集。In this step, you create a dataset for storing a high watermark value.

  1. 在同一文件夹中,创建包含以下内容的名为 WatermarkDataset.json 的 JSON 文件:Create a JSON file named WatermarkDataset.json in the same folder with the following content:

    {
        "name": " WatermarkDataset ",
        "properties": {
            "type": "AzureSqlTable",
            "typeProperties": {
                "tableName": "watermarktable"
            },
            "linkedServiceName": {
                "referenceName": "AzureSQLDatabaseLinkedService",
                "type": "LinkedServiceReference"
            }
        }
    }    
    
  2. 运行 Set-AzDataFactoryV2Dataset cmdlet 以创建数据集 WatermarkDataset。Run the Set-AzDataFactoryV2Dataset cmdlet to create the dataset WatermarkDataset.

    Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "WatermarkDataset" -File ".\WatermarkDataset.json"
    

    下面是该 cmdlet 的示例输出:Here is the sample output of the cmdlet:

    DatasetName       : WatermarkDataset
    ResourceGroupName : <ResourceGroupName>
    DataFactoryName   : <DataFactoryName>
    Structure         :
    Properties        : Microsoft.Azure.Management.DataFactory.Models.AzureSqlTableDataset    
    

创建管道Create a pipeline

此管道使用表名列表作为参数。The pipeline takes a list of table names as a parameter. ForEach 活动循环访问包含表名的列表,并执行以下操作:The ForEach activity iterates through the list of table names and performs the following operations:

  1. 通过 Lookup 活动检索旧的水印值(初始值或上次迭代中使用的值)。Use the Lookup activity to retrieve the old watermark value (the initial value or the one that was used in the last iteration).

  2. 通过 Lookup 活动检索新的水印值(源表中水印列的最大值)。Use the Lookup activity to retrieve the new watermark value (the maximum value of the watermark column in the source table).

  3. 通过 Copy 活动将这两个水印值之间的数据从源数据库复制到目标数据库。Use the Copy activity to copy data between these two watermark values from the source database to the destination database.

  4. 通过 StoredProcedure 活动更新旧水印值,以便在下一迭代的第一步使用该值。Use the StoredProcedure activity to update the old watermark value to be used in the first step of the next iteration.

创建管道Create the pipeline

  1. 在同一文件夹中,创建包含以下内容的名为 IncrementalCopyPipeline.json 的 JSON 文件:Create a JSON file named IncrementalCopyPipeline.json in the same folder with the following content:

    {  
        "name":"IncrementalCopyPipeline",
        "properties":{  
            "activities":[  
                {  
                    "name":"IterateSQLTables",
                    "type":"ForEach",
                    "dependsOn":[  
    
                    ],
                    "userProperties":[  
    
                    ],
                    "typeProperties":{  
                        "items":{  
                            "value":"@pipeline().parameters.tableList",
                            "type":"Expression"
                        },
                        "isSequential":false,
                        "activities":[  
                            {  
                                "name":"LookupOldWaterMarkActivity",
                                "type":"Lookup",
                                "dependsOn":[  
    
                                ],
                                "policy":{  
                                    "timeout":"7.00:00:00",
                                    "retry":0,
                                    "retryIntervalInSeconds":30,
                                    "secureOutput":false,
                                    "secureInput":false
                                },
                                "userProperties":[  
    
                                ],
                                "typeProperties":{  
                                    "source":{  
                                        "type":"AzureSqlSource",
                                        "sqlReaderQuery":{  
                                            "value":"select * from watermarktable where TableName  =  '@{item().TABLE_NAME}'",
                                            "type":"Expression"
                                        }
                                    },
                                    "dataset":{  
                                        "referenceName":"WatermarkDataset",
                                        "type":"DatasetReference"
                                    }
                                }
                            },
                            {  
                                "name":"LookupNewWaterMarkActivity",
                                "type":"Lookup",
                                "dependsOn":[  
    
                                ],
                                "policy":{  
                                    "timeout":"7.00:00:00",
                                    "retry":0,
                                    "retryIntervalInSeconds":30,
                                    "secureOutput":false,
                                    "secureInput":false
                                },
                                "userProperties":[  
    
                                ],
                                "typeProperties":{  
                                    "source":{  
                                        "type":"SqlServerSource",
                                        "sqlReaderQuery":{  
                                            "value":"select MAX(@{item().WaterMark_Column}) as NewWatermarkvalue from @{item().TABLE_NAME}",
                                            "type":"Expression"
                                        }
                                    },
                                    "dataset":{  
                                        "referenceName":"SourceDataset",
                                        "type":"DatasetReference"
                                    },
                                    "firstRowOnly":true
                                }
                            },
                            {  
                                "name":"IncrementalCopyActivity",
                                "type":"Copy",
                                "dependsOn":[  
                                    {  
                                        "activity":"LookupOldWaterMarkActivity",
                                        "dependencyConditions":[  
                                            "Succeeded"
                                        ]
                                    },
                                    {  
                                        "activity":"LookupNewWaterMarkActivity",
                                        "dependencyConditions":[  
                                            "Succeeded"
                                        ]
                                    }
                                ],
                                "policy":{  
                                    "timeout":"7.00:00:00",
                                    "retry":0,
                                    "retryIntervalInSeconds":30,
                                    "secureOutput":false,
                                    "secureInput":false
                                },
                                "userProperties":[  
    
                                ],
                                "typeProperties":{  
                                    "source":{  
                                        "type":"SqlServerSource",
                                        "sqlReaderQuery":{  
                                            "value":"select * from @{item().TABLE_NAME} where @{item().WaterMark_Column} > '@{activity('LookupOldWaterMarkActivity').output.firstRow.WatermarkValue}' and @{item().WaterMark_Column} <= '@{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}'",
                                            "type":"Expression"
                                        }
                                    },
                                    "sink":{  
                                        "type":"AzureSqlSink",
                                        "sqlWriterStoredProcedureName":{  
                                            "value":"@{item().StoredProcedureNameForMergeOperation}",
                                            "type":"Expression"
                                        },
                                        "sqlWriterTableType":{  
                                            "value":"@{item().TableType}",
                                            "type":"Expression"
                                        },
                                        "storedProcedureTableTypeParameterName":{  
                                            "value":"@{item().TABLE_NAME}",
                                            "type":"Expression"
                                        },
                                        "disableMetricsCollection":false
                                    },
                                    "enableStaging":false
                                },
                                "inputs":[  
                                    {  
                                        "referenceName":"SourceDataset",
                                        "type":"DatasetReference"
                                    }
                                ],
                                "outputs":[  
                                    {  
                                        "referenceName":"SinkDataset",
                                        "type":"DatasetReference",
                                        "parameters":{  
                                            "SinkTableName":{  
                                                "value":"@{item().TABLE_NAME}",
                                                "type":"Expression"
                                            }
                                        }
                                    }
                                ]
                            },
                            {  
                                "name":"StoredProceduretoWriteWatermarkActivity",
                                "type":"SqlServerStoredProcedure",
                                "dependsOn":[  
                                    {  
                                        "activity":"IncrementalCopyActivity",
                                        "dependencyConditions":[  
                                            "Succeeded"
                                        ]
                                    }
                                ],
                                "policy":{  
                                    "timeout":"7.00:00:00",
                                    "retry":0,
                                    "retryIntervalInSeconds":30,
                                    "secureOutput":false,
                                    "secureInput":false
                                },
                                "userProperties":[  
    
                                ],
                                "typeProperties":{  
                                    "storedProcedureName":"[dbo].[usp_write_watermark]",
                                    "storedProcedureParameters":{  
                                        "LastModifiedtime":{  
                                            "value":{  
                                                "value":"@{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}",
                                                "type":"Expression"
                                            },
                                            "type":"DateTime"
                                        },
                                        "TableName":{  
                                            "value":{  
                                                "value":"@{activity('LookupOldWaterMarkActivity').output.firstRow.TableName}",
                                                "type":"Expression"
                                            },
                                            "type":"String"
                                        }
                                    }
                                },
                                "linkedServiceName":{  
                                    "referenceName":"AzureSQLDatabaseLinkedService",
                                    "type":"LinkedServiceReference"
                                }
                            }
                        ]
                    }
                }
            ],
            "parameters":{  
                "tableList":{  
                    "type":"array"
                }
            },
            "annotations":[  
    
            ]
        }
    }
    
  2. 运行 Set-AzDataFactoryV2Pipeline cmdlet 以创建管道 IncrementalCopyPipeline。Run the Set-AzDataFactoryV2Pipeline cmdlet to create the pipeline IncrementalCopyPipeline.

    Set-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "IncrementalCopyPipeline" -File ".\IncrementalCopyPipeline.json"
    

    下面是示例输出:Here is the sample output:

     PipelineName      : IncrementalCopyPipeline
     ResourceGroupName : <ResourceGroupName>
     DataFactoryName   : <DataFactoryName>
     Activities        : {IterateSQLTables}
     Parameters        : {[tableList, Microsoft.Azure.Management.DataFactory.Models.ParameterSpecification]}
    

运行管道Run the pipeline

  1. 在同一文件夹中创建包含以下内容的名为 Parameters.json 的参数文件:Create a parameter file named Parameters.json in the same folder with the following content:

    {
        "tableList": 
        [
            {
                "TABLE_NAME": "customer_table",
                "WaterMark_Column": "LastModifytime",
                "TableType": "DataTypeforCustomerTable",
                "StoredProcedureNameForMergeOperation": "usp_upsert_customer_table"
            },
            {
                "TABLE_NAME": "project_table",
                "WaterMark_Column": "Creationtime",
                "TableType": "DataTypeforProjectTable",
                "StoredProcedureNameForMergeOperation": "usp_upsert_project_table"
            }
        ]
    }
    
  2. 使用 Invoke-AzDataFactoryV2Pipeline cmdlet 运行管道 IncrementalCopyPipeline。Run the pipeline IncrementalCopyPipeline by using the Invoke-AzDataFactoryV2Pipeline cmdlet. 将占位符替换为自己的资源组和数据工厂名称。Replace placeholders with your own resource group and data factory name.

    $RunId = Invoke-AzDataFactoryV2Pipeline -PipelineName "IncrementalCopyPipeline" -ResourceGroup $resourceGroupName -dataFactoryName $dataFactoryName -ParameterFile ".\Parameters.json"        
    

监视管道Monitor the pipeline

  1. 登录 Azure 门户Sign in to the Azure portal.

  2. 选择“所有服务”,使用关键字“数据工厂”进行搜索,然后选择“数据工厂”。Select All services, search with the keyword Data factories, and select Data factories.

  3. 在数据工厂列表中搜索你的数据工厂,然后选择它来打开“数据工厂”页。Search for your data factory in the list of data factories, and select it to open the Data factory page.

  4. 在“数据工厂”页上,选择“创作和监视”以在单独的选项卡中启动 Azure 数据工厂。On the Data factory page, select Author & Monitor to launch Azure Data Factory in a separate tab.

  5. 在“开始使用”页上,选择左侧的“监视”。On the Let's get started page, select Monitor on the left side. 管道运行Pipeline Runs

  6. 可以看到所有管道运行及其状态。You can see all the pipeline runs and their status. 请注意,在以下示例中,管道运行的状态为“成功”。Notice that in the following example, the status of the pipeline run is Succeeded. 选择“参数”列中的链接即可查看传递至管道的参数。To check parameters passed to the pipeline, select the link in the Parameters column. 如果出现错误,请查看“错误”列中的链接。If an error occurred, you see a link in the Error column.

    管道运行

  7. 在“操作”列中选择链接时,会看到管道的所有活动运行。When you select the link in the Actions column, you see all the activity runs for the pipeline.

  8. 若要回到“管道运行”视图,请选择“所有管道运行” 。To go back to the Pipeline Runs view, select All Pipeline Runs.

查看结果Review the results

在 SQL Server Management Studio 中对目标 SQL 数据库运行以下查询,验证数据是否已从源表复制到目标表:In SQL Server Management Studio, run the following queries against the target SQL database to verify that the data was copied from source tables to destination tables:

查询Query

select * from customer_table

输出Output

===========================================
PersonID    Name    LastModifytime
===========================================
1           John    2017-09-01 00:56:00.000
2           Mike    2017-09-02 05:23:00.000
3           Alice   2017-09-03 02:36:00.000
4           Andy    2017-09-04 03:21:00.000
5           Anny    2017-09-05 08:06:00.000

查询Query

select * from project_table

输出Output

===================================
Project     Creationtime
===================================
project1    2015-01-01 00:00:00.000
project2    2016-02-02 01:23:00.000
project3    2017-03-04 05:16:00.000

查询Query

select * from watermarktable

输出Output

======================================
TableName       WatermarkValue
======================================
customer_table  2017-09-05 08:06:00.000
project_table   2017-03-04 05:16:00.000

请注意,已更新这两个表的水印值。Notice that the watermark values for both tables were updated.

向源表中添加更多数据Add more data to the source tables

对源 SQL Server 数据库运行以下查询,在 customer_table 中更新现有行。Run the following query against the source SQL Server database to update an existing row in customer_table. 将新行插入到 project_table 中。Insert a new row into project_table.

UPDATE customer_table
SET [LastModifytime] = '2017-09-08T00:00:00Z', [name]='NewName' where [PersonID] = 3

INSERT INTO project_table
(Project, Creationtime)
VALUES
('NewProject','10/1/2017 0:00:00 AM');

重新运行管道Rerun the pipeline

  1. 现在,通过执行以下 PowerShell 命令重新运行管道:Now, rerun the pipeline by executing the following PowerShell command:

    $RunId = Invoke-AzDataFactoryV2Pipeline -PipelineName "IncrementalCopyPipeline" -ResourceGroup $resourceGroupname -dataFactoryName $dataFactoryName -ParameterFile ".\Parameters.json"
    
  2. 按照监视管道部分的说明监视管道运行。Monitor the pipeline runs by following the instructions in the Monitor the pipeline section. 当管道状态为“正在进行”时,可以在“操作”下看到另一操作链接,用于取消管道运行。 When the pipeline status is In Progress, you see another action link under Actions to cancel the pipeline run.

  3. 选择“刷新”对列表进行刷新,直到管道运行成功。Select Refresh to refresh the list until the pipeline run succeeds.

  4. 也可选择“操作”下的“查看活动运行”链接,查看与此管道运行相关联的所有活动运行。 Optionally, select the View Activity Runs link under Actions to see all the activity runs associated with this pipeline run.

查看最终结果Review the final results

在 SQL Server Management Studio 中对目标数据库运行以下查询,验证更新的/全新的数据是否已从源表复制到目标表。In SQL Server Management Studio, run the following queries against the target database to verify that the updated/new data was copied from source tables to destination tables.

查询Query

select * from customer_table

输出Output

===========================================
PersonID    Name    LastModifytime
===========================================
1           John    2017-09-01 00:56:00.000
2           Mike    2017-09-02 05:23:00.000
3           NewName 2017-09-08 00:00:00.000
4           Andy    2017-09-04 03:21:00.000
5           Anny    2017-09-05 08:06:00.000

请注意 PersonID 为 3 时对应的 NameLastModifytime 的新值。Notice the new values of Name and LastModifytime for the PersonID for number 3.

查询Query

select * from project_table

输出Output

===================================
Project     Creationtime
===================================
project1    2015-01-01 00:00:00.000
project2    2016-02-02 01:23:00.000
project3    2017-03-04 05:16:00.000
NewProject  2017-10-01 00:00:00.000

请注意,已将 NewProject 条目添加到 project_table。Notice that the NewProject entry was added to project_table.

查询Query

select * from watermarktable

输出Output

======================================
TableName       WatermarkValue
======================================
customer_table  2017-09-08 00:00:00.000
project_table   2017-10-01 00:00:00.000

请注意,已更新这两个表的水印值。Notice that the watermark values for both tables were updated.

后续步骤Next steps

已在本教程中执行了以下步骤:You performed the following steps in this tutorial:

  • 准备源和目标数据存储。Prepare source and destination data stores.
  • 创建数据工厂。Create a data factory.
  • 创建自承载 Integration Runtime (IR)。Create a self-hosted integration runtime (IR).
  • 安装 Integration Runtime。Install the integration runtime.
  • 创建链接服务。Create linked services.
  • 创建源、接收器和水印数据集。Create source, sink, and watermark datasets.
  • 创建、运行和监视管道。Create, run, and monitor a pipeline.
  • 查看结果。Review the results.
  • 在源表中添加或更新数据。Add or update data in source tables.
  • 重新运行和监视管道。Rerun and monitor the pipeline.
  • 查看最终结果。Review the final results.

请转到下一篇教程,了解如何在 Azure 上使用 Spark 群集转换数据:Advance to the following tutorial to learn about transforming data by using a Spark cluster on Azure: