Azure 数据工厂和 Synapse Analytics 中的执行管道活动
适用于:Azure 数据工厂 Azure Synapse Analytics
执行管道活动允许数据工厂或 Synapse 管道调用另一个管道。
使用 UI 创建一个执行管道活动
若要在管道中使用执行管道活动,请完成以下步骤:
在管道“活动”窗格中搜索“管道”,然后将执行管道活动拖动到管道画布上。
如果尚未选择画布上的新执行管道活动,请选择它及其“设置”选项卡,以编辑其详细信息。
选择现有管道或使用“新建”按钮创建新管道。 选择其他选项,并根据需要为管道配置任何参数,以完成配置。
语法
{
"name": "MyPipeline",
"properties": {
"activities": [
{
"name": "ExecutePipelineActivity",
"type": "ExecutePipeline",
"typeProperties": {
"parameters": {
"mySourceDatasetFolderPath": {
"value": "@pipeline().parameters.mySourceDatasetFolderPath",
"type": "Expression"
}
},
"pipeline": {
"referenceName": "<InvokedPipelineName>",
"type": "PipelineReference"
},
"waitOnCompletion": true
}
}
],
"parameters": [
{
"mySourceDatasetFolderPath": {
"type": "String"
}
}
]
}
}
Type 属性
属性 | 说明 | 允许的值 | 必需 |
---|---|---|---|
name | Execute Pipeline 活动的名称。 | 字符串 | 是 |
type | 必须设置为:ExecutePipeline。 | 字符串 | 是 |
管道 | 管道引用此管道调用的依赖管道。 管道引用对象具有两个属性:referenceName 和 type。 referenceName 属性指定引用管道的名称。 type 属性必须设置为 PipelineReference。 | PipelineReference | 是 |
parameters | 传递给已调用管道的参数 | 将参数名映射为自变量值的 JSON 对象 | 否 |
waitOnCompletion | 定义活动执行是否等待从属管道执行完成。 默认为 true。 | 布尔 | 否 |
示例
此方案具有两个管道:
- 主管道 - 此管道具有调用已调用管道的 Execute Pipeline 活动。 主管道采用两个参数:
masterSourceBlobContainer
和masterSinkBlobContainer
。 - 已调用管道 - 此管道具有一个 Copy 活动,该活动将数据从 Azure Blob 源复制到 Azure Blob 接收器。 已调用管道采用两个参数:
sourceBlobContainer
和sinkBlobContainer
。
主管道定义
{
"name": "masterPipeline",
"properties": {
"activities": [
{
"type": "ExecutePipeline",
"typeProperties": {
"pipeline": {
"referenceName": "invokedPipeline",
"type": "PipelineReference"
},
"parameters": {
"sourceBlobContainer": {
"value": "@pipeline().parameters.masterSourceBlobContainer",
"type": "Expression"
},
"sinkBlobContainer": {
"value": "@pipeline().parameters.masterSinkBlobContainer",
"type": "Expression"
}
},
"waitOnCompletion": true
},
"name": "MyExecutePipelineActivity"
}
],
"parameters": {
"masterSourceBlobContainer": {
"type": "String"
},
"masterSinkBlobContainer": {
"type": "String"
}
}
}
}
已调用管道定义
{
"name": "invokedPipeline",
"properties": {
"activities": [
{
"type": "Copy",
"typeProperties": {
"source": {
"type": "BlobSource"
},
"sink": {
"type": "BlobSink"
}
},
"name": "CopyBlobtoBlob",
"inputs": [
{
"referenceName": "SourceBlobDataset",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "sinkBlobDataset",
"type": "DatasetReference"
}
]
}
],
"parameters": {
"sourceBlobContainer": {
"type": "String"
},
"sinkBlobContainer": {
"type": "String"
}
}
}
}
链接服务
{
"name": "BlobStorageLinkedService",
"properties": {
"type": "AzureStorage",
"typeProperties": {
"connectionString": "DefaultEndpointsProtocol=https;AccountName=*****;AccountKey=*****"
}
}
}
源数据集
{
"name": "SourceBlobDataset",
"properties": {
"type": "AzureBlob",
"typeProperties": {
"folderPath": {
"value": "@pipeline().parameters.sourceBlobContainer",
"type": "Expression"
},
"fileName": "salesforce.txt"
},
"linkedServiceName": {
"referenceName": "BlobStorageLinkedService",
"type": "LinkedServiceReference"
}
}
}
接收器数据集
{
"name": "sinkBlobDataset",
"properties": {
"type": "AzureBlob",
"typeProperties": {
"folderPath": {
"value": "@pipeline().parameters.sinkBlobContainer",
"type": "Expression"
}
},
"linkedServiceName": {
"referenceName": "BlobStorageLinkedService",
"type": "LinkedServiceReference"
}
}
}
运行管道
若要运行该示例中的主管道,为 masterSourceBlobContainer 和 masterSinkBlobContainer 参数传递以下值:
{
"masterSourceBlobContainer": "executetest",
"masterSinkBlobContainer": "executesink"
}
主管道将这些值传递给已调用管道,如以下示例中所示:
{
"type": "ExecutePipeline",
"typeProperties": {
"pipeline": {
"referenceName": "invokedPipeline",
"type": "PipelineReference"
},
"parameters": {
"sourceBlobContainer": {
"value": "@pipeline().parameters.masterSourceBlobContainer",
"type": "Expression"
},
"sinkBlobContainer": {
"value": "@pipeline().parameters.masterSinkBlobContainer",
"type": "Expression"
}
},
....
}
警告
执行管道活动会将数组参数作为字符串传递给子管道。这是因为将有效负载作为字符串从父管道传递到 >子管道。 当我们检查传递给子管道的输入时,我们可以看到它。 有关更多详细信息,请查看此部分。
相关内容
参阅支持的其他控制流活动: