Azure 数据工厂和 Azure Synapse Analytics 中的 ForEach 活动

适用于:Azure 数据工厂 Azure Synapse Analytics

ForEach Activity 定义了 Azure 数据工厂或 Synapse 管道中的重复控制流。 此活动用于循环访问集合,并在循环中执行指定的活动。 此活动的循环实现类似于采用编程语言的 Foreach 循环结构。

使用 UI 创建 ForEach 活动

若要在管道中使用 ForEach 活动,请完成以下步骤:

  1. 可以使用任何数组类型变量或其他活动的输出作为 ForEach 活动的输入。 若要创建数组变量,请选择管道画布的背景,然后选择“变量”选项卡以添加数组类型变量,如下所示。

    显示一个空的管道画布,其中向管道添加了一个数组类型变量。

  2. 在管道“活动”窗格中搜索“ForEach”,然后将 ForEach 活动拖动到管道画布上。

  3. 如果尚未选择画布上的新 ForEach 活动,请选择它及其“设置”选项卡,以编辑其详细信息。

    显示 Filter 活动的 UI。

  4. 选择“项”字段,然后选择“添加动态内容”链接以打开动态内容编辑器窗格。

    显示 Items 属性的“添加动态内容”链接。

  5. 选择要在动态内容编辑器中筛选的输入数组。 在此示例中,我们选择在第一步中创建的变量。

    显示动态内容编辑器,并选定了在第一步中创建的变量

  6. 选择 ForEach 活动上的“活动”编辑器,为输入“项”数组中的每个项添加一个或多个要执行的活动。

    显示管道编辑器窗口中 ForEach 活动上的“活动编辑器”按钮。

  7. 在任何在 ForEach 活动中创建的活动中,都可以从 “项”列表中引用 ForEach 活动正在循环访问的当前项。 可以在任何可以使用动态表达式指定属性值的位置引用当前项。 在动态内容编辑器中,选择 ForEach 迭代器以返回当前项。

    显示选定了 ForEach 迭代器的动态内容编辑器。

语法

此属性在本文后面介绍。 项属性是集合,通过 @item() 引用集合中的每个项目,如以下语法中所示:

{  
   "name":"MyForEachActivityName",
   "type":"ForEach",
   "typeProperties":{  
      "isSequential":"true",
        "items": {
            "value": "@pipeline().parameters.mySinkDatasetFolderPathCollection",
            "type": "Expression"
        },
      "activities":[  
         {  
            "name":"MyCopyActivity",
            "type":"Copy",
            "typeProperties":{  
               ...
            },
            "inputs":[  
               {  
                  "referenceName":"MyDataset",
                  "type":"DatasetReference",
                  "parameters":{  
                     "MyFolderPath":"@pipeline().parameters.mySourceDatasetFolderPath"
                  }
               }
            ],
            "outputs":[  
               {  
                  "referenceName":"MyDataset",
                  "type":"DatasetReference",
                  "parameters":{  
                     "MyFolderPath":"@item()"
                  }
               }
            ]
         }
      ]
   }
}

Type 属性

属性 说明 允许的值 必需
name For-Each 活动的名称。 String
type 必须设置为 ForEach String
isSequential 指定是否应按顺序或并行执行循环。 一次最多可以并行执行 50 个循环迭代。 例如,如果你有 ForEach 活动,在 isSequential 设置为 False 的情况下循环访问含有 10 个不同源和接收器数据集的复制活动,所有副本都执行一次。 默认值为 False。

如果“isSequential”被设置为 False,则确保有运行多个可执行文件的正确配置。 否则,应谨慎使用此属性,以避免产生写入冲突。 有关详细信息,请参阅并行执行部分。
布尔 否。 默认值为 False。
batchCount 要用于控制并行执行数的批计数(当 isSequential 设为 false 时)。 这是并发数上限,但 for-each 活动不会始终按此数量执行 整数(最大值为 50) 否。 默认值为 20。
Items 返回要循环访问的 JSON 数组的表达式。 表达式(返回 JSON 数组)
活动 要执行的活动。 活动列表

并行执行

如果 isSequential 被设置为 False,则活动以并行方式迭代,最多包含 50 个并发迭代。 应谨慎使用此设置。 如果并发迭代写入同一文件夹中的不同文件,此方法仍然适用。 如果并发迭代以并发的方式写入同一文件,则此方法很有可能出错。

迭代表达式语言

在 ForEach 活动中,为属性 items 提供要迭代的数组。使用 @item() 在 ForEach 活动中对单个枚举进行迭代。 例如,如果 items 是数组:[1, 2, 3],则 @item() 在第一次迭代中返回 1,在第二次迭代中返回 2,在第三次迭代中返回 3。 你还可以使用 @range(0,10) 之类的表达式,从 0 开始,到 9 结束,迭代 10 次。

循环访问单个活动

场景: 从 Azure Blob 中的同一源文件复制到 Azure Blob 中的多个目标文件。

管道定义

{
    "name": "<MyForEachPipeline>",
    "properties": {
        "activities": [
            {
                "name": "<MyForEachActivity>",
                "type": "ForEach",
                "typeProperties": {
                    "isSequential": "true",
                    "items": {
                        "value": "@pipeline().parameters.mySinkDatasetFolderPath",
                        "type": "Expression"
                    },
                    "activities": [
                        {
                            "name": "MyCopyActivity",
                            "type": "Copy",
                            "typeProperties": {
                                "source": {
                                    "type": "BlobSource",
                                    "recursive": "false"
                                },
                                "sink": {
                                    "type": "BlobSink",
                                    "copyBehavior": "PreserveHierarchy"
                                }
                            },
                            "inputs": [
                                {
                                    "referenceName": "<MyDataset>",
                                    "type": "DatasetReference",
                                    "parameters": {
                                        "MyFolderPath": "@pipeline().parameters.mySourceDatasetFolderPath"
                                    }
                                }
                            ],
                            "outputs": [
                                {
                                    "referenceName": "MyDataset",
                                    "type": "DatasetReference",
                                    "parameters": {
                                        "MyFolderPath": "@item()"
                                    }
                                }
                            ]
                        }
                    ]
                }
            }
        ],
        "parameters": {
            "mySourceDatasetFolderPath": {
                "type": "String"
            },
            "mySinkDatasetFolderPath": {
                "type": "String"
            }
        }
    }
}

Blob 数据集定义

{  
   "name":"<MyDataset>",
   "properties":{  
      "type":"AzureBlob",
      "typeProperties":{  
         "folderPath":{  
            "value":"@dataset().MyFolderPath",
            "type":"Expression"
         }
      },
      "linkedServiceName":{  
         "referenceName":"StorageLinkedService",
         "type":"LinkedServiceReference"
      },
      "parameters":{  
         "MyFolderPath":{  
            "type":"String"
         }
      }
   }
}

运行参数值

{
    "mySourceDatasetFolderPath": "input/",
    "mySinkDatasetFolderPath": [ "outputs/file1", "outputs/file2" ]
}

循环访问多个活动

可以循环访问 ForEach 活动中的多个活动(例如:复制和 Web 活动)。 在此方案中,我们建议将多个活动摘录到单独的管道。 然后,可以将管道中的 ExecutePipeline 活动与 ForEach 活动结合使用,以调用含有多个活动的单独管道。

语法

{
  "name": "masterPipeline",
  "properties": {
    "activities": [
      {
        "type": "ForEach",
        "name": "<MyForEachMultipleActivities>"
        "typeProperties": {
          "isSequential": true,
          "items": {
            ...
          },
          "activities": [
            {
              "type": "ExecutePipeline",
              "name": "<MyInnerPipeline>"
              "typeProperties": {
                "pipeline": {
                  "referenceName": "<copyHttpPipeline>",
                  "type": "PipelineReference"
                },
                "parameters": {
                  ...
                },
                "waitOnCompletion": true
              }
            }
          ]
        }
      }
    ],
    "parameters": {
      ...
    }
  }
}

示例

场景: 在包含执行管道活动的 ForEach 活动中循环访问 InnerPipeline。 内部管道使用参数化的架构定义进行复制。

主管道定义

{
  "name": "masterPipeline",
  "properties": {
    "activities": [
      {
        "type": "ForEach",
        "name": "MyForEachActivity",
        "typeProperties": {
          "isSequential": true,
          "items": {
            "value": "@pipeline().parameters.inputtables",
            "type": "Expression"
          },
          "activities": [
            {
              "type": "ExecutePipeline",
              "typeProperties": {
                "pipeline": {
                  "referenceName": "InnerCopyPipeline",
                  "type": "PipelineReference"
                },
                "parameters": {
                  "sourceTableName": {
                    "value": "@item().SourceTable",
                    "type": "Expression"
                  },
                  "sourceTableStructure": {
                    "value": "@item().SourceTableStructure",
                    "type": "Expression"
                  },
                  "sinkTableName": {
                    "value": "@item().DestTable",
                    "type": "Expression"
                  },
                  "sinkTableStructure": {
                    "value": "@item().DestTableStructure",
                    "type": "Expression"
                  }
                },
                "waitOnCompletion": true
              },
              "name": "ExecuteCopyPipeline"
            }
          ]
        }
      }
    ],
    "parameters": {
      "inputtables": {
        "type": "Array"
      }
    }
  }
}

内部管道定义

{
  "name": "InnerCopyPipeline",
  "properties": {
    "activities": [
      {
        "type": "Copy",
        "typeProperties": {
          "source": {
            "type": "SqlSource",
            }
          },
          "sink": {
            "type": "SqlSink"
          }
        },
        "name": "CopyActivity",
        "inputs": [
          {
            "referenceName": "sqlSourceDataset",
            "parameters": {
              "SqlTableName": {
                "value": "@pipeline().parameters.sourceTableName",
                "type": "Expression"
              },
              "SqlTableStructure": {
                "value": "@pipeline().parameters.sourceTableStructure",
                "type": "Expression"
              }
            },
            "type": "DatasetReference"
          }
        ],
        "outputs": [
          {
            "referenceName": "sqlSinkDataset",
            "parameters": {
              "SqlTableName": {
                "value": "@pipeline().parameters.sinkTableName",
                "type": "Expression"
              },
              "SqlTableStructure": {
                "value": "@pipeline().parameters.sinkTableStructure",
                "type": "Expression"
              }
            },
            "type": "DatasetReference"
          }
        ]
      }
    ],
    "parameters": {
      "sourceTableName": {
        "type": "String"
      },
      "sourceTableStructure": {
        "type": "String"
      },
      "sinkTableName": {
        "type": "String"
      },
      "sinkTableStructure": {
        "type": "String"
      }
    }
  }
}

源数据集定义

{
  "name": "sqlSourceDataset",
  "properties": {
    "type": "SqlServerTable",
    "typeProperties": {
      "tableName": {
        "value": "@dataset().SqlTableName",
        "type": "Expression"
      }
    },
    "structure": {
      "value": "@dataset().SqlTableStructure",
      "type": "Expression"
    },
    "linkedServiceName": {
      "referenceName": "sqlserverLS",
      "type": "LinkedServiceReference"
    },
    "parameters": {
      "SqlTableName": {
        "type": "String"
      },
      "SqlTableStructure": {
        "type": "String"
      }
    }
  }
}

接收器数据集定义

{
  "name": "sqlSinkDataSet",
  "properties": {
    "type": "AzureSqlTable",
    "typeProperties": {
      "tableName": {
        "value": "@dataset().SqlTableName",
        "type": "Expression"
      }
    },
    "structure": {
      "value": "@dataset().SqlTableStructure",
      "type": "Expression"
    },
    "linkedServiceName": {
      "referenceName": "azureSqlLS",
      "type": "LinkedServiceReference"
    },
    "parameters": {
      "SqlTableName": {
        "type": "String"
      },
      "SqlTableStructure": {
        "type": "String"
      }
    }
  }
}

主管道参数

{
    "inputtables": [
        {
            "SourceTable": "department",
            "SourceTableStructure": [
              {
                "name": "departmentid",
                "type": "int"
              },
              {
                "name": "departmentname",
                "type": "string"
              }
            ],
            "DestTable": "department2",
            "DestTableStructure": [
              {
                "name": "departmentid",
                "type": "int"
              },
              {
                "name": "departmentname",
                "type": "string"
              }
            ]
        }
    ]
    
}

聚合输出

若要聚合 foreach 活动的输出,请使用 Variables 和 Append Variable 活动。

首先,在管道中声明 array 变量。 然后,在每个 foreach 循环内调用追加变量 活动。 随后,你可以从数组中检索聚合。

限制和解决方法

以下是 ForEach 活动的一些限制以及建议的解决方法。

限制 解决方法
不能将 ForEach 循环嵌套在另一个 ForEach 循环(或 Until 循环)中。 设计一个两级管道,其中具有外部 ForEach 循环的外部管道使用嵌套循环对内部管道进行迭代。
对于并行处理,ForEach 活动的最大 batchCount 为 50,最大项数为 100,000 个。 设计一个两级管道,其中具有 ForEach 活动的外部管道对内部管道进行迭代。
SetVariable 不能在并行运行的 ForEach 活动中使用,因为变量适用于整个管道,它们不限定于 ForEach 或任何其他活动。 请考虑使用顺序 ForEach,或在 ForEach 内使用执行管道(子管道中处理的变量/参数)。

参阅支持的其他控制流活动: