通过图形数据(边缘和节点)发现相关终结点(源和目标)之间的有效路径。
函数 graph_path_discovery_fl() 是 UDF(用户定义的函数),可用于通过图形数据发现相关终结点之间的有效路径。 图形数据由节点(例如资源、应用程序或用户)和边缘(例如,现有访问权限)组成。 在网络安全上下文中,此类路径可能表示潜在攻击者可能会利用的横向移动路径。 我们对连接终结点的路径感兴趣,这些路径定义为与某些条件相关,例如,与关键目标相连的暴露源。 根据函数的配置,可以发现适用于其他安全应用场景的其他类型的路径。
此函数的输入数据应包含格式为“SourceId、EdgeId、TargetId”的边缘表和具有可选节点属性的节点列表,这些属性可用于定义有效路径。 或者,可以从其他类型的数据中提取图形输入。 例如,具有“用户 A 登录到资源 B”类型的条目的流量日志可以建模为类型为“(用户 A)-[已登录]->(资源 B)”的边缘。 不同的用户和资源列表可以建模为节点。
我们进行了以下几个假设:
- 所有边缘都对路径发现有效。 在运行路径发现之前,应筛选出无关的边缘。
- 边缘不加权、独立且无条件,这意味着所有边缘的概率相同,从 B 移动到 C 并不依赖于以前的从 A 移动到 B。
- 我们想要发现的路径是简单的方向路径,没有循环,类型为 A->B->C。 可以通过更改函数中图形匹配运算符的内部语法,来实现更复杂的定义。
可以通过更改函数的内部逻辑,来根据需要调整这些假设。
该函数根据可选约束(如路径长度限制、最大输出大小等)发现有效源到有效目标之间的所有可能路径。输出是包含源 ID 和目标 ID 的已发现路径列表,以及连接边缘和节点的列表。 该函数仅使用必填字段,例如节点 ID 和边缘 ID。 如果其他相关字段(例如类型、属性列表、安全相关评分或外部信号)可用于输入数据,则可以通过更改函数定义将其添加到逻辑和输出中。
语法
graph_path_discovery_fl(
edgesTableName、nodesTableName、scopeColumnName、isValidPathStartColumnName、isValidPathEndColumnName、nodeIdColumnName、edgeIdColumnName、sourceIdColumnName、targetIdColumnName、[minPathLength]、[maxPathLength]、[resultCountLimit])
详细了解语法约定。
参数
| 客户 | 类型 | 必需 | 说明 |
|---|---|---|---|
| edgesTableName | string |
✔️ | 包含图形边缘的输入表的名称。 |
| nodesTableName | string |
✔️ | 包含图形节点的输入表的名称。 |
| scopeColumnName | string |
✔️ | 节点和边缘表中包含分区或范围(例如订阅或帐户)中的列的名称,以便为每个范围生成不同的异常模型。 |
| isValidPathStartColumnName | string |
✔️ | 节点表中包含节点布尔标志的列的名称,True 表示节点是路径的有效起点,False 表示不是有效的起点。 |
| isValidPathEndColumnName | string |
✔️ | 节点表中包含节点布尔标志的列的名称,True 表示节点是路径的有效终点,False 表示不是有效的终点。 |
| nodeIdColumnName | string |
✔️ | 包含节点 ID 的节点表中的列的名称。 |
| edgeIdColumnName | string |
✔️ | 边缘表中包含边缘 ID 的列的名称。 |
| sourceIdColumnName | string |
✔️ | 边缘表中包含边缘的源节点 ID 的列的名称。 |
| targetIdColumnName | string |
✔️ | 边缘表中包含边缘的目标节点 ID 的列的名称。 |
| minPathLength | long |
路径中的最小步长(边缘)数。 默认值是 1秒。 | |
| maxPathLength | long |
路径中的最大步长(边缘)数。 默认值为 8。 | |
| resultCountLimit | long |
为输出返回的最大路径数。 默认值为 100000。 |
函数定义
可以通过将函数的代码嵌入为查询定义的函数,或将其创建为数据库中的存储函数来定义函数,如下所示:
使用以下 let 语句定义函数。 不需要任何权限。
let graph_path_discovery_fl = ( edgesTableName:string, nodesTableName:string, scopeColumnName:string
, isValidPathStartColumnName:string, isValidPathEndColumnName:string
, nodeIdColumnName:string, edgeIdColumnName:string, sourceIdColumnName:string, targetIdColumnName:string
, minPathLength:long = 1, maxPathLength:long = 8, resultCountLimit:long = 100000)
{
let edges = (
table(edgesTableName)
| extend sourceId = column_ifexists(sourceIdColumnName, '')
| extend targetId = column_ifexists(targetIdColumnName, '')
| extend edgeId = column_ifexists(edgeIdColumnName, '')
| extend scope = column_ifexists(scopeColumnName, '')
);
let nodes = (
table(nodesTableName)
| extend nodeId = column_ifexists(nodeIdColumnName, '')
| extend isValidPathStart = column_ifexists(isValidPathStartColumnName, '')
| extend isValidPathEnd = column_ifexists(isValidPathEndColumnName, '')
| extend scope = column_ifexists(scopeColumnName, '')
);
let paths = (
edges
// Build graph object partitioned by scope, so that no connections are allowed between scopes.
// In case no scopes are relevant, partitioning should be removed for better performance.
| make-graph sourceId --> targetId with nodes on nodeId partitioned-by scope (
// Look for existing paths between source nodes and target nodes with less than predefined number of hops
// Current configurations looks for directed paths without any cycles; this can be changed if needed
graph-match cycles = none (s)-[e*minPathLength..maxPathLength]->(t)
// Filter only by paths with that connect valid endpoints
where ((s.isValidPathStart) and (t.isValidPathEnd))
project sourceId = s.nodeId
, isSourceValidPathStart = s.isValidPathStart
, targetId = t.nodeId
, isTargetValidPathEnd = t.isValidPathEnd
, scope = s.scope
, edgeIds = e.edgeId
, edgeAllTargetIds = e.targetId
| limit resultCountLimit
)
| extend pathLength = array_length(edgeIds)
, pathId = hash_md5(strcat(sourceId, strcat(edgeIds), targetId))
, pathAllNodeIds = array_concat(pack_array(sourceId), edgeAllTargetIds)
| project-away edgeAllTargetIds
| mv-apply with_itemindex = SortIndex nodesInPath = pathAllNodeIds to typeof(string), edgesInPath = edgeIds to typeof(string) on (
extend step = strcat(
iff(isnotempty(nodesInPath), strcat('(', nodesInPath, ')'), '')
, iff(isnotempty(edgesInPath), strcat('-[', edgesInPath, ']->'), ''))
| summarize fullPath = array_strcat(make_list(step), '')
)
);
paths
};
// Write your query to use the function here.
示例
以下示例使用 invoke 运算符运行函数。
若要使用查询定义的函数,请在嵌入的函数定义后调用它。
let edges = datatable (SourceNodeName:string, EdgeName:string, EdgeType:string, TargetNodeName:string, Region:string)[
'vm-work-1', 'e1', 'can use', 'webapp-prd', 'US',
'vm-custom', 'e2', 'can use', 'webapp-prd', 'US',
'webapp-prd', 'e3', 'can access', 'vm-custom', 'US',
'webapp-prd', 'e4', 'can access', 'test-machine', 'US',
'vm-custom', 'e5', 'can access', 'server-0126', 'US',
'vm-custom', 'e6', 'can access', 'hub_router', 'US',
'webapp-prd', 'e7', 'can access', 'hub_router', 'US',
'test-machine', 'e8', 'can access', 'vm-custom', 'US',
'test-machine', 'e9', 'can access', 'hub_router', 'US',
'hub_router', 'e10', 'routes traffic to', 'remote_DT', 'US',
'vm-work-1', 'e11', 'can access', 'storage_main_backup', 'US',
'hub_router', 'e12', 'routes traffic to', 'vm-work-2', 'US',
'vm-work-2', 'e13', 'can access', 'backup_prc', 'US',
'remote_DT', 'e14', 'can access', 'backup_prc', 'US',
'backup_prc', 'e15', 'moves data to', 'storage_main_backup', 'US',
'backup_prc', 'e16', 'moves data to', 'storage_DevBox', 'US',
'device_A1', 'e17', 'is connected to', 'sevice_B2', 'EU',
'sevice_B2', 'e18', 'is connected to', 'device_A1', 'EU'
];
let nodes = datatable (NodeName:string, NodeType:string, NodeEnvironment:string, Region:string) [
'vm-work-1', 'Virtual Machine', 'Production', 'US',
'vm-custom', 'Virtual Machine', 'Production', 'US',
'webapp-prd', 'Application', 'None', 'US',
'test-machine', 'Virtual Machine', 'Test', 'US',
'hub_router', 'Traffic Router', 'None', 'US',
'vm-work-2', 'Virtual Machine', 'Production', 'US',
'remote_DT', 'Virtual Machine', 'Production', 'US',
'backup_prc', 'Service', 'Production', 'US',
'server-0126', 'Server', 'Production', 'US',
'storage_main_backup', 'Cloud Storage', 'Production', 'US',
'storage_DevBox', 'Cloud Storage', 'Test', 'US',
'device_A1', 'Device', 'Backend', 'EU',
'device_B2', 'Device', 'Backend', 'EU'
];
let nodesEnriched = (
nodes
| extend IsValidStart = (NodeType == 'Virtual Machine'), IsValidEnd = (NodeType == 'Cloud Storage') // option 1
//| extend IsValidStart = (NodeName in('vm-work-1', 'vm-work-2')), IsValidEnd = (NodeName in('storage_main_backup')) // option 2
//| extend IsValidStart = (NodeEnvironment == 'Test'), IsValidEnd = (NodeEnvironment == 'Production') // option 3
);
let graph_path_discovery_fl = ( edgesTableName:string, nodesTableName:string, scopeColumnName:string
, isValidPathStartColumnName:string, isValidPathEndColumnName:string
, nodeIdColumnName:string, edgeIdColumnName:string, sourceIdColumnName:string, targetIdColumnName:string
, minPathLength:long = 1, maxPathLength:long = 8, resultCountLimit:long = 100000)
{
let edges = (
table(edgesTableName)
| extend sourceId = column_ifexists(sourceIdColumnName, '')
| extend targetId = column_ifexists(targetIdColumnName, '')
| extend edgeId = column_ifexists(edgeIdColumnName, '')
| extend scope = column_ifexists(scopeColumnName, '')
);
let nodes = (
table(nodesTableName)
| extend nodeId = column_ifexists(nodeIdColumnName, '')
| extend isValidPathStart = column_ifexists(isValidPathStartColumnName, '')
| extend isValidPathEnd = column_ifexists(isValidPathEndColumnName, '')
| extend scope = column_ifexists(scopeColumnName, '')
);
let paths = (
edges
// Build graph object partitioned by scope, so that no connections are allowed between scopes.
// In case no scopes are relevant, partitioning should be removed for better performance.
| make-graph sourceId --> targetId with nodes on nodeId partitioned-by scope (
// Look for existing paths between source nodes and target nodes with less than predefined number of hops
// Current configurations looks for directed paths without any cycles; this can be changed if needed
graph-match cycles = none (s)-[e*minPathLength..maxPathLength]->(t)
// Filter only by paths with that connect valid endpoints
where ((s.isValidPathStart) and (t.isValidPathEnd))
project sourceId = s.nodeId
, isSourceValidPathStart = s.isValidPathStart
, targetId = t.nodeId
, isTargetValidPathEnd = t.isValidPathEnd
, scope = s.scope
, edgeIds = e.edgeId
, edgeAllTargetIds = e.targetId
| limit resultCountLimit
)
| extend pathLength = array_length(edgeIds)
, pathId = hash_md5(strcat(sourceId, strcat(edgeIds), targetId))
, pathAllNodeIds = array_concat(pack_array(sourceId), edgeAllTargetIds)
| project-away edgeAllTargetIds
| mv-apply with_itemindex = SortIndex nodesInPath = pathAllNodeIds to typeof(string), edgesInPath = edgeIds to typeof(string) on (
extend step = strcat(
iff(isnotempty(nodesInPath), strcat('(', nodesInPath, ')'), '')
, iff(isnotempty(edgesInPath), strcat('-[', edgesInPath, ']->'), ''))
| summarize fullPath = array_strcat(make_list(step), '')
)
);
paths
};
graph_path_discovery_fl(edgesTableName = 'edges'
, nodesTableName = 'nodesEnriched'
, scopeColumnName = 'Region'
, nodeIdColumnName = 'NodeName'
, edgeIdColumnName = 'EdgeName'
, sourceIdColumnName = 'SourceNodeName'
, targetIdColumnName = 'TargetNodeName'
, isValidPathStartColumnName = 'IsValidStart'
, isValidPathEndColumnName = 'IsValidEnd'
)
输出
| sourceId | isSourceValidPathStart | targetId | isTargetValidPathEnd | 范围 | edgeIds | pathLength | pathId | pathAllNodeIds | fullPath |
|---|---|---|---|---|---|---|---|---|---|
| test-machine | 真 实 | storage_DevBox | 真 实 | 美国 | [“e9”,“e10”,“e14”,“e16”] | 4 | 00605d35b6e1d28024fd846f217b43ac | [“test-machine”,“hub_router”,“remote_DT”,“backup_prc”,“storage_DevBox”] | (test-machine)-[e9]->(hub_router)-[e10]->(remote_DT)-[e14]->(backup_prc)-[e16]->(storage_DevBox) |
运行该函数使用连接标记为有效起点 (isSourceValidPathStart == True) 的源节点与标记为有效终点 (isTargetValidPathEnd == True) 的所有目标之间的输入边缘查找所有路径。 输出是一个表,其中每一行描述单个路径(仅限 resultCountLimit 参数定义的最大行数)。 每一行包含以下字段:
-
sourceId:源的 nodeId - 路径中的第一个节点。 -
isSourceValidPathStart:源节点作为有效路径起点的布尔标志;应等于 True。 -
targetId:目标的 nodeId - 路径中的最后一个节点。 -
isTargetValidPathEnd:目标节点作为有效路径终点的布尔标志;应始终等于 True。 -
scope:包含路径的范围。 -
edgeIds:路径中边缘的已排序列表。 -
pathLength:路径中的边缘数(跃点)。 -
pathId:路径终结点和步长的哈希可用作路径的唯一标识符。 -
pathAllNodeIds:路径中节点的已排序列表。 -
fullPath:表示完整路径的字符串,格式为 (source node)-[edge 1]->(node2)-.....->(target node)。
在前面的示例中,我们预处理节点表,并添加多个可能的终结点定义选项。 通过注释/取消注释不同的选项,可以发现多个应用场景:
- 选项 1:查找虚拟机到云存储资源之间的路径。 在探索节点类型之间的连接模式时非常有用。
- 选项 2:查找任何特定节点(vm-work-1、vm-work-2)到某个特定节点(storage_main_backup)之间的路径。 可用于调查已知情况,例如从已知泄露资产到已知关键资产的路径。
- 选项 3:查找节点组之间的路径,例如不同环境中的节点。 可用于监视不安全的路径,例如测试和生产环境之间的路径。
在上面的示例中,我们使用第一个选项来查找虚拟机到云存储资源之间的所有路径,这些路径可能会被想要访问存储数据的潜在攻击者使用。 通过将更多筛选条件添加到有效终结点(例如,将已知有漏洞的虚拟机连接到包含敏感数据的存储帐户)可以增强此应用场景。
函数 graph_path_discovery_fl() 可用于网络安全域中,通过建模为图形的数据发现有趣的路径,例如横向移动路径。