Azure 数据工厂和 Synapse Analytics 中的 Until 活动

适用于:Azure 数据工厂 Azure Synapse Analytics

Until 活动提供的功能与 do-until 循环结构以编程语言提供的功能相同。 它在循环中将执行一组活动,直到与活动相关联的条件的计算结果为 true。 如果内部活动失败,Until 活动不会停止。 可以为 Until 活动指定超时值。

使用 UI 创建 Until 活动

若要在管道中使用 Until 活动,请完成以下步骤:

  1. 在管道“活动”窗格中搜索“Until”,然后将 Until 活动拖动到管道画布上。

  2. 如果尚未在画布上选择 Until 活动,请选择它及其“设置”选项卡,以编辑其详细信息

    显示管道画布中 Until 活动的“设置”选项卡。

  3. 输入一个表达式,将在执行 Until 活动中定义的所有子活动之后计算该表达式。 如果表达式的计算结果为 false,则 Until 活动会再次执行其所有子活动。 当计算结果为 true 时,Until 活动完成。 该表达式可以是文本字符串表达式,也可以是动态表达式、函数系统变量其他活动的输出的任意组合。 下面的示例检查之前定义的管道数组变量 TestVariable 的值,以查看其计算结果是否为 ['done']。

    显示“添加动态内容”窗格,其中有一个表达式用于针对定义值检查变量。

  4. 通过直接选择 Until 活动上的“编辑活动”按钮,或者通过选择“活动”选项卡来选择它,定义 Until 活动执行的活动。 此时会显示一个新的活动编辑器窗格,可以在其中为要执行的 Until 活动添加任何活动。 在此示例中,Set Variable 活动只是将上述表达式中引用的变量的值设置为 ['done'],因此 Until 活动的表达式在第一次执行时为 true,并且 Until 活动将停止。 在实际使用中,你可以检查所需的任何条件,并且 Until 活动将在每次计算表达式后继续执行其子活动,直到满足条件为止。

    显示已定义“设置变量”活动的 Until 活动的活动编辑器。

语法

{
    "type": "Until",
    "typeProperties": {
        "expression":  {
            "value":  "<expression that evaluates to true or false>", 
            "type": "Expression"
        },
        "timeout": "<time out for the loop. for example: 00:10:00 (10 minute)>",
        "activities": [
            {
                "<Activity 1 definition>"
            },
            {
                "<Activity 2 definition>"
            },
            {
                "<Activity N definition>"
            }
        ]
    },
    "name": "MyUntilActivity"
}

Type 属性

属性 说明 允许的值 必需
name Until 活动的名称。 String
type 必须设置为 Until 字符串
表达式 计算结果必须为 true 或 false 的表达式 表达式。
timeout 此处在指定的时间之后 do-until 循环超时。 字符串。 d.hh:mm:sshh:mm:ss 默认值为 7 天。 最大值为 90 天。
活动 在表达式计算结果为 true 前将执行的活动集。 活动数组

示例 1

注意

本部分提供运行管道的 JSON 定义和示例 PowerShell 命令。 有关使用 Azure PowerShell 和 JSON 定义创建管道的分步说明演练,请参阅教程:使用 Azure PowerShell 创建数据工厂

包含 Until 活动的管道

在此示例中,管道包含两个活动:UntilWait。 在循环中运行 Web 活动之前,Wait 活动会等待指定的期间。 若要了解表达式和函数,请参阅表达式语言和函数

{
    "name": "DoUntilPipeline",
    "properties": {
        "activities": [
            {
                "type": "Until",
                "typeProperties": {
                    "expression": {
                        "value": "@equals('Failed', coalesce(body('MyUnauthenticatedActivity')?.status, actions('MyUnauthenticatedActivity')?.status, 'null'))",
                        "type": "Expression"
                    },
                    "timeout": "00:10:00",
                    "activities": [
                        {
                            "name": "MyUnauthenticatedActivity",
                            "type": "WebActivity",
                            "typeProperties": {
                                "method": "get",
                                "url": "https://www.fake.com/",
                                "headers": {
                                    "Content-Type": "application/json"
                                }
                            },
                            "dependsOn": [
                                {
                                    "activity": "MyWaitActivity",
                                    "dependencyConditions": [ "Succeeded" ]
                                }
                            ]
                        },
                        {
                            "type": "Wait",
                            "typeProperties": {
                                "waitTimeInSeconds": 1
                            },
                            "name": "MyWaitActivity"
                        }
                    ]
                },
                "name": "MyUntilActivity"
            }
        ]
    }
}

示例 2

此示例中的管道可在循环中将数据从输入文件夹复制到输出文件夹。 循环将在重复参数的值设置为 false 时终止,否则会在一分钟后超时。

包含 Until 活动的管道 (Adfv2QuickStartPipeline.json)

{
    "name": "Adfv2QuickStartPipeline",
    "properties": {
        "activities": [
            {
                "type": "Until",
                "typeProperties": {
                    "expression":  {
                        "value":  "@equals('false', pipeline().parameters.repeat)", 
                        "type": "Expression"
                    },
                    "timeout": "00:10:00",
                    "activities": [
                        {
                            "name": "CopyFromBlobToBlob",
                            "type": "Copy",
                            "inputs": [
                                {
                                    "referenceName": "BlobDataset",
                                    "parameters": {
                                        "path": "@pipeline().parameters.inputPath"
                                    },
                                    "type": "DatasetReference"
                                }
                            ],
                            "outputs": [
                                {
                                    "referenceName": "BlobDataset",
                                    "parameters": {
                                        "path": "@pipeline().parameters.outputPath"
                                    },
                                    "type": "DatasetReference"
                                }
                            ],
                            "typeProperties": {
                                "source": {
                                    "type": "BlobSource"
                                },
                                "sink": {
                                    "type": "BlobSink"
                                }
                            },
                            "policy": {
                                "retry": 1,
                                "timeout": "00:10:00",
                                "retryIntervalInSeconds": 60
                            }
                        }
                    ]
                },
                "name": "MyUntilActivity"
            }
        ],
        "parameters": {
            "inputPath": {
                "type": "String"
            },
            "outputPath": {
                "type": "String"
            },
            "repeat": {
                "type": "String"
            }                        
        }        
    }
}

Azure 存储链接服务 (AzureStorageLinkedService.json)

{
    "name": "AzureStorageLinkedService",
    "properties": {
        "type": "AzureStorage",
        "typeProperties": {
            "connectionString": "DefaultEndpointsProtocol=https;AccountName=<Azure Storage account name>;AccountKey=<Azure Storage account key>;EndpointSuffix=core.chinacloudapi.cn"
        }
    }
}

参数化的 Azure Blob 数据集 (BlobDataset.json)

管道将 folderPath 设置为管道参数 outputPath1 或 outputPath2 的值。

{
    "name": "BlobDataset",
    "properties": {
        "type": "AzureBlob",
        "typeProperties": {
            "folderPath": {
                "value": "@{dataset().path}",
                "type": "Expression"
            }
        },
        "linkedServiceName": {
            "referenceName": "AzureStorageLinkedService",
            "type": "LinkedServiceReference"
        },
        "parameters": {
            "path": {
                "type": "String"
            }
        }
    }
}

管道参数 JSON (PipelineParameters.json)

{
    "inputPath": "adftutorial/input",
    "outputPath": "adftutorial/outputUntil",
    "repeat": "true"
}

PowerShell 命令

注意

建议使用 Azure Az PowerShell 模块与 Azure 交互。 请参阅安装 Azure PowerShell 以开始使用。 若要了解如何迁移到 Az PowerShell 模块,请参阅 将 Azure PowerShell 从 AzureRM 迁移到 Az

这些命令假设你已将 JSON 文件保存到文件夹 C:\ADF。

Connect-AzAccount -Environment AzureChinaCloud
Select-AzSubscription "<Your subscription name>"

$resourceGroupName = "<Resource Group Name>"
$dataFactoryName = "<Data Factory Name. Must be globally unique>";
Remove-AzDataFactoryV2 $dataFactoryName -ResourceGroupName $resourceGroupName -force


Set-AzDataFactoryV2 -ResourceGroupName $resourceGroupName -Location "China East 2" -Name $dataFactoryName
Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "AzureStorageLinkedService" -DefinitionFile "C:\ADF\AzureStorageLinkedService.json"
Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "BlobDataset" -DefinitionFile "C:\ADF\BlobDataset.json"
Set-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "Adfv2QuickStartPipeline" -DefinitionFile "C:\ADF\Adfv2QuickStartPipeline.json"
$runId = Invoke-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineName "Adfv2QuickStartPipeline" -ParameterFile C:\ADF\PipelineParameters.json

while ($True) {
    $run = Get-AzDataFactoryV2PipelineRun -ResourceGroupName $resourceGroupName -DataFactoryName $DataFactoryName -PipelineRunId $runId

    if ($run) {
        if ($run.Status -ne 'InProgress') {
            Write-Host "Pipeline run finished. The status is: " $run.Status -foregroundcolor "Yellow"
            $run
            break
        }
        Write-Host  "Pipeline is running...status: InProgress" -foregroundcolor "Yellow"
        Write-Host "Activity run details:" -foregroundcolor "Yellow"
        $result = Get-AzDataFactoryV2ActivityRun -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineRunId $runId -RunStartedAfter (Get-Date).AddMinutes(-30) -RunStartedBefore (Get-Date).AddMinutes(30)
        $result

        Write-Host "Activity 'Output' section:" -foregroundcolor "Yellow"
        $result.Output -join "`r`n"
    }

    Start-Sleep -Seconds 15
}

参阅支持的其他控制流活动: