Azure 数据工厂和 Azure Synapse Analytics 中的 ForEach 活动
适用于:Azure 数据工厂 Azure Synapse Analytics
ForEach Activity 定义了 Azure 数据工厂或 Synapse 管道中的重复控制流。 此活动用于循环访问集合,并在循环中执行指定的活动。 此活动的循环实现类似于采用编程语言的 Foreach 循环结构。
使用 UI 创建 ForEach 活动
若要在管道中使用 ForEach 活动,请完成以下步骤:
可以使用任何数组类型变量或其他活动的输出作为 ForEach 活动的输入。 若要创建数组变量,请选择管道画布的背景,然后选择“变量”选项卡以添加数组类型变量,如下所示。
在管道“活动”窗格中搜索“ForEach”,然后将 ForEach 活动拖动到管道画布上。
如果尚未选择画布上的新 ForEach 活动,请选择它及其“设置”选项卡,以编辑其详细信息。
选择“项”字段,然后选择“添加动态内容”链接以打开动态内容编辑器窗格。
选择要在动态内容编辑器中筛选的输入数组。 在此示例中,我们选择在第一步中创建的变量。
选择 ForEach 活动上的“活动”编辑器,为输入“项”数组中的每个项添加一个或多个要执行的活动。
在任何在 ForEach 活动中创建的活动中,都可以从 “项”列表中引用 ForEach 活动正在循环访问的当前项。 可以在任何可以使用动态表达式指定属性值的位置引用当前项。 在动态内容编辑器中,选择 ForEach 迭代器以返回当前项。
语法
此属性在本文后面介绍。 项属性是集合,通过 @item()
引用集合中的每个项目,如以下语法中所示:
{
"name":"MyForEachActivityName",
"type":"ForEach",
"typeProperties":{
"isSequential":"true",
"items": {
"value": "@pipeline().parameters.mySinkDatasetFolderPathCollection",
"type": "Expression"
},
"activities":[
{
"name":"MyCopyActivity",
"type":"Copy",
"typeProperties":{
...
},
"inputs":[
{
"referenceName":"MyDataset",
"type":"DatasetReference",
"parameters":{
"MyFolderPath":"@pipeline().parameters.mySourceDatasetFolderPath"
}
}
],
"outputs":[
{
"referenceName":"MyDataset",
"type":"DatasetReference",
"parameters":{
"MyFolderPath":"@item()"
}
}
]
}
]
}
}
Type 属性
属性 | 说明 | 允许的值 | 必需 |
---|---|---|---|
name | For-Each 活动的名称。 | String | 是 |
type | 必须设置为 ForEach | String | 是 |
isSequential | 指定是否应按顺序或并行执行循环。 一次最多可以并行执行 50 个循环迭代。 例如,如果你有 ForEach 活动,在 isSequential 设置为 False 的情况下循环访问含有 10 个不同源和接收器数据集的复制活动,所有副本都执行一次。 默认值为 False。 如果“isSequential”被设置为 False,则确保有运行多个可执行文件的正确配置。 否则,应谨慎使用此属性,以避免产生写入冲突。 有关详细信息,请参阅并行执行部分。 |
布尔 | 否。 默认值为 False。 |
batchCount | 要用于控制并行执行数的批计数(当 isSequential 设为 false 时)。 这是并发数上限,但 for-each 活动不会始终按此数量执行 | 整数(最大值为 50) | 否。 默认值为 20。 |
Items | 返回要循环访问的 JSON 数组的表达式。 | 表达式(返回 JSON 数组) | 是 |
活动 | 要执行的活动。 | 活动列表 | 是 |
并行执行
如果 isSequential 被设置为 False,则活动以并行方式迭代,最多包含 50 个并发迭代。 应谨慎使用此设置。 如果并发迭代写入同一文件夹中的不同文件,此方法仍然适用。 如果并发迭代以并发的方式写入同一文件,则此方法很有可能出错。
迭代表达式语言
在 ForEach 活动中,为属性 items 提供要迭代的数组。使用 @item()
在 ForEach 活动中对单个枚举进行迭代。 例如,如果 items 是数组:[1, 2, 3],则 @item()
在第一次迭代中返回 1,在第二次迭代中返回 2,在第三次迭代中返回 3。 你还可以使用 @range(0,10)
之类的表达式,从 0 开始,到 9 结束,迭代 10 次。
循环访问单个活动
场景: 从 Azure Blob 中的同一源文件复制到 Azure Blob 中的多个目标文件。
管道定义
{
"name": "<MyForEachPipeline>",
"properties": {
"activities": [
{
"name": "<MyForEachActivity>",
"type": "ForEach",
"typeProperties": {
"isSequential": "true",
"items": {
"value": "@pipeline().parameters.mySinkDatasetFolderPath",
"type": "Expression"
},
"activities": [
{
"name": "MyCopyActivity",
"type": "Copy",
"typeProperties": {
"source": {
"type": "BlobSource",
"recursive": "false"
},
"sink": {
"type": "BlobSink",
"copyBehavior": "PreserveHierarchy"
}
},
"inputs": [
{
"referenceName": "<MyDataset>",
"type": "DatasetReference",
"parameters": {
"MyFolderPath": "@pipeline().parameters.mySourceDatasetFolderPath"
}
}
],
"outputs": [
{
"referenceName": "MyDataset",
"type": "DatasetReference",
"parameters": {
"MyFolderPath": "@item()"
}
}
]
}
]
}
}
],
"parameters": {
"mySourceDatasetFolderPath": {
"type": "String"
},
"mySinkDatasetFolderPath": {
"type": "String"
}
}
}
}
Blob 数据集定义
{
"name":"<MyDataset>",
"properties":{
"type":"AzureBlob",
"typeProperties":{
"folderPath":{
"value":"@dataset().MyFolderPath",
"type":"Expression"
}
},
"linkedServiceName":{
"referenceName":"StorageLinkedService",
"type":"LinkedServiceReference"
},
"parameters":{
"MyFolderPath":{
"type":"String"
}
}
}
}
运行参数值
{
"mySourceDatasetFolderPath": "input/",
"mySinkDatasetFolderPath": [ "outputs/file1", "outputs/file2" ]
}
循环访问多个活动
可以循环访问 ForEach 活动中的多个活动(例如:复制和 Web 活动)。 在此方案中,我们建议将多个活动摘录到单独的管道。 然后,可以将管道中的 ExecutePipeline 活动与 ForEach 活动结合使用,以调用含有多个活动的单独管道。
语法
{
"name": "masterPipeline",
"properties": {
"activities": [
{
"type": "ForEach",
"name": "<MyForEachMultipleActivities>"
"typeProperties": {
"isSequential": true,
"items": {
...
},
"activities": [
{
"type": "ExecutePipeline",
"name": "<MyInnerPipeline>"
"typeProperties": {
"pipeline": {
"referenceName": "<copyHttpPipeline>",
"type": "PipelineReference"
},
"parameters": {
...
},
"waitOnCompletion": true
}
}
]
}
}
],
"parameters": {
...
}
}
}
示例
场景: 在包含执行管道活动的 ForEach 活动中循环访问 InnerPipeline。 内部管道使用参数化的架构定义进行复制。
主管道定义
{
"name": "masterPipeline",
"properties": {
"activities": [
{
"type": "ForEach",
"name": "MyForEachActivity",
"typeProperties": {
"isSequential": true,
"items": {
"value": "@pipeline().parameters.inputtables",
"type": "Expression"
},
"activities": [
{
"type": "ExecutePipeline",
"typeProperties": {
"pipeline": {
"referenceName": "InnerCopyPipeline",
"type": "PipelineReference"
},
"parameters": {
"sourceTableName": {
"value": "@item().SourceTable",
"type": "Expression"
},
"sourceTableStructure": {
"value": "@item().SourceTableStructure",
"type": "Expression"
},
"sinkTableName": {
"value": "@item().DestTable",
"type": "Expression"
},
"sinkTableStructure": {
"value": "@item().DestTableStructure",
"type": "Expression"
}
},
"waitOnCompletion": true
},
"name": "ExecuteCopyPipeline"
}
]
}
}
],
"parameters": {
"inputtables": {
"type": "Array"
}
}
}
}
内部管道定义
{
"name": "InnerCopyPipeline",
"properties": {
"activities": [
{
"type": "Copy",
"typeProperties": {
"source": {
"type": "SqlSource",
}
},
"sink": {
"type": "SqlSink"
}
},
"name": "CopyActivity",
"inputs": [
{
"referenceName": "sqlSourceDataset",
"parameters": {
"SqlTableName": {
"value": "@pipeline().parameters.sourceTableName",
"type": "Expression"
},
"SqlTableStructure": {
"value": "@pipeline().parameters.sourceTableStructure",
"type": "Expression"
}
},
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "sqlSinkDataset",
"parameters": {
"SqlTableName": {
"value": "@pipeline().parameters.sinkTableName",
"type": "Expression"
},
"SqlTableStructure": {
"value": "@pipeline().parameters.sinkTableStructure",
"type": "Expression"
}
},
"type": "DatasetReference"
}
]
}
],
"parameters": {
"sourceTableName": {
"type": "String"
},
"sourceTableStructure": {
"type": "String"
},
"sinkTableName": {
"type": "String"
},
"sinkTableStructure": {
"type": "String"
}
}
}
}
源数据集定义
{
"name": "sqlSourceDataset",
"properties": {
"type": "SqlServerTable",
"typeProperties": {
"tableName": {
"value": "@dataset().SqlTableName",
"type": "Expression"
}
},
"structure": {
"value": "@dataset().SqlTableStructure",
"type": "Expression"
},
"linkedServiceName": {
"referenceName": "sqlserverLS",
"type": "LinkedServiceReference"
},
"parameters": {
"SqlTableName": {
"type": "String"
},
"SqlTableStructure": {
"type": "String"
}
}
}
}
接收器数据集定义
{
"name": "sqlSinkDataSet",
"properties": {
"type": "AzureSqlTable",
"typeProperties": {
"tableName": {
"value": "@dataset().SqlTableName",
"type": "Expression"
}
},
"structure": {
"value": "@dataset().SqlTableStructure",
"type": "Expression"
},
"linkedServiceName": {
"referenceName": "azureSqlLS",
"type": "LinkedServiceReference"
},
"parameters": {
"SqlTableName": {
"type": "String"
},
"SqlTableStructure": {
"type": "String"
}
}
}
}
主管道参数
{
"inputtables": [
{
"SourceTable": "department",
"SourceTableStructure": [
{
"name": "departmentid",
"type": "int"
},
{
"name": "departmentname",
"type": "string"
}
],
"DestTable": "department2",
"DestTableStructure": [
{
"name": "departmentid",
"type": "int"
},
{
"name": "departmentname",
"type": "string"
}
]
}
]
}
聚合输出
若要聚合 foreach 活动的输出,请使用 Variables 和 Append Variable 活动。
首先,在管道中声明 array
变量。 然后,在每个 foreach 循环内调用追加变量 活动。 随后,你可以从数组中检索聚合。
限制和解决方法
以下是 ForEach 活动的一些限制以及建议的解决方法。
限制 | 解决方法 |
---|---|
不能将 ForEach 循环嵌套在另一个 ForEach 循环(或 Until 循环)中。 | 设计一个两级管道,其中具有外部 ForEach 循环的外部管道使用嵌套循环对内部管道进行迭代。 |
对于并行处理,ForEach 活动的最大 batchCount 为 50,最大项数为 100,000 个。 |
设计一个两级管道,其中具有 ForEach 活动的外部管道对内部管道进行迭代。 |
SetVariable 不能在并行运行的 ForEach 活动中使用,因为变量适用于整个管道,它们不限定于 ForEach 或任何其他活动。 | 请考虑使用顺序 ForEach,或在 ForEach 内使用执行管道(子管道中处理的变量/参数)。 |
相关内容
参阅支持的其他控制流活动: