Azure 数据工厂中的 ForEach 活动ForEach activity in Azure Data Factory

适用于:是 Azure 数据工厂是 Azure Synapse Analytics(预览版)APPLIES TO: yesAzure Data Factory yesAzure Synapse Analytics (Preview)

ForEach 活动在管道中定义重复的控制流。The ForEach Activity defines a repeating control flow in your pipeline. 此活动用于循环访问集合,并在循环中执行指定的活动。This activity is used to iterate over a collection and executes specified activities in a loop. 此活动的循环实现类似于采用编程语言的 Foreach 循环结构。The loop implementation of this activity is similar to Foreach looping structure in programming languages.

语法Syntax

此属性在本文后面介绍。The properties are described later in this article. 项属性是集合,通过 @item() 引用集合中的每个项目,如以下语法中所示:The items property is the collection and each item in the collection is referred to by using the @item() as shown in the following syntax:

{  
   "name":"MyForEachActivityName",
   "type":"ForEach",
   "typeProperties":{  
      "isSequential":"true",
        "items": {
            "value": "@pipeline().parameters.mySinkDatasetFolderPathCollection",
            "type": "Expression"
        },
      "activities":[  
         {  
            "name":"MyCopyActivity",
            "type":"Copy",
            "typeProperties":{  
               ...
            },
            "inputs":[  
               {  
                  "referenceName":"MyDataset",
                  "type":"DatasetReference",
                  "parameters":{  
                     "MyFolderPath":"@pipeline().parameters.mySourceDatasetFolderPath"
                  }
               }
            ],
            "outputs":[  
               {  
                  "referenceName":"MyDataset",
                  "type":"DatasetReference",
                  "parameters":{  
                     "MyFolderPath":"@item()"
                  }
               }
            ]
         }
      ]
   }
}

Type 属性Type properties

属性Property 说明Description 允许的值Allowed values 必须Required
namename For-Each 活动的名称。Name of the for-each activity. StringString Yes
typetype 必须设置为 ForEachMust be set to ForEach StringString Yes
isSequentialisSequential 指定是否应按顺序或并行执行循环。Specifies whether the loop should be executed sequentially or in parallel. 一次最多可以并行执行 20 个循环迭代。Maximum of 20 loop iterations can be executed at once in parallel). 例如,如果你有 ForEach 活动,在 isSequential 设置为 False 的情况下循环访问含有 10 个不同源和接收器数据集的复制活动,所有副本都执行一次。For example, if you have a ForEach activity iterating over a copy activity with 10 different source and sink datasets with isSequential set to False, all copies are executed at once. 默认值为 False。Default is False.

如果“isSequential”被设置为 False,则确保有运行多个可执行文件的正确配置。If "isSequential" is set to False, ensure that there is a correct configuration to run multiple executables. 否则,应谨慎使用此属性,以避免产生写入冲突。Otherwise, this property should be used with caution to avoid incurring write conflicts. 有关详细信息,请参阅并行执行部分。For more information, see Parallel execution section.
布尔Boolean 否。No. 默认值为 False。Default is False.
batchCountbatchCount 要用于控制并行执行数的批计数(当 isSequential 设为 false 时)。Batch count to be used for controlling the number of parallel execution (when isSequential is set to false). 这是并发数上限,但 for-each 活动不会始终按此数量执行This is the upper concurrency limit, but the for-each activity will not always execute at this number 整数(最大值为 50)Integer (maximum 50) 否。No. 默认值为 20。Default is 20.
ItemsItems 返回要循环访问的 JSON 数组的表达式。An expression that returns a JSON Array to be iterated over. 表达式(返回 JSON 数组)Expression (which returns a JSON Array) Yes
活动Activities 要执行的活动。The activities to be executed. 活动列表List of Activities Yes

并行执行Parallel execution

如果 isSequential 被设置为 False,则活动以并行方式迭代,最多包含 20 个并发迭代。If isSequential is set to false, the activity iterates in parallel with a maximum of 20 concurrent iterations. 应谨慎使用此设置。This setting should be used with caution. 如果并发迭代写入同一文件夹中的不同文件,此方法仍然适用。If the concurrent iterations are writing to the same folder but to different files, this approach is fine. 如果并发迭代以并发的方式写入同一文件,则此方法很有可能出错。If the concurrent iterations are writing concurrently to the exact same file, this approach most likely causes an error.

迭代表达式语言Iteration expression language

在 ForEach 活动中,为属性 items 提供要循环访问的数组。In the ForEach activity, provide an array to be iterated over for the property items." 使用 @item() 循环访问 ForEach 活动中的单个枚举。Use @item() to iterate over a single enumeration in ForEach activity. 例如,如果 items 是数组:[1, 2, 3],则 @item() 在第一次迭代中返回 1,在第二次迭代中返回 2,在第三次迭代中返回 3。For example, if items is an array: [1, 2, 3], @item() returns 1 in the first iteration, 2 in the second iteration, and 3 in the third iteration.

循环访问单个活动Iterating over a single activity

场景: 从 Azure Blob 中的同一源文件复制到 Azure Blob 中的多个目标文件。Scenario: Copy from the same source file in Azure Blob to multiple destination files in Azure Blob.

管道定义Pipeline definition

{
    "name": "<MyForEachPipeline>",
    "properties": {
        "activities": [
            {
                "name": "<MyForEachActivity>",
                "type": "ForEach",
                "typeProperties": {
                    "isSequential": "true",
                    "items": {
                        "value": "@pipeline().parameters.mySinkDatasetFolderPath",
                        "type": "Expression"
                    },
                    "activities": [
                        {
                            "name": "MyCopyActivity",
                            "type": "Copy",
                            "typeProperties": {
                                "source": {
                                    "type": "BlobSource",
                                    "recursive": "false"
                                },
                                "sink": {
                                    "type": "BlobSink",
                                    "copyBehavior": "PreserveHierarchy"
                                }
                            },
                            "inputs": [
                                {
                                    "referenceName": "<MyDataset>",
                                    "type": "DatasetReference",
                                    "parameters": {
                                        "MyFolderPath": "@pipeline().parameters.mySourceDatasetFolderPath"
                                    }
                                }
                            ],
                            "outputs": [
                                {
                                    "referenceName": "MyDataset",
                                    "type": "DatasetReference",
                                    "parameters": {
                                        "MyFolderPath": "@item()"
                                    }
                                }
                            ]
                        }
                    ]
                }
            }
        ],
        "parameters": {
            "mySourceDatasetFolderPath": {
                "type": "String"
            },
            "mySinkDatasetFolderPath": {
                "type": "String"
            }
        }
    }
}

Blob 数据集定义Blob dataset definition

{  
   "name":"<MyDataset>",
   "properties":{  
      "type":"AzureBlob",
      "typeProperties":{  
         "folderPath":{  
            "value":"@dataset().MyFolderPath",
            "type":"Expression"
         }
      },
      "linkedServiceName":{  
         "referenceName":"StorageLinkedService",
         "type":"LinkedServiceReference"
      },
      "parameters":{  
         "MyFolderPath":{  
            "type":"String"
         }
      }
   }
}

运行参数值Run parameter values

{
    "mySourceDatasetFolderPath": "input/",
    "mySinkDatasetFolderPath": [ "outputs/file1", "outputs/file2" ]
}

循环访问多个活动Iterate over multiple activities

可以循环访问 ForEach 活动中的多个活动(例如:复制和 Web 活动)。It's possible to iterate over multiple activities (for example: copy and web activities) in a ForEach activity. 在此方案中,我们建议将多个活动摘录到单独的管道。In this scenario, we recommend that you abstract out multiple activities into a separate pipeline. 然后,可以将管道中的 ExecutePipeline 活动与 ForEach 活动结合使用,以调用含有多个活动的单独管道。Then, you can use the ExecutePipeline activity in the pipeline with ForEach activity to invoke the separate pipeline with multiple activities.

语法Syntax

{
  "name": "masterPipeline",
  "properties": {
    "activities": [
      {
        "type": "ForEach",
        "name": "<MyForEachMultipleActivities>"
        "typeProperties": {
          "isSequential": true,
          "items": {
            ...
          },
          "activities": [
            {
              "type": "ExecutePipeline",
              "name": "<MyInnerPipeline>"
              "typeProperties": {
                "pipeline": {
                  "referenceName": "<copyHttpPipeline>",
                  "type": "PipelineReference"
                },
                "parameters": {
                  ...
                },
                "waitOnCompletion": true
              }
            }
          ]
        }
      }
    ],
    "parameters": {
      ...
    }
  }
}

示例Example

场景: 在包含执行管道活动的 ForEach 活动中循环访问 InnerPipeline。Scenario: Iterate over an InnerPipeline within a ForEach activity with Execute Pipeline activity. 内部管道使用参数化的架构定义进行复制。The inner pipeline copies with schema definitions parameterized.

主管道定义Master Pipeline definition

{
  "name": "masterPipeline",
  "properties": {
    "activities": [
      {
        "type": "ForEach",
        "name": "MyForEachActivity",
        "typeProperties": {
          "isSequential": true,
          "items": {
            "value": "@pipeline().parameters.inputtables",
            "type": "Expression"
          },
          "activities": [
            {
              "type": "ExecutePipeline",
              "typeProperties": {
                "pipeline": {
                  "referenceName": "InnerCopyPipeline",
                  "type": "PipelineReference"
                },
                "parameters": {
                  "sourceTableName": {
                    "value": "@item().SourceTable",
                    "type": "Expression"
                  },
                  "sourceTableStructure": {
                    "value": "@item().SourceTableStructure",
                    "type": "Expression"
                  },
                  "sinkTableName": {
                    "value": "@item().DestTable",
                    "type": "Expression"
                  },
                  "sinkTableStructure": {
                    "value": "@item().DestTableStructure",
                    "type": "Expression"
                  }
                },
                "waitOnCompletion": true
              },
              "name": "ExecuteCopyPipeline"
            }
          ]
        }
      }
    ],
    "parameters": {
      "inputtables": {
        "type": "Array"
      }
    }
  }
}

内部管道定义Inner pipeline definition

{
  "name": "InnerCopyPipeline",
  "properties": {
    "activities": [
      {
        "type": "Copy",
        "typeProperties": {
          "source": {
            "type": "SqlSource",
            }
          },
          "sink": {
            "type": "SqlSink"
          }
        },
        "name": "CopyActivity",
        "inputs": [
          {
            "referenceName": "sqlSourceDataset",
            "parameters": {
              "SqlTableName": {
                "value": "@pipeline().parameters.sourceTableName",
                "type": "Expression"
              },
              "SqlTableStructure": {
                "value": "@pipeline().parameters.sourceTableStructure",
                "type": "Expression"
              }
            },
            "type": "DatasetReference"
          }
        ],
        "outputs": [
          {
            "referenceName": "sqlSinkDataset",
            "parameters": {
              "SqlTableName": {
                "value": "@pipeline().parameters.sinkTableName",
                "type": "Expression"
              },
              "SqlTableStructure": {
                "value": "@pipeline().parameters.sinkTableStructure",
                "type": "Expression"
              }
            },
            "type": "DatasetReference"
          }
        ]
      }
    ],
    "parameters": {
      "sourceTableName": {
        "type": "String"
      },
      "sourceTableStructure": {
        "type": "String"
      },
      "sinkTableName": {
        "type": "String"
      },
      "sinkTableStructure": {
        "type": "String"
      }
    }
  }
}

源数据集定义Source dataset definition

{
  "name": "sqlSourceDataset",
  "properties": {
    "type": "SqlServerTable",
    "typeProperties": {
      "tableName": {
        "value": "@dataset().SqlTableName",
        "type": "Expression"
      }
    },
    "structure": {
      "value": "@dataset().SqlTableStructure",
      "type": "Expression"
    },
    "linkedServiceName": {
      "referenceName": "sqlserverLS",
      "type": "LinkedServiceReference"
    },
    "parameters": {
      "SqlTableName": {
        "type": "String"
      },
      "SqlTableStructure": {
        "type": "String"
      }
    }
  }
}

接收器数据集定义Sink dataset definition

{
  "name": "sqlSinkDataSet",
  "properties": {
    "type": "AzureSqlTable",
    "typeProperties": {
      "tableName": {
        "value": "@dataset().SqlTableName",
        "type": "Expression"
      }
    },
    "structure": {
      "value": "@dataset().SqlTableStructure",
      "type": "Expression"
    },
    "linkedServiceName": {
      "referenceName": "azureSqlLS",
      "type": "LinkedServiceReference"
    },
    "parameters": {
      "SqlTableName": {
        "type": "String"
      },
      "SqlTableStructure": {
        "type": "String"
      }
    }
  }
}

主管道参数Master pipeline parameters

{
    "inputtables": [
        {
            "SourceTable": "department",
            "SourceTableStructure": [
              {
                "name": "departmentid",
                "type": "int"
              },
              {
                "name": "departmentname",
                "type": "string"
              }
            ],
            "DestTable": "department2",
            "DestTableStructure": [
              {
                "name": "departmentid",
                "type": "int"
              },
              {
                "name": "departmentname",
                "type": "string"
              }
            ]
        }
    ]
    
}

聚合输出Aggregating outputs

若要聚合 foreach 活动的输出,请使用 Variables 和 Append Variable 活动。To aggregate outputs of foreach activity, please utilize Variables and Append Variable activity.

首先,在管道中声明 array 变量 。First, declare an array variable in the pipeline. 然后,在每个 foreach 循环内调用追加变量 活动。Then, invoke Append Variable activity inside each foreach loop. 随后,你可以从数组中检索聚合。Subsequently, you can retrieve the aggregation from your array.

限制和解决方法Limitations and workarounds

以下是 ForEach 活动的一些限制以及建议的解决方法。Here are some limitations of the ForEach activity and suggested workarounds.

限制Limitation 解决方法Workaround
不能将 ForEach 循环嵌套在另一个 ForEach 循环(或 Until 循环)中。You can't nest a ForEach loop inside another ForEach loop (or an Until loop). 设计一个两级管道,其中具有外部 ForEach 循环的外部管道使用嵌套循环对内部管道进行迭代。Design a two-level pipeline where the outer pipeline with the outer ForEach loop iterates over an inner pipeline with the nested loop.
对于并行处理,ForEach 活动的最大 batchCount 为 50,最大项数为 100,000 个。The ForEach activity has a maximum batchCount of 50 for parallel processing, and a maximum of 100,000 items. 设计一个两级管道,其中具有 ForEach 活动的外部管道对内部管道进行迭代。Design a two-level pipeline where the outer pipeline with the ForEach activity iterates over an inner pipeline.

后续步骤Next steps

查看数据工厂支持的其他控制流活动:See other control flow activities supported by Data Factory: