Azure 数据工厂和 Synapse Analytics 中的 Until 活动

适用于: Azure 数据工厂 Azure Synapse Analytics

提示

试用 Microsoft Fabric 中的数据工厂,这是一种适用于企业的一站式分析解决方案。 Microsoft Fabric 涵盖从数据移动到数据科学、实时分析、商业智能和报告的所有内容。 了解如何免费开始新的试用

Until 活动提供的功能与 do-until 循环结构以编程语言提供的功能相同。 它在循环中将执行一组活动,直到与活动相关联的条件的计算结果为 true。 如果内部活动失败,则 Until 活动不会停止。 可以为 Until 活动指定超时值。

使用 UI 创建 Until 活动

若要在管道中使用 Until 活动,请完成以下步骤:

  1. 在管道“活动”窗格中搜索“Until”,然后将 Until 活动拖动到管道画布上。

  2. 如果尚未选择画布上的 Until 活动,请选择它及其“设置”选项卡,以编辑其详细信息。

    Shows the Settings tab of the Until activity in the pipeline canvas.

  3. 输入一个表达式,该表达式将在执行 Until 活动中定义的所有子活动之后进行计算。 如果表达式的计算结果为 false,则 Until 活动将再次执行其所有子活动。 当计算结果为 true 时,Until 活动将完成。 该表达式可以是文本字符串表达式,也可以是动态表达式、函数系统变量其他活动的输出的任意组合。 下面的示例检查之前定义的管道数组变量 TestVariable 的值,以查看其计算结果是否为 ['done']。

    Shows the  Add dynamic content  pane with an expression to check a variable for a defined value.

  4. 通过直接选择 Until 活动上的“编辑活动”按钮,或通过选择“活动”选项卡在那里选择它,定义 Until 活动将执行的活动。 此时将显示一个新的活动编辑器窗格,可在其中为 Until 活动添加要执行的任何活动。 在此示例中,设置变量活动只是将上面表达式中引用的变量的值设置为 ['done'],因此 Until 活动的表达式将在在第一次执行时为 true,而 Until 活动将停止。 在实际使用中,你可以检查所需的任何条件,并且 Until 活动将在每次计算表达式后继续执行其子活动,直到满足条件为止。

    Shows the activities editor for an Until activity with a Set Variable activity defined.

语法

{
    "type": "Until",
    "typeProperties": {
        "expression":  {
            "value":  "<expression that evaluates to true or false>", 
            "type": "Expression"
        },
        "timeout": "<time out for the loop. for example: 00:01:00 (1 minute)>",
        "activities": [
            {
                "<Activity 1 definition>"
            },
            {
                "<Activity 2 definition>"
            },
            {
                "<Activity N definition>"
            }
        ]
    },
    "name": "MyUntilActivity"
}

Type 属性

属性 说明 允许的值 必需
name Until 活动的名称。 String
type 必须设置为 Until 字符串
表达式 计算结果必须为 true 或 false 的表达式 表达式。
timeout 此处在指定的时间之后 do-until 循环超时。 字符串。 d.hh:mm:sshh:mm:ss 默认值为 7 天。 最大值为 90 天。
活动 在表达式计算结果为 true 前将执行的活动集。 活动数组

示例 1

注意

本部分提供运行管道的 JSON 定义和示例 PowerShell 命令。 有关使用 Azure PowerShell 和 JSON 定义创建管道的分步说明演练,请参阅教程:使用 Azure PowerShell 创建数据工厂

包含 Until 活动的管道

在此示例中,管道包含两个活动:UntilWait。 在循环中运行 Web 活动之前,Wait 活动会等待指定的期间。 若要了解表达式和函数,请参阅表达式语言和函数

{
    "name": "DoUntilPipeline",
    "properties": {
        "activities": [
            {
                "type": "Until",
                "typeProperties": {
                    "expression": {
                        "value": "@equals('Failed', coalesce(body('MyUnauthenticatedActivity')?.status, actions('MyUnauthenticatedActivity')?.status, 'null'))",
                        "type": "Expression"
                    },
                    "timeout": "00:00:01",
                    "activities": [
                        {
                            "name": "MyUnauthenticatedActivity",
                            "type": "WebActivity",
                            "typeProperties": {
                                "method": "get",
                                "url": "https://www.fake.com/",
                                "headers": {
                                    "Content-Type": "application/json"
                                }
                            },
                            "dependsOn": [
                                {
                                    "activity": "MyWaitActivity",
                                    "dependencyConditions": [ "Succeeded" ]
                                }
                            ]
                        },
                        {
                            "type": "Wait",
                            "typeProperties": {
                                "waitTimeInSeconds": 1
                            },
                            "name": "MyWaitActivity"
                        }
                    ]
                },
                "name": "MyUntilActivity"
            }
        ]
    }
}

示例 2

此示例中的管道可在循环中将数据从输入文件夹复制到输出文件夹。 循环将在重复参数的值设置为 false 时终止,否则会在一分钟后超时。

包含 Until 活动的管道 (Adfv2QuickStartPipeline.json)

{
    "name": "Adfv2QuickStartPipeline",
    "properties": {
        "activities": [
            {
                "type": "Until",
                "typeProperties": {
                    "expression":  {
                        "value":  "@equals('false', pipeline().parameters.repeat)", 
                        "type": "Expression"
                    },
                    "timeout": "00:01:00",
                    "activities": [
                        {
                            "name": "CopyFromBlobToBlob",
                            "type": "Copy",
                            "inputs": [
                                {
                                    "referenceName": "BlobDataset",
                                    "parameters": {
                                        "path": "@pipeline().parameters.inputPath"
                                    },
                                    "type": "DatasetReference"
                                }
                            ],
                            "outputs": [
                                {
                                    "referenceName": "BlobDataset",
                                    "parameters": {
                                        "path": "@pipeline().parameters.outputPath"
                                    },
                                    "type": "DatasetReference"
                                }
                            ],
                            "typeProperties": {
                                "source": {
                                    "type": "BlobSource"
                                },
                                "sink": {
                                    "type": "BlobSink"
                                }
                            },
                            "policy": {
                                "retry": 1,
                                "timeout": "00:10:00",
                                "retryIntervalInSeconds": 60
                            }
                        }
                    ]
                },
                "name": "MyUntilActivity"
            }
        ],
        "parameters": {
            "inputPath": {
                "type": "String"
            },
            "outputPath": {
                "type": "String"
            },
            "repeat": {
                "type": "String"
            }                        
        }        
    }
}

Azure 存储链接服务 (AzureStorageLinkedService.json)

{
    "name": "AzureStorageLinkedService",
    "properties": {
        "type": "AzureStorage",
        "typeProperties": {
            "connectionString": "DefaultEndpointsProtocol=https;AccountName=<Azure Storage account name>;AccountKey=<Azure Storage account key>;EndpointSuffix=core.chinacloudapi.cn"
        }
    }
}

参数化的 Azure Blob 数据集 (BlobDataset.json)

管道将 folderPath 设置为管道参数 outputPath1 或 outputPath2 的值。

{
    "name": "BlobDataset",
    "properties": {
        "type": "AzureBlob",
        "typeProperties": {
            "folderPath": {
                "value": "@{dataset().path}",
                "type": "Expression"
            }
        },
        "linkedServiceName": {
            "referenceName": "AzureStorageLinkedService",
            "type": "LinkedServiceReference"
        },
        "parameters": {
            "path": {
                "type": "String"
            }
        }
    }
}

管道参数 JSON (PipelineParameters.json)

{
    "inputPath": "adftutorial/input",
    "outputPath": "adftutorial/outputUntil",
    "repeat": "true"
}

PowerShell 命令

注意

建议使用 Azure Az PowerShell 模块与 Azure 交互。 请参阅安装 Azure PowerShell 以开始使用。 若要了解如何迁移到 Az PowerShell 模块,请参阅 将 Azure PowerShell 从 AzureRM 迁移到 Az

这些命令假设你已将 JSON 文件保存到文件夹 C:\ADF。

Connect-AzAccount -Environment AzureChinaCloud
Select-AzSubscription "<Your subscription name>"

$resourceGroupName = "<Resource Group Name>"
$dataFactoryName = "<Data Factory Name. Must be globally unique>";
Remove-AzDataFactoryV2 $dataFactoryName -ResourceGroupName $resourceGroupName -force


Set-AzDataFactoryV2 -ResourceGroupName $resourceGroupName -Location "China East 2" -Name $dataFactoryName
Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "AzureStorageLinkedService" -DefinitionFile "C:\ADF\AzureStorageLinkedService.json"
Set-AzDataFactoryV2Dataset -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "BlobDataset" -DefinitionFile "C:\ADF\BlobDataset.json"
Set-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "Adfv2QuickStartPipeline" -DefinitionFile "C:\ADF\Adfv2QuickStartPipeline.json"
$runId = Invoke-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineName "Adfv2QuickStartPipeline" -ParameterFile C:\ADF\PipelineParameters.json

while ($True) {
    $run = Get-AzDataFactoryV2PipelineRun -ResourceGroupName $resourceGroupName -DataFactoryName $DataFactoryName -PipelineRunId $runId

    if ($run) {
        if ($run.Status -ne 'InProgress') {
            Write-Host "Pipeline run finished. The status is: " $run.Status -foregroundcolor "Yellow"
            $run
            break
        }
        Write-Host  "Pipeline is running...status: InProgress" -foregroundcolor "Yellow"
        Write-Host "Activity run details:" -foregroundcolor "Yellow"
        $result = Get-AzDataFactoryV2ActivityRun -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineRunId $runId -RunStartedAfter (Get-Date).AddMinutes(-30) -RunStartedBefore (Get-Date).AddMinutes(30)
        $result

        Write-Host "Activity 'Output' section:" -foregroundcolor "Yellow"
        $result.Output -join "`r`n"
    }

    Start-Sleep -Seconds 15
}

参阅支持的其他控制流活动: