Observação
O acesso a essa página exige autorização. Você pode tentar entrar ou alterar diretórios.
O acesso a essa página exige autorização. Você pode tentar alterar os diretórios.
适用于:
Azure 数据工厂
Azure Synapse Analytics
在本教程中,你将使用管道创建一个Azure 数据工厂,该管道将增量数据从Azure SQL 数据库中的表加载到 Azure Blob 存储。
在本教程中执行以下步骤:
- 准备数据存储以存放水印值。
- 创建数据工厂。
- 创建链接服务。
- 创建源、接收器和水印数据集。
- 创建管道。
- 运行管道。
- 监视管道运行。
- 查看结果
- 向源添加更多数据。
- 再次运行管道。
- 监视第二个管道运行
- 查看第二次运行的结果
概述
下面是高级解决方案示意图:
下面是创建此解决方案所要执行的重要步骤:
选择水印列。 在源数据存储中选择一列,该列可用于在每次运行时分割或分类新记录和更新记录。 通常,在创建或更新行时,此选定列中的数据(例如 last_modify_time 或 ID)会不断递增。 此列中的最大值用作水印。
准备用于存储水印值的数据存储。 在本教程中,你将把水印值存储在一个 SQL 数据库中。
创建采用以下工作流的管道:
此解决方案中的管道具有以下活动:
- 创建两个 Lookup 活动。 使用第一个查找活动检索最后一个水印值。 使用第二个 Lookup 活动检索新的水印值。 这些水印值将传递给复制活动。
- 创建一个复制活动,从源数据存储中复制行,其中水印列的值大于旧水印值并且小于新水印值。 然后,该活动将源数据存储中的增量数据作为新文件复制到 Blob 存储。
- 创建 StoredProcedure 活动,用于更新下一次运行的管道的水印值。
如果没有Azure订阅,请在开始前创建 trial 帐户。
先决条件
- Azure SQL 数据库。 将数据库用作源数据存储。 如果您没有在Azure SQL数据库中的数据库,请参阅 在 Azure SQL 数据库 中创建数据库 以了解创建数据库的步骤。
- Azure 存储。 将 Blob 存储用作接收器数据存储。 如果没有存储帐户,请参阅创建存储帐户以获取创建步骤。 创建名为 adftutorial 的容器。
在 SQL 数据库中创建数据源表
打开SQL Server Management Studio。 在“服务器资源管理器”中,右键单击数据库,然后选择“新建查询”。
针对 SQL 数据库运行以下 SQL 命令,创建名为
data_source_table的表作为数据源存储:create table data_source_table ( PersonID int, Name varchar(255), LastModifytime datetime ); INSERT INTO data_source_table (PersonID, Name, LastModifytime) VALUES (1, 'aaaa','9/1/2017 12:56:00 AM'), (2, 'bbbb','9/2/2017 5:23:00 AM'), (3, 'cccc','9/3/2017 2:36:00 AM'), (4, 'dddd','9/4/2017 3:21:00 AM'), (5, 'eeee','9/5/2017 8:06:00 AM');本教程中,您将使用 LastModifytime 作为标记字段。 下表显示了数据源存储中的数据:
PersonID | Name | LastModifytime -------- | ---- | -------------- 1 | aaaa | 2017-09-01 00:56:00.000 2 | bbbb | 2017-09-02 05:23:00.000 3 | cccc | 2017-09-03 02:36:00.000 4 | dddd | 2017-09-04 03:21:00.000 5 | eeee | 2017-09-05 08:06:00.000
在 SQL 数据库中创建另一个表,用于存储高水印值
针对 SQL 数据库运行以下 SQL 命令,创建名为
watermarktable的表,用于存储水印值:create table watermarktable ( TableName varchar(255), WatermarkValue datetime, );使用源数据存储表名来设置高水印的默认值。 在本教程中,表名为 data_source_table。
INSERT INTO watermarktable VALUES ('data_source_table','1/1/2010 12:00:00 AM')查看
watermarktable表中的数据。Select * from watermarktable输出:
TableName | WatermarkValue ---------- | -------------- data_source_table | 2010-01-01 00:00:00.000
在 SQL 数据库中创建存储过程
运行以下命令,在 SQL 数据库中创建存储过程:
CREATE PROCEDURE usp_write_watermark @LastModifiedtime datetime, @TableName varchar(50)
AS
BEGIN
UPDATE watermarktable
SET [WatermarkValue] = @LastModifiedtime
WHERE [TableName] = @TableName
END
创建数据工厂
启动 Microsoft Edge 或 Google Chrome Web 浏览器。 目前,数据工厂 UI 仅在 Microsoft Edge 和 Google Chrome Web 浏览器中受支持。
在顶部菜单中,选择“ 创建资源>数据 + 分析>数据工厂 ” :
在“新建数据工厂”页中,输入 ADFIncCopyTutorialDF 作为名称。
Azure 数据工厂的名称必须全球唯一。 如果看到红色感叹号和以下错误,请更改数据工厂的名称(例如改为 yournameADFIncCopyTutorialDF),并重新尝试创建。 有关数据工厂项目命名规则,请参阅数据工厂 - 命名规则一文。
数据工厂名“ADFIncCopyTutorialDF”不可用
选择要在其中创建数据工厂的 Azure 订阅。
对于资源组,请执行以下步骤之一:
选择“使用现有资源组”,并从下拉列表选择现有的资源组。
选择“新建”,并输入资源组的名称。
若要了解资源组,请参阅 使用资源组来管理Azure资源。
选择数据工厂的位置。 下拉列表中仅显示支持的位置。 数据工厂使用的数据存储(Azure 存储、Azure SQL 数据库、Azure SQL 托管实例等)和计算(HDInsight 等)可以位于其他区域。
单击“创建”。
创建完成后,可以看到图中所示的“数据工厂”页。
在 Open Azure 数据工厂 Studio 磁贴上选择 Open,以在单独的浏览器标签页中启动 Azure 数据工厂 用户界面 (UI)。
创建管道
在本教程中,你将创建一个管道,其中包含两个查找活动,一个复制活动,一个 StoredProcedure 活动链接在一个管道中。
在数据工厂 UI 的主页上,单击“协调”磁贴。
在“常规”面板的“属性”中,将 名称 指定为 IncrementalCopyPipeline。 然后通过单击右上角的“属性”图标来折叠面板。
让我们添加第一个查找活动,以获取旧的水印值。 在活动工具箱中展开常规,将查找活动拖放到管道设计器图面。 将活动的名称更改为 LookupOldWaterMarkActivity。
切换到“设置”选项卡,针对“源数据集”单击“+ 新建”。 在此步骤中,请创建一个代表 watermarktable 中数据的数据集。 此表包含在前一复制操作中使用过的旧水印。
在新建数据集窗口中,选择Azure SQL 数据库,然后单击Continue。 此时可以看到为数据集打开了一个新窗口。
在数据集的“设置属性”窗口中,将“名称”设为“WatermarkDataset”。
对于“链接服务”,选择“新建”,然后执行下列步骤:
在名称中输入AzureSqlDatabaseLinkedService。
对于“服务器名称”,请选择你的服务器。
从下拉列表中选择您的“数据库名称”。
输入用户名和密码。
若要测试到 SQL 数据库的连接,请单击“测试连接”。
单击“完成”。
请确认在“链接服务”中选择了“AzureSqlDatabaseLinkedService”。
选择“完成”。
在“连接”选项卡中,在“表”中选择“[dbo].[watermarktable]”。 若要预览表中的数据,请单击“预览数据”。
通过单击顶部的管道选项卡,或者单击左侧树状视图中管道的名称,切换到管道编辑器。 在查找活动的属性窗口中,确认对于“源数据集”字段,是否已选择 WatermarkDataset。
在“活动”工具箱中展开“常规”, 将另一查找活动拖放到管道设计器图面,然后在属性窗口的“常规”选项卡中将名称设置为 LookupNewWaterMarkActivity。 此“查找”活动从包含欲复制至目标的源数据的表中获取新的水印值。
在第二个“Lookup”活动的属性窗口中,切换到“设置”选项卡,然后单击“新建”。 请创建一个数据集,使之指向源表,该表包含新的水印值(LastModifyTime 的最大值)。
在新建数据集窗口中,选择Azure SQL 数据库,然后单击Continue。
在“设置属性”窗口中,对于“名称”输入“SourceDataset”。 为“链接服务”选择“AzureSqlDatabaseLinkedService”。
选择表 [dbo].[data_source_table]。 本教程后面需指定一个针对此数据集的查询。 此查询优先于在此步骤中指定的表。
选择“完成”。
通过单击顶部的管道选项卡,或者单击左侧树状视图中管道的名称,切换到管道编辑器。 在查找活动的属性窗口中,确认对于“源数据集”字段,是否已选择 SourceDataset。
在“Use Query”字段中,选择“Query”, 然后输入以下查询:仅从 data_source_table 中选择 LastModifytime 的最大值。 请确保您还勾选了“仅第一行”选项。
select MAX(LastModifytime) as NewWatermarkvalue from data_source_table
在“任务”工具箱中,展开“Move & Transform”
,从“活动”工具箱中拖放“复制” 活动,然后将名称设置为“IncrementalCopyActivity” 。 通过将连接在查找活动上的绿色按钮拖动到复制活动,将两个查找活动连接到复制活动。 当你看到复制活动边框颜色更改为蓝色时,释放鼠标按钮。
选择 复制活动并确认在 Properties 窗口中看到活动的属性。
在“属性”窗口中切换到“源”选项卡,然后执行以下步骤:
对于“源数据集”字段,请选择“SourceDataset”。
对于“Use Query”字段,请选择“Query”。
对于“查询”字段,请输入以下 SQL 查询。
select * from data_source_table where LastModifytime > '@{activity('LookupOldWaterMarkActivity').output.firstRow.WatermarkValue}' and LastModifytime <= '@{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}'
切换到“接收器”选项卡。对于“接收器数据集”字段,请单击“+ 新建”。
本教程中,汇集数据存储的类型为 Azure Blob 存储。 因此,选择
Azure Blob 存储 ,然后在“新建数据集continue>。 在“选择格式”窗口中选择数据的格式类型,然后单击“继续”。
在“设置属性”窗口中,将“名称”输入为“SinkDataset”。 对于“链接服务”,选择“+新建”。 在此步骤中,将创建用于 Azure Blob 存储 的连接(链接服务)。
在 “新建链接服务”(Azure Blob 存储)窗口中,执行以下步骤:
- 输入 AzureStorageLinkedService 作为名称。
- 为 Storage 帐户名称选择Azure 存储帐户。
- 测试连接,然后单击“完成”。
在“设置属性”窗口中,对于“链接服务”,确认选择了“AzureStorageLinkedService”。 然后选择“完成”。
转到 SinkDataset 的“连接”选项卡,然后执行以下步骤:
- 对于“文件路径”字段,请输入“adftutorial/incrementalcopy”。 adftutorial 是 Blob 容器名称,incrementalcopy 是文件夹名称。 此代码片段假设 Blob 存储中有一个名为 adftutorial 的 Blob 容器。 创建容器(如果不存在),或者将容器设置为现有容器的名称。 如果输出文件夹不存在,Azure 数据工厂会自动创建输出文件夹incrementalcopy。 对于“文件路径”,您也可以使用“浏览”按钮导航到 Blob 容器内的某个文件夹。
- 请在“文件路径”字段的“文件”部分,选择“添加动态内容 [Alt+P]”,然后在打开的窗口中输入
@CONCAT('Incremental-', pipeline().RunId, '.txt')。 然后选择“完成”。 文件名是使用表达式动态生成的。 每次管道运行都有唯一的 ID。 "复制活动" 使用运行 ID 生成文件名。
通过单击顶部的管道选项卡,或者单击左侧树状视图中管道的名称,切换到管道编辑器。
在活动工具箱中,展开常规,然后将存储过程活动从活动工具箱拖放到管道设计器图面。 将复制活动的绿色(成功)输出连接到存储过程活动。
在管道设计器中选择“存储过程活动”,将其名称更改为 StoredProceduretoWriteWatermarkActivity。
切换到“SQL 帐户”选项卡,然后在“链接服务”中选择“AzureSqlDatabaseLinkedService”。
切换到“存储过程”选项卡,然后执行以下步骤:
对于存储过程名称,选择usp_write_watermark。
若要指定存储过程参数的值,请单击“导入参数”,然后为参数输入以下值:
名称 类型 值 上次修改时间 日期时间 @{activity('LookupNewWaterMarkActivity')。output.firstRow.NewWatermarkvalue} 数据表名称 字符串 @{activity('LookupOldWaterMarkActivity')。output.firstRow.TableName}
若要验证管道设置,请单击工具栏中的“验证”。 确认没有任何验证错误。 若要关闭“管道验证报告”窗口,请单击 。
通过选择“Publish All 按钮,将实体(链接服务、数据集和管道)发布到Azure 数据工厂服务。 等待“发布成功”消息出现。
触发管道的运行
单击工具栏中的“添加触发器”,然后单击“立即触发”。
在“管道运行”窗口中选择“完成”。
监视管道运行
切换到左侧的“监视”选项卡。 您可以查看由手动触发的管道运行状态。 可使用“管道名称”列下的链接来查看运行详细信息并重新运行该管道。
若要查看与管道运行关联的活动运行,请选择“管道名称”列下的链接。 有关活动运行的详细信息,请选择“活动名称”列下的“详细信息”链接(眼镜图标) 。 请在顶部选择所有管道运行,以返回到“管道运行”视图。 若要刷新视图,请选择“刷新”。
查看结果
使用 Azure 存储资源管理器 等工具连接到Azure 存储帐户。 验证 adftutorial 容器的 incrementalcopy 文件夹中是否创建了一个输出文件。
打开输出文件,请注意,所有数据已从 data_source_table 复制到 Blob 文件。
1,aaaa,2017-09-01 00:56:00.0000000 2,bbbb,2017-09-02 05:23:00.0000000 3,cccc,2017-09-03 02:36:00.0000000 4,dddd,2017-09-04 03:21:00.0000000 5,eeee,2017-09-05 08:06:00.0000000在
watermarktable中查看最新值。 可看到水印值已更新。Select * from watermarktable输出如下:
| TableName | WatermarkValue | | --------- | -------------- | | data_source_table | 2017-09-05 8:06:00.000 |
向源添加更多数据
在数据库(数据源存储)中插入新数据。
INSERT INTO data_source_table
VALUES (6, 'newdata','9/6/2017 2:23:00 AM')
INSERT INTO data_source_table
VALUES (7, 'newdata','9/7/2017 9:01:00 AM')
数据库中更新后的数据为:
PersonID | Name | LastModifytime
-------- | ---- | --------------
1 | aaaa | 2017-09-01 00:56:00.000
2 | bbbb | 2017-09-02 05:23:00.000
3 | cccc | 2017-09-03 02:36:00.000
4 | dddd | 2017-09-04 03:21:00.000
5 | eeee | 2017-09-05 08:06:00.000
6 | newdata | 2017-09-06 02:23:00.000
7 | newdata | 2017-09-07 09:01:00.000
触发另一流水线的运行
切换到“编辑”选项卡。单击树状视图中的管道(如果未在设计器中打开)。
单击工具栏中的“添加触发器”,然后单击“立即触发”。
监视第二个管道运行
在左侧切换到“监控”选项卡。 您可以看到通过手动触发器启动的流水线运行状态。 可以使用“管道名称”列下的链接来查看活动详细信息以及重新运行该管道。
若要查看与管道运行关联的活动运行,请选择“管道名称”列下的链接。 有关活动运行的详细信息,请选择“活动名称”列下的“详细信息”链接(眼镜图标) 。 选择顶部的“所有管道运行”,回到“管道运行”视图。 若要刷新视图,请选择“刷新”。
验证第二个输出
在 Blob 存储中,可以看到另一文件已创建。 在本教程中,新文件名为
Incremental-<GUID>.txt。 打开该文件,会看到其中包含两行记录。6,newdata,2017-09-06 02:23:00.0000000 7,newdata,2017-09-07 09:01:00.0000000在
watermarktable中查看最新值。 可看到水印值已再次更新。Select * from watermarktable示例输出:
| TableName | WatermarkValue | | --------- | -------------- | | data_source_table | 2017-09-07 09:01:00.000 |
相关内容
已在本教程中执行了以下步骤:
- 准备数据存储以存放水印值。
- 创建数据工厂。
- 创建链接服务。
- 创建源、接收器和水印数据集。
- 创建管道。
- 运行管道。
- 监视管道运行。
- 查看结果
- 向源添加更多数据。
- 再次运行管道。
- 监视第二个管道运行
- 查看第二次运行的结果
在本教程中,管道将数据从 SQL 数据库中的单个表复制到了 Blob 存储。 转到以下教程,了解如何将数据从SQL Server数据库中的多个表复制到 SQL 数据库。