使用协作筛选 (CF) 模型检测异常访问,该模型标识时间戳数据中的异常访问模式。
该 detect_anomalous_access_cf_fl()
函数是 一个用户定义的函数(UDF), 它应用协作筛选(CF)模型来检测异常交互,例如实体资源。 例如,基于时间戳数据(例如访问日志)访问存储帐户的用户主体名称(UPN)。 在网络安全上下文中,此函数有助于检测异常或未经授权的访问模式。
基于 CF 的模型使用项相似性预测访问分数,利用实体和资源之间的历史访问模式和余弦相似性。 它估计某个实体在给定范围内定义的检测期间访问资源的概率,例如订阅或帐户。 多个可选参数(包括最小阈值)允许自定义模型的行为。
模型输出 [0, 1] 范围内的访问异常分数,其中 0 表示合法访问的可能性很高,1 表示高度异常的访问。 除了访问异常分数之外,该函数还返回二进制异常标志(基于定义的阈值)和其他解释字段。
语法
detect_anomalous_access_cf_fl(
entityColumnName, resourceColumnName, scopeColumnName, timeColumnName, startTraining, startDetection, endDetection, [anomalyScoreThresh] )
详细了解语法约定。
参数
名称 | 类型 | 必选 | DESCRIPTION |
---|---|---|---|
entityColumnName | string |
✔️ | 包含计算 cf 模型的实体名称或 ID 的输入表列的名称。 |
resourceColumnName | string |
✔️ | 包含计算模型的资源名称或 ID 的输入表列的名称。 |
scopeColumnName | string |
✔️ | 输入表列的名称,其中包含分区或范围,以便为每个范围生成不同的异常模型。 |
timeColumnName | string |
✔️ | 输入表列的名称,其中包含用于定义训练和检测周期的时间戳。 |
startTraining | datetime |
✔️ | 异常模型的训练期的开始。 它的结束由检测周期的开始来定义。 |
startDetection | datetime |
✔️ | 异常情况检测的检测周期的开始。 |
endDetection | datetime |
✔️ | 异常情况检测的检测周期的结束。 |
anomalyScoreThresh | real |
检测到异常的异常分数的最大值,范围为 [0, 1]。 较高的值表示仅将更重要的情况视为异常情况,因此检测到的异常就更少(精度较高、召回率较低)。 默认值为 0.9。 |
函数定义
可以通过将函数代码嵌入为查询定义的函数,或将其创建为数据库中的存储函数来定义函数,如下所示:
- 查询定义的
- 存储
使用以下 let 语句定义函数。 不需要任何权限。
let detect_anomalous_access_cf_fl = (T:(*), entityColumnName:string, resourceColumnName:string, scopeColumnName:string
, timeColumnName:string, startTraining:datetime, startDetection:datetime, endDetection:datetime
, anomalyScoreThresh:real = 0.9)
{
//pre-process the input data by adding standard column names and dividing to datasets
let processedData = (
T
| extend entity = column_ifexists(entityColumnName, '')
| extend resource = column_ifexists(resourceColumnName, '')
| extend scope = column_ifexists(scopeColumnName, '')
| extend sliceTime = todatetime(column_ifexists(timeColumnName, ''))
| where isnotempty(scope) and isnotempty(entity) and isnotempty(resource) and isnotempty(sliceTime)
| extend dataSet = case((sliceTime >= startTraining and sliceTime < startDetection), 'trainSet'
, sliceTime >= startDetection and sliceTime <= endDetection , 'detectSet'
, 'other')
| where dataSet in ('trainSet', 'detectSet')
);
// Create all possible pairs (entity, resource) with the same scope
let entities = (
processedData
| where dataSet == 'trainSet'
| summarize by entity, scope
| extend temp = 1
);
let resources = (
processedData
| where dataSet == 'trainSet'
| summarize by resource, scope
| extend temp = 1
);
let potentialAccessTrainData = (
entities
| join kind=inner resources on temp
| distinct entity, resource, scope
);
let accessTrainData = (
potentialAccessTrainData
| join kind=leftouter hint.strategy=broadcast (processedData | where dataSet =='trainSet') on entity, resource, scope
| extend usedOperation = iff(isempty(resource1), 0, 1)
| distinct entity, resource, scope, usedOperation
);
// Aggregate interaction scores per item into a list to prepare for similarity calculations
// Add a temporary key for self-joining later in the process
let ItemUserInteractions = (
accessTrainData
| summarize interactList = make_list(usedOperation) by resource, scope
| extend tempKey=1
);
// Compute item-to-item similarity using cosine similarity
let ItemSimilarities = (
ItemUserInteractions
| join kind=inner (ItemUserInteractions) on tempKey
| where scope == scope1
| extend similarity = series_cosine_similarity(interactList, interactList1)
| extend similarity = iff(isnan(similarity), 0.0, similarity)
| project resource, resource1, scope, similarity
);
// Predict user-item interactions based on item similarities
let Predictions = (
accessTrainData
| join kind=inner (ItemSimilarities) on scope and $left.resource == $right.resource1
| project entity, resource=resource2, usedOperation, similarity
| summarize accessAnomalyScore = sum(usedOperation * similarity) / sum(abs(similarity)) by entity, resource
| extend accessAnomalyScore = iff(isnan(accessAnomalyScore), 0.0, accessAnomalyScore)
| extend accessAnomalyScore = 1 - accessAnomalyScore
| extend accessAnomalyScore = round(accessAnomalyScore, 4)
| join kind=inner accessTrainData on entity, resource
| project entity, resource, scope, usedOperation, accessAnomalyScore
| extend accessAnomalyScore = iff(usedOperation == 0.0, accessAnomalyScore, todouble(usedOperation))
| order by entity asc, resource
);
let resultsData = (
processedData
| where dataSet == "detectSet"
| join kind=leftouter Predictions on entity, resource, scope
| extend isAnomalousAccess = iff(accessAnomalyScore > anomalyScoreThresh, 1, 0)
| project-away sliceTime, entity1, resource1, scope1, usedOperation
);
resultsData
};
// Write your query to use the function here.
示例:
以下示例使用 调用运算符 来运行函数。
- 查询定义的
- 存储
若要使用查询定义的函数,请调用嵌入的函数定义之后。
let detect_anomalous_access_cf_fl = (T:(*), entityColumnName:string, resourceColumnName:string, scopeColumnName:string
, timeColumnName:string, startTraining:datetime, startDetection:datetime, endDetection:datetime
, anomalyScoreThresh:real = 0.9)
{
//pre-process the input data by adding standard column names and dividing to datasets
let processedData = (
T
| extend entity = column_ifexists(entityColumnName, '')
| extend resource = column_ifexists(resourceColumnName, '')
| extend scope = column_ifexists(scopeColumnName, '')
| extend sliceTime = todatetime(column_ifexists(timeColumnName, ''))
| where isnotempty(scope) and isnotempty(entity) and isnotempty(resource) and isnotempty(sliceTime)
| extend dataSet = case((sliceTime >= startTraining and sliceTime < startDetection), 'trainSet'
, sliceTime >= startDetection and sliceTime <= endDetection, 'detectSet'
, 'other')
| where dataSet in ('trainSet', 'detectSet')
);
// Create all possible pairs (entity, resource) with the same scope
let entities = (
processedData
| where dataSet == 'trainSet'
| summarize by entity, scope
| extend temp = 1
);
let resources = (
processedData
| where dataSet == 'trainSet'
| summarize by resource, scope
| extend temp = 1
);
let potentialAccessTrainData = (
entities
| join kind=inner resources on temp
| distinct entity, resource, scope
);
let accessTrainData = (
potentialAccessTrainData
| join kind=leftouter hint.strategy=broadcast (processedData | where dataSet =='trainSet') on entity, resource, scope
| extend usedOperation = iff(isempty(resource1), 0, 1)
| distinct entity, resource, scope, usedOperation
);
// Aggregate interaction scores per item into a list to prepare for similarity calculations
// Add a temporary key for self-joining later in the process
let ItemUserInteractions = (
accessTrainData
| summarize interactList = make_list(usedOperation) by resource, scope
| extend tempKey=1
);
// Compute item-to-item similarity using cosine similarity
let ItemSimilarities = (
ItemUserInteractions
| join kind=inner (ItemUserInteractions) on tempKey
| where scope == scope1
| extend similarity = series_cosine_similarity(interactList, interactList1)
| extend similarity = iff(isnan(similarity), 0.0, similarity)
| project resource, resource1, scope, similarity
);
// Predict user-item interactions based on item similarities
let Predictions = (
accessTrainData
| join kind=inner (ItemSimilarities) on scope and $left.resource == $right.resource1
| project entity, resource=resource2, usedOperation, similarity
| summarize accessAnomalyScore = sum(usedOperation * similarity) / sum(abs(similarity)) by entity, resource
| extend accessAnomalyScore = iff(isnan(accessAnomalyScore), 0.0, accessAnomalyScore)
| extend accessAnomalyScore = 1 - accessAnomalyScore
| extend accessAnomalyScore = round(accessAnomalyScore, 4)
| join kind=inner accessTrainData on entity, resource
| project entity, resource, scope, usedOperation, accessAnomalyScore
| extend accessAnomalyScore = iff(usedOperation == 0.0, accessAnomalyScore, todouble(usedOperation))
| order by entity asc, resource
);
let resultsData = (
processedData
| where dataSet == "detectSet"
| join kind=leftouter Predictions on entity, resource, scope
| extend isAnomalousAccess = iff(accessAnomalyScore > anomalyScoreThresh, 1, 0)
| project-away sliceTime, entity1, resource1, scope1, usedOperation
);
resultsData
};
// synthetic data generation
let detectPeriodStart = datetime(2022-04-30 05:00);
let trainPeriodStart = datetime(2022-03-01 05:00);
let names = pack_array("Admin", "Dev1", "Dev2", "IT-support");
let countNames = array_length(names);
let devices = toscalar(range device_id from 1 to 51 step 1 | extend device = strcat("device", tostring(device_id)) | summarize devices_array = make_list(device));
let countDevices = array_length(devices)-1;
let testData = range t from 0 to 24*60 step 1
| extend timeSlice = trainPeriodStart + 1h * t
| extend userName = tostring(names[toint(rand(countNames))])
| extend deviceId = tostring(devices[toint(rand(countDevices))])
| extend accountName = iff(((rand() < 0.2) and (timeSlice < detectPeriodStart)), 'testEnvironment', 'prodEnvironment')
| extend userName = iff(timeSlice == trainPeriodStart, 'H4ck3r', userName)
| extend deviceId = iff(timeSlice == trainPeriodStart, 'device1', deviceId)
| extend accountName = iff(timeSlice == trainPeriodStart, 'prodEnvironment', accountName)
| extend userName = iff(timeSlice == detectPeriodStart, 'H4ck3r', userName)
| extend deviceId = iff(timeSlice == detectPeriodStart, 'device50', deviceId)
| extend accountName = iff(timeSlice == detectPeriodStart, 'prodEnvironment', accountName)
| sort by timeSlice desc
;
testData
| invoke detect_anomalous_access_cf_fl(entityColumnName = 'userName'
, resourceColumnName = 'deviceId'
, scopeColumnName = 'accountName'
, timeColumnName = 'timeSlice'
, startTraining = trainPeriodStart
, startDetection = detectPeriodStart
, endDetection = detectPeriodStart
)
输出
t | timeSlice | 用户名 | 设备ID | 账户名称 | 实体 | 资源 | 范围 | 数据 | accessAnomalyScore | isAnomalousAccess |
---|---|---|---|---|---|---|---|---|---|---|
1440 | 2022-04-30 05:00:00.0000000 | H4ck3r | device50 | prodEnvironment | H4ck3r | device50 | prodEnvironment | detectSet | 0.982 | 1 |
运行函数的输出显示检测期间每个异常实体资源访问事件,筛选出预测访问概率(基于协作筛选)高于定义的异常阈值(默认情况下为 0.9)。 为了清楚起见,添加了其他字段:
-
dataSet
:当前数据集(始终为detectSet
)。 -
accessAnomalyScore
:基于协作筛选建模的此访问的预测访问异常分数。 该值在 [0, 1] 范围内,较高的值表示异常程度较高。 -
isAnomalousAccess
:异常访问的二进制标志
运行具有默认参数的函数会将用户“H4ck3r”访问尝试标记为“prodEnvironment”帐户中的设备“device50”。 预测的访问异常分数为 0.982,这非常高,表明根据历史模式训练的模型,此访问是意外的。
在训练期间,协作筛选模型了解了范围内用户和设备之间的访问模式。 由于没有观察到访问“device50”的“H4ck3r”,并且被认为在历史数据中不太可能,因此它被标记为异常。
输出表将这些异常访问与预测访问分数一起呈现。 这些字段可用于进一步调查、警报或与更广泛的检测工作流集成。
网络安全上下文中建议的用法是监视重要实体,例如用户名或 IP,访问其相应范围内的重要资源,例如设备、数据库或应用程序(例如帐户或订阅)。