使用 PowerShell 和 Azure Functions 或 Azure 自动化自动暂停作业

某些应用程序需要使用流处理方法(例如需要使用 Azure 流分析),但并不是严格要求连续运行。 原因包括:

  • 按计划到达(例如整点到达)的输入数据
  • 传入数据量较少(每分钟记录数较少)
  • 受益于时间窗口功能但本质上是批量运行的业务流程(例如财务或 HR)
  • 涉及长时间运行的小规模作业的演示、原型或测试


本文介绍如何设置 Azure 流分析作业的自动暂停。 请配置一个按计划自动暂停并恢复作业的任务。 术语“暂停”表示作业状态为“已停止”,让作业处于该状态可以避免任何计费。



自动暂停作业存在缺点。 主要缺点是失去低延迟/实时功能,以及在作业暂停时允许输入事件积压工作 (backlog) 不受监视地增长所带来的潜在风险。 对于大多数大规模运行的生产方案,组织不应考虑自动暂停。


在本文的示例中,你想要让作业运行 N 分钟,然后暂停 M 分钟。 暂停作业时,不会消耗输入数据,因而会在上游累积输入数据。 作业启动后会在再次关闭之前赶上积压工作 (backlog),处理传入的数据。


作业运行期间,任务不应停止作业,直到其指标恢复正常为止。 相关指标是输入积压工作 (backlog) 和水印。 你将检查两者是否都已处于基线至少 N 分钟。 此行为转换为两个操作:

  • 已停止的作业将在 M 分钟后重启。
  • 如果正在运行的作业的积压工作 (backlog) 和水印指标正常,则该作业会在 N 分钟后随时停止。


以 N = 5 分钟,M = 10 分钟为例。 使用这些设置,一个作业至少有 5 分钟的时间来处理 15 分钟内收到的所有数据。 可节省高达 66% 的成本。

若要重启作业,请使用“上次停止时”启动选项。 此选项告知流分析处理自作业停止后上游积压的所有事件。

这种情况有两个注意事项。 首先,作业的停止时间不能超过输入流的保持期。 如果每天只运行一次作业,则需确保事件的保留期超过一天。 其次,若要让“上次停止时”模式获得接受,作业必须至少已启动一次(除非它此前从未停止过)。 因此,第一次运行作业需要手动操作,否则需要扩展脚本来涵盖这种情况。

最后一个考虑因素是使这些操作成为幂等操作。 然后你就可以随意重复操作而没有任何副作用,既易于使用,又具复原能力。


API 调用


  • 获取当前作业状态(流分析资源管理):
    • 如果作业正在运行:
      • 获取自作业启动以来所经历的时间(日志)。
      • 获取当前指标值(指标)。
      • 在适用情况下停止作业(流分析资源管理)。
    • 如果作业已停止:
      • 获取自作业停止以来所经历的时间(日志)。
      • 在适用情况下启动作业(流分析资源管理)。

对于流分析资源管理,可以使用 REST API.NET SDK 或 CLI 库之一(Azure CLIPowerShell )。

对于指标和日志,Azure 中所有内容都集中在 Azure Monitor 下,具有类似的 API 图面选择。 在查询 API 时,日志和指标总是滞后 1 到 3 分钟。 因此,将 N 设置为 5 通常意味着作业会在实际场景中运行 6 到 8 分钟。

另一考虑因素是指标始终会发出来。 作业停止时,API 将返回空记录。 必须清理 API 调用的输出,以便只关注相关值。


本文在 PowerShell 中实施自动暂停。 这种选择的第一个原因是 PowerShell 现在是跨平台的。 它可以在任何操作系统上运行,从而简化部署。 其次,它采用并返回对象,而不是字符串。 对象简化了自动化任务的分析和处理过程。

在 PowerShell 中,请使用 Az PowerShell 模块(此模块可启动 Az.MonitorAz.StreamAnalytics)来满足所需的一切:


