使用 Azure 门户以递增方式将数据从 SQL Server 中的多个表加载到 Azure SQL 数据库中的数据库Incrementally load data from multiple tables in SQL Server to a database in Azure SQL Database using the Azure portal

适用于: Azure 数据工厂 Azure Synapse Analytics(预览版)

在本教程中,你将创建一个带管道的 Azure 数据工厂,该管道将增量数据从 SQL Server 数据库中的多个表加载到 Azure SQL 数据库中的数据库。In this tutorial, you create an Azure data factory with a pipeline that loads delta data from multiple tables in a SQL Server database to a database in Azure SQL Database.

在本教程中执行以下步骤:You perform the following steps in this tutorial:

  • 准备源和目标数据存储。Prepare source and destination data stores.
  • 创建数据工厂。Create a data factory.
  • 创建自我托管的集成运行时。Create a self-hosted integration runtime.
  • 安装 Integration Runtime。Install the integration runtime.
  • 创建链接服务。Create linked services.
  • 创建源、接收器和水印数据集。Create source, sink, and watermark datasets.
  • 创建、运行和监视管道。Create, run, and monitor a pipeline.
  • 查看结果。Review the results.
  • 在源表中添加或更新数据。Add or update data in source tables.
  • 重新运行和监视管道。Rerun and monitor the pipeline.
  • 查看最终结果。Review the final results.

概述Overview

下面是创建此解决方案所要执行的重要步骤:Here are the important steps to create this solution:

  1. 选择水印列Select the watermark column.

    在源数据存储中为每个表选择一个列,该列可用于确定每个运行的新记录或已更新记录。Select one column for each table in the source data store, which can be used to identify the new or updated records for every run. 通常,在创建或更新行时,此选定列中的数据(例如 last_modify_time 或 ID)会不断递增。Normally, the data in this selected column (for example, last_modify_time or ID) keeps increasing when rows are created or updated. 此列中的最大值用作水印。The maximum value in this column is used as a watermark.

  2. 准备用于存储水印值的数据存储Prepare a data store to store the watermark value.

    本教程在 SQL 数据库中存储水印值。In this tutorial, you store the watermark value in a SQL database.

  3. 创建包含以下活动的管道Create a pipeline with the following activities:

    a.a. 创建一个 ForEach 活动,循环访问一个列表,其中的源表名称是作为参数传递到管道的。Create a ForEach activity that iterates through a list of source table names that is passed as a parameter to the pipeline. 对于每个源表,它会调用以下活动,为该表执行增量加载。For each source table, it invokes the following activities to perform delta loading for that table.

    b.b. 创建两个 Lookup 活动。Create two lookup activities. 使用第一个 Lookup 活动检索上一个水印值。Use the first Lookup activity to retrieve the last watermark value. 使用第二个 Lookup 活动检索新的水印值。Use the second Lookup activity to retrieve the new watermark value. 这些水印值会传递到 Copy 活动。These watermark values are passed to the Copy activity.

    c.c. 创建 Copy 活动,用于复制源数据存储中其水印列值大于旧水印值但小于新水印值的行。Create a Copy activity that copies rows from the source data store with the value of the watermark column greater than the old watermark value and less than the new watermark value. 然后,该活动将源数据存储中的增量数据作为新文件复制到 Azure Blob 存储。Then, it copies the delta data from the source data store to Azure Blob storage as a new file.

    d.d. 创建 StoredProcedure 活动,用于更新下一次运行的管道的水印值。Create a StoredProcedure activity that updates the watermark value for the pipeline that runs next time.

    下面是高级解决方案示意图:Here is the high-level solution diagram:

    以增量方式加载数据

如果没有 Azure 订阅,可在开始前创建一个试用帐户。If you don't have an Azure subscription, create a trial account before you begin.

先决条件Prerequisites

  • SQL ServerSQL Server. 在本教程中,请将 SQL Server 数据库用作源数据存储。You use a SQL Server database as the source data store in this tutorial.
  • Azure SQL 数据库Azure SQL Database. 使用 Azure SQL 数据库中的数据库作为接收器数据存储。You use a database in Azure SQL Database as the sink data store. 如果 SQL 数据库没有数据库,请参阅在 Azure SQL 数据库中创建数据库,了解创建步骤。If you don't have a database in SQL Database, see Create a database in Azure SQL Database for steps to create one.

在 SQL Server 数据库中创建源表Create source tables in your SQL Server database

  1. 打开 SQL Server Management Studio,连接到 SQL Server 数据库。Open SQL Server Management Studio, and connect to your SQL Server database.

  2. 在“服务器资源管理器”中,右键单击数据库,然后选择“新建查询”。In Server Explorer, right-click the database and choose New Query.

  3. 对数据库运行以下 SQL 命令,以便创建名为 customer_tableproject_table 的表:Run the following SQL command against your database to create tables named customer_table and project_table:

    create table customer_table
    (
        PersonID int,
        Name varchar(255),
        LastModifytime datetime
    );
    
    create table project_table
    (
        Project varchar(255),
        Creationtime datetime
    );
    
    INSERT INTO customer_table
    (PersonID, Name, LastModifytime)
    VALUES
    (1, 'John','9/1/2017 12:56:00 AM'),
    (2, 'Mike','9/2/2017 5:23:00 AM'),
    (3, 'Alice','9/3/2017 2:36:00 AM'),
    (4, 'Andy','9/4/2017 3:21:00 AM'),
    (5, 'Anny','9/5/2017 8:06:00 AM');
    
    INSERT INTO project_table
    (Project, Creationtime)
    VALUES
    ('project1','1/1/2015 0:00:00 AM'),
    ('project2','2/2/2016 1:23:00 AM'),
    ('project3','3/4/2017 5:16:00 AM');
    
    

在数据库中创建目标表Create destination tables in your database

  1. 打开 SQL Server Management Studio,连接到 Azure SQL 数据库中的数据库。Open SQL Server Management Studio, and connect to your database in Azure SQL Database.

  2. 在“服务器资源管理器”中,右键单击数据库,然后选择“新建查询”。In Server Explorer, right-click the database and choose New Query.

  3. 对数据库运行以下 SQL 命令,以便创建名为 customer_tableproject_table 的表:Run the following SQL command against your database to create tables named customer_table and project_table:

    create table customer_table
    (
        PersonID int,
        Name varchar(255),
        LastModifytime datetime
    );
    
    create table project_table
    (
        Project varchar(255),
        Creationtime datetime
    );
    
    

在数据库中再创建一个表来存储高水印值Create another table in your database to store the high watermark value

  1. 针对数据库运行以下 SQL 命令,创建一个名为 watermarktable 的表来存储水印值:Run the following SQL command against your database to create a table named watermarktable to store the watermark value:

    create table watermarktable
    (
    
        TableName varchar(255),
        WatermarkValue datetime,
    );
    
  2. 将两个源表的初始水印值插入水印表中。Insert initial watermark values for both source tables into the watermark table.

    
    INSERT INTO watermarktable
    VALUES 
    ('customer_table','1/1/2010 12:00:00 AM'),
    ('project_table','1/1/2010 12:00:00 AM');
    
    

在数据库中创建一个存储流程Create a stored procedure in your database

运行以下命令,在数据库中创建存储过程。Run the following command to create a stored procedure in your database. 此存储过程在每次管道运行后更新水印值。This stored procedure updates the watermark value after every pipeline run.

CREATE PROCEDURE usp_write_watermark @LastModifiedtime datetime, @TableName varchar(50)
AS

BEGIN

    UPDATE watermarktable
    SET [WatermarkValue] = @LastModifiedtime 
WHERE [TableName] = @TableName

END

在数据库中创建数据类型和其他存储流程Create data types and additional stored procedures in your database

运行以下查询,在数据库中创建两个存储过程和两个数据类型。Run the following query to create two stored procedures and two data types in your database. 以便将源表中的数据合并到目标表中。They're used to merge the data from source tables into destination tables.

为了方便入门,我们直接使用这些存储过程通过表变量来传入增量数据,然后将其合并到目标存储中。In order to make the journey easy to start with, we directly use these Stored Procedures passing the delta data in via a table variable and then merge them into destination store. 请注意,不能将大量的增量行(超过 100)存储在表变量中。Be cautious that it is not expecting a "large" number of delta rows (more than 100) to be stored in the table variable.

如果确实需要将大量增量行合并到目标存储中,则建议你先使用复制活动将所有增量数据复制到目标存储的某个临时“暂存”表中,然后在不使用表变量的情况下生成你自己的存储过程,以便将它们从“暂存”表合并到“最终”表中。If you do need to merge a large number of delta rows into the destination store, we suggest you to use copy activity to copy all the delta data into a temporary "staging" table in the destination store first, and then built your own stored procedure without using table variable to merge them from the “staging” table to the “final” table.

CREATE TYPE DataTypeforCustomerTable AS TABLE(
    PersonID int,
    Name varchar(255),
    LastModifytime datetime
);

GO

CREATE PROCEDURE usp_upsert_customer_table @customer_table DataTypeforCustomerTable READONLY
AS

BEGIN
  MERGE customer_table AS target
  USING @customer_table AS source
  ON (target.PersonID = source.PersonID)
  WHEN MATCHED THEN
      UPDATE SET Name = source.Name,LastModifytime = source.LastModifytime
  WHEN NOT MATCHED THEN
      INSERT (PersonID, Name, LastModifytime)
      VALUES (source.PersonID, source.Name, source.LastModifytime);
END

GO

CREATE TYPE DataTypeforProjectTable AS TABLE(
    Project varchar(255),
    Creationtime datetime
);

GO

CREATE PROCEDURE usp_upsert_project_table @project_table DataTypeforProjectTable READONLY
AS

BEGIN
  MERGE project_table AS target
  USING @project_table AS source
  ON (target.Project = source.Project)
  WHEN MATCHED THEN
      UPDATE SET Creationtime = source.Creationtime
  WHEN NOT MATCHED THEN
      INSERT (Project, Creationtime)
      VALUES (source.Project, source.Creationtime);
END

创建数据工厂Create a data factory

  1. 启动 Microsoft EdgeGoogle Chrome Web 浏览器。Launch Microsoft Edge or Google Chrome web browser. 目前,仅 Microsoft Edge 和 Google Chrome Web 浏览器支持数据工厂 UI。Currently, Data Factory UI is supported only in Microsoft Edge and Google Chrome web browsers.

  2. 在左侧菜单中,选择“创建资源” > “数据 + 分析” > “数据工厂”:On the left menu, select Create a resource > Data + Analytics > Data Factory:

    在“新建”窗格中选择“数据工厂”

  3. 在“新建数据工厂”页中,输入 ADFMultiIncCopyTutorialDF 作为名称In the New data factory page, enter ADFMultiIncCopyTutorialDF for the name.

    Azure 数据工厂的名称必须 全局唯一The name of the Azure data factory must be globally unique. 如果看到红色感叹号和以下错误,请更改数据工厂的名称(例如改为 yournameADFIncCopyTutorialDF),并重新尝试创建。If you see a red exclamation mark with the following error, change the name of the data factory (for example, yournameADFIncCopyTutorialDF) and try creating again. 有关数据工厂项目命名规则,请参阅数据工厂 - 命名规则一文。See Data Factory - Naming Rules article for naming rules for Data Factory artifacts.

    Data factory name "ADFIncCopyTutorialDF" is not available

  4. 选择要在其中创建数据工厂的 Azure 订阅Select your Azure subscription in which you want to create the data factory.

  5. 对于资源组,请执行以下步骤之一:For the Resource Group, do one of the following steps:

    • 选择“使用现有资源组”,并从下拉列表选择现有的资源组。Select Use existing, and select an existing resource group from the drop-down list.
    • 选择“新建”,并输入资源组的名称。Select Create new, and enter the name of a resource group.
      若要了解有关资源组的详细信息,请参阅 使用资源组管理 Azure 资源To learn about resource groups, see Using resource groups to manage your Azure resources.
  6. 选择“V2”作为“版本”。Select V2 for the version.

  7. 选择数据工厂的位置Select the location for the data factory. 下拉列表中仅显示支持的位置。Only locations that are supported are displayed in the drop-down list. 数据工厂使用的数据存储(Azure 存储、Azure SQL 数据库,等等)和计算资源(HDInsight 等)可以位于其他区域中。The data stores (Azure Storage, Azure SQL Database, etc.) and computes (HDInsight, etc.) used by data factory can be in other regions.

  8. 单击创建Click Create.

  9. 创建完成后,可以看到图中所示的“数据工厂”页。After the creation is complete, you see the Data Factory page as shown in the image.

    数据工厂主页

  10. 单击“创作和监视”磁贴,在单独的选项卡中启动 Azure 数据工厂用户界面 (UI)。Click Author & Monitor tile to launch the Azure Data Factory user interface (UI) in a separate tab.

创建自承载的 Integration RuntimeCreate self-hosted integration runtime

在将数据从专用网络(本地)中的数据存储移至 Azure 数据存储时,请在本地环境中安装自承载的 Integration Runtime (IR)。As you are moving data from a data store in a private network (on-premises) to an Azure data store, install a self-hosted integration runtime (IR) in your on-premises environment. 自承载的 IR 可在专用网络和 Azure 之间移动数据。The self-hosted IR moves data between your private network and Azure.

  1. 在 Azure 数据工厂 UI 的“开始使用”页上,从最左侧的窗格选择“管理”选项卡On the Let's get started page of Azure Data Factory UI, select the Manage tab from the leftmost pane.

    主页“管理”按钮

  2. 在左窗格中选择“集成运行时”,然后选择“+ 新建” 。Select Integration runtimes on the left pane, and then select +New.

    创建集成运行时

  3. 在“Integration Runtime 安装”窗口中,选择“执行数据移动并将活动分发到外部计算”,然后单击“继续”。 In the Integration Runtime Setup window, select Perform data movement and dispatch activities to external computes, and click Continue.

  4. 选择“自承载”,然后单击“继续”。 Select Self-Hosted, and click Continue.

  5. 在“名称”中输入 MySelfHostedIR,然后单击“创建”。 Enter MySelfHostedIR for Name, and click Create.

  6. 在“选项 1: 快速安装”部分中单击“单击此处对此计算机启动快速安装” 。Click Click here to launch the express setup for this computer in the Option 1: Express setup section.

    单击“快速安装”链接

  7. 在“集成运行时(自承载)快速安装”窗口中,单击“关闭”。 In the Integration Runtime (Self-hosted) Express Setup window, click Close.

    集成运行时安装 - 成功

  8. 在 Web 浏览器中的“集成运行时安装”窗口中,单击“完成”。 In the Web browser, in the Integration Runtime Setup window, click Finish.

  9. 确认在 Integration Runtime 的列表中看到 MySelfHostedIRConfirm that you see MySelfHostedIR in the list of integration runtimes.

创建链接服务Create linked services

可在数据工厂中创建链接服务,将数据存储和计算服务链接到数据工厂。You create linked services in a data factory to link your data stores and compute services to the data factory. 在本部分中,你将创建 SQL Server 数据库和 Azure SQL 数据库中数据库的链接服务。In this section, you create linked services to your SQL Server database and your database in Azure SQL Database.

创建 SQL Server 链接服务Create the SQL Server linked service

在此步骤中,请将 SQL Server 数据库链接到数据工厂。In this step, you link your SQL Server database to the data factory.

  1. 在“连接”窗口中从“Integration Runtime”选项卡切换到“链接服务”选项卡,然后单击“+ 新建”。 In the Connections window, switch from Integration Runtimes tab to the Linked Services tab, and click + New.

    新建链接服务

  2. 在“新建链接服务”窗口中,选择“SQL Server”,然后单击“继续”。 In the New Linked Service window, select SQL Server, and click Continue.

  3. 在“新建链接服务”窗口中执行以下步骤:In the New Linked Service window, do the following steps:

    1. 输入 SqlServerLinkedService 作为名称Enter SqlServerLinkedService for Name.
    2. 为“通过集成运行时连接”选择“MySelfHostedIR”。 Select MySelfHostedIR for Connect via integration runtime. 这是重要步骤。This is an important step. 默认的 Integration Runtime 无法连接到本地数据存储。The default integration runtime cannot connect to an on-premises data store. 使用前面创建的自承载 Integration Runtime。Use the self-hosted integration runtime you created earlier.
    3. 对于“服务器名称”,请输入装有 SQL Server 数据库的计算机的名称。For Server name, enter the name of your computer that has the SQL Server database.
    4. 对于“数据库名称”,请输入 SQL Server 中包含源数据的数据库的名称。For Database name, enter the name of the database in your SQL Server that has the source data. 已按照先决条件创建一个表并将数据插入到此数据库中。You created a table and inserted data into this database as part of the prerequisites.
    5. 对于“身份验证类型”,请选择需要用于连接到数据库的身份验证的类型For Authentication type, select the type of the authentication you want to use to connect to the database.
    6. 至于“用户名”,请输入能够访问 SQL Server 数据库的用户的名称。For User name, enter the name of user that has access to the SQL Server database. 如需在用户帐户或服务器名称中使用斜杠字符 (\),请使用转义字符 (\)。If you need to use a slash character (\) in your user account or server name, use the escape character (\). 例如 mydomain\\myuserAn example is mydomain\\myuser.
    7. 至于“密码”,请输入用户的密码For Password, enter the password for the user.
    8. 若要测试数据工厂是否可以连接到 SQL Server 数据库,请单击“测试连接”。To test whether Data Factory can connect to your SQL Server database, click Test connection. 修复任何错误,直到连接成功。Fix any errors until the connection succeeds.
    9. 若要保存链接服务,请单击“完成”。To save the linked service, click Finish.

创建 Azure SQL 数据库链接服务Create the Azure SQL Database linked service

在上一步骤中,你创建一个链接服务,将源 SQL Server 数据库链接到数据工厂。In the last step, you create a linked service to link your source SQL Server database to the data factory. 在该步骤中,要将目标/接收器数据库链接到数据工厂。In this step, you link your destination/sink database to the data factory.

  1. 在“连接”窗口中从“Integration Runtime”选项卡切换到“链接服务”选项卡,然后单击“+ 新建”。 In the Connections window, switch from Integration Runtimes tab to the Linked Services tab, and click + New.

  2. 在“新建链接服务”窗口中,选择“Azure SQL 数据库”,然后单击“继续”。 In the New Linked Service window, select Azure SQL Database, and click Continue.

  3. 在“新建链接服务”窗口中执行以下步骤:In the New Linked Service window, do the following steps:

    1. 对于“名称”,请输入 AzureSqlDatabaseLinkedServiceEnter AzureSqlDatabaseLinkedService for Name.
    2. 至于“服务器名称”,请从下拉列表中选择服务器的名称。For Server name, select the name of your server from the drop-down list.
    3. 至于“数据库名称”,请选择按照先决条件在其中创建了 created customer_table 和 project_table 的数据库。For Database name, select the database in which you created customer_table and project_table as part of the prerequisites.
    4. 对于“用户名”,请输入有权访问该数据库的用户的姓名。For User name, enter the name of user that has access to the database.
    5. 至于“密码”,请输入用户的密码For Password, enter the password for the user.
    6. 若要测试数据工厂是否可以连接到 SQL Server 数据库,请单击“测试连接”。To test whether Data Factory can connect to your SQL Server database, click Test connection. 修复任何错误,直到连接成功。Fix any errors until the connection succeeds.
    7. 若要保存链接服务,请单击“完成”。To save the linked service, click Finish.
  4. 确认在列表中看到两个链接服务。Confirm that you see two linked services in the list.

    两个链接服务

创建数据集Create datasets

在此步骤中,请创建多个数据集,分别表示数据源、数据目标以及用于存储水印的位置。In this step, you create datasets to represent the data source, the data destination, and the place to store the watermark.

创建源数据集Create a source dataset

  1. 在左窗格中单击“+ (加)”,然后单击“数据集”。 In the left pane, click + (plus), and click Dataset.

  2. 在“新建数据集”窗口中选择“SQL Server”,然后单击“继续”。 In the New Dataset window, select SQL Server, click Continue.

  3. 此时会在 Web 浏览器中看到打开的新选项卡,用于配置数据集。You see a new tab opened in the Web browser for configuring the dataset. 树状视图中也会看到数据集。You also see a dataset in the tree view. 在底部的属性窗口的“常规”选项卡中,输入 SourceDataset 作为名称In the General tab of the Properties window at the bottom, enter SourceDataset for Name.

  4. 在“属性”窗口中切换到“连接”选项卡,然后选择 SqlServerLinkedService 作为“链接服务”。 Switch to the Connection tab in the Properties window, and select SqlServerLinkedService for Linked service. 不要选择此处的表。You do not select a table here. 管道中的 Copy 活动使用 SQL 查询来加载数据,而不是加载整个表。The Copy activity in the pipeline uses a SQL query to load the data rather than load the entire table.

    源数据集 - 连接

创建接收器数据集Create a sink dataset

  1. 在左窗格中单击“+ (加)”,然后单击“数据集”。 In the left pane, click + (plus), and click Dataset.

  2. 在“新建数据集”窗口中,选择“Azure SQL 数据库”,然后单击“继续”。 In the New Dataset window, select Azure SQL Database, and click Continue.

  3. 此时会在 Web 浏览器中看到打开的新选项卡,用于配置数据集。You see a new tab opened in the Web browser for configuring the dataset. 树状视图中也会看到数据集。You also see a dataset in the tree view. 在底部的“属性”窗口的“常规”选项卡中,输入 SinkDataset 作为名称In the General tab of the Properties window at the bottom, enter SinkDataset for Name.

  4. 在“属性”窗口中切换到“参数”选项卡,然后执行以下步骤:Switch to the Parameters tab in the Properties window, and do the following steps:

    1. 在“创建/更新参数”部分单击“新建”。 Click New in the Create/update parameters section.

    2. 输入 SinkTableName 作为名称,输入字符串作为类型Enter SinkTableName for the name, and String for the type. 此数据集采用 SinkTableName 作为参数。This dataset takes SinkTableName as a parameter. SinkTableName 参数由管道在运行时动态设置。The SinkTableName parameter is set by the pipeline dynamically at runtime. 管道中的 ForEach 活动循环访问一个包含表名的列表,每一次迭代都将表名传递到此数据集。The ForEach activity in the pipeline iterates through a list of table names and passes the table name to this dataset in each iteration.

      接收器数据集 - 属性

  5. 在“属性”窗口中切换到“连接”选项卡,然后选择 AzureSqlDatabaseLinkedService 作为“链接服务”。 Switch to the Connection tab in the Properties window, and select AzureSqlDatabaseLinkedService for Linked service. 对于“表”属性,单击“添加动态内容”。For Table property, click Add dynamic content.

  6. 在“添加动态内容”窗口的“参数”部分中选择 SinkTableNameIn the Add Dynamic Content window, select SinkTableName in the Parameters section.

  7. 单击“完成”后,可以看到表名为“@dataset().SinkTableName”。After clicking Finish, you see "@dataset().SinkTableName" as the table name.

    接收器数据集 - 连接

为水印创建数据集Create a dataset for a watermark

在此步骤中,创建用于存储高水印值的数据集。In this step, you create a dataset for storing a high watermark value.

  1. 在左窗格中单击“+ (加)”,然后单击“数据集”。 In the left pane, click + (plus), and click Dataset.

  2. 在“新建数据集”窗口中,选择“Azure SQL 数据库”,然后单击“继续”。 In the New Dataset window, select Azure SQL Database, and click Continue.

  3. 在底部的“属性”窗口的“常规”选项卡中,输入 WatermarkDataset 作为名称In the General tab of the Properties window at the bottom, enter WatermarkDataset for Name.

  4. 切换到“连接”选项卡,然后执行以下步骤:Switch to the Connection tab, and do the following steps:

    1. 为“链接服务”选择“AzureSqlDatabaseLinkedService”。 Select AzureSqlDatabaseLinkedService for Linked service.

    2. 对于“表”,请选择 [dbo].[watermarktable]Select [dbo].[watermarktable] for Table.

      水印数据集 - 连接

创建管道Create a pipeline

此管道使用表名列表作为参数。The pipeline takes a list of table names as a parameter. ForEach 活动循环访问包含表名的列表,并执行以下操作:The ForEach activity iterates through the list of table names and performs the following operations:

  1. 通过 Lookup 活动检索旧的水印值(初始值或上次迭代中使用的值)。Use the Lookup activity to retrieve the old watermark value (the initial value or the one that was used in the last iteration).

  2. 通过 Lookup 活动检索新的水印值(源表中水印列的最大值)。Use the Lookup activity to retrieve the new watermark value (the maximum value of the watermark column in the source table).

  3. 通过 Copy 活动将这两个水印值之间的数据从源数据库复制到目标数据库。Use the Copy activity to copy data between these two watermark values from the source database to the destination database.

  4. 通过 StoredProcedure 活动更新旧水印值,以便在下一迭代的第一步使用该值。Use the StoredProcedure activity to update the old watermark value to be used in the first step of the next iteration.

创建管道Create the pipeline

  1. 在左窗格中单击“+ (加)”,然后单击“管道”。 In the left pane, click + (plus), and click Pipeline.

  2. 在“常规”面板的“属性”中,将名称指定为 IncrementalCopyPipeline 。In the General panel under Properties, specify IncrementalCopyPipeline for Name. 然后通过单击右上角的“属性”图标来折叠面板。Then collapse the panel by clicking the Properties icon in the top-right corner.

  3. 在“参数”选项卡中,执行以下步骤:In the Parameters tab, do the following steps:

    1. 单击“+ 新建”。Click + New.
    2. 输入 tableList 作为参数名称Enter tableList for the parameter name.
    3. 选择“数组”作为参数类型Select Array for the parameter type.
  4. 在“活动”工具栏中展开“迭代和条件语句”,然后将 ForEach 活动拖放到管道设计器图面。In the Activities toolbox, expand Iteration & Conditionals, and drag-drop the ForEach activity to the pipeline designer surface. 在属性窗口的“常规”选项卡中,输入 IterateSQLTablesIn the General tab of the Properties window, enter IterateSQLTables.

  5. 切换到“设置”选项卡,输入 @pipeline().parameters.tableList 作为Switch to the Settings tab, and enter @pipeline().parameters.tableList for Items. ForEach 活动循环访问一系列表,并执行增量复制操作。The ForEach activity iterates through a list of tables and performs the incremental copy operation.

    ForEach 活动 - 设置

  6. 在管道中选择 ForEach 活动(如果尚未选择)。Select the ForEach activity in the pipeline if it isn't already selected. 单击“编辑(铅笔图标)”按钮。Click the Edit (Pencil icon) button.

  7. 在“活动”工具箱中展开“常规”, 将查找活动拖放到管道设计器图面,然后输入 LookupOldWaterMarkActivity 作为名称In the Activities toolbox, expand General, drag-drop the Lookup activity to the pipeline designer surface, and enter LookupOldWaterMarkActivity for Name.

  8. 在“属性”窗口中切换到“设置”选项卡,然后执行以下步骤: Switch to the Settings tab of the Properties window, and do the following steps:

    1. 选择“WatermarkDataset”作为“源数据集”。 Select WatermarkDataset for Source Dataset.

    2. 为“使用查询”选择“查询”。 Select Query for Use Query.

    3. 为“查询”输入以下 SQL 查询。Enter the following SQL query for Query.

      select * from watermarktable where TableName  =  '@{item().TABLE_NAME}'
      

      第一个查找活动 - 设置

  9. 从“活动”工具箱拖放查找活动,然后输入 LookupNewWaterMarkActivity 作为名称Drag-drop the Lookup activity from the Activities toolbox, and enter LookupNewWaterMarkActivity for Name.

  10. 切换到“设置”选项卡。Switch to the Settings tab.

    1. 为“源数据集”选择“SourceDataset”。 Select SourceDataset for Source Dataset.

    2. 为“使用查询”选择“查询”。 Select Query for Use Query.

    3. 为“查询”输入以下 SQL 查询。Enter the following SQL query for Query.

      select MAX(@{item().WaterMark_Column}) as NewWatermarkvalue from @{item().TABLE_NAME}
      

      第二个查找活动 - 设置

  11. 从“活动”工具箱拖放复制活动,然后输入 IncrementalCopyActivity 作为名称Drag-drop the Copy activity from the Activities toolbox, and enter IncrementalCopyActivity for Name.

  12. 逐个地将“查找”活动连接到“复制”活动。 Connect Lookup activities to the Copy activity one by one. 若要进行连接,可以开始将附加到“查找”活动的绿色框拖放到“复制”活动。To connect, start dragging at the green box attached to the Lookup activity and drop it on the Copy activity. “复制”活动的边框颜色变为蓝色时,松开鼠标按键。Release the mouse button when the border color of the Copy activity changes to blue.

    将“查找”活动连接到“复制”活动

  13. 选择管道中的“复制”活动。Select the Copy activity in the pipeline. 切换到“属性”窗口中的“源”选项卡。 Switch to the Source tab in the Properties window.

    1. 为“源数据集”选择“SourceDataset”。 Select SourceDataset for Source Dataset.

    2. 为“使用查询”选择“查询”。 Select Query for Use Query.

    3. 为“查询”输入以下 SQL 查询。Enter the following SQL query for Query.

      select * from @{item().TABLE_NAME} where @{item().WaterMark_Column} > '@{activity('LookupOldWaterMarkActivity').output.firstRow.WatermarkValue}' and @{item().WaterMark_Column} <= '@{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}'        
      

      复制活动 - 源设置

  14. 切换到“接收器”选项卡,然后选择“SinkDataset”作为“接收器数据集”。 Switch to the Sink tab, and select SinkDataset for Sink Dataset.

  15. 执行以下步骤:Do the following steps:

    1. 在“数据集”属性中,输入 @{item().TABLE_NAME} 作为 SinkTableName 参数。In the Dataset properties, for SinkTableName parameter, enter @{item().TABLE_NAME}.

    2. 至于“存储过程名称”属性,请输入 @{item().StoredProcedureNameForMergeOperation}For Stored Procedure Name property, enter @{item().StoredProcedureNameForMergeOperation}.

    3. 至于“表类型”属性,请输入 @{item().TableType}For Table type property, enter @{item().TableType}.

    4. 至于“表类型参数名称”,请输入 @{item().TABLE_NAME}For Table type parameter name, enter @{item().TABLE_NAME}.

      复制活动 - 参数

  16. 将“存储过程”活动从“活动”工具箱拖放到管道设计器图面。 Drag-and-drop the Stored Procedure activity from the Activities toolbox to the pipeline designer surface. 将“复制”活动连接到“存储过程”活动。 Connect the Copy activity to the Stored Procedure activity.

  17. 在管道中选择“存储过程”活动,然后在“属性”窗口的“常规”选项卡中输入 StoredProceduretoWriteWatermarkActivity 作为名称Select the Stored Procedure activity in the pipeline, and enter StoredProceduretoWriteWatermarkActivity for Name in the General tab of the Properties window.

  18. 切换到“SQL 帐户”选项卡。至于“链接服务”,请选择 AzureSqlDatabaseLinkedServiceSwitch to the SQL Account tab, and select AzureSqlDatabaseLinkedService for Linked Service.

    存储过程活动 - SQL 帐户

  19. 切换到“存储过程”选项卡,然后执行以下步骤:Switch to the Stored Procedure tab, and do the following steps:

    1. 至于“存储过程名称”,请选择[dbo].[usp_write_watermark]For Stored procedure name, select [dbo].[usp_write_watermark].

    2. 选择“导入参数”。Select Import parameter.

    3. 指定以下参数值:Specify the following values for the parameters:

      名称Name 类型Type ValueValue
      LastModifiedtimeLastModifiedtime DateTimeDateTime @{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}
      TableNameTableName StringString @{activity('LookupOldWaterMarkActivity').output.firstRow.TableName}

      存储过程活动 - 存储过程设置

  20. 选择“全部发布”,以便将创建的实体发布到数据工厂服务。Select Publish All to publish the entities you created to the Data Factory service.

  21. 等待“已成功发布”消息出现。Wait until you see the Successfully published message. 若要查看通知,请单击“显示通知”链接。To see the notifications, click the Show Notifications link. 单击“X”关闭通知窗口。Close the notifications window by clicking X.

运行管道Run the pipeline

  1. 在管道的工具栏中单击“添加触发器”,然后单击“立即触发”。 On the toolbar for the pipeline, click Add trigger, and click Trigger Now.

  2. 在“管道运行”窗口中,输入以下值作为 tableList 参数,然后单击“完成”。 In the Pipeline Run window, enter the following value for the tableList parameter, and click Finish.

    [
        {
            "TABLE_NAME": "customer_table",
            "WaterMark_Column": "LastModifytime",
            "TableType": "DataTypeforCustomerTable",
            "StoredProcedureNameForMergeOperation": "usp_upsert_customer_table"
        },
        {
            "TABLE_NAME": "project_table",
            "WaterMark_Column": "Creationtime",
            "TableType": "DataTypeforProjectTable",
            "StoredProcedureNameForMergeOperation": "usp_upsert_project_table"
        }
    ]
    

    管道运行自变量

监视管道Monitor the pipeline

  1. 在左侧切换到“监视”选项卡。Switch to the Monitor tab on the left. 可以看到手动触发器触发的管道运行。You see the pipeline run triggered by the manual trigger. 可以使用“管道名称”列下的链接来查看活动详细信息以及重新运行该管道。You can use links under the PIPELINE NAME column to view activity details and to rerun the pipeline.

  2. 若要查看与管道运行关联的活动运行,请选择“管道名称”列下的链接。To see activity runs associated with the pipeline run, select the link under the PIPELINE NAME column. 有关活动运行的详细信息,请选择“活动名称”列下的“详细信息”链接(眼镜图标) 。For details about the activity runs, select the Details link (eyeglasses icon) under the ACTIVITY NAME column.

  3. 选择顶部的“所有管道运行”,回到“管道运行”视图。Select All pipeline runs at the top to go back to the Pipeline Runs view. 若要刷新视图,请选择“刷新”。To refresh the view, select Refresh.

查看结果Review the results

在 SQL Server Management Studio 中对目标 SQL 数据库运行以下查询,验证数据是否已从源表复制到目标表:In SQL Server Management Studio, run the following queries against the target SQL database to verify that the data was copied from source tables to destination tables:

查询Query

select * from customer_table

输出Output

===========================================
PersonID    Name    LastModifytime
===========================================
1           John    2017-09-01 00:56:00.000
2           Mike    2017-09-02 05:23:00.000
3           Alice   2017-09-03 02:36:00.000
4           Andy    2017-09-04 03:21:00.000
5           Anny    2017-09-05 08:06:00.000

查询Query

select * from project_table

输出Output

===================================
Project     Creationtime
===================================
project1    2015-01-01 00:00:00.000
project2    2016-02-02 01:23:00.000
project3    2017-03-04 05:16:00.000

查询Query

select * from watermarktable

输出Output

======================================
TableName       WatermarkValue
======================================
customer_table  2017-09-05 08:06:00.000
project_table   2017-03-04 05:16:00.000

请注意,已更新这两个表的水印值。Notice that the watermark values for both tables were updated.

向源表中添加更多数据Add more data to the source tables

对源 SQL Server 数据库运行以下查询,在 customer_table 中更新现有行。Run the following query against the source SQL Server database to update an existing row in customer_table. 将新行插入到 project_table 中。Insert a new row into project_table.

UPDATE customer_table
SET [LastModifytime] = '2017-09-08T00:00:00Z', [name]='NewName' where [PersonID] = 3

INSERT INTO project_table
(Project, Creationtime)
VALUES
('NewProject','10/1/2017 0:00:00 AM');

重新运行管道Rerun the pipeline

  1. 在 Web 浏览器窗口中,切换到左侧的“编辑”选项卡。In the web browser window, switch to the Edit tab on the left.

  2. 在管道的工具栏中单击“添加触发器”,然后单击“立即触发”。 On the toolbar for the pipeline, click Add trigger, and click Trigger Now.

  3. 在“管道运行”窗口中,输入以下值作为 tableList 参数,然后单击“完成”。 In the Pipeline Run window, enter the following value for the tableList parameter, and click Finish.

    [
        {
            "TABLE_NAME": "customer_table",
            "WaterMark_Column": "LastModifytime",
            "TableType": "DataTypeforCustomerTable",
            "StoredProcedureNameForMergeOperation": "usp_upsert_customer_table"
        },
        {
            "TABLE_NAME": "project_table",
            "WaterMark_Column": "Creationtime",
            "TableType": "DataTypeforProjectTable",
            "StoredProcedureNameForMergeOperation": "usp_upsert_project_table"
        }
    ]
    

再次监视管道Monitor the pipeline again

  1. 在左侧切换到“监视”选项卡。Switch to the Monitor tab on the left. 可以看到手动触发器触发的管道运行。You see the pipeline run triggered by the manual trigger. 可以使用“管道名称”列下的链接来查看活动详细信息以及重新运行该管道。You can use links under the PIPELINE NAME column to view activity details and to rerun the pipeline.

  2. 若要查看与管道运行关联的活动运行,请选择“管道名称”列下的链接。To see activity runs associated with the pipeline run, select the link under the PIPELINE NAME column. 有关活动运行的详细信息,请选择“活动名称”列下的“详细信息”链接(眼镜图标) 。For details about the activity runs, select the Details link (eyeglasses icon) under the ACTIVITY NAME column.

  3. 选择顶部的“所有管道运行”,回到“管道运行”视图。Select All pipeline runs at the top to go back to the Pipeline Runs view. 若要刷新视图,请选择“刷新”。To refresh the view, select Refresh.

查看最终结果Review the final results

在 SQL Server Management Studio 中对目标 SQL 数据库运行以下查询,验证更新的/全新的数据是否已从源表复制到目标表。In SQL Server Management Studio, run the following queries against the target SQL database to verify that the updated/new data was copied from source tables to destination tables.

查询Query

select * from customer_table

输出Output

===========================================
PersonID    Name    LastModifytime
===========================================
1           John    2017-09-01 00:56:00.000
2           Mike    2017-09-02 05:23:00.000
3           NewName 2017-09-08 00:00:00.000
4           Andy    2017-09-04 03:21:00.000
5           Anny    2017-09-05 08:06:00.000

请注意 PersonID 为 3 时对应的 NameLastModifytime 的新值。Notice the new values of Name and LastModifytime for the PersonID for number 3.

查询Query

select * from project_table

输出Output

===================================
Project     Creationtime
===================================
project1    2015-01-01 00:00:00.000
project2    2016-02-02 01:23:00.000
project3    2017-03-04 05:16:00.000
NewProject  2017-10-01 00:00:00.000

请注意,已将 NewProject 条目添加到 project_table。Notice that the NewProject entry was added to project_table.

查询Query

select * from watermarktable

输出Output

======================================
TableName       WatermarkValue
======================================
customer_table  2017-09-08 00:00:00.000
project_table   2017-10-01 00:00:00.000

请注意,已更新这两个表的水印值。Notice that the watermark values for both tables were updated.

后续步骤Next steps

已在本教程中执行了以下步骤:You performed the following steps in this tutorial:

  • 准备源和目标数据存储。Prepare source and destination data stores.
  • 创建数据工厂。Create a data factory.
  • 创建自承载 Integration Runtime (IR)。Create a self-hosted integration runtime (IR).
  • 安装 Integration Runtime。Install the integration runtime.
  • 创建链接服务。Create linked services.
  • 创建源、接收器和水印数据集。Create source, sink, and watermark datasets.
  • 创建、运行和监视管道。Create, run, and monitor a pipeline.
  • 查看结果。Review the results.
  • 在源表中添加或更新数据。Add or update data in source tables.
  • 重新运行和监视管道。Rerun and monitor the pipeline.
  • 查看最终结果。Review the final results.

请转到下一篇教程,了解如何在 Azure 上使用 Spark 群集转换数据:Advance to the following tutorial to learn about transforming data by using a Spark cluster on Azure: