Azure 数据工厂和 Synapse Analytics 中的 Until 活动
适用于:Azure 数据工厂 Azure Synapse Analytics
Until 活动提供的功能与 do-until 循环结构以编程语言提供的功能相同。 它在循环中将执行一组活动,直到与活动相关联的条件的计算结果为 true。 如果内部活动失败,Until 活动不会停止。 可以为 Until 活动指定超时值。
使用 UI 创建 Until 活动
若要在管道中使用 Until 活动,请完成以下步骤:
在管道“活动”窗格中搜索“Until”,然后将 Until 活动拖动到管道画布上。
如果尚未在画布上选择 Until 活动,请选择它及其“设置”选项卡,以编辑其详细信息。
输入一个表达式,将在执行 Until 活动中定义的所有子活动之后计算该表达式。 如果表达式的计算结果为 false,则 Until 活动会再次执行其所有子活动。 当计算结果为 true 时,Until 活动完成。 该表达式可以是文本字符串表达式,也可以是动态表达式、函数、系统变量或其他活动的输出的任意组合。 下面的示例检查之前定义的管道数组变量 TestVariable 的值,以查看其计算结果是否为 ['done']。
通过直接选择 Until 活动上的“编辑活动”按钮,或者通过选择“活动”选项卡来选择它,定义 Until 活动执行的活动。 此时会显示一个新的活动编辑器窗格,可以在其中为要执行的 Until 活动添加任何活动。 在此示例中,Set Variable 活动只是将上述表达式中引用的变量的值设置为 ['done'],因此 Until 活动的表达式在第一次执行时为 true,并且 Until 活动将停止。 在实际使用中,你可以检查所需的任何条件,并且 Until 活动将在每次计算表达式后继续执行其子活动,直到满足条件为止。
语法
{
"type": "Until",
"typeProperties": {
"expression": {
"value": "<expression that evaluates to true or false>",
"type": "Expression"
},
"timeout": "<time out for the loop. for example: 00:10:00 (10 minute)>",
"activities": [
{
"<Activity 1 definition>"
},
{
"<Activity 2 definition>"
},
{
"<Activity N definition>"
}
]
},
"name": "MyUntilActivity"
}
Type 属性
属性 | 说明 | 允许的值 | 必需 |
---|---|---|---|
name | Until 活动的名称。 |
String | 是 |
type | 必须设置为 Until。 | 字符串 | 是 |
表达式 | 计算结果必须为 true 或 false 的表达式 | 表达式。 | 是 |
timeout | 此处在指定的时间之后 do-until 循环超时。 | 字符串。 d.hh:mm:ss 或 hh:mm:ss 默认值为 7 天。 最大值为 90 天。 |
否 |
活动 | 在表达式计算结果为 true 前将执行的活动集。 |
活动数组 | 是 |
示例 1
注意
本部分提供运行管道的 JSON 定义和示例 PowerShell 命令。 有关使用 Azure PowerShell 和 JSON 定义创建管道的分步说明演练,请参阅教程:使用 Azure PowerShell 创建数据工厂。
包含 Until 活动的管道
在此示例中,管道包含两个活动:Until 和 Wait。 在循环中运行 Web 活动之前,Wait 活动会等待指定的期间。 若要了解表达式和函数,请参阅表达式语言和函数。
{
"name": "DoUntilPipeline",
"properties": {
"activities": [
{
"type": "Until",
"typeProperties": {
"expression": {
"value": "@equals('Failed', coalesce(body('MyUnauthenticatedActivity')?.status, actions('MyUnauthenticatedActivity')?.status, 'null'))",
"type": "Expression"
},
"timeout": "00:10:00",
"activities": [
{
"name": "MyUnauthenticatedActivity",
"type": "WebActivity",
"typeProperties": {
"method": "get",
"url": "https://www.fake.com/",
"headers": {
"Content-Type": "application/json"
}
},
"dependsOn": [
{
"activity": "MyWaitActivity",
"dependencyConditions": [ "Succeeded" ]
}
]
},
{
"type": "Wait",
"typeProperties": {
"waitTimeInSeconds": 1
},
"name": "MyWaitActivity"
}
]
},
"name": "MyUntilActivity"
}
]
}
}
示例 2
此示例中的管道可在循环中将数据从输入文件夹复制到输出文件夹。 循环将在重复参数的值设置为 false 时终止,否则会在一分钟后超时。
包含 Until 活动的管道 (Adfv2QuickStartPipeline.json)
{
"name": "Adfv2QuickStartPipeline",
"properties": {
"activities": [
{
"type": "Until",
"typeProperties": {
"expression": {
"value": "@equals('false', pipeline().parameters.repeat)",
"type": "Expression"
},
"timeout": "00:10:00",
"activities": [
{
"name": "CopyFromBlobToBlob",
"type": "Copy",
"inputs": [
{
"referenceName": "BlobDataset",
"parameters": {
"path": "@pipeline().parameters.inputPath"
},
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "BlobDataset",
"parameters": {
"path": "@pipeline().parameters.outputPath"
},
"type": "DatasetReference"
}
],
"typeProperties": {
"source": {
"type": "BlobSource"
},
"sink": {
"type": "BlobSink"
}
},
"policy": {
"retry": 1,
"timeout": "00:10:00",
"retryIntervalInSeconds": 60
}
}
]
},
"name": "MyUntilActivity"
}
],
"parameters": {
"inputPath": {
"type": "String"
},
"outputPath": {
"type": "String"
},
"repeat": {
"type": "String"
}
}
}
}
Azure 存储链接服务 (AzureStorageLinkedService.json)
{
"name": "AzureStorageLinkedService",
"properties": {
"type": "AzureStorage",
"typeProperties": {
"connectionString": "DefaultEndpointsProtocol=https;AccountName=<Azure Storage account name>;AccountKey=<Azure Storage account key>;EndpointSuffix=core.chinacloudapi.cn"
}
}
}
参数化的 Azure Blob 数据集 (BlobDataset.json)
管道将 folderPath 设置为管道参数 outputPath1 或 outputPath2 的值。
{
"name": "BlobDataset",
"properties": {
"type": "AzureBlob",
"typeProperties": {
"folderPath": {
"value": "@{dataset().path}",
"type": "Expression"
}
},
"linkedServiceName": {
"referenceName": "AzureStorageLinkedService",
"type": "LinkedServiceReference"
},
"parameters": {
"path": {
"type": "String"
}
}
}
}
管道参数 JSON (PipelineParameters.json)
{
"inputPath": "adftutorial/input",
"outputPath": "adftutorial/outputUntil",
"repeat": "true"
}
PowerShell 命令
注意
建议使用 Azure Az PowerShell 模块与 Azure 交互。 请参阅安装 Azure PowerShell 以开始使用。 若要了解如何迁移到 Az PowerShell 模块,请参阅 将 Azure PowerShell 从 AzureRM 迁移到 Az。
这些命令假设你已将 JSON 文件保存到文件夹 C:\ADF。
Connect-AzAccount -Environment AzureChinaCloud
Select-AzSubscription "<Your subscription name>"
$resourceGroupName = "<Resource Group Name>"
$dataFactoryName = "<Data Factory Name. Must be globally unique>";
Remove-AzDataFactoryV2 $dataFactoryName -ResourceGroupName $resourceGroupName -force
Set-AzDataFactoryV2 -ResourceGroupName $resourceGroupName -Location "China East 2" -Name $dataFactoryName
Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "AzureStorageLinkedService" -DefinitionFile "C:\ADF\AzureStorageLinkedService.json"
Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "BlobDataset" -DefinitionFile "C:\ADF\BlobDataset.json"
Set-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "Adfv2QuickStartPipeline" -DefinitionFile "C:\ADF\Adfv2QuickStartPipeline.json"
$runId = Invoke-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineName "Adfv2QuickStartPipeline" -ParameterFile C:\ADF\PipelineParameters.json
while ($True) {
$run = Get-AzDataFactoryV2PipelineRun -ResourceGroupName $resourceGroupName -DataFactoryName $DataFactoryName -PipelineRunId $runId
if ($run) {
if ($run.Status -ne 'InProgress') {
Write-Host "Pipeline run finished. The status is: " $run.Status -foregroundcolor "Yellow"
$run
break
}
Write-Host "Pipeline is running...status: InProgress" -foregroundcolor "Yellow"
Write-Host "Activity run details:" -foregroundcolor "Yellow"
$result = Get-AzDataFactoryV2ActivityRun -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineRunId $runId -RunStartedAfter (Get-Date).AddMinutes(-30) -RunStartedBefore (Get-Date).AddMinutes(30)
$result
Write-Host "Activity 'Output' section:" -foregroundcolor "Yellow"
$result.Output -join "`r`n"
}
Start-Sleep -Seconds 15
}
相关内容
参阅支持的其他控制流活动: