detect_anomalous_spike_fl()

适用于:✅Azure 数据资源管理器Azure MonitorMicrosoft Sentinel

检测带时间戳的数据中的数值变量是否出现异常峰值。

函数 detect_anomalous_spike_fl() 是一个 UDF(用户定义的函数),用于检测带有时间戳的数据(例如流量日志)中的数值变量是否出现异常峰值,例如外泄数据量或失败的登录尝试。 在网络安全上下文中,这样的事件可能是可疑的,并指示潜在的攻击或盗用。

异常模型基于两个分数的组合:Z 分数(高于平均值的标准偏差数)和 Q 分数(高于高分位数的四分位距数)。 Z 分数是一个简单且常见的离群值指标;Q 分数基于 Tukey 的围栏,但我们将定义扩展到任何分位数以实现更好的控制。 选择不同的分位数(默认情况下使用第 95 和第 25 分位数)可以检测更重要的离群值,从而提高精度。 该模型基于某些数值变量构建,按范围(如订阅或帐户)和实体(如用户或设备)进行计算。

在计算单变量数值数据点的分数并检查其他要求(例如,范围训练期间的活动天数高于预定义的阈值)之后,我们将检查每个分数是否高于其预定义的阈值。 如果是这样,则检测到峰值并将数据点标记为异常。 生成两个模型:一个用于实体级别(由 entityColumnName 参数定义),例如每个范围(由 scopeColumnName 参数定义,例如帐户或订阅)的用户或设备。 第二个模型是为整个范围构建的。 对每个模型执行异常情况检测逻辑,如果在其中一个模型中检测到异常,则会显示出来。 默认情况下,检测向上峰值;向下峰值 ('dips') 在某些情况下也可能很有趣,并且可以通过调整逻辑来检测。

该模型的直接输出是基于分数的异常分数。 这个评分在 [0, 1] 范围内是单调的,1 表示异常。 除异常评分外,还有一个二进制标志用于检测到的异常(由最小阈值参数控制)和其他解释字段。

请注意,该函数会忽略变量的时态结构(主要用于可伸缩性和可解释性)。 如果变量具有重要的时态组件(如趋势和季节性),我们建议考虑 series_decompose_anomalies() 函数或使用 series_decompose() 来计算残差并对其执行 detect_anomalous_spike_fl()

语法

detect_anomalous_spike_fl(numericColumnName、, entityColumnName, scopeColumnName, timeColumnName, startTraining, startDetection, endDetection, [minTrainingDaysThresh], [lowPercentileForQscore], [highPercentileForQscore], [minSlicesPerEntity], [zScoreThreshEntity], [qScoreThreshEntity], [minNumValueThreshEntity], [minSlicesPerScope], [zScoreThreshScope], [qScoreThreshScope], [minNumValueThreshScope])

详细了解语法约定

参数

客户 类型​​ 必需 说明
numericColumnName string ✔️ 包含要计算异常模型的数值变量的输入表列的名称。
entityColumnName string ✔️ 输入表列的名称,其中包含要计算异常模型的实体的名称或 ID。
scopeColumnName string ✔️ 输入表列的名称,其中包含分区或范围,以便为每个范围生成不同的异常模型。
timeColumnName string ✔️ 输入表列的名称,其中包含时间戳,用于定义训练和检测周期。
startTraining datetime ✔️ 异常模型的训练期的开始。 它的结束由检测周期的开始来定义。
startDetection datetime ✔️ 异常情况检测的检测周期的开始。
endDetection datetime ✔️ 异常情况检测的检测周期的结束。
minTrainingDaysThresh int 训练期间,用于计算异常的范围所存在的最少天数。 如果它低于阈值,则范围被视为太新且未知,因此不会计算异常。 默认值为 14。
lowPercentileForQscore real 范围 [0.0,1.0] 内的数字,表示要计算为 Q 分数下限的百分位数。 在 Tukey 的围栏中,使用 0.25。 默认值为 0.25。 选择较低的百分位数可以提高精度,因为可以检测到更显著的异常。
highPercentileForQscore real 范围 [0.0,1.0] 内的数字,表示要计算为 Q 分数上限的百分位数。 在 Tukey 的围栏中,使用 0.75。 默认值为 0.9。 选择较高的百分位数可以提高精度,因为可以检测到更显著的异常。
minSlicesPerEntity int 在为实体生成异常模型之前实体上存在的“切片”(例如天)的最低阈值。 如果数量低于阈值,则实体被认为太新且不稳定。 默认值为 20。
zScoreThreshEntity real 被标记为异常的实体级 Z 分数(高于平均值的标准偏差数)的最低阈值。 选择较高的值时,只会检测到更显著的异常。 默认值为 3.0。
qScoreThreshEntity real 要标记为异常的实体级 Q 分数(高于高分位数的四分位距数)的最低阈值。 选择较高的值时,只会检测到更显著的异常。 默认值为 2.0。
minNumValueThreshEntity long 要标记为实体异常的数值变量的最小阈值。 这对于筛选出值在统计上异常(高 Z 分数和 Q 分数)但值本身太小而无意义的情况很有用。 默认值为 0。
minSlicesPerScope int 在为范围生成异常模型之前范围上存在的“切片”(例如天)的最低阈值。 如果数量低于阈值,则范围被认为太新且不稳定。 默认值为 20。
zScoreThreshScope real 要标记为异常的范围级 Z 分数(高于平均值的标准偏差数)的最低阈值。 选择较高的值时,只会检测到更显著的异常。 默认值为 3.0。
qScoreThreshScope real 要标记为异常的范围级 Q 分数(高于高分位数的四分位距数)的最低阈值。 选择较高的值时,只会检测到更显著的异常。 默认值为 2.0。
minNumValueThreshScope long 要标记为范围异常的数值变量的最小阈值。 这对于筛选出值在统计上异常(高 Z 分数和 Q 分数)但值本身太小而无意义的情况很有用。 默认值为 0。

函数定义

可以通过将函数的代码嵌入为查询定义的函数,或将其创建为数据库中的存储函数来定义函数,如下所示:

使用以下 let 语句定义函数。 不需要任何权限。

重要

let 语句不能独立运行。 它必须后跟一个表格表达式语句。 若要运行 detect_anomalous_spike_fl() 的工作示例,请参阅示例

let detect_anomalous_spike_fl = (T:(*), numericColumnName:string, entityColumnName:string, scopeColumnName:string
                            , timeColumnName:string, startTraining:datetime, startDetection:datetime, endDetection:datetime, minTrainingDaysThresh:int = 14
                            , lowPercentileForQscore:real = 0.25, highPercentileForQscore:real = 0.9
                            , minSlicesPerEntity:int = 20, zScoreThreshEntity:real = 3.0, qScoreThreshEntity:real = 2.0, minNumValueThreshEntity:long = 0
                            , minSlicesPerScope:int = 20, zScoreThreshScope:real = 3.0, qScoreThreshScope:real = 2.0, minNumValueThreshScope:long = 0)
{
// pre-process the input data by adding standard column names and dividing to datasets
let timePeriodBinSize = 'day';      // we assume a reasonable bin for time is day
let processedData = (
    T
    | extend scope      = column_ifexists(scopeColumnName, '')
    | extend entity     = column_ifexists(entityColumnName, '')
    | extend numVec     = tolong(column_ifexists(numericColumnName, 0))
    | extend sliceTime  = todatetime(column_ifexists(timeColumnName, ''))
    | where isnotempty(scope) and isnotempty(sliceTime)
    | extend dataSet = case((sliceTime >= startTraining and sliceTime < startDetection), 'trainSet'
                           , sliceTime >= startDetection and sliceTime <= endDetection,  'detectSet'
                                                                                       , 'other')
    | where dataSet in ('trainSet', 'detectSet')
);
let aggregatedCandidateScopeData = (
    processedData
    | summarize firstSeenScope = min(sliceTime), lastSeenScope = max(sliceTime) by scope
    | extend slicesInTrainingScope = datetime_diff(timePeriodBinSize, startDetection, firstSeenScope)
    | where slicesInTrainingScope >= minTrainingDaysThresh and lastSeenScope >= startDetection
);
let entityModelData = (
    processedData
    | join kind = inner (aggregatedCandidateScopeData) on scope
    | where dataSet == 'trainSet'
    | summarize countSlicesEntity = dcount(sliceTime), avgNumEntity = avg(numVec), sdNumEntity = stdev(numVec)
            , lowPrcNumEntity = percentile(numVec, lowPercentileForQscore), highPrcNumEntity = percentile(numVec, highPercentileForQscore)
            , firstSeenEntity = min(sliceTime), lastSeenEntity = max(sliceTime)
        by scope, entity
    | extend slicesInTrainingEntity = datetime_diff(timePeriodBinSize, startDetection, firstSeenEntity)
);
let scopeModelData = (
    processedData
    | join kind = inner (aggregatedCandidateScopeData) on scope
    | where dataSet == 'trainSet'
    | summarize countSlicesScope = dcount(sliceTime), avgNumScope = avg(numVec), sdNumScope = stdev(numVec)
            , lowPrcNumScope = percentile(numVec, lowPercentileForQscore), highPrcNumScope = percentile(numVec, highPercentileForQscore)
        by scope
);
let resultsData = (
    processedData
    | where dataSet == 'detectSet'
    | join kind = inner (aggregatedCandidateScopeData) on scope 
    | join kind = leftouter (entityModelData) on scope, entity 
    | join kind = leftouter (scopeModelData) on scope
    | extend zScoreEntity       = iff(countSlicesEntity >= minSlicesPerEntity, round((toreal(numVec) - avgNumEntity)/(sdNumEntity + 1), 2), 0.0)
            , qScoreEntity      = iff(countSlicesEntity >= minSlicesPerEntity, round((toreal(numVec) - highPrcNumEntity)/(highPrcNumEntity - lowPrcNumEntity + 1), 2), 0.0)
            , zScoreScope       = iff(countSlicesScope >= minSlicesPerScope, round((toreal(numVec) - avgNumScope)/(sdNumScope + 1), 2), 0.0)
            , qScoreScope       = iff(countSlicesScope >= minSlicesPerScope, round((toreal(numVec) - highPrcNumScope)/(highPrcNumScope - lowPrcNumScope + 1), 2), 0.0)
    | extend isSpikeOnEntity    = iff((slicesInTrainingEntity >= minTrainingDaysThresh and zScoreEntity > zScoreThreshEntity and qScoreEntity > qScoreThreshEntity and numVec >= minNumValueThreshEntity), 1, 0)
            , entityHighBaseline= round(max_of((avgNumEntity + sdNumEntity), highPrcNumEntity), 2)
            , isSpikeOnScope    = iff((countSlicesScope >= minTrainingDaysThresh and zScoreScope > zScoreThreshScope and qScoreScope > qScoreThreshScope and numVec >= minNumValueThreshScope), 1, 0)
            , scopeHighBaseline = round(max_of((avgNumEntity + 2 * sdNumEntity), highPrcNumScope), 2)
    | extend entitySpikeAnomalyScore = iff(isSpikeOnEntity  == 1, round(1.0 - 0.25/(max_of(zScoreEntity, qScoreEntity)),4), 0.00)
            , scopeSpikeAnomalyScore = iff(isSpikeOnScope == 1, round(1.0 - 0.25/(max_of(zScoreScope, qScoreScope)), 4), 0.00)
    | where isSpikeOnEntity == 1 or isSpikeOnScope == 1
    | extend avgNumEntity   = round(avgNumEntity, 2), sdNumEntity = round(sdNumEntity, 2)
            , avgNumScope   = round(avgNumScope, 2), sdNumScope = round(sdNumScope, 2)
   | project-away entity1, scope1, scope2, scope3
   | extend anomalyType = iff(isSpikeOnEntity == 1, strcat('spike_', entityColumnName), strcat('spike_', scopeColumnName)), anomalyScore = max_of(entitySpikeAnomalyScore, scopeSpikeAnomalyScore)
   | extend anomalyExplainability = iff(isSpikeOnEntity == 1
        , strcat('The value of numeric variable ', numericColumnName, ' for ', entityColumnName, ' ', entity, ' is ', numVec, ', which is abnormally high for this '
            , entityColumnName, ' at this ', scopeColumnName
            , '. Based on observations from last ' , slicesInTrainingEntity, ' ', timePeriodBinSize, 's, the expected baseline value is below ', entityHighBaseline, '.')
        , strcat('The value of numeric variable ', numericColumnName, ' on ', scopeColumnName, ' ', scope, ' is ', numVec, ', which is abnormally high for this '
            , scopeColumnName, '. Based on observations from last ' , slicesInTrainingScope, ' ', timePeriodBinSize, 's, the expected baseline value is below ', scopeHighBaseline, '.'))
   | extend anomalyState = iff(isSpikeOnEntity == 1
        , bag_pack('avg', avgNumEntity, 'stdev', sdNumEntity, strcat('percentile_', lowPercentileForQscore), lowPrcNumEntity, strcat('percentile_', highPercentileForQscore), highPrcNumEntity)
        , bag_pack('avg', avgNumScope, 'stdev', sdNumScope, strcat('percentile_', lowPercentileForQscore), lowPrcNumScope, strcat('percentile_', highPercentileForQscore), highPrcNumScope))
   | project-away lowPrcNumEntity, highPrcNumEntity, lowPrcNumScope, highPrcNumScope
);
resultsData
};
// Write your query to use the function here.

示例

以下示例使用 invoke 运算符运行函数。

若要使用查询定义的函数,请在嵌入的函数定义后调用它。

let detect_anomalous_spike_fl = (T:(*), numericColumnName:string, entityColumnName:string, scopeColumnName:string
                            , timeColumnName:string, startTraining:datetime, startDetection:datetime, endDetection:datetime, minTrainingDaysThresh:int = 14
                            , lowPercentileForQscore:real = 0.25, highPercentileForQscore:real = 0.9
                            , minSlicesPerEntity:int = 20, zScoreThreshEntity:real = 3.0, qScoreThreshEntity:real = 2.0, minNumValueThreshEntity:long = 0
                            , minSlicesPerScope:int = 20, zScoreThreshScope:real = 3.0, qScoreThreshScope:real = 2.0, minNumValueThreshScope:long = 0)
{
// pre-process the input data by adding standard column names and dividing to datasets
let timePeriodBinSize = 'day';      // we assume a reasonable bin for time is day
let processedData = (
    T
    | extend scope      = column_ifexists(scopeColumnName, '')
    | extend entity     = column_ifexists(entityColumnName, '')
    | extend numVec     = tolong(column_ifexists(numericColumnName, 0))
    | extend sliceTime  = todatetime(column_ifexists(timeColumnName, ''))
    | where isnotempty(scope) and isnotempty(sliceTime)
    | extend dataSet = case((sliceTime >= startTraining and sliceTime < startDetection), 'trainSet'
                           , sliceTime >= startDetection and sliceTime <= endDetection,  'detectSet'
                                                                                       , 'other')
    | where dataSet in ('trainSet', 'detectSet')
);
let aggregatedCandidateScopeData = (
    processedData
    | summarize firstSeenScope = min(sliceTime), lastSeenScope = max(sliceTime) by scope
    | extend slicesInTrainingScope = datetime_diff(timePeriodBinSize, startDetection, firstSeenScope)
    | where slicesInTrainingScope >= minTrainingDaysThresh and lastSeenScope >= startDetection
);
let entityModelData = (
    processedData
    | join kind = inner (aggregatedCandidateScopeData) on scope
    | where dataSet == 'trainSet'
    | summarize countSlicesEntity = dcount(sliceTime), avgNumEntity = avg(numVec), sdNumEntity = stdev(numVec)
            , lowPrcNumEntity = percentile(numVec, lowPercentileForQscore), highPrcNumEntity = percentile(numVec, highPercentileForQscore)
            , firstSeenEntity = min(sliceTime), lastSeenEntity = max(sliceTime)
        by scope, entity
    | extend slicesInTrainingEntity = datetime_diff(timePeriodBinSize, startDetection, firstSeenEntity)
);
let scopeModelData = (
    processedData
    | join kind = inner (aggregatedCandidateScopeData) on scope
    | where dataSet == 'trainSet'
    | summarize countSlicesScope = dcount(sliceTime), avgNumScope = avg(numVec), sdNumScope = stdev(numVec)
            , lowPrcNumScope = percentile(numVec, lowPercentileForQscore), highPrcNumScope = percentile(numVec, highPercentileForQscore)
        by scope
);
let resultsData = (
    processedData
    | where dataSet == 'detectSet'
    | join kind = inner (aggregatedCandidateScopeData) on scope 
    | join kind = leftouter (entityModelData) on scope, entity 
    | join kind = leftouter (scopeModelData) on scope
    | extend zScoreEntity       = iff(countSlicesEntity >= minSlicesPerEntity, round((toreal(numVec) - avgNumEntity)/(sdNumEntity + 1), 2), 0.0)
            , qScoreEntity      = iff(countSlicesEntity >= minSlicesPerEntity, round((toreal(numVec) - highPrcNumEntity)/(highPrcNumEntity - lowPrcNumEntity + 1), 2), 0.0)
            , zScoreScope       = iff(countSlicesScope >= minSlicesPerScope, round((toreal(numVec) - avgNumScope)/(sdNumScope + 1), 2), 0.0)
            , qScoreScope       = iff(countSlicesScope >= minSlicesPerScope, round((toreal(numVec) - highPrcNumScope)/(highPrcNumScope - lowPrcNumScope + 1), 2), 0.0)
    | extend isSpikeOnEntity    = iff((slicesInTrainingEntity >= minTrainingDaysThresh and zScoreEntity > zScoreThreshEntity and qScoreEntity > qScoreThreshEntity and numVec >= minNumValueThreshEntity), 1, 0)
            , entityHighBaseline= round(max_of((avgNumEntity + sdNumEntity), highPrcNumEntity), 2)
            , isSpikeOnScope    = iff((countSlicesScope >= minTrainingDaysThresh and zScoreScope > zScoreThreshScope and qScoreScope > qScoreThreshScope and numVec >= minNumValueThreshScope), 1, 0)
            , scopeHighBaseline = round(max_of((avgNumEntity + 2 * sdNumEntity), highPrcNumScope), 2)
    | extend entitySpikeAnomalyScore = iff(isSpikeOnEntity  == 1, round(1.0 - 0.25/(max_of(zScoreEntity, qScoreEntity)),4), 0.00)
            , scopeSpikeAnomalyScore = iff(isSpikeOnScope == 1, round(1.0 - 0.25/(max_of(zScoreScope, qScoreScope)), 4), 0.00)
    | where isSpikeOnEntity == 1 or isSpikeOnScope == 1
    | extend avgNumEntity   = round(avgNumEntity, 2), sdNumEntity = round(sdNumEntity, 2)
            , avgNumScope   = round(avgNumScope, 2), sdNumScope = round(sdNumScope, 2)
   | project-away entity1, scope1, scope2, scope3
   | extend anomalyType = iff(isSpikeOnEntity == 1, strcat('spike_', entityColumnName), strcat('spike_', scopeColumnName)), anomalyScore = max_of(entitySpikeAnomalyScore, scopeSpikeAnomalyScore)
   | extend anomalyExplainability = iff(isSpikeOnEntity == 1
        , strcat('The value of numeric variable ', numericColumnName, ' for ', entityColumnName, ' ', entity, ' is ', numVec, ', which is abnormally high for this '
            , entityColumnName, ' at this ', scopeColumnName
            , '. Based on observations from last ' , slicesInTrainingEntity, ' ', timePeriodBinSize, 's, the expected baseline value is below ', entityHighBaseline, '.')
        , strcat('The value of numeric variable ', numericColumnName, ' on ', scopeColumnName, ' ', scope, ' is ', numVec, ', which is abnormally high for this '
            , scopeColumnName, '. Based on observations from last ' , slicesInTrainingScope, ' ', timePeriodBinSize, 's, the expected baseline value is below ', scopeHighBaseline, '.'))
   | extend anomalyState = iff(isSpikeOnEntity == 1
        , bag_pack('avg', avgNumEntity, 'stdev', sdNumEntity, strcat('percentile_', lowPercentileForQscore), lowPrcNumEntity, strcat('percentile_', highPercentileForQscore), highPrcNumEntity)
        , bag_pack('avg', avgNumScope, 'stdev', sdNumScope, strcat('percentile_', lowPercentileForQscore), lowPrcNumScope, strcat('percentile_', highPercentileForQscore), highPrcNumScope))
   | project-away lowPrcNumEntity, highPrcNumEntity, lowPrcNumScope, highPrcNumScope
);
resultsData
};
let detectPeriodStart   	= datetime(2022-04-30 05:00:00.0000000);
let trainPeriodStart    	= datetime(2022-03-01 05:00);
let names               	= pack_array("Admin", "Dev1", "Dev2", "IT-support");
let countNames          	= array_length(names);
let testData            	= range t from 1 to 24*60 step 1
    | extend timeSlice      = trainPeriodStart + 1h * t
    | extend countEvents    = round(2*rand() + iff((t/24)%7>=5, 10.0, 15.0) - (((t%24)/10)*((t%24)/10)), 2) * 100
    | extend userName       = tostring(names[toint(rand(countNames))])
    | extend deviceId       = hash_md5(rand())
    | extend accountName    = iff(((rand() < 0.2) and (timeSlice < detectPeriodStart)), 'testEnvironment', 'prodEnvironment')
    | extend userName       = iff(timeSlice == detectPeriodStart, 'H4ck3r', userName)
    | extend countEvents 	= iff(timeSlice == detectPeriodStart, 3*countEvents, countEvents)
    | sort by timeSlice desc
