Note
Access to this page requires authorization. You can try signing in or changing directories.
Access to this page requires authorization. You can try changing directories.
Applies to: ✅ Azure Data Explorer ✅ Azure Monitor ✅ Microsoft Sentinel
Calculate metrics of node centrality, such as degree and betweenness, over graph edge and node data.
The function graph_node_centrality_fl()
is a UDF (user-defined function) that allows you to calculate various metrics of node centrality over graph data. Graph data consists of nodes, such as resources, applications or users, and edges such as existing access permissions or connections. The centrality of a node represents its importance in the graph structure and can be defined and measured in several ways. In cybersecurity, centrality represents a node's value to attackers; compromising a node with high centrality, such as a well-connected token, offers more opportunities. For defenders, high-centrality nodes are also important and should be protected accordingly. Centrality is calculated directly over edges, as well as over discovered shortest paths. Various centrality metrics can be useful in different security contexts.
The input data for this function should include a table of edges in the format SourceId, EdgeId, TargetId
and a list of nodes with optional relevant node properties. Alternatively, graph input can be extracted from other types of data. For example, traffic logs with entries of type User A logged in to resource B can be modeled as edges of type (User A)-[logged in to]->(resource B). The list of distinct users and resources can be modeled as nodes. As part of the function, shortest paths are calculated and used as input for centrality calculations.
The following assumptions are made:
- All edges are valid for path discovery. Edges that are irrelevant should be filtered out before calculating centrality.
- Edges are unweighted, independent, and unconditional, meaning that all edges have the same probability and moving from B to C isn't dependent on previous move from A to B.
- Centrality metrics are calculated over edges as well as simple directional shortest paths without cycles, of type A->B->C. More complex definitions can be made by changing the internal syntax of graph-match operator in the function.
These assumptions can be adapted as needed by changing the internal logic of the function.
The function discovers all possible shortest paths between valid sources to valid targets, under optional constraints such as path length limits, maximum output size, and more. Various centrality metrics are calculated over resulting paths as well as original edges, representing different aspects of node importance. The output is a list of nodes that are flagged as relevant using the isValidConnectorColumnName column, with additional columns containing the centrality metrics for each node. The function only uses the required fields, such as node IDs and edge IDs. Other relevant fields, such as types, property lists, security-related scores, or external signals, can be added to logic and output by changing the function definition.
Syntax
graph_node_centrality_fl(
edgesTableName, , nodesTableName, scopeColumnName, isValidPathStartColumnName, isValidPathEndColumnName, isValidConnectorColumnName, nodeIdColumnName, edgeIdColumnName, sourceIdColumnName, targetIdColumnName, [minPathLength], [maxPathLength], [resultCountLimit])
Learn more about syntax conventions.
Parameters
Name | Type | Required | Description |
---|---|---|---|
edgesTableName | string |
✔️ | The name of the input table containing the edges of the graph. |
nodesTableName | string |
✔️ | The name of the input table containing the nodes of the graph. |
scopeColumnName | string |
✔️ | The name of the column in nodes and edges tables containing the partition or scope (for example, subscription or account), so that a different anomaly model is built for each scope. |
isValidPathStartColumnName | string |
✔️ | The name of the column in nodes table containing a Boolean flag for a node, True meaning that the node is a valid start point for a path and False - not a valid one. |
isValidPathEndColumnName | string |
✔️ | The name of the column in nodes table containing a Boolean flag for a node, True meaning that the node is a valid end point for a path and False - not a valid one. |
isValidConnectorColumnName | string |
✔️ | The name of the column in nodes table containing a Boolean flag for a node, True meaning that the node is a valid connector that will be included in the output and False - that it is not a valid one. |
nodeIdColumnName | string |
✔️ | The name of the column in nodes table containing the node ID. |
edgeIdColumnName | string |
✔️ | The name of the column in edges table containing the edge ID. |
sourceIdColumnName | string |
✔️ | The name of the column in edges table containing edge's source node ID. |
targetIdColumnName | string |
✔️ | The name of the column in edges table containing edge's target node ID. |
minPathLength | long |
The minimum number of steps (edges) in the path. Default value: 1. | |
maxPathLength | long |
The maximum number of steps (edges) in the path. Default value: 8. | |
resultCountLimit | long |
The maximum number of paths returned for output. Default value: 100000. |
Function definition
You can define the function by either embedding its code as a query-defined function, or creating it as a stored function in your database, as follows:
Define the function using the following let statement. No permissions are required.
Important
A let statement can't run on its own. It must be followed by a tabular expression statement. To run a working example of graph_node_centrality_fl()
, see Example.
let graph_node_centrality_fl = ( edgesTableName:string, nodesTableName:string, scopeColumnName:string
, isValidPathStartColumnName:string, isValidPathEndColumnName:string, isValidConnectorColumnName:string
, nodeIdColumnName:string, edgeIdColumnName:string, sourceIdColumnName:string, targetIdColumnName:string
, minPathLength:long = 1, maxPathLength:long = 8, resultCountLimit:long = 100000)
{
let edges = (
table(edgesTableName)
| extend sourceId = column_ifexists(sourceIdColumnName, '')
| extend targetId = column_ifexists(targetIdColumnName, '')
| extend edgeId = column_ifexists(edgeIdColumnName, '')
| extend scope = column_ifexists(scopeColumnName, '')
);
let nodes = (
table(nodesTableName)
| extend nodeId = column_ifexists(nodeIdColumnName, '')
| extend isValidPathStart = column_ifexists(isValidPathStartColumnName, '')
| extend isValidPathEnd = column_ifexists(isValidPathEndColumnName, '')
| extend isNodeValidConnector = column_ifexists(isValidConnectorColumnName, '')
| extend scope = column_ifexists(scopeColumnName, '')
);
let potentialPairsOnScope = (
nodes
| summarize countSources = dcountif(nodeId, (isValidPathStart)), countTargets = dcountif(nodeId, (isValidPathEnd)) by scope
| project scope, countPotentialPairsOnScope = countSources * countTargets
);
let paths = (
edges
// Build graph object partitioned by scope, so that no connections are allowed between scopes.
// In case no scopes are relevant, partitioning should be removed for better performance.
| make-graph sourceId --> targetId with nodes on nodeId partitioned-by scope (
// Look for existing shortest paths between source nodes and target nodes with less than predefined number of hops.
// Current configurations looks for directed paths without any cycles; this can be changed if needed.
graph-shortest-paths output = all cycles = none (s)-[e*minPathLength..maxPathLength]->(t)
// Filter only by paths with that connect valid endpoints
where ((s.isValidPathStart) and (t.isValidPathEnd))
project sourceId = s.nodeId
, isSourceValidPathStart = s.isValidPathStart
, targetId = t.nodeId
, isTargetValidPathEnd = t.isValidPathEnd
, scope = s.scope
, edgeIds = e.edgeId
, innerNodeIds = map(inner_nodes(e), nodeId)
, innerNodeConnector = map(inner_nodes(e), isNodeValidConnector)
| limit resultCountLimit
)
| extend pathLength = array_length(edgeIds)
, pathEndpointsId = hash_md5(strcat(sourceId, targetId))
, pathId = hash_md5(strcat(sourceId, strcat(edgeIds), targetId))
);
let pathsProcessed = (
paths
| mv-expand with_itemindex = i innerNodeId = innerNodeIds to typeof(string), innerNodeConnector to typeof(bool)
| where (innerNodeConnector)
| summarize countShortestPathsThroughNode = count(), take_any(sourceId, targetId, pathLength) by scope, innerNodeId, pathEndpointsId
| join kind = leftouter (paths | summarize countShortestPaths = count() by scope, pathEndpointsId) on scope, pathEndpointsId
| project-away scope1, pathEndpointsId1
| extend betweennessForPair = (todouble(countShortestPathsThroughNode)/countShortestPaths)
| summarize betweenness = sum(betweennessForPair), countShortestPathsThroughNode = sum(countShortestPathsThroughNode)
, countPairsConnectedByNode = dcount(pathEndpointsId)
by scope, nodeId = innerNodeId
| join kind = leftouter (potentialPairsOnScope) on scope
| extend relativePrestige = round(todouble(countPairsConnectedByNode)/countPotentialPairsOnScope, 6)
| project scope, nodeId, betweenness, relativePrestige, countShortestPathsThroughNode, countPairsConnectedByNode
);
let centrality = (
nodes
| summarize take_any(*) by scope, nodeId
| where (isNodeValidConnector)
| join kind = leftouter (edges | summarize outDegree = dcount(targetId) by scope, sourceId) on scope, $left.nodeId == $right.sourceId
| join kind = leftouter (edges | summarize inDegree = dcount(sourceId) by scope, targetId) on scope, $left.nodeId == $right.targetId
| project-away scope1, scope2, sourceId, targetId
| extend inDegree = coalesce(inDegree, 0), outDegree = coalesce(outDegree, 0)
| extend totalDegree = inDegree * outDegree
| join kind = leftouter (paths | summarize sourceOutFlow = dcount(targetId) by scope, sourceId) on scope, $left.nodeId == $right.sourceId
| join kind = leftouter (paths | summarize sinkInFlow = dcount(sourceId) by scope, targetId) on scope, $left.nodeId == $right.targetId
| project-away scope1, scope2, sourceId, targetId
| extend sourceOutFlow = coalesce(sourceOutFlow, 0), sinkInFlow = coalesce(sinkInFlow, 0)
| join kind = leftouter (pathsProcessed) on scope, nodeId
| project-away scope1, nodeId1
| extend betweenness = coalesce(betweenness, 0.0), relativePrestige = coalesce(relativePrestige, 0.0)
, countShortestPathsThroughNode = coalesce(countShortestPathsThroughNode, 0), countPairsConnectedByNode = coalesce(countPairsConnectedByNode, 0)
);
centrality
};
// Write your query to use the function here.
Example
The following example uses the invoke operator to run the function.
To use a query-defined function, invoke it after the embedded function definition.
let edges = datatable (SourceNodeName:string, EdgeName:string, EdgeType:string, TargetNodeName:string, Region:string)[
'vm-work-1', 'e1', 'can use', 'webapp-prd', 'US',
'vm-custom', 'e2', 'can use', 'webapp-prd', 'US',
'webapp-prd', 'e3', 'can access', 'vm-custom', 'US',
'webapp-prd', 'e4', 'can access', 'test-machine', 'US',
'vm-custom', 'e5', 'can access', 'server-0126', 'US',
'vm-custom', 'e6', 'can access', 'hub_router', 'US',
'webapp-prd', 'e7', 'can access', 'hub_router', 'US',
'test-machine', 'e8', 'can access', 'vm-custom', 'US',
'test-machine', 'e9', 'can access', 'hub_router', 'US',
'hub_router', 'e10', 'routes traffic to', 'remote_DT', 'US',
'vm-work-1', 'e11', 'can access', 'storage_main_backup', 'US',
'hub_router', 'e12', 'routes traffic to', 'vm-work-2', 'US',
'vm-work-2', 'e13', 'can access', 'backup_prc', 'US',
'remote_DT', 'e14', 'can access', 'backup_prc', 'US',
'backup_prc', 'e15', 'moves data to', 'storage_main_backup', 'US',
'backup_prc', 'e16', 'moves data to', 'storage_DevBox', 'US',
'device_A1', 'e17', 'is connected to', 'device_B2', 'EU',
'device_B2', 'e18', 'is connected to', 'device_A1', 'EU'
];
let nodes = datatable (NodeName:string, NodeType:string, NodeEnvironment:string, Region:string) [
'vm-work-1', 'Virtual Machine', 'Production', 'US',
'vm-custom', 'Virtual Machine', 'Production', 'US',
'webapp-prd', 'Application', 'None', 'US',
'test-machine', 'Virtual Machine', 'Test', 'US',
'hub_router', 'Traffic Router', 'None', 'US',
'vm-work-2', 'Virtual Machine', 'Production', 'US',
'remote_DT', 'Virtual Machine', 'Production', 'US',
'backup_prc', 'Service', 'Production', 'US',
'server-0126', 'Server', 'Production', 'US',
'storage_main_backup', 'Cloud Storage', 'Production', 'US',
'storage_DevBox', 'Cloud Storage', 'Test', 'US',
'device_A1', 'Device', 'Backend', 'EU',
'device_B2', 'Device', 'Backend', 'EU'
];
let nodesEnriched = (
nodes
| extend IsValidStart = (NodeType in ('Virtual Machine'))
, IsValidEnd = (NodeType in ('Cloud Storage'))
| extend IsValidConnector = (NodeType in ('Application', 'Traffic Router', 'Service'))
);
let graph_node_centrality_fl = ( edgesTableName:string, nodesTableName:string, scopeColumnName:string
, isValidPathStartColumnName:string, isValidPathEndColumnName:string, isValidConnectorColumnName:string
, nodeIdColumnName:string, edgeIdColumnName:string, sourceIdColumnName:string, targetIdColumnName:string
, minPathLength:long = 1, maxPathLength:long = 8, resultCountLimit:long = 100000)
{
let edges = (
table(edgesTableName)
| extend sourceId = column_ifexists(sourceIdColumnName, '')
| extend targetId = column_ifexists(targetIdColumnName, '')
| extend edgeId = column_ifexists(edgeIdColumnName, '')
| extend scope = column_ifexists(scopeColumnName, '')
);
let nodes = (
table(nodesTableName)
| extend nodeId = column_ifexists(nodeIdColumnName, '')
| extend isValidPathStart = column_ifexists(isValidPathStartColumnName, '')
| extend isValidPathEnd = column_ifexists(isValidPathEndColumnName, '')
| extend isNodeValidConnector = column_ifexists(isValidConnectorColumnName, '')
| extend scope = column_ifexists(scopeColumnName, '')
);
let potentialPairsOnScope = (
nodes
| summarize countSources = dcountif(nodeId, (isValidPathStart)), countTargets = dcountif(nodeId, (isValidPathEnd)) by scope
| project scope, countPotentialPairsOnScope = countSources * countTargets
);
let paths = (
edges
// Build graph object partitioned by scope, so that no connections are allowed between scopes.
// In case no scopes are relevant, partitioning should be removed for better performance.
| make-graph sourceId --> targetId with nodes on nodeId partitioned-by scope (
// Look for existing shortest paths between source nodes and target nodes with less than predefined number of hops.
// Current configurations looks for directed paths without any cycles; this can be changed if needed.
graph-shortest-paths output = all cycles = none (s)-[e*minPathLength..maxPathLength]->(t)
// Filter only by paths with that connect valid endpoints
where ((s.isValidPathStart) and (t.isValidPathEnd))
project sourceId = s.nodeId
, isSourceValidPathStart = s.isValidPathStart
, targetId = t.nodeId
, isTargetValidPathEnd = t.isValidPathEnd
, scope = s.scope
, edgeIds = e.edgeId
, innerNodeIds = map(inner_nodes(e), nodeId)
, innerNodeConnector = map(inner_nodes(e), isNodeValidConnector)
| limit resultCountLimit
)
| extend pathLength = array_length(edgeIds)
, pathEndpointsId = hash_md5(strcat(sourceId, targetId))
, pathId = hash_md5(strcat(sourceId, strcat(edgeIds), targetId))
);
let pathsProcessed = (
paths
| mv-expand with_itemindex = i innerNodeId = innerNodeIds to typeof(string), innerNodeConnector to typeof(bool)
| where (innerNodeConnector)
| summarize countShortestPathsThroughNode = count(), take_any(sourceId, targetId, pathLength) by scope, innerNodeId, pathEndpointsId
| join kind = leftouter (paths | summarize countShortestPaths = count() by scope, pathEndpointsId) on scope, pathEndpointsId
| project-away scope1, pathEndpointsId1
| extend betweennessForPair = (todouble(countShortestPathsThroughNode)/countShortestPaths)
| summarize betweenness = sum(betweennessForPair), countShortestPathsThroughNode = sum(countShortestPathsThroughNode)
, countPairsConnectedByNode = dcount(pathEndpointsId)
by scope, nodeId = innerNodeId
| join kind = leftouter (potentialPairsOnScope) on scope
| extend relativePrestige = round(todouble(countPairsConnectedByNode)/countPotentialPairsOnScope, 6)
| project scope, nodeId, betweenness, relativePrestige, countShortestPathsThroughNode, countPairsConnectedByNode
);
let centrality = (
nodes
| summarize take_any(*) by scope, nodeId
| where (isNodeValidConnector)
| join kind = leftouter (edges | summarize outDegree = dcount(targetId) by scope, sourceId) on scope, $left.nodeId == $right.sourceId
| join kind = leftouter (edges | summarize inDegree = dcount(sourceId) by scope, targetId) on scope, $left.nodeId == $right.targetId
| project-away scope1, scope2, sourceId, targetId
| extend inDegree = coalesce(inDegree, 0), outDegree = coalesce(outDegree, 0)
| extend totalDegree = inDegree * outDegree
| join kind = leftouter (paths | summarize sourceOutFlow = dcount(targetId) by scope, sourceId) on scope, $left.nodeId == $right.sourceId
| join kind = leftouter (paths | summarize sinkInFlow = dcount(sourceId) by scope, targetId) on scope, $left.nodeId == $right.targetId
| project-away scope1, scope2, sourceId, targetId
| extend sourceOutFlow = coalesce(sourceOutFlow, 0), sinkInFlow = coalesce(sinkInFlow, 0)
| join kind = leftouter (pathsProcessed) on scope, nodeId
| project-away scope1, nodeId1
| extend betweenness = coalesce(betweenness, 0.0), relativePrestige = coalesce(relativePrestige, 0.0)
, countShortestPathsThroughNode = coalesce(countShortestPathsThroughNode, 0), countPairsConnectedByNode = coalesce(countPairsConnectedByNode, 0)
);
centrality
};
graph_node_centrality_fl(edgesTableName = 'edges'
, nodesTableName = 'nodesEnriched'
, scopeColumnName = 'Region'
, nodeIdColumnName = 'NodeName'
, edgeIdColumnName = 'EdgeName'
, sourceIdColumnName = 'SourceNodeName'
, targetIdColumnName = 'TargetNodeName'
, isValidPathStartColumnName = 'IsValidStart'
, isValidPathEndColumnName = 'IsValidEnd'
, isValidConnectorColumnName = 'IsValidConnector'
)
Output
scope | nodeId | NodeName | NodeType | NodeEnvironment | Region | IsValidStart | IsValidEnd | IsValidConnector | isValidPathStart | isValidPathEnd | isNodeValidConnector | outDegree | inDegree | totalDegree | sourceOutFlow | sinkInFlow | betweenness | relativePrestige | countShortestPathsThroughNode | countPairsConnectedByNode |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
US | backup_prc | backup_prc | Service | Production | US | False | False | True | False | False | True | 2 | 2 | 4 | 0 | 0 | 9 | 0.9 | 14 | 9 |
Running the function finds all shortest paths that connect between source nodes flagged as valid start points (isSourceValidPathStart == True) to all targets flagged as valid end points (isTargetValidPathEnd == True). Various centrality metrics are calculated on top of these paths and original edges for all nodes flagged as valid connectors (isValidConnector == True). The output is a table where each row corresponds to a valid connector node. Each row contains the following fields:
nodeId
: The ID of the connector node.isValidConnector
: A boolean flag for the node being a valid connector for which we want to calculate centrality; should be equal to True.isSourceValidPathStart
: A boolean flag for the node being a valid path start.isTargetValidPathEnd
: A boolean flag for the node being a valid path end.scope
: The scope containing the node and the paths.outDegree
: The OutDegree of the node. This is the number of distinct targets on outcoming edges adjacent to the node.inDegree
: The InDegree of the node. This is the number of distinct sources on incoming edges of the node.totalDegree
: TheinDegree
multiplied byoutDegree
. The value represents the potential number of paths that the node can create since all the incoming edges are connected to all the outcoming ones.sourceOutFlow
: The number of targets that can be reached via paths starting with the node, similar to BlastRadius.sinkInFlow
: The number of sources that can reach the node via paths, similar to ExposurePerimeter.betweenness
: The Betweenness centrality, the fraction of shortest paths that pass through the node out of all shortest paths.relativePrestige
: The Prestige centrality is the count of source/target pairs connected by shortest paths passing through the node. Relative prestige normalizes this count by the number of all potential source and target pairs. The calculation can be adapted to penalize the score for longer paths.countShortestPathsThroughNode
: The number of shortest paths that pass through the node, including those with recurring source and target pairs.countPairsConnectedByNode
: The number of distinct source and target pairs from paths that pass through the node.
The example calculated the centrality metrics for all assets that are either applications, traffic routers, or services, based on paths connecting virtual machines to storage accounts. In the first row of the output, if sorted by descending betweenness, you can see the service backup_prc. It has an in and out degrees of 2, betweenness of 9, and so on. Different centrality metrics represent different aspects of importance, so they are not perfectly aligned. For example, node backup_prc has high betweenness
and relativePrestige
, but low degrees which highlights it as a node that doesn't have lots of direct edges, but is placed strategically and plays an important role in global relativePrestige of its scope.
The function graph_node_centrality_fl()
can be used in the cybersecurity domain to discover important nodes, such as well connected tokens or users, over data modeled as a graph. Various available centrality metrics provide a better understanding of node's posture and allow you to act accordingly. For example, by prioritizing related signals, hardening the node or disrupting unnecessary connections.