使用复制数据工具中元数据驱动的方法生成大规模数据复制管道

适用于:Azure 数据工厂 Azure Synapse Analytics

如果你要复制巨量的对象(例如数千个表)或者要从大量不同的源中加载数据,适当的方法是在一个控制表中输入包含所需复制行为的对象的名称列表,然后使用参数化管道从该控制表中读取这些对象,并相应地将其应用于作业。 这样,只需更新控制表中的对象名称就能轻松维护(例如添加/删除)要复制的对象列表,而无需重新部署管道。 此外,还可以在单个位置轻松检查定义了复制行为的哪些管道/触发器复制了哪些对象。

ADF 中的复制数据工具简化了这种元数据驱动的数据复制管道的生成过程。 通过基于向导的体验完成直观的流程后,该工具可为你生成参数化管道和 SQL 脚本,以便你相应地创建外部控制表。 运行生成的脚本在 SQL 数据库中创建控制表后,管道将读取控制表中的元数据,并自动将其应用于复制作业。

通过复制数据工具创建元数据驱动的复制作业

  1. 在复制数据工具中选择“元数据驱动的复制任务”。

    需要输入连接以及控制表的表名称,使生成的管道从该控制表中读取元数据。

    选择任务类型

  2. 输入源数据库的连接。 也可以使用参数化链接服务

    选择参数化链接服务

  3. 选择要复制的表名称。

    选择表

    注意

    如果选择了表格数据存储,则可以在下一页中进一步选择完全加载或增量加载。 如果选择了存储,则在下一页中只能进一步选择完全加载。 目前不支持仅从存储中以增量方式加载新文件。

  4. 选择加载行为。

    提示

    如果你要对所有表执行完全复制,请选择“完全加载所有表”。 如果你要执行增量复制,可以选择“单独对每个表进行配置”,然后选择“增量加载”,以及用于为每个表开始复制的水印列名和值

  5. 选择“目标数据存储”。

  6. 在“设置”页中,可以通过“并发复制任务数”来确定用于从源存储中并发复制数据的复制活动数上限 。 默认值为 20。

    “设置”页

  7. 管道部署后,可以从 UI 复制或下载 SQL 脚本,用于创建控制表和存储过程。

    下载脚本

    你将看到两个 SQL 脚本。

    • 第一个 SQL 脚本用于创建两个控制表。 主控制表存储表列表、文件路径或复制行为。 连接控制表存储数据存储的连接值(如果使用的是参数化链接服务)。
    • 第二个 SQL 脚本用于创建存储过程。 每次增量复制作业完成时,将使用该脚本来更新主控制表中的水印值。
  8. 打开 SSMS 以连接到控制表服务器,然后运行这两个 SQL 脚本来创建控制表和存储过程。

    创建控制表脚本

  9. 查询主控制表和连接控制表,以查看其中的元数据。

    主控制表查询控制表脚本 1

    连接控制表查询控制表脚本 2

  10. 返回 ADF 门户以查看和调试管道。 你将看到,已创建一个名为“MetadataDrivenCopyTask_### ######”的文件夹。单击名为“MetadataDrivenCopyTask###_TopLevel”的管道,然后单击“调试运行” 。

    需要输入以下参数:

    参数名称 说明
    MaxNumberOfConcurrentTasks 在管道运行之前,始终可以更改最大并发复制活动运行数。 默认值是你在复制数据工具中输入的值。
    MainControlTableName 在运行之前,始终可以更改主控制表名称,使管道从该表中获取元数据。
    ConnectionControlTableName 在运行之前,始终可以更改连接控制表名称(可选),使管道获取与数据存储连接相关的元数据。
    MaxNumberOfObjectsReturnedFromLookupActivity 为了避免达到输出查找活动的限制,可使用该参数来定义查找活动返回的最大对象数。 在大多数情况下,无需更改默认值。
    windowStart 输入动态值(例如 yyyyy/mm/dd)作为文件夹路径时,会使用该参数将当前触发器时间传递给管道,以填充动态文件夹路径。 当管道由计划触发器或翻转窗口触发器触发时,用户无需输入此参数的值。 示例值:2021-01-25T01:49:28Z
  11. 启用触发器以使管道可运行。

    启用触发器

通过复制数据工具更新控制表

始终可以通过添加或删除要复制的对象或者更改每个表的复制行为,来直接更新控制表。 我们还在复制数据工具中创建了 UI 体验,以简化控制表编辑过程。

  1. 右键单击顶级管道“MetadataDrivenCopyTask_xxx_TopLevel”并选择“编辑控制表” 。

    编辑控制表 1

  2. 在控制表中选择要编辑的行。

    编辑控制表 2

  3. 完成复制数据工具中的每个步骤,最终它会为你创建一个新的 SQL 脚本。 重新运行 SQL 脚本以更新控制表。

    编辑控制表 3

    注意

    不会重新部署管道。 新建的 SQL 脚本只能帮助你更新控制表。

控制表

主控制表

控制表中的每一行包含一个要复制的对象(例如一个表)的元数据。

列名称 说明
ID 要复制的对象的唯一 ID。
SourceObjectSettings 源数据集的元数据。 可以是架构名称、表名称等。此处提供了示例。
SourceConnectionSettingsName 连接控制表中源连接设置的名称。 它是可选的。
CopySourceSettings 复制活动中源属性的元数据。 可以是查询、分区等。此处提供了示例。
SinkObjectSettings 目标数据集的元数据。 可以是文件名、文件夹路径、表名称等。此处提供了示例。 如果指定了动态文件夹路径,则变量值不会写入到控制表中的此列。
SinkConnectionSettingsName 连接控制表中目标连接设置的名称。 它是可选的。
CopySinkSettings 复制活动中接收器属性的元数据。 可以是 preCopyScript、tableOption 等。此处提供了示例。
CopyActivitySettings 复制活动中转换器属性的元数据。 用于定义列映射。
TopLevelPipelineName 顶级管道名称,它可以复制此对象。
TriggerName 触发器名称,它可以触发管道来复制此对象。 如果调试运行,则名称为“沙盒”。 如果手动执行,则名称为“手动”。 如果计划内运行,则名称为关联的触发器名称。 它可以输入多个名称。
DataLoadingBehaviorSettings 完全加载与增量加载。
TaskId 要按控制表中的 TaskId 复制的对象的顺序 (ORDER BY [TaskId] DESC)。 如果你要复制巨量的对象,但允许的并发复制数有限,则你可以更改每个对象的 TaskId,以确定哪些对象可以先复制。 默认值为 0。
CopyEnabled 指定是否在数据引入过程中启用了该项。 允许的值:1(已启用)、0(已禁用)。 默认值为 1。

连接控制表

控制表中的每一行包含该数据存储的一个连接设置。

列名称 说明
名称 主控制表中参数化连接的名称。
ConnectionSettings 连接设置。 可以是数据库名称、服务器名称等。

管道

你将看到复制数据工具生成了三个级别的管道。

MetadataDrivenCopyTask_xxx_TopLevel

此管道将计算需要在此运行中复制的对象(表等)总数,根据允许的最大并发复制任务数提供顺序批数,然后执行另一个管道来按顺序复制不同的批。

参数

参数名称 说明
MaxNumberOfConcurrentTasks 在管道运行之前,始终可以更改最大并发复制活动运行数。 默认值是你在复制数据工具中输入的值。
MainControlTableName 主控制表的表名称。 在运行之前,管道将从此表中获取元数据
ConnectionControlTableName 连接控制表的表名称(可选)。 在运行之前,管道将获取与数据存储连接相关的元数据
MaxNumberOfObjectsReturnedFromLookupActivity 为了避免达到输出查找活动的限制,可使用该参数来定义查找活动返回的最大对象数。 在大多数情况下,无需更改默认值。
windowStart 输入动态值(例如 yyyyy/mm/dd)作为文件夹路径时,会使用该参数将当前触发器时间传递给管道,以填充动态文件夹路径。 当管道由计划触发器或翻转窗口触发器触发时,用户无需输入此参数的值。 示例值:2021-01-25T01:49:28Z

活动

活动名称 活动类型 说明
GetSumOfObjectsToCopy 查找 计算需要在此运行中复制的对象(表等)总数。
CopyBatchesOfObjectsSequentially ForEach 根据允许的最大并发复制任务数提供顺序批数,然后执行另一个管道来按顺序复制不同的批。
CopyObjectsInOneBtach 执行管道 执行另一个管道来复制一批对象。 属于此批的对象将会并行复制。

MetadataDrivenCopyTask_xxx_MiddleLevel

此管道将复制一批对象。 属于此批的对象将会并行复制。

参数

参数名称 说明
MaxNumberOfObjectsReturnedFromLookupActivity 为了避免达到输出查找活动的限制,可使用该参数来定义查找活动返回的最大对象数。 在大多数情况下,无需更改默认值。
TopLevelPipelineName 顶层管道的名称。
TriggerName 触发器的名称。
CurrentSequentialNumberOfBatch 顺序批的 ID。
SumOfObjectsToCopy 要复制的对象总数。
SumOfObjectsToCopyForCurrentBatch 当前批中要复制的对象数。
MainControlTableName 主控制表的名称。
ConnectionControlTableName 连接控制表的名称。

活动

活动名称 活动类型 说明
DivideOneBatchIntoMultipleGroups ForEach 将单个批中的对象划分为多个并行组,以避免达到查找活动的输出限制。
GetObjectsPerGroupToCopy 查找 从控制表中获取需要在此组中复制的对象(表等)。 要按控制表中的 TaskId 复制的对象的顺序 (ORDER BY [TaskId] DESC)。
CopyObjectsInOneGroup 执行管道 执行另一个管道以从一个组复制对象。 属于此组的对象将会并行复制。

MetadataDrivenCopyTask_xxx_BottomLevel

此管道将从一个组复制对象。 属于此组的对象将会并行复制。

参数

参数名称 说明
ObjectsPerGroupToCopy 当前组中要复制的对象数。
ConnectionControlTableName 连接控制表的名称。
windowStart 用于将当前触发器时间传递给管道,以填充动态文件夹路径(如果已由用户配置)。

活动

活动名称 活动类型 说明
ListObjectsFromOneGroup ForEach 列出一个组中的对象,并将其中的每个对象迭代到下游活动。
RouteJobsBasedOnLoadingBehavior 开关 检查每个对象的加载行为。 如果加载行为是默认行为或 FullLoad,则执行完全加载。 如果加载行为是 DeltaLoad,则通过水印列执行增量加载以识别更改
FullLoadOneObject 复制 获取此对象的完整快照并将其复制到目标。
DeltaLoadOneObject 复制 通过比较水印列中的值来识别更改,以便仅复制自上次复制以来已更改的数据。
GetMaxWatermarkValue 查找 查询源对象以获取水印列中的最大值。
UpdateWatermarkColumnValue StoreProcedure 将新的水印值写回到控制表供下次使用。

已知的限制

  • 无法在 ADF 中参数化 IR 名称、数据库类型和文件格式类型。 例如,如果你要从 Oracle Server 和 SQL Server 引入数据,则需要两个不同的参数化管道。 但是,一个控制表可由两组管道共享。
  • OPENJSON 用于通过复制数据工具生成 SQL 脚本。 如果使用 SQL Server 托管控制表,则必须是 SQL Server 2016 (13.x) 及更高版本以便支持 OPENJSON 函数。

请尝试以下使用“复制数据”工具的教程: