Azure 流分析中的异常情况检测Anomaly detection in Azure Stream Analytics

Azure 流分析可在云和 Azure IoT Edge 中使用,它提供内置的机器学习异常情况检测功能,此功能可用于监视两种最常见的异常情况:暂时性异常和永久性异常。Available in both the cloud and Azure IoT Edge, Azure Stream Analytics offers built-in machine learning based anomaly detection capabilities that can be used to monitor the two most commonly occurring anomalies: temporary and persistent. 使用 AnomalyDetection_SpikeAndDipAnomalyDetection_ChangePoint 函数可以直接在流分析作业中执行异常情况检测。With the AnomalyDetection_SpikeAndDip and AnomalyDetection_ChangePoint functions, you can perform anomaly detection directly in your Stream Analytics job.

机器学习模型采用统一采样的时序。The machine learning models assume a uniformly sampled time series. 如果时序不统一,你可以在调用异常情况检测之前使用翻转窗口插入一个聚合步骤。If the time series is not uniform, you may insert an aggregation step with a tumbling window prior to calling anomaly detection.

机器学习操作目前不支持季节性趋势或多变体关联。The machine learning operations do not support seasonality trends or multi-variate correlations at this time.

模型行为Model behavior

一般情况下,模型的准确度会随着滑动窗口中数据的增多而提升。Generally, the model's accuracy improves with more data in the sliding window. 指定的滑动窗口中的数据被视为该时间范围内其正态值范围的一部分。The data in the specified sliding window is treated as part of its normal range of values for that time frame. 检查当前事件是否为异常事件时,模型只考虑滑动窗口中的事件历史记录。The model only considers event history over the sliding window to check if the current event is anomalous. 当滑动窗口移动时,将从模型的训练中逐出旧值。As the sliding window moves, old values are evicted from the model’s training.

函数的工作方式是根据它们到目前为止所观测到的值建立特定的法线。The functions operate by establishing a certain normal based on what they have seen so far. 通过在置信度级别内根据建立的法线进行比较来识别离群值。Outliers are identified by comparing against the established normal, within the confidence level. 窗口大小应该基于训练正常行为模型所需的最小事件数,这样,在发生异常时,该模型才能识别它。The window size should be based on the minimum events required to train the model for normal behavior so that when an anomaly occurs, it would be able to recognize it.

模型的响应时间随着历史记录的大小增大而延长,因为它需要对更多的以往事件进行比较。The model's response time increases with history size because it needs to compare against a higher number of past events. 建议仅包含所需数量的事件,以提高性能。It is recommended to only include the necessary number of events for better performance.

时序中的间隙可能是模型在特定的时间点未接收事件而造成的。Gaps in the time series can be a result of the model not receiving events at certain points in time. 流分析将使用插补逻辑来处理这种情况。This situation is handled by Stream Analytics using imputation logic. 历史记录大小以及同一滑动窗口的持续时间用于计算事件预期抵达的平均速率。The history size, as well as a time duration, for the same sliding window is used to calculate the average rate at which events are expected to arrive.

可以使用此处提供的异常情况生成器在 IoT 中心馈送采用不同异常模式的数据。An anomaly generator available here can be used to feed an Iot Hub with data with different anomaly patterns. 可以使用这些异常情况检测函数来设置 ASA 作业,以便从此 IoT 中心读取数据和检测异常。An ASA job can be set up with these anomaly detection functions to read from this Iot Hub and detect anomalies.

高峰和低谷Spike and dip

时序事件流中的暂时性异常称为高峰和低谷。Temporary anomalies in a time series event stream are known as spikes and dips. 可以使用基于机器学习的运算符 AnomalyDetection_SpikeAndDip 来监视高峰和低谷。Spikes and dips can be monitored using the Machine Learning based operator, AnomalyDetection_SpikeAndDip.

高峰和低谷异常示例

在同一滑动窗口中,如果第二个高峰小于第一个高峰,则相比于在指定的置信度级别内为第一个高峰计算的评分,较小高峰的计算评分可能不够明显。In the same sliding window, if a second spike is smaller than the first one, the computed score for the smaller spike is probably not significant enough compared to the score for the first spike within the confidence level specified. 可以尝试降低模型的置信度级别来检测此类异常。You can try decreasing the model's confidence level to detect such anomalies. 但是,如果开始收到过多的警报,则可以使用更高的置信度间隔。However, if you start to get too many alerts, you can use a higher confidence interval.

以下示例查询假设在 2 分钟的滑动窗口中,以每秒 1 个事件的统一速率输入事件,历史记录中包含 120 个事件。The following example query assumes a uniform input rate of one event per second in a 2-minute sliding window with a history of 120 events. 最终的 SELECT 语句将提取事件,并输出评分和置信度级别为 95% 的异常状态。The final SELECT statement extracts and outputs the score and anomaly status with a confidence level of 95%.

WITH AnomalyDetectionStep AS
(
    SELECT
        EVENTENQUEUEDUTCTIME AS time,
        CAST(temperature AS float) AS temp,
        AnomalyDetection_SpikeAndDip(CAST(temperature AS float), 95, 120, 'spikesanddips')
            OVER(LIMIT DURATION(second, 120)) AS SpikeAndDipScores
    FROM input
)
SELECT
    time,
    temp,
    CAST(GetRecordPropertyValue(SpikeAndDipScores, 'Score') AS float) AS
    SpikeAndDipScore,
    CAST(GetRecordPropertyValue(SpikeAndDipScores, 'IsAnomaly') AS bigint) AS
    IsSpikeAndDipAnomaly
INTO output
FROM AnomalyDetectionStep

变化点Change point

时序事件流中的永久性异常是指事件流中的值分布变化,例如级别变化和趋势。Persistent anomalies in a time series event stream are changes in the distribution of values in the event stream, like level changes and trends. 在流分析中,将使用基于机器学习的 AnomalyDetection_ChangePoint 运算符检测此类异常。In Stream Analytics, such anomalies are detected using the Machine Learning based AnomalyDetection_ChangePoint operator.

永久性变化的持续时间比高峰和低谷要长得多,可能表示发生了灾难性事件。Persistent changes last much longer than spikes and dips and could indicate catastrophic event(s). 通常肉眼很难观察到永久性变化,但可以使用“AnomalyDetection_ChangePoint”运算符来检测 。Persistent changes are not usually visible to the naked eye, but can be detected with the AnomalyDetection_ChangePoint operator.

下图是级别变化的示例:The following image is an example of a level change:

级别变化异常的示例

下图是趋势变化的示例:The following image is an example of a trend change:

趋势变化异常的示例

以下示例查询假设在 20 分钟的滑动窗口中,以每秒 1 个事件的统一速率输入事件,历史记录大小为 1200 个事件。The following example query assumes a uniform input rate of one event per second in a 20-minute sliding window with a history size of 1200 events. 最终的 SELECT 语句将提取事件,并输出评分和置信度级别为 80% 的异常状态。The final SELECT statement extracts and outputs the score and anomaly status with a confidence level of 80%.

WITH AnomalyDetectionStep AS
(
    SELECT
        EVENTENQUEUEDUTCTIME AS time,
        CAST(temperature AS float) AS temp,
        AnomalyDetection_ChangePoint(CAST(temperature AS float), 80, 1200) 
        OVER(LIMIT DURATION(minute, 20)) AS ChangePointScores
    FROM input
)
SELECT
    time,
    temp,
    CAST(GetRecordPropertyValue(ChangePointScores, 'Score') AS float) AS
    ChangePointScore,
    CAST(GetRecordPropertyValue(ChangePointScores, 'IsAnomaly') AS bigint) AS
    IsChangePointAnomaly
INTO output
FROM AnomalyDetectionStep

性能特征Performance characteristics

这些模型的性能取决于历史记录大小、窗口持续时间、事件负载,以及是否使用函数级分区。The performance of these models depends on the history size, window duration, event load, and whether function level partitioning is used. 本部分讨论这些配置,并通过示例来说明如何保持每秒 1K、5K 和 10K 个事件的引入速率。This section discusses these configurations and provides samples for how to sustain ingestion rates of 1K, 5K and 10K events per second.

  • 历史记录大小 - 这些模型的性能与历史记录大小呈线性关系。History size - These models perform linearly with history size. 历史记录越长,模型为新事件评分所需的时间就越长。The longer the history size, the longer the models take to score a new event. 这是因为,这些模型会将新事件与历史缓冲区中的每个以往事件进行比较。This is because the models compare the new event with each of the past events in the history buffer.
  • 窗口持续时间 - 窗口持续时间应反映接收历史记录大小指定的事件数所花费的时间。Window duration - The Window duration should reflect how long it takes to receive as many events as specified by the history size. 如果窗口中没有这么多事件,Azure 流分析会插补缺失值。Without that many events in the window, Azure Stream Analytics would impute missing values. 因此,CPU 消耗量取决于历史记录大小。Hence, CPU consumption is a function of the history size.
  • 事件负载 - 事件负载越大,模型执行的工作就越多,因而会影响 CPU 消耗量。Event load - The greater the event load, the more work that is performed by the models, which impacts CPU consumption. 假设易并行有利于业务逻辑利用更多的输入分区,则可以通过易并行来横向扩展作业。The job can be scaled out by making it embarrassingly parallel, assuming it makes sense for business logic to use more input partitions.
  • 函数级分区 - 可以通过在异常情况检测函数调用中使用 PARTITION BY 来执行函数级分区Function level partitioning - Function level partitioning is done by using PARTITION BY within the anomaly detection function call. 此类分区会增大开销,因为需要同时保留多个模型的状态。This type of partitioning adds an overhead, as state needs to be maintained for multiple models at the same time. 函数级分区在设备级分区等方案中使用。Function level partitioning is used in scenarios like device level partitioning.

关系Relationship

历史记录大小、窗口持续时间和总事件负载之间的关系如下:The history size, window duration, and total event load are related in the following way:

窗口持续时间 (毫秒) = 1000 * 历史记录大小 / (每秒输入事件总数 / 输入分区计数)windowDuration (in ms) = 1000 * historySize / (Total Input Events Per Sec / Input Partition Count)

按 deviceId 将函数分区时,请将“PARTITION BY deviceId”添加到异常情况检测函数调用。When partitioning the function by deviceId, add “PARTITION BY deviceId” to the anomaly detection function call.

观测结果Observations

下表列出了在不使用分区的情况下,在单个节点 (6 SU) 上观测到的吞吐量:The following table includes the throughput observations for a single node (6 SU) for the non-partitioned case:

历史记录大小(事件数)History size (events) 窗口持续时间(毫秒)Window duration (ms) 每秒输入事件总数Total input events per sec
6060 5555 2,2002,200
600600 728728 1,6501,650
6,0006,000 10,91010,910 1,1001,100

下表列出了在使用分区的情况下,在单个节点 (6 SU) 上观测到的吞吐量:The following table includes the throughput observations for a single node (6 SU) for the partitioned case:

历史记录大小(事件数)History size (events) 窗口持续时间(毫秒)Window duration (ms) 每秒输入事件总数Total input events per sec 设备计数Device count
6060 1,0911,091 1,1001,100 10 个10
600600 10,91010,910 1,1001,100 10 个10
6,0006,000 218,182218,182 <550<550 10 个10
6060 21,81921,819 550550 100100
600600 218,182218,182 550550 100100
6,0006,000 2,181,8192,181,819 <550<550 100100

Azure 示例大规模流式处理存储库中提供了用于运行上述非分区配置的示例代码。Sample code to run the non-partitioned configurations above is located in the Streaming At Scale repo of Azure Samples. 该代码创建一个不使用函数级分区的流分析作业,该作业使用事件中心作为输入和输出。The code creates a stream analytics job with no function level partitioning, which uses Event Hub as input and output. 输入负载是使用测试客户端生成的。The input load is generated using test clients. 每个输入事件是一个 1KB JSON 文档。Each input event is a 1KB json document. 事件模拟发送 JSON 数据的 IoT 设备(最多可模拟 1000 个设备)。Events simulate an IoT device sending JSON data (for up to 1K devices). 2 个输入分区的历史记录大小、窗口持续时间和总事件负载各不相同。The history size, window duration, and total event load are varied over 2 input partitions.

备注

若要获得更准确的估算值,请根据具体的方案自定义示例。For a more accurate estimate, customize the samples to fit your scenario.

识别瓶颈Identifying bottlenecks

使用 Azure 流分析作业中的“指标”窗格可识别管道中的瓶颈。Use the Metrics pane in your Azure Stream Analytics job to identify bottlenecks in your pipeline. 查看针对吞吐量的“输入/输出事件”,以及“水印延迟”或“积压事件”,可以确定作业是否跟得上输入速率。Review Input/Output Events for throughput and "Watermark Delay" or Backlogged Events to see if the job is keeping up with the input rate. 对于事件中心指标,请查看“受限制的请求数”并相应地调整阈值单位。 For Event Hub metrics, look for Throttled Requests and adjust the Threshold Units accordingly. 对于 Cosmos DB 指标,请查看“吞吐量”下的“每个分区键范围的最大 RU/秒消耗量”,以确保均匀消耗分区键范围。 For Cosmos DB metrics, review Max consumed RU/s per partition key range under Throughput to ensure your partition key ranges are uniformly consumed. 对于 Azure SQL 数据库,请监视“日志 IO”和“CPU”。 For Azure SQL DB, monitor Log IO and CPU.

后续步骤Next steps