若要托管 PowerShell 任务,需要一种提供计划运行的服务。 有许多选项,但我们只使用这里的两个无服务器选项:

  • Azure Functions,一种可以运行几乎任何代码段的计算引擎。 它提供计时器触发器,该触发器最多每秒运行一次。
  • Azure 自动化,一种用于操作云工作负荷和资源的托管服务。 其用途适当,但其最小计划间隔为 1 小时(解决方法的计划间隔更短)。

如果你不介意解决方法,可以使用 Azure 自动化来部署任务,它更简单。 但在本文中,你首先编写一个本地脚本,以便进行比较。 有了正常运行的脚本后,请将其部署在 Functions 和自动化帐户中。


为简单起见,本文将在 Azure 门户中介绍该过程。

在本地写入 PowerShell 脚本

开发脚本的最佳方式是在本地开发。 由于 PowerShell 是跨平台的,因此你可以编写脚本并在任何操作系统上测试它。 在 Windows 中,你可以将 Windows 终端PowerShell 7Azure PowerShell 配合使用。

本文使用的最终脚本适用于 Azure FunctionsAzure 自动化。 它与以下脚本的不同之处在于它连接到托管环境(Functions 或自动化)。 本文稍后讨论该方面的内容。 首先,请逐步检查仅在本地运行的脚本版本。



# Setting variables
$restartThresholdMinute = 10 # This is M
$stopThresholdMinute = 5 # This is N

$maxInputBacklog = 0 # The amount of backlog you tolerate when stopping the job (in event count, 0 is a good starting point)
$maxWatermark = 10 # The amount of watermark you tolerate when stopping the job (in seconds, 10 is a good starting point at low Streaming Units)

$subscriptionId = "<Replace with your Subscription Id - not the name>"
$resourceGroupName = "<Replace with your Resource Group Name>"
$asaJobName = "<Replace with your Stream Analytics job name>"

$resourceId = "/subscriptions/$($subscriptionId )/resourceGroups/$($resourceGroupName )/providers/Microsoft.StreamAnalytics/streamingjobs/$($asaJobName)"

# If not already logged, uncomment and run the two following commands:
# Connect-AzAccount -Environment AzureChinaCloud
# Set-AzContext -SubscriptionId $subscriptionId

# Check current Stream Analytics job status
$currentJobState = Get-AzStreamAnalyticsJob  -ResourceGroupName $resourceGroupName -Name $asaJobName | Foreach-Object {$_.JobState}
Write-Output "asaRobotPause - Job $($asaJobName) is $($currentJobState)."

如果作业正在运行,则检查作业是否已至少运行了 N 分钟。 另请检查其积压工作 (backlog) 和水印。

# Switch state
if ($currentJobState -eq "Running")
    # First, look up the job start time with Get-AzActivityLog
    ## Get-AzActivityLog issues warnings about deprecation coming in future releases. Here you ignore them via -WarningAction Ignore.
    ## You check in 1,000 records of history, to make sure you're not missing what you're looking for. It might need adjustment for a job that has a lot of logging happening.
    ## There's a bug in Get-AzActivityLog that triggers an error when Select-Object First is in the same pipeline (on the same line). So you move it down.
    $startTimeStamp = Get-AzActivityLog -ResourceId $resourceId -MaxRecord 1000 -WarningAction Ignore | Where-Object {$_.EventName.Value -like "Start Job*"}
    $startTimeStamp = $startTimeStamp | Select-Object -First 1 | Foreach-Object {$_.EventTimeStamp}

    # Then gather the current metric values
    ## Get-AzMetric issues warnings about deprecation coming in future releases. Here you ignore them via -WarningAction Ignore.
    $currentBacklog = Get-AzMetric -ResourceId $resourceId -TimeGrain 00:01:00 -MetricName "InputEventsSourcesBacklogged" -DetailedOutput -WarningAction Ignore
    $currentWatermark = Get-AzMetric -ResourceId $resourceId -TimeGrain 00:01:00 -MetricName "OutputWatermarkDelaySeconds" -DetailedOutput -WarningAction Ignore

    # Metrics are always lagging 1-3 minutes behind, so grabbing the last N minutes actually means checking N+3. This might be overly safe and can be fine-tuned down per job.
    $Backlog =  $currentBacklog.Data |
                    Where-Object {$_.Maximum -ge 0} | # Remove the empty records (when the job is stopped or starting)
                    Sort-Object -Property Timestamp -Descending |
                    Where-Object {$_.Timestamp -ge $startTimeStamp} | # Keep only the records of the latest run
                    Select-Object -First $stopThresholdMinute | # Take the last N records
                    Measure-Object -Sum Maximum # Sum over those N records
    $BacklogSum = $Backlog.Sum

    $Watermark = $currentWatermark.Data |
                    Where-Object {$_.Maximum -ge 0} |
                    Sort-Object -Property Timestamp -Descending |
                    Where-Object {$_.Timestamp -ge $startTimeStamp} |
                    Select-Object -First $stopThresholdMinute |
                    Measure-Object -Average Maximum # Here you average
    $WatermarkAvg = [int]$Watermark.Average # Rounding the decimal value and casting it to integer

    # Because you called Get-AzMetric with a TimeGrain of a minute, counting the number of records gives you the duration in minutes
    Write-Output "asaRobotPause - Job $($asaJobName) is running since $($startTimeStamp) with a sum of $($BacklogSum) backlogged events, and an average watermark of $($WatermarkAvg) sec, for $($Watermark.Count) minutes."

    # -le for lesser or equal, -ge for greater or equal
    if (
        ($BacklogSum -ge 0) -and ($BacklogSum -le $maxInputBacklog) -and ` # is not null and is under the threshold
        ($WatermarkAvg -ge 0) -and ($WatermarkAvg -le $maxWatermark) -and ` # is not null and is under the threshold
        ($Watermark.Count -ge $stopThresholdMinute) # at least N values
        Write-Output "asaRobotPause - Job $($asaJobName) is stopping..."
        Stop-AzStreamAnalyticsJob -ResourceGroupName $resourceGroupName -Name $asaJobName
    else {
        Write-Output "asaRobotPause - Job $($asaJobName) is not stopping yet, it needs to have less than $($maxInputBacklog) backlogged events and under $($maxWatermark) sec watermark for at least $($stopThresholdMinute) minutes."

如果作业已停止,请检查日志,找出最后一个“Stop Job”操作发生的时间:

elseif ($currentJobState -eq "Stopped")
    # First, look up the job start time with Get-AzActivityLog
    ## Get-AzActivityLog issues warnings about deprecation coming in future releases. Here you ignore them via -WarningAction Ignore.
    ## You check in 1,000 records of history, to make sure you're not missing what you're looking for. It might need adjustment for a job that has a lot of logging happening.
    ## There's a bug in Get-AzActivityLog that triggers an error when Select-Object First is in the same pipeline (on the same line). So you move it down.
    $stopTimeStamp = Get-AzActivityLog -ResourceId $resourceId -MaxRecord 1000 -WarningAction Ignore | Where-Object {$_.EventName.Value -like "Stop Job*"}
    $stopTimeStamp = $stopTimeStamp | Select-Object -First 1 | Foreach-Object {$_.EventTimeStamp}

    # Get-Date returns a local time. You project it to the same time zone (universal) as the result of Get-AzActivityLog that you extracted earlier.
    $minutesSinceStopped = ((Get-Date).ToUniversalTime()- $stopTimeStamp).TotalMinutes

    # -ge for greater or equal
    if ($minutesSinceStopped -ge $restartThresholdMinute)
        Write-Output "asaRobotPause - Job $($jobName) was paused $([int]$minutesSinceStopped) minutes ago, set interval is $($restartThresholdMinute), it is now starting..."
        Start-AzStreamAnalyticsJob -ResourceGroupName $resourceGroupName -Name $asaJobName -OutputStartMode LastOutputEventTime
        Write-Output "asaRobotPause - Job $($jobName) was paused $([int]$minutesSinceStopped) minutes ago, set interval is $($restartThresholdMinute), it will not be restarted yet."
else {
    Write-Output "asaRobotPause - Job $($jobName) is not in a state I can manage: $($currentJobState). Let's wait a bit, but consider helping is that doesn't go away!"


# Final Stream Analytics job status check
$newJobState = Get-AzStreamAnalyticsJob  -ResourceGroupName $resourceGroupName -Name $asaJobName | Foreach-Object {$_.JobState}
Write-Output "asaRobotPause - Job $($asaJobName) was $($currentJobState), is now $($newJobState). Job completed."

选项 1:在 Azure Functions 中托管任务

Azure Functions 团队提供了详尽的 PowerShell 开发人员指南以供参考。

首先,你需要一个新的函数应用。 函数应用与可托管多个函数的解决方案类似。

你可以获取完整过程,但重点是进入 Azure 门户,并使用以下选项创建新函数应用:

  • 发布:代码
  • 运行时:PowerShell Core
  • 版本:7+


Azure Functions 的托管标识

函数需要权限才能启动和停止流分析作业。 请通过托管标识分配这些权限。


现在,你可以在要自动暂停的流分析作业上向该标识授予适当的权限。 对于该任务,请进入流分析作业的门户区域(非函数区域),然后在“访问控制(IAM)”中将一项角色分配添加到“托管标识”类型的成员的“参与者”角色。 选择之前的函数的名称。


在 PowerShell 脚本中,你可以添加一项检查来确保托管标识已正确设置。 (GitHub 上提供了最终脚本。)

# Check if a managed identity has been enabled and granted access to a subscription, resource group, or resource
$AzContext = Get-AzContext -ErrorAction SilentlyContinue
if (-not $AzContext.Subscription.Id)
    Throw ("Managed identity is not enabled for this app or it has not been granted access to any Azure resources. Please see /app-service/overview-managed-identity for additional details.")


$currentUTCtime = (Get-Date).ToUniversalTime()

# Write an information log with the current time.
Write-Host "asaRobotPause - PowerShell timer trigger function is starting at time: $currentUTCtime"

Azure Functions 参数

将参数传递给 Functions 中脚本的最佳方法是使用函数应用的应用程序设置作为环境变量

第一步是按照过程将参数定义为函数应用页面上的“应用设置”。 您需要:

maxInputBacklog 停止作业时允许的积压工作 (backlog) 数量。 在事件计数中,0 是一个很好的起点。
maxWatermark 停止作业时允许的水印数量。 10(以秒为单位)在流单元数较小时是一个很好的起点。
restartThresholdMinute M:重启已停止作业之前等待的时间(以分钟为单位)。
stopThresholdMinute N:停止正在运行的作业之前的冷却时间(以分钟为单位)。 在此期间,输入积压工作 (backlog) 需要保持在 0
subscriptionId 要自动暂停的流分析作业的订阅 ID(不是名称)。
resourceGroupName 要自动暂停的流分析作业的资源组名称。
asaJobName 要自动暂停的流分析作业的名称。

然后更新 PowerShell 脚本,以相应地加载变量:

$maxInputBacklog = $env:maxInputBacklog
$maxWatermark = $env:maxWatermark

$restartThresholdMinute = $env:restartThresholdMinute
$stopThresholdMinute = $env:stopThresholdMinute

$subscriptionId = $env:subscriptionId
$resourceGroupName = $env:resourceGroupName
$asaJobName = $env:asaJobName

PowerShell 模块要求

与必须在本地安装 Az PowerShell 才能使用流分析命令(如 Start-AzStreamAnalyticsJob)一样,你需要将其添加到函数应用主机

  1. 在函数应用页面的“函数”下,选择“应用文件”,然后选择“requirements.psd1”
  2. 取消注释 'Az' = '6.*' 行。
  3. 若要使该更改生效,请重启应用。




在门户中,开发一个由计时器触发的函数。 使用 0 */1 * * * * 来确保每分钟都触发该函数,并确保它显示为“每分钟的第 0 秒”。




然后,我们可以在“代码 + 测试”中复制 run.ps1 中的脚本并对其进行测试。 也可从 GitHub 复制完整脚本。 业务逻辑已移至 try/catch 语句中,以在处理过程失败时生成适当的错误消息。

函数的“代码 + 测试”窗格的屏幕截图。

可以通过在“代码 + 测试”窗格中选择“测试/运行”来检查一切是否正常运行。 还可以检查“监视器”窗格,但它总是在几个执行之后出现。



最后,如果函数未成功运行,你希望通过警报收到通知。 警报成本较低,但可能会阻止费用更高昂的情况。

在函数应用页面的“日志”下,运行以下查询。 它返回过去 5 分钟内所有不成功的运行。

| where success == false
| where timestamp > ago(5min)
| summarize failedCount=sum(itemCount) by operation_Name
| order by failedCount desc

在查询编辑器中,选择“新建预警规则”。 在打开的窗格中,将“度量”定义为:

  • 度量值:failedCount
  • 聚合类型:Total
  • 聚合粒度:5 分钟


  • 运算符:大于
  • 阈值:0
  • 评估频率:5 分钟

接下来重用或新建操作组。 然后完成配置。

若要检查警报设置是否正确,可以在 PowerShell 脚本中的任何位置添加 throw "Testing the alert",然后等待 5 分钟以接收电子邮件。

选项 2:在 Azure 自动化中托管任务

首先需要新的自动化帐户。 自动化帐户类似于可托管多个 runbook 的解决方案。

有关过程,请参阅快速入门:使用 Azure 门户创建自动化帐户。 可以选择直接在“高级”选项卡中使用系统分配的托管标识。

自动化团队提供了一个有关 PowerShell runbook 入门的教程作为参考。

Azure 自动化参数

通过 runbook,你可以使用 PowerShell 的经典形参语法传递实参:




Azure 自动化的托管标识

自动化帐户应已在预配期间收到托管标识。 但如有必要,你可以使用此过程启用托管标识。


若要授予权限,请进入流分析作业的门户区域(非自动化页面),然后在“访问控制(IAM)”中将一项角色分配添加到“托管标识”类型的成员的“参与者”角色。 选择之前的自动化帐户的名称。


在 PowerShell 脚本中,你可以添加一项检查来确保托管标识已正确设置。 (GitHub 上提供了最终脚本。)

# Ensure that you don't inherit an AzContext in your runbook
Disable-AzContextAutosave -Scope Process | Out-Null

# Connect by using a managed service identity
try {
        $AzureContext = (Connect-AzAccount -Identity).context
        Write-Output "There is no system-assigned user identity. Aborting.";

创建 Runbook

完成配置后,可以在自动化帐户中创建特定 runbook 来运行脚本。 这里不需要添加 Azure PowerShell 作为要求。 它已内置。

在门户中的“过程自动化”下,选择“Runbook”。 然后选择“创建 runbook”,选择“PowerShell”作为 runbook 类型,并选择 7 以上的任何版本(目前为“7.1 (预览版)”)作为版本。

现在可以粘贴脚本并测试它。 可以从 GitHub 复制完整脚本。 业务逻辑已移至 try/catch 语句中,以在处理过程失败时生成适当的错误消息。

Azure 自动化中的 runbook 脚本编辑器的屏幕截图。


之后,你需要通过选择“发布”来发布作业,这样就可以将 runbook 链接到计划。 创建和链接计划是一个简单的过程。 现在,请记住,我们有解决方法来帮助实现 1 小时内的计划间隔。

最后,你可以设置警报。 第一步是通过自动化帐户的诊断设置启用日志。 第二步是通过查询捕获错误,就像你针对 Functions 所做的那样。







理解脚本后,可以直接重写它以扩展其范围。 可以轻松地更新脚本,将一系列作业而不是单个作业作为目标。 可以通过标记、资源组甚至整个订阅来定义和处理更大的范围。


如需进一步的帮助,请尝试 Azure 流分析的 Microsoft Q&A 页


你已了解使用 PowerShell 来自动管理 Azure 流分析作业的基础知识。 若要了解详细信息,请参阅以下文章: