创建按翻转窗口运行管道的触发器
适用于:Azure 数据工厂 Azure Synapse Analytics
本文提供了创建、启动和监视翻转窗口触发器的步骤。 有关触发器和支持的类型的一般信息,请参阅管道执行和触发器。
翻转窗口触发器是一类可以在保留状态的同时按周期性的时间间隔(从指定的开始时间算起)触发的触发器。 翻转窗口是一系列固定大小、非重叠且连续的时间间隔。 翻转窗口触发器与管道存在一对一关系,一个这样的触发器只能引用一个管道。
翻转窗口触发器是计划触发器的更重量级的替代方法。 它提供了一套适用于复杂方案(例如,其他翻转窗口触发器上的依赖项、重新运行失败的作业,以及为管道设置用户重试)的功能。 若要进一步了解计划触发器与翻转窗口触发器之间的差异,请参阅触发器类型比较。
Azure 数据工厂和 Azure Synapse 门户体验
- 若要在 Azure 门户中创建翻转窗口触发器,请选择“触发器”选项卡,然后选择“新建” 。
- 触发器配置窗格打开后,选择“翻转窗口”。 然后定义翻转窗口触发器属性。
- 完成后,选择保存。
翻转窗口触发器类型属性
翻转窗口具有以下触发器类型属性:
{
"name": "MyTriggerName",
"properties": {
"type": "TumblingWindowTrigger",
"runtimeState": "<<Started/Stopped/Disabled - readonly>>",
"typeProperties": {
"frequency": <<Minute/Hour>>,
"interval": <<int>>,
"startTime": "<<datetime>>",
"endTime": <<datetime - optional>>,
"delay": <<timespan - optional>>,
"maxConcurrency": <<int>> (required, max allowed: 50),
"retryPolicy": {
"count": <<int - optional, default: 0>>,
"intervalInSeconds": <<int>>,
},
"dependsOn": [
{
"type": "TumblingWindowTriggerDependencyReference",
"size": <<timespan - optional>>,
"offset": <<timespan - optional>>,
"referenceTrigger": {
"referenceName": "MyTumblingWindowDependency1",
"type": "TriggerReference"
}
},
{
"type": "SelfDependencyTumblingWindowTriggerReference",
"size": <<timespan - optional>>,
"offset": <<timespan>>
}
]
},
"pipeline": {
"pipelineReference": {
"type": "PipelineReference",
"referenceName": "MyPipelineName"
},
"parameters": {
"parameter1": {
"type": "Expression",
"value": "@{concat('output',formatDateTime(trigger().outputs.windowStartTime,'-dd-MM-yyyy-HH-mm-ss-ffff'))}"
},
"parameter2": {
"type": "Expression",
"value": "@{concat('output',formatDateTime(trigger().outputs.windowEndTime,'-dd-MM-yyyy-HH-mm-ss-ffff'))}"
},
"parameter3": "https://mydemo.chinacloudsites.cn/api/demoapi"
}
}
}
}
下表概述了与翻转窗口触发器中的循环和计划相关的主要 JSON 元素。
JSON 元素 | 说明 | 类型 | 允许的值 | 必须 |
---|---|---|---|---|
type |
触发器的类型。 type 的固定值为 TumblingWindowTrigger 。 |
String |
TumblingWindowTrigger |
是 |
runtimeState |
触发器运行时的当前状态。 此元素是 <readOnly>。 |
String |
Started ,Stopped ,Disabled |
是 |
frequency |
一个字符串,表示触发器重复出现的频率单位(分钟、小时或月)。 如果 startTime 日期值粒度比 frequency 值更细,则会在计算窗口边界时考虑 startTime 日期。 例如:如果 frequency 值为 hourly ,startTime 值为 2017-09-01T10:10:10Z,则第一个窗口为 (2017-09-01T10:10:10Z, 2017-09-01T11:10:10Z)。 |
String |
Minute ,Hour ,Month |
是 |
interval |
一个正整数,表示 frequency 值对应的时间间隔,决定了触发器的运行频率。 例如,如果 interval 为 3 且 frequency 为 hour ,则触发器每 3 小时定期触发一次。 最小窗口间隔为 5 分钟。 |
Integer |
正整数。 | 是 |
startTime |
第一个匹配项,可以是过去的时间。 第一个触发器间隔为 (startTime , startTime + interval )。 |
DateTime |
一个 DateTime 值。 |
是 |
endTime |
最后一个匹配项,可以是过去的时间。 | DateTime |
一个 DateTime 值。 |
是 |
delay |
延迟窗口数据处理开始的时间量。 管道运行在预期的执行时间加上 delay 的量之后启动。 delay 的定义是:在预期的执行时间过后,触发器在触发新的运行之前等待的时间。 延迟不会改变窗口 startTime 。 例如,值为 00:10:00 的 delay 意味着 10 分钟的延迟。 |
Timespan (hh:mm:ss) |
默认值为 00:00:00 的 timespan 值。 |
否 |
maxConcurrency |
同时针对已就绪窗口触发的触发器运行数。 例如,若要每小时回填,昨天的运行会产生 24 个窗口。 如果 maxConcurrency = 10,则仅针对前 10 个窗口 (00:00-01:00 - 09:00-10:00) 触发触发器事件。 在头 10 个触发的管道运行完成后,将针对接下来的 10 个窗口 (10:00-11:00 - 19:00-20:00) 触发触发器运行。 继续进行 maxConcurrency = 10 的此示例,如果有 10 个窗口就绪,则总共有 10 个管道运行。 如果只有一个窗口准备就绪,则只有一个管道运行。 |
Integer |
一个介于 1 到 50 之间的整数。 | 是 |
retryPolicy: Count |
将管道运行标记为 Failed 之前的重试次数。 |
Integer |
一个整数,其默认值为 0(不重试)。 | 否 |
retryPolicy: intervalInSeconds |
重试之间的延迟(以秒为单位指定)。 | Integer |
秒数,其默认值为 30。 最小值为 30 。 |
否 |
dependsOn: type |
TumblingWindowTriggerReference 的类型。 如果设置了依赖项,则为必需。 |
String |
TumblingWindowTriggerDependencyReference ,SelfDependencyTumblingWindowTriggerReference |
否 |
dependsOn: size |
依赖项翻转窗口的大小。 | Timespan (hh:mm:ss) |
一个正的 timespan 值,其中默认值为子触发器的窗口大小。 |
否 |
dependsOn: offset |
依赖项触发器的偏移量。 | Timespan (hh:mm:ss) |
在自我依赖项中必须为负的 timespan 值。 如果未指定任何值,则该窗口与触发器本身相同。 |
自我依赖项:是 其他:否 |
注意
发布翻转窗口触发器后,无法编辑 interval
和 frequency
值。
WindowStart 和 WindowEnd 系统变量
可以在管道定义中(即,作为查询的一部分),使用翻转窗口触发器的 WindowStart
和 WindowEnd
系统变量。 触发器定义中将系统变量作为参数传递给管道。 下面的示例演示如何将这些变量作为参数传递。
{
"name": "MyTriggerName",
"properties": {
"type": "TumblingWindowTrigger",
...
"pipeline": {
"pipelineReference": {
"type": "PipelineReference",
"referenceName": "MyPipelineName"
},
"parameters": {
"MyWindowStart": {
"type": "Expression",
"value": "@{concat('output',formatDateTime(trigger().outputs.windowStartTime,'-dd-MM-yyyy-HH-mm-ss-ffff'))}"
},
"MyWindowEnd": {
"type": "Expression",
"value": "@{concat('output',formatDateTime(trigger().outputs.windowEndTime,'-dd-MM-yyyy-HH-mm-ss-ffff'))}"
}
}
}
}
}
若要在管道定义中使用 WindowStart
和 WindowEnd
系统变量值,请相应地使用你的 MyWindowStart
和 MyWindowEnd
参数。
回填方案中的窗口执行顺序
如果触发器的 startTime
为过去时间,那么根据公式 M=(CurrentTime- TriggerStartTime)/TumblingWindowSize,触发器将在执行未来运行之前生成 {M} backfill(past) 次并行运行,以保证触发器并发性。 窗口的执行顺序是确定的(从最旧到最新的时间间隔)。 当前无法修改此行为。
注意
在此方案中,选定 startTime
中的所有运行将在执行未来的运行之前运行。 如果需要回填很长一段时间,建议执行初始历史加载。
现有 TriggerResource 元素
以下几点适用于更新现有 TriggerResource
元素:
- 创建触发器后,便无法更改触发器的
frequency
元素(或窗口大小)与interval
元素的值。 这是triggerRun
重新运行和依赖项评估正常运行所必需的限制。 - 如果触发器的
endTime
元素的值发生更改(添加或更新),则已处理窗口的状态不会重置。 触发器遵循新endTime
值。 如果新的endTime
值早于已执行的窗口,则触发器会停止。 否则,触发器会在遇到新的endTime
值停止。
用户分配的管道重试次数
如果出现管道故障,翻转窗口触发器可以使用相同的输入参数自动重试引用的管道的执行,而无需用户干预。 使用触发器定义中的 retryPolicy
属性指定此操作。
翻转窗口触发器依赖项
若要确保仅在数据工厂中成功执行另一个翻转窗口触发器后才执行翻转窗口触发器,请创建翻转窗口触发器依赖项。
取消翻转窗口运行
如果特定窗口处于“正在等待”、“正在等待依赖项”或“正在运行”状态,则可以取消翻转窗口触发器的运行:
- 如果窗口处于“正在运行”状态,则取消关联的“管道运行”,然后触发器运行会被标记为“已取消”。
- 如果窗口处于“正在等待”或“正在等待依赖项”状态,则可以从“监视”中取消该窗口。
还可以重新运行已取消的窗口。 重新运行采用触发器的最新发布定义。 重新运行后,将重新评估指定窗口的依赖项。
Azure PowerShell 和 Azure CLI 示例
本部分展示了如何使用 Azure PowerShell 创建、启动和监视触发器。
注意
建议使用 Azure Az PowerShell 模块与 Azure 交互。 请参阅安装 Azure PowerShell 以开始使用。 若要了解如何迁移到 Az PowerShell 模块,请参阅 将 Azure PowerShell 从 AzureRM 迁移到 Az。
先决条件
- Azure 订阅。 如果没有 Azure 订阅,请在开始前创建一个试用帐户。
- Azure PowerShell。 请遵循使用 PowerShellGet 在 Windows 上安装 Azure PowerShell 中的说明。
- Azure 数据工厂:按照使用 PowerShell 创建 Azure 数据工厂中的说明创建数据工厂和管道。
示例代码
在 C:\ADFv2QuickStartPSH\ 文件夹中创建一个名为 MyTrigger.json 的且包含以下内容的 JSON 文件:
重要
保存 JSON 文件之前,请将
startTime
元素的值设置为当前的协调世界时 (UTC) 时间。 将endTime
元素的值设置为比当前 UTC 时间早一小时。{ "name": "PerfTWTrigger", "properties": { "type": "TumblingWindowTrigger", "typeProperties": { "frequency": "Minute", "interval": "15", "startTime": "2017-09-08T05:30:00Z", "endTime" : "2017-09-08T06:30:00Z", "delay": "00:00:01", "retryPolicy": { "count": 2, "intervalInSeconds": 30 }, "maxConcurrency": 50 }, "pipeline": { "pipelineReference": { "type": "PipelineReference", "referenceName": "DynamicsToBlobPerfPipeline" }, "parameters": { "windowStart": "@trigger().outputs.windowStartTime", "windowEnd": "@trigger().outputs.windowEndTime" } }, "runtimeState": "Started" } }
使用 Set-AzDataFactoryV2Trigger cmdlet 创建一个触发器:
Set-AzDataFactoryV2Trigger -ResourceGroupName $ResourceGroupName -DataFactoryName $DataFactoryName -Name "MyTrigger" -DefinitionFile "C:\ADFv2QuickStartPSH\MyTrigger.json"
使用 Get-AzDataFactoryV2Trigger cmdlet 确认触发器的状态为 Stopped:
Get-AzDataFactoryV2Trigger -ResourceGroupName $ResourceGroupName -DataFactoryName $DataFactoryName -Name "MyTrigger"
使用 Start-AzDataFactoryV2Trigger cmdlet 启动触发器:
Start-AzDataFactoryV2Trigger -ResourceGroupName $ResourceGroupName -DataFactoryName $DataFactoryName -Name "MyTrigger"
使用 Get-AzDataFactoryV2Trigger cmdlet 确认触发器的状态为 Started:
Get-AzDataFactoryV2Trigger -ResourceGroupName $ResourceGroupName -DataFactoryName $DataFactoryName -Name "MyTrigger"
使用 Get-AzDataFactoryV2TriggerRun cmdlet 在 Azure PowerShell 中获取触发器运行。 若要获取有关触发器运行的信息,请定期执行以下命令。 更新
TriggerRunStartedAfter
和TriggerRunStartedBefore
值可匹配触发器定义中的值:Get-AzDataFactoryV2TriggerRun -ResourceGroupName $ResourceGroupName -DataFactoryName $DataFactoryName -TriggerName "MyTrigger" -TriggerRunStartedAfter "2017-12-08T00:00:00" -TriggerRunStartedBefore "2017-12-08T01:00:00"
若要在 Azure 门户中监视触发器运行和管道运行,请参阅监视管道运行。