Azure 数据工厂和 Synapse Analytics 中的执行管道活动

适用于:Azure 数据工厂 Azure Synapse Analytics

执行管道活动允许数据工厂或 Synapse 管道调用另一个管道。

使用 UI 创建一个执行管道活动

若要在管道中使用执行管道活动,请完成以下步骤:

  1. 在管道“活动”窗格中搜索“管道”,然后将执行管道活动拖动到管道画布上。

  2. 如果尚未选择画布上的新执行管道活动,请选择它及其“设置”选项卡,以编辑其详细信息。

    显示执行管道活动的 UI。

  3. 选择现有管道或使用“新建”按钮创建新管道。 选择其他选项,并根据需要为管道配置任何参数,以完成配置。

语法

{
    "name": "MyPipeline",
    "properties": {
        "activities": [
            {
                "name": "ExecutePipelineActivity",
                "type": "ExecutePipeline",
                "typeProperties": {
                    "parameters": {                        
                        "mySourceDatasetFolderPath": {
                            "value": "@pipeline().parameters.mySourceDatasetFolderPath",
                            "type": "Expression"
                        }
                    },
                    "pipeline": {
                        "referenceName": "<InvokedPipelineName>",
                        "type": "PipelineReference"
                    },
                    "waitOnCompletion": true
                 }
            }
        ],
        "parameters": [
            {
                "mySourceDatasetFolderPath": {
                    "type": "String"
                }
            }
        ]
    }
}

Type 属性

属性 说明 允许的值 必需
name Execute Pipeline 活动的名称。 字符串
type 必须设置为:ExecutePipeline 字符串
管道 管道引用此管道调用的依赖管道。 管道引用对象具有两个属性:referenceNametype。 referenceName 属性指定引用管道的名称。 type 属性必须设置为 PipelineReference。 PipelineReference
parameters 传递给已调用管道的参数 将参数名映射为自变量值的 JSON 对象
waitOnCompletion 定义活动执行是否等待从属管道执行完成。 默认为 true。 布尔

示例

此方案具有两个管道:

  • 主管道 - 此管道具有调用已调用管道的 Execute Pipeline 活动。 主管道采用两个参数:masterSourceBlobContainermasterSinkBlobContainer
  • 已调用管道 - 此管道具有一个 Copy 活动,该活动将数据从 Azure Blob 源复制到 Azure Blob 接收器。 已调用管道采用两个参数:sourceBlobContainersinkBlobContainer

主管道定义

{
  "name": "masterPipeline",
  "properties": {
    "activities": [
      {
        "type": "ExecutePipeline",
        "typeProperties": {
          "pipeline": {
            "referenceName": "invokedPipeline",
            "type": "PipelineReference"
          },
          "parameters": {
            "sourceBlobContainer": {
              "value": "@pipeline().parameters.masterSourceBlobContainer",
              "type": "Expression"
            },
            "sinkBlobContainer": {
              "value": "@pipeline().parameters.masterSinkBlobContainer",
              "type": "Expression"
            }
          },
          "waitOnCompletion": true
        },
        "name": "MyExecutePipelineActivity"
      }
    ],
    "parameters": {
      "masterSourceBlobContainer": {
        "type": "String"
      },
      "masterSinkBlobContainer": {
        "type": "String"
      }
    }
  }
}

已调用管道定义

{
  "name": "invokedPipeline",
  "properties": {
    "activities": [
      {
        "type": "Copy",
        "typeProperties": {
          "source": {
            "type": "BlobSource"
          },
          "sink": {
            "type": "BlobSink"
          }
        },
        "name": "CopyBlobtoBlob",
        "inputs": [
          {
            "referenceName": "SourceBlobDataset",
            "type": "DatasetReference"
          }
        ],
        "outputs": [
          {
            "referenceName": "sinkBlobDataset",
            "type": "DatasetReference"
          }
        ]
      }
    ],
    "parameters": {
      "sourceBlobContainer": {
        "type": "String"
      },
      "sinkBlobContainer": {
        "type": "String"
      }
    }
  }
}

链接服务

{
    "name": "BlobStorageLinkedService",
    "properties": {
    "type": "AzureStorage",
    "typeProperties": {
      "connectionString": "DefaultEndpointsProtocol=https;AccountName=*****;AccountKey=*****"
    }
  }
}

源数据集

{
    "name": "SourceBlobDataset",
    "properties": {
    "type": "AzureBlob",
    "typeProperties": {
      "folderPath": {
        "value": "@pipeline().parameters.sourceBlobContainer",
        "type": "Expression"
      },
      "fileName": "salesforce.txt"
    },
    "linkedServiceName": {
      "referenceName": "BlobStorageLinkedService",
      "type": "LinkedServiceReference"
    }
  }
}

接收器数据集

{
    "name": "sinkBlobDataset",
    "properties": {
    "type": "AzureBlob",
    "typeProperties": {
      "folderPath": {
        "value": "@pipeline().parameters.sinkBlobContainer",
        "type": "Expression"
      }
    },
    "linkedServiceName": {
      "referenceName": "BlobStorageLinkedService",
      "type": "LinkedServiceReference"
    }
  }
}

运行管道

若要运行该示例中的主管道,为 masterSourceBlobContainer 和 masterSinkBlobContainer 参数传递以下值:

{
  "masterSourceBlobContainer": "executetest",
  "masterSinkBlobContainer": "executesink"
}

主管道将这些值传递给已调用管道,如以下示例中所示:

{
    "type": "ExecutePipeline",
    "typeProperties": {
      "pipeline": {
        "referenceName": "invokedPipeline",
        "type": "PipelineReference"
      },
      "parameters": {
        "sourceBlobContainer": {
          "value": "@pipeline().parameters.masterSourceBlobContainer",
          "type": "Expression"
        },
        "sinkBlobContainer": {
          "value": "@pipeline().parameters.masterSinkBlobContainer",
          "type": "Expression"
        }
      },

      ....
}

警告

执行管道活动会将数组参数作为字符串传递给子管道。这是因为将有效负载作为字符串从父管道传递到 >子管道。 当我们检查传递给子管道的输入时,我们可以看到它。 有关更多详细信息,请查看此部分

参阅支持的其他控制流活动: