Azure 数据工厂中的 Execute Pipeline 活动Execute Pipeline activity in Azure Data Factory

适用于:是 Azure 数据工厂是 Azure Synapse Analytics(预览版)APPLIES TO: yesAzure Data Factory yesAzure Synapse Analytics (Preview)

Execute Pipeline 活动允许一个数据工厂管道调用另一个管道。The Execute Pipeline activity allows a Data Factory pipeline to invoke another pipeline.

语法Syntax

{
    "name": "MyPipeline",
    "properties": {
        "activities": [
            {
                "name": "ExecutePipelineActivity",
                "type": "ExecutePipeline",
                "typeProperties": {
                    "parameters": {                        
                        "mySourceDatasetFolderPath": {
                            "value": "@pipeline().parameters.mySourceDatasetFolderPath",
                            "type": "Expression"
                        }
                    },
                    "pipeline": {
                        "referenceName": "<InvokedPipelineName>",
                        "type": "PipelineReference"
                    },
                    "waitOnCompletion": true
                 }
            }
        ],
        "parameters": [
            {
                "mySourceDatasetFolderPath": {
                    "type": "String"
                }
            }
        ]
    }
}

Type 属性Type properties

propertiesProperty 说明Description 允许的值Allowed values 必选Required
namename Execute Pipeline 活动的名称。Name of the execute pipeline activity. StringString Yes
typetype 必须设置为:ExecutePipelineMust be set to: ExecutePipeline. StringString Yes
管道pipeline 管道引用此管道调用的依赖管道。Pipeline reference to the dependent pipeline that this pipeline invokes. 管道引用对象具有两个属性:referenceNametypeA pipeline reference object has two properties: referenceName and type. referenceName 属性指定引用管道的名称。The referenceName property specifies the name of the reference pipeline. type 属性必须设置为 PipelineReference。The type property must be set to PipelineReference. PipelineReferencePipelineReference Yes
parametersparameters 传递给已调用管道的参数Parameters to be passed to the invoked pipeline 将参数名映射为自变量值的 JSON 对象A JSON object that maps parameter names to argument values No
waitOnCompletionwaitOnCompletion 定义活动执行是否等待从属管道执行完成。Defines whether activity execution waits for the dependent pipeline execution to finish. 默认值为 false。Default is false. BooleanBoolean No

示例Sample

此方案具有两个管道:This scenario has two pipelines:

  • 主管道 - 此管道具有调用已调用管道的 Execute Pipeline 活动。Master pipeline - This pipeline has one Execute Pipeline activity that calls the invoked pipeline. 主管道采用两个参数:masterSourceBlobContainermasterSinkBlobContainerThe master pipeline takes two parameters: masterSourceBlobContainer, masterSinkBlobContainer.
  • 已调用管道 - 此管道具有一个 Copy 活动,该活动将数据从 Azure Blob 源复制到 Azure Blob 接收器。Invoked pipeline - This pipeline has one Copy activity that copies data from an Azure Blob source to Azure Blob sink. 已调用管道采用两个参数:sourceBlobContainersinkBlobContainerThe invoked pipeline takes two parameters: sourceBlobContainer, sinkBlobContainer.

主管道定义Master pipeline definition

{
  "name": "masterPipeline",
  "properties": {
    "activities": [
      {
        "type": "ExecutePipeline",
        "typeProperties": {
          "pipeline": {
            "referenceName": "invokedPipeline",
            "type": "PipelineReference"
          },
          "parameters": {
            "sourceBlobContainer": {
              "value": "@pipeline().parameters.masterSourceBlobContainer",
              "type": "Expression"
            },
            "sinkBlobContainer": {
              "value": "@pipeline().parameters.masterSinkBlobContainer",
              "type": "Expression"
            }
          },
          "waitOnCompletion": true
        },
        "name": "MyExecutePipelineActivity"
      }
    ],
    "parameters": {
      "masterSourceBlobContainer": {
        "type": "String"
      },
      "masterSinkBlobContainer": {
        "type": "String"
      }
    }
  }
}

已调用管道定义Invoked pipeline definition

{
  "name": "invokedPipeline",
  "properties": {
    "activities": [
      {
        "type": "Copy",
        "typeProperties": {
          "source": {
            "type": "BlobSource"
          },
          "sink": {
            "type": "BlobSink"
          }
        },
        "name": "CopyBlobtoBlob",
        "inputs": [
          {
            "referenceName": "SourceBlobDataset",
            "type": "DatasetReference"
          }
        ],
        "outputs": [
          {
            "referenceName": "sinkBlobDataset",
            "type": "DatasetReference"
          }
        ]
      }
    ],
    "parameters": {
      "sourceBlobContainer": {
        "type": "String"
      },
      "sinkBlobContainer": {
        "type": "String"
      }
    }
  }
}

链接服务Linked service

{
    "name": "BlobStorageLinkedService",
    "properties": {
    "type": "AzureStorage",
    "typeProperties": {
      "connectionString": "DefaultEndpointsProtocol=https;AccountName=*****;AccountKey=*****"
    }
  }
}

源数据集Source dataset

{
    "name": "SourceBlobDataset",
    "properties": {
    "type": "AzureBlob",
    "typeProperties": {
      "folderPath": {
        "value": "@pipeline().parameters.sourceBlobContainer",
        "type": "Expression"
      },
      "fileName": "salesforce.txt"
    },
    "linkedServiceName": {
      "referenceName": "BlobStorageLinkedService",
      "type": "LinkedServiceReference"
    }
  }
}

接收器数据集Sink dataset

{
    "name": "sinkBlobDataset",
    "properties": {
    "type": "AzureBlob",
    "typeProperties": {
      "folderPath": {
        "value": "@pipeline().parameters.sinkBlobContainer",
        "type": "Expression"
      }
    },
    "linkedServiceName": {
      "referenceName": "BlobStorageLinkedService",
      "type": "LinkedServiceReference"
    }
  }
}

运行管道Running the pipeline

若要运行该示例中的主管道,为 masterSourceBlobContainer 和 masterSinkBlobContainer 参数传递以下值:To run the master pipeline in this example, the following values are passed for the masterSourceBlobContainer and masterSinkBlobContainer parameters:

{
  "masterSourceBlobContainer": "executetest",
  "masterSinkBlobContainer": "executesink"
}

主管道将这些值传递给已调用管道,如以下示例中所示:The master pipeline forwards these values to the invoked pipeline as shown in the following example:

{
    "type": "ExecutePipeline",
    "typeProperties": {
      "pipeline": {
        "referenceName": "invokedPipeline",
        "type": "PipelineReference"
      },
      "parameters": {
        "sourceBlobContainer": {
          "value": "@pipeline().parameters.masterSourceBlobContainer",
          "type": "Expression"
        },
        "sinkBlobContainer": {
          "value": "@pipeline().parameters.masterSinkBlobContainer",
          "type": "Expression"
        }
      },

      ....
}

后续步骤Next steps

查看数据工厂支持的其他控制流活动:See other control flow activities supported by Data Factory: