通过图形边缘和节点数据计算节点中心(例如度和间隔)的指标。
函数 graph_node_centrality_fl()
是 UDF(用户定义的函数),可用于计算图形数据上节点中心的各种指标。 图形数据由节点(例如资源、应用程序或用户)以及现有访问权限或连接等边缘组成。 节点的中心性表示其在图形结构中的重要性,可以通过多种方式进行定义和度量。 在网络安全中,中心性表示节点对攻击者的价值;破坏具有高中心性的节点(如连接良好的令牌)可提供更多机会。 对于防御者,高中心节点也很重要,应相应地进行保护。 中心直接通过边缘计算,以及通过发现的最短路径计算。 不同的安全上下文中,各种中心指标非常有用。
此函数的输入数据应以 SourceId, EdgeId, TargetId
格式包含边缘表,以及具有可选相关节点属性的节点列表。 或者,可以从其他类型的数据中提取图形输入。 例如,登录到资源 B 的用户 A 类型的流量日志可以建模为 (用户 A)-[已登录]->(资源 B)类型的边缘。 不同的用户和资源列表可以建模为节点。 作为函数的一部分,将计算最短路径并将其用作集中计算的输入。
进行了以下假设:
- 所有边缘都对路径发现有效。 在计算中心性之前,应筛选掉无关的边缘。
- 边缘不加权、独立且无条件,这意味着所有边缘的概率相同,从 B 移动到 C 并不依赖于以前的从 A 移动到 B。
- 中心指标通过边缘计算,以及简单的无周期定向最短路径,类型为 A->B->C。 可以通过更改函数中图形匹配运算符的内部语法,来实现更复杂的定义。
可以通过更改函数的内部逻辑,来根据需要调整这些假设。
该函数根据可选约束(如路径长度限制、最大输出大小等)发现有效源与有效目标之间的所有可能最短路径。 通过生成的路径和原始边缘计算各种中心指标,表示节点重要性的不同方面。 输出是使用 isValidConnectorColumnName 列标记为相关的节点列表,其中包含每个节点的中心性指标的其他列。 该函数仅使用必填字段,例如节点 ID 和边缘 ID。 可以通过更改函数定义将其他相关字段(例如类型、属性列表、安全相关分数或外部信号)添加到逻辑和输出中。
语法
graph_node_centrality_fl(
edgesTableName、nodesTableName、scopeColumnName、isValidPathStartColumnName、isValidPathEndColumnName、isValidConnectorColumnName、 nodeIdColumnName, edgeIdColumnName, sourceIdColumnName, targetIdColumnName, [minPathLength], [maxPathLength], [resultCountLimit])
参数
名字 | 类型 | 必选 | DESCRIPTION |
---|---|---|---|
edgesTableName | string |
✔️ | 包含图形边缘的输入表的名称。 |
nodesTableName | string |
✔️ | 包含图形节点的输入表的名称。 |
scopeColumnName | string |
✔️ | 节点和边缘表中包含分区或范围(例如订阅或帐户)中的列的名称,以便为每个范围生成不同的异常模型。 |
isValidPathStartColumnName | string |
✔️ | 节点表中包含节点布尔标志的列的名称,True 表示节点是路径的有效起点,False 表示不是有效的起点。 |
isValidPathEndColumnName | string |
✔️ | 节点表中包含节点布尔标志的列的名称,True 表示节点是路径的有效终点,False 表示不是有效的终点。 |
isValidConnectorColumnName | string |
✔️ | 节点表中包含节点布尔标志的列的名称,True 这意味着节点是输出中包含的有效连接器,False - 它不是有效的连接器。 |
nodeIdColumnName | string |
✔️ | 包含节点 ID 的节点表中的列的名称。 |
edgeIdColumnName | string |
✔️ | 边缘表中包含边缘 ID 的列的名称。 |
sourceIdColumnName | string |
✔️ | 边缘表中包含边缘的源节点 ID 的列的名称。 |
targetIdColumnName | string |
✔️ | 边缘表中包含边缘的目标节点 ID 的列的名称。 |
minPathLength | long |
路径中的最小步长(边缘)数。 默认值:1。 | |
maxPathLength | long |
路径中的最大步长(边缘)数。 默认值:8。 | |
resultCountLimit | long |
为输出返回的最大路径数。 默认值:100000。 |
函数定义
可以通过将函数的代码嵌入为查询定义的函数,或将其创建为数据库中的存储函数来定义函数,如下所示:
使用以下 let 语句定义函数。 不需要任何权限。
let graph_node_centrality_fl = ( edgesTableName:string, nodesTableName:string, scopeColumnName:string
, isValidPathStartColumnName:string, isValidPathEndColumnName:string, isValidConnectorColumnName:string
, nodeIdColumnName:string, edgeIdColumnName:string, sourceIdColumnName:string, targetIdColumnName:string
, minPathLength:long = 1, maxPathLength:long = 8, resultCountLimit:long = 100000)
{
let edges = (
table(edgesTableName)
| extend sourceId = column_ifexists(sourceIdColumnName, '')
| extend targetId = column_ifexists(targetIdColumnName, '')
| extend edgeId = column_ifexists(edgeIdColumnName, '')
| extend scope = column_ifexists(scopeColumnName, '')
);
let nodes = (
table(nodesTableName)
| extend nodeId = column_ifexists(nodeIdColumnName, '')
| extend isValidPathStart = column_ifexists(isValidPathStartColumnName, '')
| extend isValidPathEnd = column_ifexists(isValidPathEndColumnName, '')
| extend isNodeValidConnector = column_ifexists(isValidConnectorColumnName, '')
| extend scope = column_ifexists(scopeColumnName, '')
);
let potentialPairsOnScope = (
nodes
| summarize countSources = dcountif(nodeId, (isValidPathStart)), countTargets = dcountif(nodeId, (isValidPathEnd)) by scope
| project scope, countPotentialPairsOnScope = countSources * countTargets
);
let paths = (
edges
// Build graph object partitioned by scope, so that no connections are allowed between scopes.
// In case no scopes are relevant, partitioning should be removed for better performance.
| make-graph sourceId --> targetId with nodes on nodeId partitioned-by scope (
// Look for existing shortest paths between source nodes and target nodes with less than predefined number of hops.
// Current configurations looks for directed paths without any cycles; this can be changed if needed.
graph-shortest-paths output = all cycles = none (s)-[e*minPathLength..maxPathLength]->(t)
// Filter only by paths with that connect valid endpoints
where ((s.isValidPathStart) and (t.isValidPathEnd))
project sourceId = s.nodeId
, isSourceValidPathStart = s.isValidPathStart
, targetId = t.nodeId
, isTargetValidPathEnd = t.isValidPathEnd
, scope = s.scope
, edgeIds = e.edgeId
, innerNodeIds = map(inner_nodes(e), nodeId)
, innerNodeConnector = map(inner_nodes(e), isNodeValidConnector)
| limit resultCountLimit
)
| extend pathLength = array_length(edgeIds)
, pathEndpointsId = hash_md5(strcat(sourceId, targetId))
, pathId = hash_md5(strcat(sourceId, strcat(edgeIds), targetId))
);
let pathsProcessed = (
paths
| mv-expand with_itemindex = i innerNodeId = innerNodeIds to typeof(string), innerNodeConnector to typeof(bool)
| where (innerNodeConnector)
| summarize countShortestPathsThroughNode = count(), take_any(sourceId, targetId, pathLength) by scope, innerNodeId, pathEndpointsId
| join kind = leftouter (paths | summarize countShortestPaths = count() by scope, pathEndpointsId) on scope, pathEndpointsId
| project-away scope1, pathEndpointsId1
| extend betweennessForPair = (todouble(countShortestPathsThroughNode)/countShortestPaths)
| summarize betweenness = sum(betweennessForPair), countShortestPathsThroughNode = sum(countShortestPathsThroughNode)
, countPairsConnectedByNode = dcount(pathEndpointsId)
by scope, nodeId = innerNodeId
| join kind = leftouter (potentialPairsOnScope) on scope
| extend relativePrestige = round(todouble(countPairsConnectedByNode)/countPotentialPairsOnScope, 6)
| project scope, nodeId, betweenness, relativePrestige, countShortestPathsThroughNode, countPairsConnectedByNode
);
let centrality = (
nodes
| summarize take_any(*) by scope, nodeId
| where (isNodeValidConnector)
| join kind = leftouter (edges | summarize outDegree = dcount(targetId) by scope, sourceId) on scope, $left.nodeId == $right.sourceId
| join kind = leftouter (edges | summarize inDegree = dcount(sourceId) by scope, targetId) on scope, $left.nodeId == $right.targetId
| project-away scope1, scope2, sourceId, targetId
| extend inDegree = coalesce(inDegree, 0), outDegree = coalesce(outDegree, 0)
| extend totalDegree = inDegree * outDegree
| join kind = leftouter (paths | summarize sourceOutFlow = dcount(targetId) by scope, sourceId) on scope, $left.nodeId == $right.sourceId
| join kind = leftouter (paths | summarize sinkInFlow = dcount(sourceId) by scope, targetId) on scope, $left.nodeId == $right.targetId
| project-away scope1, scope2, sourceId, targetId
| extend sourceOutFlow = coalesce(sourceOutFlow, 0), sinkInFlow = coalesce(sinkInFlow, 0)
| join kind = leftouter (pathsProcessed) on scope, nodeId
| project-away scope1, nodeId1
| extend betweenness = coalesce(betweenness, 0.0), relativePrestige = coalesce(relativePrestige, 0.0)
, countShortestPathsThroughNode = coalesce(countShortestPathsThroughNode, 0), countPairsConnectedByNode = coalesce(countPairsConnectedByNode, 0)
);
centrality
};
// Write your query to use the function here.
示例:
以下示例使用 invoke 运算符运行函数。
若要使用查询定义的函数,请在嵌入的函数定义后调用它。
let edges = datatable (SourceNodeName:string, EdgeName:string, EdgeType:string, TargetNodeName:string, Region:string)[
'vm-work-1', 'e1', 'can use', 'webapp-prd', 'US',
'vm-custom', 'e2', 'can use', 'webapp-prd', 'US',
'webapp-prd', 'e3', 'can access', 'vm-custom', 'US',
'webapp-prd', 'e4', 'can access', 'test-machine', 'US',
'vm-custom', 'e5', 'can access', 'server-0126', 'US',
'vm-custom', 'e6', 'can access', 'hub_router', 'US',
'webapp-prd', 'e7', 'can access', 'hub_router', 'US',
'test-machine', 'e8', 'can access', 'vm-custom', 'US',
'test-machine', 'e9', 'can access', 'hub_router', 'US',
'hub_router', 'e10', 'routes traffic to', 'remote_DT', 'US',
'vm-work-1', 'e11', 'can access', 'storage_main_backup', 'US',
'hub_router', 'e12', 'routes traffic to', 'vm-work-2', 'US',
'vm-work-2', 'e13', 'can access', 'backup_prc', 'US',
'remote_DT', 'e14', 'can access', 'backup_prc', 'US',
'backup_prc', 'e15', 'moves data to', 'storage_main_backup', 'US',
'backup_prc', 'e16', 'moves data to', 'storage_DevBox', 'US',
'device_A1', 'e17', 'is connected to', 'device_B2', 'EU',
'device_B2', 'e18', 'is connected to', 'device_A1', 'EU'
];
let nodes = datatable (NodeName:string, NodeType:string, NodeEnvironment:string, Region:string) [
'vm-work-1', 'Virtual Machine', 'Production', 'US',
'vm-custom', 'Virtual Machine', 'Production', 'US',
'webapp-prd', 'Application', 'None', 'US',
'test-machine', 'Virtual Machine', 'Test', 'US',
'hub_router', 'Traffic Router', 'None', 'US',
'vm-work-2', 'Virtual Machine', 'Production', 'US',
'remote_DT', 'Virtual Machine', 'Production', 'US',
'backup_prc', 'Service', 'Production', 'US',
'server-0126', 'Server', 'Production', 'US',
'storage_main_backup', 'Cloud Storage', 'Production', 'US',
'storage_DevBox', 'Cloud Storage', 'Test', 'US',
'device_A1', 'Device', 'Backend', 'EU',
'device_B2', 'Device', 'Backend', 'EU'
];
let nodesEnriched = (
nodes
| extend IsValidStart = (NodeType in ('Virtual Machine'))
, IsValidEnd = (NodeType in ('Cloud Storage'))
| extend IsValidConnector = (NodeType in ('Application', 'Traffic Router', 'Service'))
);
let graph_node_centrality_fl = ( edgesTableName:string, nodesTableName:string, scopeColumnName:string
, isValidPathStartColumnName:string, isValidPathEndColumnName:string, isValidConnectorColumnName:string
, nodeIdColumnName:string, edgeIdColumnName:string, sourceIdColumnName:string, targetIdColumnName:string
, minPathLength:long = 1, maxPathLength:long = 8, resultCountLimit:long = 100000)
{
let edges = (
table(edgesTableName)
| extend sourceId = column_ifexists(sourceIdColumnName, '')
| extend targetId = column_ifexists(targetIdColumnName, '')
| extend edgeId = column_ifexists(edgeIdColumnName, '')
| extend scope = column_ifexists(scopeColumnName, '')
);
let nodes = (
table(nodesTableName)
| extend nodeId = column_ifexists(nodeIdColumnName, '')
| extend isValidPathStart = column_ifexists(isValidPathStartColumnName, '')
| extend isValidPathEnd = column_ifexists(isValidPathEndColumnName, '')
| extend isNodeValidConnector = column_ifexists(isValidConnectorColumnName, '')
| extend scope = column_ifexists(scopeColumnName, '')
);
let potentialPairsOnScope = (
nodes
| summarize countSources = dcountif(nodeId, (isValidPathStart)), countTargets = dcountif(nodeId, (isValidPathEnd)) by scope
| project scope, countPotentialPairsOnScope = countSources * countTargets
);
let paths = (
edges
// Build graph object partitioned by scope, so that no connections are allowed between scopes.
// In case no scopes are relevant, partitioning should be removed for better performance.
| make-graph sourceId --> targetId with nodes on nodeId partitioned-by scope (
// Look for existing shortest paths between source nodes and target nodes with less than predefined number of hops.
// Current configurations looks for directed paths without any cycles; this can be changed if needed.
graph-shortest-paths output = all cycles = none (s)-[e*minPathLength..maxPathLength]->(t)
// Filter only by paths with that connect valid endpoints
where ((s.isValidPathStart) and (t.isValidPathEnd))
project sourceId = s.nodeId
, isSourceValidPathStart = s.isValidPathStart
, targetId = t.nodeId
, isTargetValidPathEnd = t.isValidPathEnd
, scope = s.scope
, edgeIds = e.edgeId
, innerNodeIds = map(inner_nodes(e), nodeId)
, innerNodeConnector = map(inner_nodes(e), isNodeValidConnector)
| limit resultCountLimit
)
| extend pathLength = array_length(edgeIds)
, pathEndpointsId = hash_md5(strcat(sourceId, targetId))
, pathId = hash_md5(strcat(sourceId, strcat(edgeIds), targetId))
);
let pathsProcessed = (
paths
| mv-expand with_itemindex = i innerNodeId = innerNodeIds to typeof(string), innerNodeConnector to typeof(bool)
| where (innerNodeConnector)
| summarize countShortestPathsThroughNode = count(), take_any(sourceId, targetId, pathLength) by scope, innerNodeId, pathEndpointsId
| join kind = leftouter (paths | summarize countShortestPaths = count() by scope, pathEndpointsId) on scope, pathEndpointsId
| project-away scope1, pathEndpointsId1
| extend betweennessForPair = (todouble(countShortestPathsThroughNode)/countShortestPaths)
| summarize betweenness = sum(betweennessForPair), countShortestPathsThroughNode = sum(countShortestPathsThroughNode)
, countPairsConnectedByNode = dcount(pathEndpointsId)
by scope, nodeId = innerNodeId
| join kind = leftouter (potentialPairsOnScope) on scope
| extend relativePrestige = round(todouble(countPairsConnectedByNode)/countPotentialPairsOnScope, 6)
| project scope, nodeId, betweenness, relativePrestige, countShortestPathsThroughNode, countPairsConnectedByNode
);
let centrality = (
nodes
| summarize take_any(*) by scope, nodeId
| where (isNodeValidConnector)
| join kind = leftouter (edges | summarize outDegree = dcount(targetId) by scope, sourceId) on scope, $left.nodeId == $right.sourceId
| join kind = leftouter (edges | summarize inDegree = dcount(sourceId) by scope, targetId) on scope, $left.nodeId == $right.targetId
| project-away scope1, scope2, sourceId, targetId
| extend inDegree = coalesce(inDegree, 0), outDegree = coalesce(outDegree, 0)
| extend totalDegree = inDegree * outDegree
| join kind = leftouter (paths | summarize sourceOutFlow = dcount(targetId) by scope, sourceId) on scope, $left.nodeId == $right.sourceId
| join kind = leftouter (paths | summarize sinkInFlow = dcount(sourceId) by scope, targetId) on scope, $left.nodeId == $right.targetId
| project-away scope1, scope2, sourceId, targetId
| extend sourceOutFlow = coalesce(sourceOutFlow, 0), sinkInFlow = coalesce(sinkInFlow, 0)
| join kind = leftouter (pathsProcessed) on scope, nodeId
| project-away scope1, nodeId1
| extend betweenness = coalesce(betweenness, 0.0), relativePrestige = coalesce(relativePrestige, 0.0)
, countShortestPathsThroughNode = coalesce(countShortestPathsThroughNode, 0), countPairsConnectedByNode = coalesce(countPairsConnectedByNode, 0)
);
centrality
};
graph_node_centrality_fl(edgesTableName = 'edges'
, nodesTableName = 'nodesEnriched'
, scopeColumnName = 'Region'
, nodeIdColumnName = 'NodeName'
, edgeIdColumnName = 'EdgeName'
, sourceIdColumnName = 'SourceNodeName'
, targetIdColumnName = 'TargetNodeName'
, isValidPathStartColumnName = 'IsValidStart'
, isValidPathEndColumnName = 'IsValidEnd'
, isValidConnectorColumnName = 'IsValidConnector'
)
输出
范围 | nodeId | 节点名称 | 节点类型 | NodeEnvironment | 区域 | IsValidStart | IsValidEnd | IsValidConnector | isValidPathStart | isValidPathEnd | isNodeValidConnector | outDegree | inDegree | totalDegree | sourceOutFlow | sinkInFlow | betweenness | relativePrestige | countShortestPathsThroughNode | countPairsConnectedByNode |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
美国 | backup_prc | backup_prc | 服务 | 生产 | 美国 | 假 | 假 | 真 实 | 假 | 假 | 真 实 | 2 | 2 | 4 | 0 | 0 | 9 | 0.9 | 14 | 9 |
运行该函数可查找在标记为有效起始点(isSourceValidPathStart == True)的源节点之间连接到标记为有效终结点的所有目标(isTargetValidPathEnd == True)。 根据这些路径和标记为有效连接器的所有节点的原始边缘(isValidConnector == True)计算各种中心指标。 输出是一个表,其中每一行对应于有效的连接器节点。 每一行包含以下字段:
-
nodeId
:连接器节点的 ID。 -
isValidConnector
:节点的布尔标志是要为其计算中心的有效连接器;应等于 True。 -
isSourceValidPathStart
:节点作为有效路径开始的布尔标志。 -
isTargetValidPathEnd
:节点的布尔标志是有效的路径末尾。 -
scope
:包含节点和路径的范围。 -
outDegree
:节点的 OutDegree。 这是与节点相邻的传入边缘上的不同目标数。 -
inDegree
:节点的 InDegree。 这是节点传入边缘的不同源数。 -
totalDegree
:inDegree
乘以outDegree
。 该值表示节点可以创建的路径数,因为所有传入边缘都连接到所有传入边缘。 -
sourceOutFlow
:可以通过从节点开始的路径访问的目标数,类似于 BlastRadius。 -
sinkInFlow
:可以通过路径访问节点的源数,类似于 ExposurePerimeter。 -
betweenness
:介于中心性 之间的,通过节点传出所有最短路径的最短路径的分数。 -
relativePrestige
:威望中心性 是通过通过节点的最短路径连接的源/目标对的计数。 相对威望按所有潜在源和目标对的数量规范化此计数。 可以调整计算,以惩罚较长路径的分数。 -
countShortestPathsThroughNode
:通过节点传递的最短路径数,包括具有重复的源和目标对的路径。 -
countPairsConnectedByNode
:来自通过节点的路径的不同源和目标对的数目。
该示例根据将虚拟机连接到存储帐户的路径计算了应用程序、流量路由器或服务的所有资产的核心指标。 在输出的第一行中,如果按降序排序,则可以看到服务 backup_prc。 它有 2 度和 2 度之间, 介于 9 之间, 等等。 不同的中心指标表示重要性的不同方面,因此它们不完全一致。 例如,节点 backup_prc 具有较高的 betweenness
和 relativePrestige
,但低度突出它作为一个没有大量直接边缘的节点,但在战略上放置,并在其范围的全局相对上扮演重要角色。
函数 graph_node_centrality_fl()
可用于网络安全域中,通过建模为图形的数据发现重要节点,例如连接令牌或用户。 各种可用的中心指标可以更好地了解节点的态势,并允许你相应地采取行动。 例如,通过确定相关信号的优先级、强化节点或中断不必要的连接。