detect_anomalous_access_cf_fl()

适用于:✅Azure 数据资源管理器Azure MonitorMicrosoft Sentinel

使用协作筛选 (CF) 模型检测异常访问,该模型标识时间戳数据中的异常访问模式。

detect_anomalous_access_cf_fl() 函数是 一个用户定义的函数(UDF), 它应用协作筛选(CF)模型来检测异常交互,例如实体资源。 例如,基于时间戳数据(例如访问日志)访问存储帐户的用户主体名称(UPN)。 在网络安全上下文中,此函数有助于检测异常或未经授权的访问模式。

基于 CF 的模型使用项相似性预测访问分数,利用实体和资源之间的历史访问模式和余弦相似性。 它估计某个实体在给定范围内定义的检测期间访问资源的概率,例如订阅或帐户。 多个可选参数(包括最小阈值)允许自定义模型的行为。

模型输出 [0, 1] 范围内的访问异常分数,其中 0 表示合法访问的可能性很高,1 表示高度异常的访问。 除了访问异常分数之外,该函数还返回二进制异常标志(基于定义的阈值)和其他解释字段。

语法

detect_anomalous_access_cf_fl( entityColumnName, resourceColumnNamescopeColumnNametimeColumnNamestartTrainingstartDetection, endDetection, [anomalyScoreThresh] )

详细了解语法约定

参数

名称 类型 必选 DESCRIPTION
entityColumnName string ✔️ 包含计算 cf 模型的实体名称或 ID 的输入表列的名称。
resourceColumnName string ✔️ 包含计算模型的资源名称或 ID 的输入表列的名称。
scopeColumnName string ✔️ 输入表列的名称,其中包含分区或范围,以便为每个范围生成不同的异常模型。
timeColumnName string ✔️ 输入表列的名称,其中包含用于定义训练和检测周期的时间戳。
startTraining datetime ✔️ 异常模型的训练期的开始。 它的结束由检测周期的开始来定义。
startDetection datetime ✔️ 异常情况检测的检测周期的开始。
endDetection datetime ✔️ 异常情况检测的检测周期的结束。
anomalyScoreThresh real 检测到异常的异常分数的最大值,范围为 [0, 1]。 较高的值表示仅将更重要的情况视为异常情况,因此检测到的异常就更少(精度较高、召回率较低)。 默认值为 0.9。

函数定义

可以通过将函数代码嵌入为查询定义的函数,或将其创建为数据库中的存储函数来定义函数,如下所示:

使用以下 let 语句定义函数。 不需要任何权限。

重要

let 语句 不能自行运行。 它必须后跟 表格表达式语句。 若要运行 detect_anomalous_access_cf_fl()的工作示例,请参阅 示例

let detect_anomalous_access_cf_fl = (T:(*), entityColumnName:string, resourceColumnName:string, scopeColumnName:string
                                          , timeColumnName:string, startTraining:datetime, startDetection:datetime, endDetection:datetime
                                          , anomalyScoreThresh:real = 0.9)
{
//pre-process the input data by adding standard column names and dividing to datasets
let processedData = (
    T
    | extend entity     = column_ifexists(entityColumnName, '')
    | extend resource   = column_ifexists(resourceColumnName, '')
    | extend scope      = column_ifexists(scopeColumnName, '')
    | extend sliceTime  = todatetime(column_ifexists(timeColumnName, ''))
    | where isnotempty(scope) and isnotempty(entity) and isnotempty(resource) and isnotempty(sliceTime)
    | extend dataSet = case((sliceTime >= startTraining and sliceTime < startDetection), 'trainSet'
                           , sliceTime >= startDetection and sliceTime <= endDetection ,  'detectSet'
                                                                                       , 'other')
    | where dataSet in ('trainSet', 'detectSet')
);
// Create all possible pairs (entity, resource) with the same scope
let entities = (
    processedData
    | where dataSet == 'trainSet'
    | summarize by entity, scope
    | extend temp = 1
);
let resources = (
    processedData
    | where dataSet == 'trainSet'
    | summarize by resource, scope
    | extend temp = 1
);
let potentialAccessTrainData = (
    entities
    | join kind=inner resources on temp
    | distinct  entity, resource, scope
);
let accessTrainData = (
    potentialAccessTrainData
    | join kind=leftouter hint.strategy=broadcast (processedData | where dataSet =='trainSet') on entity, resource, scope
    | extend usedOperation = iff(isempty(resource1), 0, 1)
    | distinct entity, resource, scope, usedOperation
);
// Aggregate interaction scores per item into a list to prepare for similarity calculations
// Add a temporary key for self-joining later in the process
let ItemUserInteractions = (
    accessTrainData
    | summarize interactList = make_list(usedOperation) by resource, scope
    | extend tempKey=1
);
// Compute item-to-item similarity using cosine similarity
let ItemSimilarities = (
    ItemUserInteractions
    | join kind=inner (ItemUserInteractions) on tempKey
    | where scope == scope1
    | extend similarity = series_cosine_similarity(interactList, interactList1)
    | extend similarity = iff(isnan(similarity), 0.0, similarity)
    | project resource, resource1, scope, similarity
);
// Predict user-item interactions based on item similarities
let Predictions = (
    accessTrainData
    | join kind=inner (ItemSimilarities) on scope and $left.resource == $right.resource1
    | project entity, resource=resource2, usedOperation, similarity
    | summarize accessAnomalyScore = sum(usedOperation * similarity) / sum(abs(similarity)) by entity, resource
    | extend accessAnomalyScore = iff(isnan(accessAnomalyScore), 0.0, accessAnomalyScore)
    | extend accessAnomalyScore = 1 - accessAnomalyScore
    | extend accessAnomalyScore = round(accessAnomalyScore, 4)
    | join kind=inner accessTrainData on entity, resource
    | project entity, resource, scope, usedOperation, accessAnomalyScore
    | extend accessAnomalyScore = iff(usedOperation == 0.0, accessAnomalyScore, todouble(usedOperation))
    | order by entity asc, resource
);
let resultsData = (
    processedData
    | where dataSet == "detectSet"
    | join kind=leftouter Predictions on entity, resource, scope
    | extend isAnomalousAccess = iff(accessAnomalyScore > anomalyScoreThresh, 1, 0)
    | project-away sliceTime, entity1, resource1, scope1, usedOperation
);
resultsData
};
// Write your query to use the function here.

示例:

以下示例使用 调用运算符 来运行函数。

若要使用查询定义的函数,请调用嵌入的函数定义之后。

let detect_anomalous_access_cf_fl = (T:(*), entityColumnName:string, resourceColumnName:string, scopeColumnName:string
                                                , timeColumnName:string, startTraining:datetime, startDetection:datetime, endDetection:datetime
                                                , anomalyScoreThresh:real = 0.9)
{
//pre-process the input data by adding standard column names and dividing to datasets
let processedData = (
    T
    | extend entity     = column_ifexists(entityColumnName, '')
    | extend resource = column_ifexists(resourceColumnName, '')
    | extend scope      = column_ifexists(scopeColumnName, '')
    | extend sliceTime  = todatetime(column_ifexists(timeColumnName, ''))
    | where isnotempty(scope) and isnotempty(entity) and isnotempty(resource) and isnotempty(sliceTime)
    | extend dataSet = case((sliceTime >= startTraining and sliceTime < startDetection), 'trainSet'
                           , sliceTime >= startDetection and sliceTime <= endDetection,  'detectSet'
                                                                                       , 'other')
    | where dataSet in ('trainSet', 'detectSet')
);
// Create all possible pairs (entity, resource) with the same scope
let entities = (
    processedData
    | where dataSet == 'trainSet'
    | summarize by entity, scope
    | extend temp = 1
);
let resources = (
    processedData
    | where dataSet == 'trainSet'
    | summarize by resource, scope
    | extend temp = 1
);
let potentialAccessTrainData = (
    entities
    | join kind=inner resources on temp
    | distinct  entity, resource, scope
);
let accessTrainData = (
    potentialAccessTrainData
    | join kind=leftouter hint.strategy=broadcast (processedData | where dataSet =='trainSet') on entity, resource, scope
    | extend usedOperation = iff(isempty(resource1), 0, 1)
    | distinct entity, resource, scope, usedOperation
);
// Aggregate interaction scores per item into a list to prepare for similarity calculations
// Add a temporary key for self-joining later in the process
let ItemUserInteractions = (
    accessTrainData
    | summarize interactList = make_list(usedOperation) by resource, scope
    | extend tempKey=1
);
// Compute item-to-item similarity using cosine similarity
let ItemSimilarities = (
    ItemUserInteractions
    | join kind=inner (ItemUserInteractions) on tempKey
    | where scope == scope1
    | extend similarity = series_cosine_similarity(interactList, interactList1)
    | extend similarity = iff(isnan(similarity), 0.0, similarity)
    | project resource, resource1, scope, similarity
);
// Predict user-item interactions based on item similarities
let Predictions = (
    accessTrainData
    | join kind=inner (ItemSimilarities) on scope and $left.resource == $right.resource1
    | project entity, resource=resource2, usedOperation, similarity
    | summarize accessAnomalyScore = sum(usedOperation * similarity) / sum(abs(similarity)) by entity, resource
    | extend accessAnomalyScore = iff(isnan(accessAnomalyScore), 0.0, accessAnomalyScore)
    | extend accessAnomalyScore = 1 - accessAnomalyScore
    | extend accessAnomalyScore = round(accessAnomalyScore, 4)
    | join kind=inner accessTrainData on entity, resource
    | project entity, resource, scope, usedOperation, accessAnomalyScore
    | extend accessAnomalyScore = iff(usedOperation == 0.0, accessAnomalyScore, todouble(usedOperation))
    | order by entity asc, resource
);
let resultsData = (
    processedData
    | where dataSet == "detectSet"
    | join kind=leftouter Predictions on entity, resource, scope
    | extend isAnomalousAccess = iff(accessAnomalyScore > anomalyScoreThresh, 1, 0)
    | project-away sliceTime, entity1, resource1, scope1, usedOperation
);
resultsData
};
// synthetic data generation
let detectPeriodStart   = datetime(2022-04-30 05:00);
let trainPeriodStart    = datetime(2022-03-01 05:00);
let names               = pack_array("Admin", "Dev1", "Dev2", "IT-support");
let countNames          = array_length(names);
let devices             = toscalar(range device_id from 1 to 51 step 1 | extend device = strcat("device", tostring(device_id)) | summarize devices_array = make_list(device));
let countDevices          = array_length(devices)-1;
let testData            = range t from 0 to 24*60 step 1
    | extend timeSlice      = trainPeriodStart + 1h * t
    | extend userName       = tostring(names[toint(rand(countNames))])
    | extend deviceId       = tostring(devices[toint(rand(countDevices))])
    | extend accountName    = iff(((rand() < 0.2) and (timeSlice < detectPeriodStart)), 'testEnvironment', 'prodEnvironment')
    | extend userName       = iff(timeSlice == trainPeriodStart, 'H4ck3r', userName)
    | extend deviceId       = iff(timeSlice == trainPeriodStart, 'device1', deviceId)
    | extend accountName    = iff(timeSlice == trainPeriodStart, 'prodEnvironment', accountName)
    | extend userName       = iff(timeSlice == detectPeriodStart, 'H4ck3r', userName)
    | extend deviceId       = iff(timeSlice == detectPeriodStart, 'device50', deviceId)
    | extend accountName    = iff(timeSlice == detectPeriodStart, 'prodEnvironment', accountName)
    | sort by timeSlice desc
;
testData
| invoke detect_anomalous_access_cf_fl(entityColumnName    = 'userName'
                                      , resourceColumnName = 'deviceId'
                                      , scopeColumnName    = 'accountName'
                                      , timeColumnName     = 'timeSlice'
                                      , startTraining      = trainPeriodStart
                                      , startDetection     = detectPeriodStart
                                      , endDetection       = detectPeriodStart
                                  )

输出

t timeSlice 用户名 设备ID 账户名称 实体 资源 范围 数据 accessAnomalyScore isAnomalousAccess
1440 2022-04-30 05:00:00.0000000 H4ck3r device50 prodEnvironment H4ck3r device50 prodEnvironment detectSet 0.982 1

运行函数的输出显示检测期间每个异常实体资源访问事件,筛选出预测访问概率(基于协作筛选)高于定义的异常阈值(默认情况下为 0.9)。 为了清楚起见,添加了其他字段:

  • dataSet:当前数据集(始终为 detectSet)。
  • accessAnomalyScore:基于协作筛选建模的此访问的预测访问异常分数。 该值在 [0, 1] 范围内,较高的值表示异常程度较高。
  • isAnomalousAccess:异常访问的二进制标志

运行具有默认参数的函数会将用户“H4ck3r”访问尝试标记为“prodEnvironment”帐户中的设备“device50”。 预测的访问异常分数为 0.982,这非常高,表明根据历史模式训练的模型,此访问是意外的。

在训练期间,协作筛选模型了解了范围内用户和设备之间的访问模式。 由于没有观察到访问“device50”的“H4ck3r”,并且被认为在历史数据中不太可能,因此它被标记为异常。

输出表将这些异常访问与预测访问分数一起呈现。 这些字段可用于进一步调查、警报或与更广泛的检测工作流集成。

网络安全上下文中建议的用法是监视重要实体,例如用户名或 IP,访问其相应范围内的重要资源,例如设备、数据库或应用程序(例如帐户或订阅)。