根据路径或边缘数据计算源节点的爆炸半径(列表和分数)。
函数 graph_blast_radius_fl() 是 UDF(用户定义的函数),可用于根据路径或边缘数据计算每个源节点的爆破半径。 输入数据的每一行都包含一个源节点和一个目标节点,可以表示节点和目标之间的直接连接(边缘),或者两者之间更长的多跃点路径。 如果路径不可用,我们首先可以使用 图形匹配 运算符或 graph_path_discovery_fl() 函数发现它们。 然后可以在路径发现输出的基础上执行 graph_blast_radius_fl()。
爆炸半径表示特定源节点与相关目标的连接。 源能够访问的目标越多,遭到攻击者入侵后造成的影响就越大,爆炸半径因此得名。 具有高爆炸半径的节点在网络安全域中很重要,因为它们可能造成的潜在损害和攻击者高度重视。 因此,在强化和优先处理警报等安全信号方面,应相应地保护具有高爆炸半径的节点。
该函数输出每个源的连接目标列表以及代表目标数量的分数。 或者,如果每个目标都有一个有意义的“权重”(例如关键性或成本),则可以将加权分数计算为目标权重的总和。 此外,为了更好地进行控制,显示源的最大总数和每个列表中目标最大数量的限制将作为可选参数公开。
语法
graph_blast_radius_fl(sourceIdColumnName、targetIdColumnName、[targetWeightColumnName]、[resultCountLimit]、[listedIdsLimit])
详细了解语法约定。
参数
| 客户 | 类型 | 必需 | 说明 |
|---|---|---|---|
| sourceIdColumnName | string |
✔️ | 包含源节点 ID 的列的名称(边缘或路径)。 |
| targetIdColumnName | string |
✔️ | 包含目标节点 ID 的列的名称(边缘或路径)。 |
| targetWeightColumnName | string |
包含目标节点权重(例如关键性)的列名称。 如果不存在相关权重,则加权分数为 0。 默认列名为 noWeightsColumn。 | |
| resultCountLimit | long |
返回的最大行数(按分数降序排序)。 默认值为 100000。 | |
| listedIdsLimit | long |
为每个源列出的最大目标数。 默认值为 50。 |
函数定义
可以通过将函数的代码嵌入为查询定义的函数,或将其创建为数据库中的存储函数来定义函数,如下所示:
使用以下 let 语句定义函数。 不需要任何权限。
let graph_blast_radius_fl = (T:(*), sourceIdColumnName:string, targetIdColumnName:string, targetWeightColumnName:string = 'noWeightsColumn'
, resultCountLimit:long = 100000, listedIdsLimit:long = 50)
{
let paths = (
T
| extend sourceId = column_ifexists(sourceIdColumnName, '')
| extend targetId = column_ifexists(targetIdColumnName, '')
| extend targetWeight = tolong(column_ifexists(targetWeightColumnName, 0))
);
let aggregatedPaths = (
paths
| sort by sourceId, targetWeight desc
| summarize blastRadiusList = array_slice(make_set_if(targetId, isnotempty(targetId)), 0, (listedIdsLimit - 1))
, blastRadiusScore = dcountif(targetId, isnotempty(targetId))
, blastRadiusScoreWeighted = sum(targetWeight)
by sourceId
| extend isBlastRadiusListCapped = (blastRadiusScore > listedIdsLimit)
);
aggregatedPaths
| top resultCountLimit by blastRadiusScore desc
};
// Write your query to use the function here.
示例
以下示例使用 invoke 运算符运行函数。
若要使用查询定义的函数,请在嵌入的函数定义后调用它。
let connections = datatable (SourceNodeName:string, TargetNodeName:string, TargetNodeCriticality:int)[
'vm-work-1', 'webapp-prd', 3,
'vm-custom', 'webapp-prd', 3,
'webapp-prd', 'vm-custom', 1,
'webapp-prd', 'test-machine', 1,
'vm-custom', 'server-0126', 1,
'vm-custom', 'hub_router', 2,
'webapp-prd', 'hub_router', 2,
'test-machine', 'vm-custom', 1,
'test-machine', 'hub_router', 2,
'hub_router', 'remote_DT', 1,
'vm-work-1', 'storage_main_backup', 5,
'hub_router', 'vm-work-2', 1,
'vm-work-2', 'backup_prc', 3,
'remote_DT', 'backup_prc', 3,
'backup_prc', 'storage_main_backup', 5,
'backup_prc', 'storage_DevBox', 1,
'device_A1', 'sevice_B2', 2,
'sevice_B2', 'device_A1', 2
];
let graph_blast_radius_fl = (T:(*), sourceIdColumnName:string, targetIdColumnName:string, targetWeightColumnName:string = 'noWeightsColumn'
, resultCountLimit:long = 100000, listedIdsLimit:long = 50)
{
let paths = (
T
| extend sourceId = column_ifexists(sourceIdColumnName, '')
| extend targetId = column_ifexists(targetIdColumnName, '')
| extend targetWeight = tolong(column_ifexists(targetWeightColumnName, 0))
);
let aggregatedPaths = (
paths
| sort by sourceId, targetWeight desc
| summarize blastRadiusList = array_slice(make_set_if(targetId, isnotempty(targetId)), 0, (listedIdsLimit - 1))
, blastRadiusScore = dcountif(targetId, isnotempty(targetId))
, blastRadiusScoreWeighted = sum(targetWeight)
by sourceId
| extend isBlastRadiusListCapped = (blastRadiusScore > listedIdsLimit)
);
aggregatedPaths
| top resultCountLimit by blastRadiusScore desc
};
connections
| invoke graph_blast_radius_fl(sourceIdColumnName = 'SourceNodeName'
, targetIdColumnName = 'TargetNodeName'
, targetWeightColumnName = 'TargetNodeCriticality'
)
输出
| sourceId | blastRadiusList | blastRadiusScore | blastRadiusScoreWeighted | isBlastRadiusListCapped |
|---|---|---|---|---|
| webapp-prd | [“vm-custom”,“test-machine”,“hub_router”] | 3 | 4 | 假 |
| vm-custom | [“webapp-prd”,“server-0126”,“hub_router”] | 3 | 6 | 假 |
| test-machine | [“vm-custom”,“hub_router”] | 2 | 3 | 假 |
| vm-work-1 | [“webapp-prd”,“storage_main_backup”] | 2 | 8 | 假 |
| backup_prc | [“storage_main_backup”,“storage_DevBox”] | 2 | 6 | 假 |
| hub_router | [“remote_DT”,“vm-work-2”] | 2 | 2 | 假 |
| vm-work-2 | [“backup_prc”] | 1 | 3 | 假 |
| device_A1 | [“sevice_B2”] | 1 | 2 | 假 |
| remote_DT | [“backup_prc”] | 1 | 3 | 假 |
| sevice_B2 | [“device_A1”] | 1 | 2 | 假 |
运行函数按源聚合源与目标之间的连接或路径。 对于每个源,爆炸半径将连接目标表示为分数(常规分数和加权分数)和列表。
输出的每一行包含以下字段:
-
sourceId:从相关列获取的源节点 ID。 -
blastRadiusList:源节点连接到的目标节点 ID 列表(取自相关列)。 列表的最大长度限制 listedIdsLimit 参数。 -
blastRadiusScore:分数是源连接到的目标节点计数。 高爆炸半径分数表示源节点可能访问大量目标,应予以相应处理。 -
blastRadiusScoreWeighted:加权分数是可选目标节点权重列的总和,表示其值(例如关键性或成本)。 如果存在此类权重,则由于可能会访问高值目标,加权爆炸半径分数可能是源节点值的更准确的指标。 -
isBlastRadiusListCapped:布尔标志,表示目标列表是否受 listedIdsLimit 参数限制。 如果受此参数限制,则除了列出的目标之外,还可以从源访问其他目标(最多可达 blastRadiusScore 的数量)。
在上面的示例中,我们在源和目标之间的连接基础上运行 graph_blast_radius_fl() 函数。 在输出的第一行中,可以看到源节点“webapp-prd”已连接到三个目标(“vm-custom”、“test-machine”、“hub_router”)。 我们将输入数据 TargetNodeCriticality 列用作目标权重,并得到累积权重为 4。 此外,由于目标数量为 3 且默认列表限制为 50,将显示所有目标,所以 isBlastRadiusListCapped 列的值为 FALSE。
如果多跃点路径不可用,可以在源和目标之间构建多跃点路径(例如,运行 “graph_path_discovery_fl()”),并在结果的基础上运行“graph_blast_radius_fl()”。
输出看起来类似,但表示通过多跃点路径计算的爆破半径,从而更好地指示源节点与相关目标的真实连接。 为了查找源和目标场景(例如中断)之间的完整路径,graph_path_discovery_fl() 函数可与相关源和目标节点上的筛选器一起使用。
函数 graph_blast_radius_fl() 可用于根据直接边缘或较长的路径计算源节点的爆炸半径。 在网络安全域中,它可以提供多个见解。 爆炸半径分数(常规和加权)表示源节点从防御者和攻击者的角度的重要性。 应相应地保护具有高爆炸半径的节点,例如,在访问强化和漏洞管理方面。 应确定此类节点上的警报等安全信号的优先级。 应监视爆炸半径列表,以查找源和目标之间的意外连接,并在中断方案中使用。 例如,如果源遭到入侵,则它与重要目标之间的连接应断开。