graph_node_centrality_fl()

适用于:✅Azure 数据资源管理器Azure MonitorMicrosoft Sentinel

通过图形边缘和节点数据计算节点中心(例如度和间隔)的指标。

函数 graph_node_centrality_fl()UDF(用户定义的函数),可用于计算图形数据上节点中心的各种指标。 图形数据由节点(例如资源、应用程序或用户)以及现有访问权限或连接等边缘组成。 节点的中心性表示其在图形结构中的重要性,可以通过多种方式进行定义和度量。 在网络安全中,中心性表示节点对攻击者的价值;破坏具有高中心性的节点(如连接良好的令牌)可提供更多机会。 对于防御者,高中心节点也很重要,应相应地进行保护。 中心直接通过边缘计算,以及通过发现的最短路径计算。 不同的安全上下文中,各种中心指标非常有用。

此函数的输入数据应以 SourceId, EdgeId, TargetId 格式包含边缘表,以及具有可选相关节点属性的节点列表。 或者,可以从其他类型的数据中提取图形输入。 例如,登录到资源 B 的用户 A 类型的流量日志可以建模为 (用户 A)-[已登录]->(资源 B)类型的边缘。 不同的用户和资源列表可以建模为节点。 作为函数的一部分,将计算最短路径并将其用作集中计算的输入。

进行了以下假设:

  • 所有边缘都对路径发现有效。 在计算中心性之前,应筛选掉无关的边缘。
  • 边缘不加权、独立且无条件,这意味着所有边缘的概率相同,从 B 移动到 C 并不依赖于以前的从 A 移动到 B。
  • 中心指标通过边缘计算,以及简单的无周期定向最短路径,类型为 A->B->C。 可以通过更改函数中图形匹配运算符的内部语法,来实现更复杂的定义。

可以通过更改函数的内部逻辑,来根据需要调整这些假设。

该函数根据可选约束(如路径长度限制、最大输出大小等)发现有效源与有效目标之间的所有可能最短路径。 通过生成的路径和原始边缘计算各种中心指标,表示节点重要性的不同方面。 输出是使用 isValidConnectorColumnName 列标记为相关的节点列表,其中包含每个节点的中心性指标的其他列。 该函数仅使用必填字段,例如节点 ID 和边缘 ID。 可以通过更改函数定义将其他相关字段(例如类型、属性列表、安全相关分数或外部信号)添加到逻辑和输出中。

语法

graph_node_centrality_fl( edgesTableNamenodesTableNamescopeColumnNameisValidPathStartColumnNameisValidPathEndColumnNameisValidConnectorColumnNamenodeIdColumnNameedgeIdColumnNamesourceIdColumnNametargetIdColumnName, [minPathLength], [maxPathLength], [resultCountLimit])

详细了解 语法约定。

参数

名字 类型 必选 DESCRIPTION
edgesTableName string ✔️ 包含图形边缘的输入表的名称。
nodesTableName string ✔️ 包含图形节点的输入表的名称。
scopeColumnName string ✔️ 节点和边缘表中包含分区或范围(例如订阅或帐户)中的列的名称,以便为每个范围生成不同的异常模型。
isValidPathStartColumnName string ✔️ 节点表中包含节点布尔标志的列的名称,True 表示节点是路径的有效起点,False 表示不是有效的起点。
isValidPathEndColumnName string ✔️ 节点表中包含节点布尔标志的列的名称,True 表示节点是路径的有效终点,False 表示不是有效的终点。
isValidConnectorColumnName string ✔️ 节点表中包含节点布尔标志的列的名称,True 这意味着节点是输出中包含的有效连接器,False - 它不是有效的连接器。
nodeIdColumnName string ✔️ 包含节点 ID 的节点表中的列的名称。
edgeIdColumnName string ✔️ 边缘表中包含边缘 ID 的列的名称。
sourceIdColumnName string ✔️ 边缘表中包含边缘的源节点 ID 的列的名称。
targetIdColumnName string ✔️ 边缘表中包含边缘的目标节点 ID 的列的名称。
minPathLength long 路径中的最小步长(边缘)数。 默认值:1。
maxPathLength long 路径中的最大步长(边缘)数。 默认值:8。
resultCountLimit long 为输出返回的最大路径数。 默认值:100000。

函数定义

可以通过将函数的代码嵌入为查询定义的函数,或将其创建为数据库中的存储函数来定义函数,如下所示:

使用以下 let 语句定义函数。 不需要任何权限。

重要

let 语句不能独立运行。 它必须后跟一个表格表达式语句。 若要运行 graph_node_centrality_fl() 的工作示例,请参阅示例

let graph_node_centrality_fl = (   edgesTableName:string, nodesTableName:string, scopeColumnName:string
								, isValidPathStartColumnName:string, isValidPathEndColumnName:string, isValidConnectorColumnName:string
								, nodeIdColumnName:string, edgeIdColumnName:string, sourceIdColumnName:string, targetIdColumnName:string
								, minPathLength:long = 1, maxPathLength:long = 8, resultCountLimit:long = 100000) 
{
let edges = (
    table(edgesTableName)
    | extend sourceId               = column_ifexists(sourceIdColumnName, '')
    | extend targetId               = column_ifexists(targetIdColumnName, '')
    | extend edgeId                 = column_ifexists(edgeIdColumnName, '')
    | extend scope                  = column_ifexists(scopeColumnName, '')
    );
let nodes = (
    table(nodesTableName)
    | extend nodeId                 = column_ifexists(nodeIdColumnName, '')
    | extend isValidPathStart       = column_ifexists(isValidPathStartColumnName, '')
    | extend isValidPathEnd         = column_ifexists(isValidPathEndColumnName, '')
    | extend isNodeValidConnector   = column_ifexists(isValidConnectorColumnName, '')
    | extend scope                  = column_ifexists(scopeColumnName, '')
);
let potentialPairsOnScope = (
    nodes
    | summarize countSources = dcountif(nodeId, (isValidPathStart)), countTargets = dcountif(nodeId, (isValidPathEnd)) by scope
    | project scope, countPotentialPairsOnScope = countSources * countTargets
    );
let paths = (
    edges
    // Build graph object partitioned by scope, so that no connections are allowed between scopes.
    // In case no scopes are relevant, partitioning should be removed for better performance.
    | make-graph sourceId --> targetId with nodes on nodeId partitioned-by scope (
    // Look for existing shortest paths between source nodes and target nodes with less than predefined number of hops.
    // Current configurations looks for directed paths without any cycles; this can be changed if needed.
      graph-shortest-paths output = all  cycles = none (s)-[e*minPathLength..maxPathLength]->(t)
        // Filter only by paths with that connect valid endpoints
        where ((s.isValidPathStart) and (t.isValidPathEnd))
        project   sourceId                  = s.nodeId
                , isSourceValidPathStart    = s.isValidPathStart
                , targetId                  = t.nodeId
                , isTargetValidPathEnd      = t.isValidPathEnd
                , scope                     = s.scope
                , edgeIds                   = e.edgeId
                , innerNodeIds              = map(inner_nodes(e), nodeId)
                , innerNodeConnector        = map(inner_nodes(e), isNodeValidConnector)
    | limit resultCountLimit
    )
    | extend  pathLength                    = array_length(edgeIds)
            , pathEndpointsId               = hash_md5(strcat(sourceId, targetId))
            , pathId                        = hash_md5(strcat(sourceId, strcat(edgeIds), targetId))
);
let pathsProcessed = (
    paths
    | mv-expand with_itemindex = i innerNodeId = innerNodeIds to typeof(string), innerNodeConnector to typeof(bool)
    | where (innerNodeConnector)
    | summarize countShortestPathsThroughNode = count(), take_any(sourceId, targetId, pathLength) by scope, innerNodeId, pathEndpointsId
    | join kind = leftouter (paths | summarize countShortestPaths = count() by scope, pathEndpointsId) on scope, pathEndpointsId
    | project-away scope1, pathEndpointsId1
    | extend betweennessForPair = (todouble(countShortestPathsThroughNode)/countShortestPaths)
    | summarize betweenness = sum(betweennessForPair), countShortestPathsThroughNode = sum(countShortestPathsThroughNode)
            , countPairsConnectedByNode = dcount(pathEndpointsId)
        by scope, nodeId = innerNodeId
    | join kind = leftouter (potentialPairsOnScope) on scope
    | extend relativePrestige = round(todouble(countPairsConnectedByNode)/countPotentialPairsOnScope, 6)
    | project scope, nodeId, betweenness, relativePrestige, countShortestPathsThroughNode, countPairsConnectedByNode
);
let centrality = (
nodes
| summarize take_any(*) by scope, nodeId
| where (isNodeValidConnector)
| join kind = leftouter (edges | summarize outDegree = dcount(targetId) by scope, sourceId) on scope, $left.nodeId == $right.sourceId
| join kind = leftouter (edges | summarize inDegree = dcount(sourceId) by scope, targetId) on scope, $left.nodeId == $right.targetId
| project-away scope1, scope2, sourceId, targetId
| extend inDegree = coalesce(inDegree, 0), outDegree = coalesce(outDegree, 0)
| extend totalDegree = inDegree * outDegree
| join kind = leftouter (paths | summarize sourceOutFlow = dcount(targetId) by scope, sourceId) on scope, $left.nodeId == $right.sourceId
| join kind = leftouter (paths | summarize sinkInFlow = dcount(sourceId) by scope, targetId) on scope, $left.nodeId == $right.targetId
| project-away scope1, scope2, sourceId, targetId
| extend sourceOutFlow = coalesce(sourceOutFlow, 0), sinkInFlow = coalesce(sinkInFlow, 0)
| join kind = leftouter (pathsProcessed) on scope, nodeId
| project-away scope1, nodeId1
| extend betweenness = coalesce(betweenness, 0.0), relativePrestige = coalesce(relativePrestige, 0.0)
    , countShortestPathsThroughNode = coalesce(countShortestPathsThroughNode, 0), countPairsConnectedByNode = coalesce(countPairsConnectedByNode, 0)
);
centrality
};
// Write your query to use the function here.

示例:

以下示例使用 invoke 运算符运行函数。

若要使用查询定义的函数,请在嵌入的函数定义后调用它。

let edges = datatable (SourceNodeName:string, EdgeName:string, EdgeType:string, TargetNodeName:string, Region:string)[						
    'vm-work-1',            'e1',           'can use',	            'webapp-prd', 	          'US',
    'vm-custom',        	'e2',           'can use',	            'webapp-prd', 	          'US',
    'webapp-prd',           'e3',           'can access',	        'vm-custom', 	          'US',
    'webapp-prd',       	'e4',           'can access',	        'test-machine', 	      'US',
    'vm-custom',        	'e5',           'can access',	        'server-0126', 	          'US',
    'vm-custom',        	'e6',	        'can access',	        'hub_router', 	          'US',
    'webapp-prd',       	'e7',	        'can access',	        'hub_router', 	          'US',
    'test-machine',       	'e8',	        'can access',	        'vm-custom',              'US',
    'test-machine',        	'e9',	        'can access',	        'hub_router', 	          'US',
    'hub_router',           'e10',	        'routes traffic to',	'remote_DT', 	          'US',
    'vm-work-1',            'e11',	        'can access',	        'storage_main_backup', 	  'US',
    'hub_router',           'e12',	        'routes traffic to',	'vm-work-2', 	          'US',
    'vm-work-2',        	'e13',          'can access',	        'backup_prc', 	          'US',
    'remote_DT',            'e14',	        'can access',	        'backup_prc', 	          'US',
    'backup_prc',           'e15',	        'moves data to',        'storage_main_backup', 	  'US',
    'backup_prc',           'e16',	        'moves data to',        'storage_DevBox', 	      'US',
    'device_A1',            'e17',	        'is connected to',      'device_B2', 	          'EU',
    'device_B2',            'e18',	        'is connected to',      'device_A1', 	          'EU'
];
let nodes = datatable (NodeName:string, NodeType:string, NodeEnvironment:string, Region:string) [
        'vm-work-1',                'Virtual Machine',      'Production',       'US',
        'vm-custom',                'Virtual Machine',      'Production',       'US',
        'webapp-prd',               'Application',          'None',             'US',
        'test-machine',             'Virtual Machine',      'Test',             'US',
        'hub_router',               'Traffic Router',       'None',             'US',
        'vm-work-2',                'Virtual Machine',      'Production',       'US',
        'remote_DT',                'Virtual Machine',      'Production',       'US',
        'backup_prc',               'Service',              'Production',       'US',
        'server-0126',              'Server',               'Production',       'US',
        'storage_main_backup',      'Cloud Storage',        'Production',       'US',
        'storage_DevBox',           'Cloud Storage',        'Test',             'US',
        'device_A1',                'Device',               'Backend',          'EU',
        'device_B2',                'Device',               'Backend',          'EU'
];
let nodesEnriched = (
    nodes
    | extend  IsValidStart      = (NodeType in ('Virtual Machine'))
            , IsValidEnd        = (NodeType in ('Cloud Storage'))
    | extend  IsValidConnector  = (NodeType in ('Application', 'Traffic Router', 'Service'))
);
let graph_node_centrality_fl = (   edgesTableName:string, nodesTableName:string, scopeColumnName:string
								, isValidPathStartColumnName:string, isValidPathEndColumnName:string, isValidConnectorColumnName:string
								, nodeIdColumnName:string, edgeIdColumnName:string, sourceIdColumnName:string, targetIdColumnName:string
								, minPathLength:long = 1, maxPathLength:long = 8, resultCountLimit:long = 100000) 
{
let edges = (
    table(edgesTableName)
    | extend sourceId               = column_ifexists(sourceIdColumnName, '')
    | extend targetId               = column_ifexists(targetIdColumnName, '')
    | extend edgeId                 = column_ifexists(edgeIdColumnName, '')
    | extend scope                  = column_ifexists(scopeColumnName, '')
    );
let nodes = (
    table(nodesTableName)
    | extend nodeId                 = column_ifexists(nodeIdColumnName, '')
    | extend isValidPathStart       = column_ifexists(isValidPathStartColumnName, '')
    | extend isValidPathEnd         = column_ifexists(isValidPathEndColumnName, '')
    | extend isNodeValidConnector   = column_ifexists(isValidConnectorColumnName, '')
    | extend scope                  = column_ifexists(scopeColumnName, '')
);
let potentialPairsOnScope = (
    nodes
    | summarize countSources = dcountif(nodeId, (isValidPathStart)), countTargets = dcountif(nodeId, (isValidPathEnd)) by scope
    | project scope, countPotentialPairsOnScope = countSources * countTargets
    );
let paths = (
    edges
    // Build graph object partitioned by scope, so that no connections are allowed between scopes.
    // In case no scopes are relevant, partitioning should be removed for better performance.
    | make-graph sourceId --> targetId with nodes on nodeId partitioned-by scope (
    // Look for existing shortest paths between source nodes and target nodes with less than predefined number of hops.
    // Current configurations looks for directed paths without any cycles; this can be changed if needed.
      graph-shortest-paths output = all  cycles = none (s)-[e*minPathLength..maxPathLength]->(t)
        // Filter only by paths with that connect valid endpoints
        where ((s.isValidPathStart) and (t.isValidPathEnd))
        project   sourceId                  = s.nodeId
                , isSourceValidPathStart    = s.isValidPathStart
                , targetId                  = t.nodeId
                , isTargetValidPathEnd      = t.isValidPathEnd
                , scope                     = s.scope
                , edgeIds                   = e.edgeId
                , innerNodeIds              = map(inner_nodes(e), nodeId)
                , innerNodeConnector        = map(inner_nodes(e), isNodeValidConnector)
    | limit resultCountLimit
    )
    | extend  pathLength                    = array_length(edgeIds)
            , pathEndpointsId               = hash_md5(strcat(sourceId, targetId))
            , pathId                        = hash_md5(strcat(sourceId, strcat(edgeIds), targetId))
);
let pathsProcessed = (
    paths
    | mv-expand with_itemindex = i innerNodeId = innerNodeIds to typeof(string), innerNodeConnector to typeof(bool)
    | where (innerNodeConnector)
    | summarize countShortestPathsThroughNode = count(), take_any(sourceId, targetId, pathLength) by scope, innerNodeId, pathEndpointsId
    | join kind = leftouter (paths | summarize countShortestPaths = count() by scope, pathEndpointsId) on scope, pathEndpointsId
    | project-away scope1, pathEndpointsId1
    | extend betweennessForPair = (todouble(countShortestPathsThroughNode)/countShortestPaths)
    | summarize betweenness = sum(betweennessForPair), countShortestPathsThroughNode = sum(countShortestPathsThroughNode)
            , countPairsConnectedByNode = dcount(pathEndpointsId)
        by scope, nodeId = innerNodeId
    | join kind = leftouter (potentialPairsOnScope) on scope
    | extend relativePrestige = round(todouble(countPairsConnectedByNode)/countPotentialPairsOnScope, 6)
    | project scope, nodeId, betweenness, relativePrestige, countShortestPathsThroughNode, countPairsConnectedByNode
);
let centrality = (
nodes
| summarize take_any(*) by scope, nodeId
| where (isNodeValidConnector)
| join kind = leftouter (edges | summarize outDegree = dcount(targetId) by scope, sourceId) on scope, $left.nodeId == $right.sourceId
| join kind = leftouter (edges | summarize inDegree = dcount(sourceId) by scope, targetId) on scope, $left.nodeId == $right.targetId
| project-away scope1, scope2, sourceId, targetId
| extend inDegree = coalesce(inDegree, 0), outDegree = coalesce(outDegree, 0)
| extend totalDegree = inDegree * outDegree
| join kind = leftouter (paths | summarize sourceOutFlow = dcount(targetId) by scope, sourceId) on scope, $left.nodeId == $right.sourceId
| join kind = leftouter (paths | summarize sinkInFlow = dcount(sourceId) by scope, targetId) on scope, $left.nodeId == $right.targetId
| project-away scope1, scope2, sourceId, targetId
| extend sourceOutFlow = coalesce(sourceOutFlow, 0), sinkInFlow = coalesce(sinkInFlow, 0)
| join kind = leftouter (pathsProcessed) on scope, nodeId
| project-away scope1, nodeId1
| extend betweenness = coalesce(betweenness, 0.0), relativePrestige = coalesce(relativePrestige, 0.0)
    , countShortestPathsThroughNode = coalesce(countShortestPathsThroughNode, 0), countPairsConnectedByNode = coalesce(countPairsConnectedByNode, 0)
);
centrality
};
graph_node_centrality_fl(edgesTableName         = 'edges'
                , nodesTableName                = 'nodesEnriched'
                , scopeColumnName               = 'Region'
                , nodeIdColumnName              = 'NodeName'
                , edgeIdColumnName              = 'EdgeName'
                , sourceIdColumnName            = 'SourceNodeName'
                , targetIdColumnName            = 'TargetNodeName'
                , isValidPathStartColumnName    = 'IsValidStart'
                , isValidPathEndColumnName      = 'IsValidEnd'
                , isValidConnectorColumnName    = 'IsValidConnector'
)

输出

范围 nodeId 节点名称 节点类型 NodeEnvironment 区域 IsValidStart IsValidEnd IsValidConnector isValidPathStart isValidPathEnd isNodeValidConnector outDegree inDegree totalDegree sourceOutFlow sinkInFlow betweenness relativePrestige countShortestPathsThroughNode countPairsConnectedByNode
美国 backup_prc backup_prc 服务 生产 美国 真 实 真 实 2 2 4 0 0 9 0.9 14 9

运行该函数可查找在标记为有效起始点(isSourceValidPathStart == True)的源节点之间连接到标记为有效终结点的所有目标(isTargetValidPathEnd == True)。 根据这些路径和标记为有效连接器的所有节点的原始边缘(isValidConnector == True)计算各种中心指标。 输出是一个表,其中每一行对应于有效的连接器节点。 每一行包含以下字段:

  • nodeId:连接器节点的 ID。
  • isValidConnector:节点的布尔标志是要为其计算中心的有效连接器;应等于 True。
  • isSourceValidPathStart:节点作为有效路径开始的布尔标志。
  • isTargetValidPathEnd:节点的布尔标志是有效的路径末尾。
  • scope:包含节点和路径的范围。
  • outDegree:节点的 OutDegree。 这是与节点相邻的传入边缘上的不同目标数。
  • inDegree:节点的 InDegree。 这是节点传入边缘的不同源数。
  • totalDegreeinDegree 乘以 outDegree。 该值表示节点可以创建的路径数,因为所有传入边缘都连接到所有传入边缘。
  • sourceOutFlow:可以通过从节点开始的路径访问的目标数,类似于 BlastRadius
  • sinkInFlow:可以通过路径访问节点的源数,类似于 ExposurePerimeter
  • betweenness:介于中心性 之间的,通过节点传出所有最短路径的最短路径的分数。
  • relativePrestige威望中心性 是通过通过节点的最短路径连接的源/目标对的计数。 相对威望按所有潜在源和目标对的数量规范化此计数。 可以调整计算,以惩罚较长路径的分数。
  • countShortestPathsThroughNode:通过节点传递的最短路径数,包括具有重复的源和目标对的路径。
  • countPairsConnectedByNode:来自通过节点的路径的不同源和目标对的数目。

该示例根据将虚拟机连接到存储帐户的路径计算了应用程序、流量路由器或服务的所有资产的核心指标。 在输出的第一行中,如果按降序排序,则可以看到服务 backup_prc。 它有 2 度和 2 度之间, 介于 9 之间, 等等。 不同的中心指标表示重要性的不同方面,因此它们不完全一致。 例如,节点 backup_prc 具有较高的 betweennessrelativePrestige,但低度突出它作为一个没有大量直接边缘的节点,但在战略上放置,并在其范围的全局相对上扮演重要角色。

函数 graph_node_centrality_fl() 可用于网络安全域中,通过建模为图形的数据发现重要节点,例如连接令牌或用户。 各种可用的中心指标可以更好地了解节点的态势,并允许你相应地采取行动。 例如,通过确定相关信号的优先级、强化节点或中断不必要的连接。