;    
testData
| invoke detect_anomalous_spike_fl(numericColumnName        = 'countEvents'
                                , entityColumnName          = 'userName'
                                , scopeColumnName           = 'accountName'
                                , timeColumnName            = 'timeSlice'
                                , startTraining             = trainPeriodStart
                                , startDetection            = detectPeriodStart
                                , endDetection              = detectPeriodStart
                            )

输出

t timeSlice countEvents userName deviceId accountName scope 实体 numVec sliceTime 数据集 firstSeenScope lastSeenScope slicesInTrainingScope countSlicesEntity avgNumEntity sdNumEntity firstSeenEntity lastSeenEntity slicesInTrainingEntity countSlicesScope avgNumScope sdNumScope zScoreEntity qScoreEntity zScoreScope qScoreScope isSpikeOnEntity entityHighBaseline isSpikeOnScope scopeHighBaseline entitySpikeAnomalyScore scopeSpikeAnomalyScore anomalyType anomalyScore anomalyExplainability anomalyState
1440 2022-04-30 05:00:00.0000000 5079 H4ck3r 9e8e151aced5a64938b93ee0c13fe940 prodEnvironment prodEnvironment H4ck3r 5079 2022-04-30 05:00:00.0000000 detectSet 2022-03-01 08:00:00.0000000 2022-04-30 05:00:00.0000000 60 1155 1363.22 267.51 0 0 13.84 185.46 0 1 628 0 0.9987 spike_accountName 0.9987 accountName prodEnvironment 上的数值变量 countEvents 的值为 5079,对于此 accountName 来说,此值异常高。 根据过去 60 天的观察,预期基线值低于 628.0。 {"avg": 1363.22,"stdev": 267.51,"percentile_0.25": 605,"percentile_0.9": 628}

运行该函数的输出是检测数据集中在范围或实体级别被标记为异常峰值的行。 为了清楚起见,添加了一些其他字段:

  • dataSet:当前数据集(始终为 detectSet)。
  • firstSeenScope:首次看到范围的时间戳。
  • lastSeenScope:上次看到范围的时间戳。
  • slicesInTrainingScope:范围在训练数据集中存在的切片数(例如天数)。
  • countSlicesEntity:实体在范围内存在的切片数(例如天数)。
  • avgNumEntity:范围内每个实体的训练集中数值变量的平均值。
  • sdNumEntity:范围内每个实体的训练集中数值变量的标准偏差。
  • firstSeenEntity:在范围内首次看到实体的时间戳。
  • lastSeenEntity:在范围内上次看到实体的时间戳。
  • slicesInTrainingEntity:实体在训练数据集范围内存在的切片数(例如天数)。
  • countSlicesScope:范围存在的切片数(例如天数)。
  • avgNumScope:每个范围的训练集中数值变量的平均值。
  • sdNumScope:每个范围的训练集中数值变量的标准偏差。
  • zScoreEntity:基于实体模型的数值变量的当前值的 Z 分数。
  • qScoreEntity:基于实体模型的数值变量的当前值的 Q 分数。
  • zScoreScope:基于范围模型的数值变量的当前值的 Z 分数。
  • qScoreScope:基于范围模型的数值变量的当前值的 Q 分数。
  • isSpikeOnEntity:基于实体模型的异常峰值的二进制标志。
  • entityHighBaseline:基于实体模型的数值变量值的预期高基线。
  • isSpikeOnScope:基于范围模型的异常峰值的二进制标志。
  • scopeHighBaseline:基于范围模型的数值变量值的预期高基线。
  • entitySpikeAnomalyScore:基于实体模型的峰值异常分数;范围为 [0,1] 内的数字,值越高表示异常越多。
  • scopeSpikeAnomalyScore:基于范围模型的峰值异常分数;范围为 [0,1] 内的数字,值越高表示异常越多。
  • anomalyType:显示异常类型(在同时运行多个异常情况检测逻辑时很有用)。
  • anomalyScore:基于所选模型的峰值的异常分数。
  • anomalyExplainability:所生成异常的文本包装及其解释。
  • anomalyState:来自所选模型的描述模型的指标包(平均值、标准偏差和百分位数)。

上面的示例中,使用用户作为实体、帐户作为范围并使用默认参数在 countEvents 变量上运行此函数会检测到范围级别的峰值。 由于用户“H4ck3r”在训练期间没有足够的数据,因此不会计算实体级别的异常,并且所有相关字段都是空的。 范围级异常的异常分数为 0.998,这意味着此峰值对于范围来说是异常的。

如果我们将任何最低阈值提高到足够高,则不会检测到任何异常,因为要求太高。

输出以标准格式显示具有异常峰值的行以及说明字段。 这些字段可用于调查异常情况以及对多个数值变量运行异常情况峰值检测或同时运行其他算法。

网络安全上下文中的建议用法是根据有意义的范围(例如帐户上的订阅)和实体(例如用户或设备)对有意义的数值变量(下载的数据量、上传的文件数或登录尝试失败次数)运行函数。 检测到的异常峰值意味着数值高于该范围或实体的预期值,并且可能可疑。