使用 PowerShell 和 Azure Functions 或 Azure 自动化自动暂停作业
某些应用程序需要使用流处理方法(例如需要使用 Azure 流分析),但并不是严格要求连续运行。 原因包括:
- 按计划到达(例如整点到达)的输入数据
- 传入数据量较少(每分钟记录数较少)
- 受益于时间窗口功能但本质上是批量运行的业务流程(例如财务或 HR)
- 涉及长时间运行的小规模作业的演示、原型或测试
不连续运行这些作业的好处是节省成本,因为流分析作业会随着时间的推移按流单元计费。
本文介绍如何设置 Azure 流分析作业的自动暂停。 请配置一个按计划自动暂停并恢复作业的任务。 术语“暂停”表示作业状态为“已停止”,让作业处于该状态可以避免任何计费。
本文讨论总体设计、所需组件和一些实现细节。
注意
自动暂停作业存在缺点。 主要缺点是失去低延迟/实时功能,以及在作业暂停时允许输入事件积压工作 (backlog) 不受监视地增长所带来的潜在风险。 对于大多数大规模运行的生产方案,组织不应考虑自动暂停。
设计
在本文的示例中,你想要让作业运行 N 分钟,然后暂停 M 分钟。 暂停作业时,不会消耗输入数据,因而会在上游累积输入数据。 作业启动后会在再次关闭之前赶上积压工作 (backlog),处理传入的数据。
作业运行期间,任务不应停止作业,直到其指标恢复正常为止。 相关指标是输入积压工作 (backlog) 和水印。 你将检查两者是否都已处于基线至少 N 分钟。 此行为转换为两个操作:
- 已停止的作业将在 M 分钟后重启。
- 如果正在运行的作业的积压工作 (backlog) 和水印指标正常,则该作业会在 N 分钟后随时停止。
以 N = 5 分钟,M = 10 分钟为例。 使用这些设置,一个作业至少有 5 分钟的时间来处理 15 分钟内收到的所有数据。 可节省高达 66% 的成本。
若要重启作业,请使用“上次停止时”启动选项。 此选项告知流分析处理自作业停止后上游积压的所有事件。
这种情况有两个注意事项。 首先,作业的停止时间不能超过输入流的保持期。 如果每天只运行一次作业,则需确保事件的保留期超过一天。 其次,若要让“上次停止时”模式获得接受,作业必须至少已启动一次(除非它此前从未停止过)。 因此,第一次运行作业需要手动操作,否则需要扩展脚本来涵盖这种情况。
最后一个考虑因素是使这些操作成为幂等操作。 然后你就可以随意重复操作而没有任何副作用,既易于使用,又具复原能力。
组件
API 调用
本文预计需要与流分析在以下方面进行交互:
- 获取当前作业状态(流分析资源管理):
- 如果作业正在运行:
- 获取自作业启动以来所经历的时间(日志)。
- 获取当前指标值(指标)。
- 在适用情况下停止作业(流分析资源管理)。
- 如果作业已停止:
- 获取自作业停止以来所经历的时间(日志)。
- 在适用情况下启动作业(流分析资源管理)。
- 如果作业正在运行:
对于流分析资源管理,可以使用 REST API、.NET SDK 或 CLI 库之一(Azure CLI 或 PowerShell )。
对于指标和日志,Azure 中所有内容都集中在 Azure Monitor 下,具有类似的 API 图面选择。 在查询 API 时,日志和指标总是滞后 1 到 3 分钟。 因此,将 N 设置为 5 通常意味着作业会在实际场景中运行 6 到 8 分钟。
另一考虑因素是指标始终会发出来。 作业停止时,API 将返回空记录。 必须清理 API 调用的输出,以便只关注相关值。
脚本语言
本文在 PowerShell 中实施自动暂停。 这种选择的第一个原因是 PowerShell 现在是跨平台的。 它可以在任何操作系统上运行,从而简化部署。 其次,它采用并返回对象,而不是字符串。 对象简化了自动化任务的分析和处理过程。
在 PowerShell 中,请使用 Az PowerShell 模块(此模块可启动 Az.Monitor 和 Az.StreamAnalytics)来满足所需的一切:
- 当前作业状态的 Get-AzStreamAnalyticsJob
- Start-AzStreamAnalyticsJob 或 Stop-AzStreamAnalyticsJob
- Get-AzMetric,带有
InputEventsSourcesBacklogged
(来自流分析指标) - 以
Stop Job
开头的事件名称的 Get-AzActivityLog
托管服务
若要托管 PowerShell 任务,需要一种提供计划运行的服务。 有许多选项,但我们只使用这里的两个无服务器选项:
- Azure Functions,一种可以运行几乎任何代码段的计算引擎。 它提供计时器触发器,该触发器最多每秒运行一次。
- Azure 自动化,一种用于操作云工作负荷和资源的托管服务。 其用途适当,但其最小计划间隔为 1 小时(解决方法的计划间隔更短)。
如果你不介意解决方法,可以使用 Azure 自动化来部署任务,它更简单。 但在本文中,你首先编写一个本地脚本,以便进行比较。 有了正常运行的脚本后,请将其部署在 Functions 和自动化帐户中。
开发人员工具
为简单起见,本文将在 Azure 门户中介绍该过程。
在本地写入 PowerShell 脚本
开发脚本的最佳方式是在本地开发。 由于 PowerShell 是跨平台的,因此你可以编写脚本并在任何操作系统上测试它。 在 Windows 中,你可以将 Windows 终端与 PowerShell 7 和 Azure PowerShell 配合使用。
本文使用的最终脚本适用于 Azure Functions 和 Azure 自动化。 它与以下脚本的不同之处在于它连接到托管环境(Functions 或自动化)。 本文稍后讨论该方面的内容。 首先,请逐步检查仅在本地运行的脚本版本。
该脚本特意以简单的形式编写,以便每个人都能理解。
在顶部设置所需的参数,并检查初始作业状态:
# Setting variables
$restartThresholdMinute = 10 # This is M
$stopThresholdMinute = 5 # This is N
$maxInputBacklog = 0 # The amount of backlog you tolerate when stopping the job (in event count, 0 is a good starting point)
$maxWatermark = 10 # The amount of watermark you tolerate when stopping the job (in seconds, 10 is a good starting point at low Streaming Units)
$subscriptionId = "<Replace with your Subscription Id - not the name>"
$resourceGroupName = "<Replace with your Resource Group Name>"
$asaJobName = "<Replace with your Stream Analytics job name>"
$resourceId = "/subscriptions/$($subscriptionId )/resourceGroups/$($resourceGroupName )/providers/Microsoft.StreamAnalytics/streamingjobs/$($asaJobName)"
# If not already logged, uncomment and run the two following commands:
# Connect-AzAccount -Environment AzureChinaCloud
# Set-AzContext -SubscriptionId $subscriptionId
# Check current Stream Analytics job status
$currentJobState = Get-AzStreamAnalyticsJob -ResourceGroupName $resourceGroupName -Name $asaJobName | Foreach-Object {$_.JobState}
Write-Output "asaRobotPause - Job $($asaJobName) is $($currentJobState)."
如果作业正在运行,则检查作业是否已至少运行了 N 分钟。 另请检查其积压工作 (backlog) 和水印。
# Switch state
if ($currentJobState -eq "Running")
{
# First, look up the job start time with Get-AzActivityLog
## Get-AzActivityLog issues warnings about deprecation coming in future releases. Here you ignore them via -WarningAction Ignore.
## You check in 1,000 records of history, to make sure you're not missing what you're looking for. It might need adjustment for a job that has a lot of logging happening.
## There's a bug in Get-AzActivityLog that triggers an error when Select-Object First is in the same pipeline (on the same line). So you move it down.
$startTimeStamp = Get-AzActivityLog -ResourceId $resourceId -MaxRecord 1000 -WarningAction Ignore | Where-Object {$_.EventName.Value -like "Start Job*"}
$startTimeStamp = $startTimeStamp | Select-Object -First 1 | Foreach-Object {$_.EventTimeStamp}
# Then gather the current metric values
## Get-AzMetric issues warnings about deprecation coming in future releases. Here you ignore them via -WarningAction Ignore.
$currentBacklog = Get-AzMetric -ResourceId $resourceId -TimeGrain 00:01:00 -MetricName "InputEventsSourcesBacklogged" -DetailedOutput -WarningAction Ignore
$currentWatermark = Get-AzMetric -ResourceId $resourceId -TimeGrain 00:01:00 -MetricName "OutputWatermarkDelaySeconds" -DetailedOutput -WarningAction Ignore
# Metrics are always lagging 1-3 minutes behind, so grabbing the last N minutes actually means checking N+3. This might be overly safe and can be fine-tuned down per job.
$Backlog = $currentBacklog.Data |
Where-Object {$_.Maximum -ge 0} | # Remove the empty records (when the job is stopped or starting)
Sort-Object -Property Timestamp -Descending |
Where-Object {$_.Timestamp -ge $startTimeStamp} | # Keep only the records of the latest run
Select-Object -First $stopThresholdMinute | # Take the last N records
Measure-Object -Sum Maximum # Sum over those N records
$BacklogSum = $Backlog.Sum
$Watermark = $currentWatermark.Data |
Where-Object {$_.Maximum -ge 0} |
Sort-Object -Property Timestamp -Descending |
Where-Object {$_.Timestamp -ge $startTimeStamp} |
Select-Object -First $stopThresholdMinute |
Measure-Object -Average Maximum # Here you average
$WatermarkAvg = [int]$Watermark.Average # Rounding the decimal value and casting it to integer
# Because you called Get-AzMetric with a TimeGrain of a minute, counting the number of records gives you the duration in minutes
Write-Output "asaRobotPause - Job $($asaJobName) is running since $($startTimeStamp) with a sum of $($BacklogSum) backlogged events, and an average watermark of $($WatermarkAvg) sec, for $($Watermark.Count) minutes."
# -le for lesser or equal, -ge for greater or equal
if (
($BacklogSum -ge 0) -and ($BacklogSum -le $maxInputBacklog) -and ` # is not null and is under the threshold
($WatermarkAvg -ge 0) -and ($WatermarkAvg -le $maxWatermark) -and ` # is not null and is under the threshold
($Watermark.Count -ge $stopThresholdMinute) # at least N values
)
{
Write-Output "asaRobotPause - Job $($asaJobName) is stopping..."
Stop-AzStreamAnalyticsJob -ResourceGroupName $resourceGroupName -Name $asaJobName
}
else {
Write-Output "asaRobotPause - Job $($asaJobName) is not stopping yet, it needs to have less than $($maxInputBacklog) backlogged events and under $($maxWatermark) sec watermark for at least $($stopThresholdMinute) minutes."
}
}
如果作业已停止,请检查日志,找出最后一个“Stop Job
”操作发生的时间:
elseif ($currentJobState -eq "Stopped")
{
# First, look up the job start time with Get-AzActivityLog
## Get-AzActivityLog issues warnings about deprecation coming in future releases. Here you ignore them via -WarningAction Ignore.
## You check in 1,000 records of history, to make sure you're not missing what you're looking for. It might need adjustment for a job that has a lot of logging happening.
## There's a bug in Get-AzActivityLog that triggers an error when Select-Object First is in the same pipeline (on the same line). So you move it down.
$stopTimeStamp = Get-AzActivityLog -ResourceId $resourceId -MaxRecord 1000 -WarningAction Ignore | Where-Object {$_.EventName.Value -like "Stop Job*"}
$stopTimeStamp = $stopTimeStamp | Select-Object -First 1 | Foreach-Object {$_.EventTimeStamp}
# Get-Date returns a local time. You project it to the same time zone (universal) as the result of Get-AzActivityLog that you extracted earlier.
$minutesSinceStopped = ((Get-Date).ToUniversalTime()- $stopTimeStamp).TotalMinutes
# -ge for greater or equal
if ($minutesSinceStopped -ge $restartThresholdMinute)
{
Write-Output "asaRobotPause - Job $($jobName) was paused $([int]$minutesSinceStopped) minutes ago, set interval is $($restartThresholdMinute), it is now starting..."
Start-AzStreamAnalyticsJob -ResourceGroupName $resourceGroupName -Name $asaJobName -OutputStartMode LastOutputEventTime
}
else{
Write-Output "asaRobotPause - Job $($jobName) was paused $([int]$minutesSinceStopped) minutes ago, set interval is $($restartThresholdMinute), it will not be restarted yet."
}
}
else {
Write-Output "asaRobotPause - Job $($jobName) is not in a state I can manage: $($currentJobState). Let's wait a bit, but consider helping is that doesn't go away!"
}
最后记录作业完成情况:
# Final Stream Analytics job status check
$newJobState = Get-AzStreamAnalyticsJob -ResourceGroupName $resourceGroupName -Name $asaJobName | Foreach-Object {$_.JobState}
Write-Output "asaRobotPause - Job $($asaJobName) was $($currentJobState), is now $($newJobState). Job completed."
选项 1:在 Azure Functions 中托管任务
Azure Functions 团队提供了详尽的 PowerShell 开发人员指南以供参考。
首先,你需要一个新的函数应用。 函数应用与可托管多个函数的解决方案类似。
你可以获取完整过程,但重点是进入 Azure 门户,并使用以下选项创建新函数应用:
- 发布:代码
- 运行时:PowerShell Core
- 版本:7+
预配函数应用后,即可从其整体配置开始。
Azure Functions 的托管标识
函数需要权限才能启动和停止流分析作业。 请通过托管标识分配这些权限。
第一步是通过执行此过程,为函数启用“系统分配的托管标识”。
现在,你可以在要自动暂停的流分析作业上向该标识授予适当的权限。 对于该任务,请进入流分析作业的门户区域(非函数区域),然后在“访问控制(IAM)”中将一项角色分配添加到“托管标识”类型的成员的“参与者”角色。 选择之前的函数的名称。
在 PowerShell 脚本中,你可以添加一项检查来确保托管标识已正确设置。 (GitHub 上提供了最终脚本。)
# Check if a managed identity has been enabled and granted access to a subscription, resource group, or resource
$AzContext = Get-AzContext -ErrorAction SilentlyContinue
if (-not $AzContext.Subscription.Id)
{
Throw ("Managed identity is not enabled for this app or it has not been granted access to any Azure resources. Please see /app-service/overview-managed-identity for additional details.")
}
添加一些日志记录信息,以确保函数正在启动:
$currentUTCtime = (Get-Date).ToUniversalTime()
# Write an information log with the current time.
Write-Host "asaRobotPause - PowerShell timer trigger function is starting at time: $currentUTCtime"
Azure Functions 参数
将参数传递给 Functions 中脚本的最佳方法是使用函数应用的应用程序设置作为环境变量。
第一步是按照过程将参数定义为函数应用页面上的“应用设置”。 您需要:
名称 | 值 |
---|---|
maxInputBacklog |
停止作业时允许的积压工作 (backlog) 数量。 在事件计数中,0 是一个很好的起点。 |
maxWatermark |
停止作业时允许的水印数量。 10 (以秒为单位)在流单元数较小时是一个很好的起点。 |
restartThresholdMinute |
M:重启已停止作业之前等待的时间(以分钟为单位)。 |
stopThresholdMinute |
N:停止正在运行的作业之前的冷却时间(以分钟为单位)。 在此期间,输入积压工作 (backlog) 需要保持在 0 。 |
subscriptionId |
要自动暂停的流分析作业的订阅 ID(不是名称)。 |
resourceGroupName |
要自动暂停的流分析作业的资源组名称。 |
asaJobName |
要自动暂停的流分析作业的名称。 |
然后更新 PowerShell 脚本,以相应地加载变量:
$maxInputBacklog = $env:maxInputBacklog
$maxWatermark = $env:maxWatermark
$restartThresholdMinute = $env:restartThresholdMinute
$stopThresholdMinute = $env:stopThresholdMinute
$subscriptionId = $env:subscriptionId
$resourceGroupName = $env:resourceGroupName
$asaJobName = $env:asaJobName
PowerShell 模块要求
与必须在本地安装 Az PowerShell 才能使用流分析命令(如 Start-AzStreamAnalyticsJob
)一样,你需要将其添加到函数应用主机:
- 在函数应用页面的“函数”下,选择“应用文件”,然后选择“requirements.psd1”。
- 取消注释
'Az' = '6.*'
行。 - 若要使该更改生效,请重启应用。
创建函数
完成配置后,可以在函数应用内创建特定函数来运行脚本。
在门户中,开发一个由计时器触发的函数。 使用 0 */1 * * * *
来确保每分钟都触发该函数,并确保它显示为“每分钟的第 0 秒”。
必要时可以通过更新计划来更改“集成”中的计时器值。
然后,我们可以在“代码 + 测试”中复制 run.ps1 中的脚本并对其进行测试。 也可从 GitHub 复制完整脚本。 业务逻辑已移至 try/catch 语句中,以在处理过程失败时生成适当的错误消息。
可以通过在“代码 + 测试”窗格中选择“测试/运行”来检查一切是否正常运行。 还可以检查“监视器”窗格,但它总是在几个执行之后出现。
设置有关函数执行的警报
最后,如果函数未成功运行,你希望通过警报收到通知。 警报成本较低,但可能会阻止费用更高昂的情况。
在函数应用页面的“日志”下,运行以下查询。 它返回过去 5 分钟内所有不成功的运行。
requests
| where success == false
| where timestamp > ago(5min)
| summarize failedCount=sum(itemCount) by operation_Name
| order by failedCount desc
在查询编辑器中,选择“新建预警规则”。 在打开的窗格中,将“度量”定义为:
- 度量值:failedCount
- 聚合类型:Total
- 聚合粒度:5 分钟
接下来设置“警报逻辑”,如下所示:
- 运算符:大于
- 阈值:0
- 评估频率:5 分钟
接下来重用或新建操作组。 然后完成配置。
若要检查警报设置是否正确,可以在 PowerShell 脚本中的任何位置添加 throw "Testing the alert"
,然后等待 5 分钟以接收电子邮件。
选项 2:在 Azure 自动化中托管任务
首先需要新的自动化帐户。 自动化帐户类似于可托管多个 runbook 的解决方案。
有关过程,请参阅快速入门:使用 Azure 门户创建自动化帐户。 可以选择直接在“高级”选项卡中使用系统分配的托管标识。
自动化团队提供了一个有关 PowerShell runbook 入门的教程作为参考。
Azure 自动化参数
通过 runbook,你可以使用 PowerShell 的经典形参语法传递实参:
Param(
[string]$subscriptionId,
[string]$resourceGroupName,
[string]$asaJobName,
[int]$restartThresholdMinute,
[int]$stopThresholdMinute,
[int]$maxInputBacklog,
[int]$maxWatermark
)
Azure 自动化的托管标识
自动化帐户应已在预配期间收到托管标识。 但如有必要,你可以使用此过程启用托管标识。
正如你为函数所做的那样,你需要在要自动暂停的流分析作业上授予适当的权限。
若要授予权限,请进入流分析作业的门户区域(非自动化页面),然后在“访问控制(IAM)”中将一项角色分配添加到“托管标识”类型的成员的“参与者”角色。 选择之前的自动化帐户的名称。
在 PowerShell 脚本中,你可以添加一项检查来确保托管标识已正确设置。 (GitHub 上提供了最终脚本。)
# Ensure that you don't inherit an AzContext in your runbook
Disable-AzContextAutosave -Scope Process | Out-Null
# Connect by using a managed service identity
try {
$AzureContext = (Connect-AzAccount -Identity).context
}
catch{
Write-Output "There is no system-assigned user identity. Aborting.";
exit
}
创建 Runbook
完成配置后,可以在自动化帐户中创建特定 runbook 来运行脚本。 这里不需要添加 Azure PowerShell 作为要求。 它已内置。
在门户中的“过程自动化”下,选择“Runbook”。 然后选择“创建 runbook”,选择“PowerShell”作为 runbook 类型,并选择 7 以上的任何版本(目前为“7.1 (预览版)”)作为版本。
现在可以粘贴脚本并测试它。 可以从 GitHub 复制完整脚本。 业务逻辑已移至 try/catch 语句中,以在处理过程失败时生成适当的错误消息。
可以在“测试”窗格中检查所有项是否都正确连接。
之后,你需要通过选择“发布”来发布作业,这样就可以将 runbook 链接到计划。 创建和链接计划是一个简单的过程。 现在,请记住,我们有解决方法来帮助实现 1 小时内的计划间隔。
最后,你可以设置警报。 第一步是通过自动化帐户的诊断设置启用日志。 第二步是通过查询捕获错误,就像你针对 Functions 所做的那样。
成果
在流分析作业中,你可以在两个位置验证一切是否按预期运行。
下面是活动日志:
下面是指标:
理解脚本后,可以直接重写它以扩展其范围。 可以轻松地更新脚本,将一系列作业而不是单个作业作为目标。 可以通过标记、资源组甚至整个订阅来定义和处理更大的范围。
获取支持
如需进一步的帮助,请尝试 Azure 流分析的 Microsoft Q&A 页。
后续步骤
你已了解使用 PowerShell 来自动管理 Azure 流分析作业的基础知识。 若要了解详细信息,请参阅以下文章: