本文提供示例 Kusto 查询语言 (KQL) 查询,可帮助你有效地分析流量分析数据。 流量分析处理虚拟网络(VNet)流日志和网络安全组(NSG)流日志,以便深入了解网络流量模式、安全事件和性能指标。
使用这些查询可以:
- 识别网络流量模式和通信终结点的顶端
- 监视安全事件并分析潜在威胁
- 排查网络连接问题
- 优化网络性能和资源利用率
先决条件
- 为流日志配置的流量分析。 有关详细信息,请参阅 启用或禁用流量分析。
- 访问存储流日志数据的 Log Analytics 工作区。 有关详细信息,请参阅 Log Analytics 工作区概述。
NTANetAnalytics 查询
本部分提供可用于分析虚拟网络流量分析数据的 NTANetAnalytics 表的示例查询。 NTANetAnalytics 表包含具有增强的网络分析信息的聚合流日志数据。 有关表架构和可用字段的详细信息,请参阅 NTANetAnalytics。
列出与公共 IP 交互的子网
使用以下查询列出过去 30 天内与非 Azure 公共 IP 交互的所有子网。
NTANetAnalytics
| where SubType == "FlowLog" and FlowStartTime > ago(30d) and FlowType == "ExternalPublic"
| project SrcSubnet, DestSubnet
列出相互交互的子网
使用以下查询列出过去 30 天内相互交换流量的所有子网,以及交换的总字节数。
NTANetAnalytics
| where SubType == 'FlowLog' and FaSchemaVersion == '3' and TimeGenerated > ago(30d)
| where isnotempty(SrcSubnet) and isnotempty(DestSubnet)
| summarize TotalBytes=sum(BytesSrcToDest + BytesDestToSrc) by SrcSubnet, DestSubnet,L4Protocol,DestPort
查看跨区域流量
使用以下查询查看过去 30 天内的区域内和区域间流量。
NTANetAnalytics
| where TimeGenerated > ago(30d)
| project SrcRegion, DestRegion, BytesDestToSrc, BytesSrcToDest
| where isnotempty(SrcRegion) and isnotempty(DestRegion)
| summarize TransferredBytes=sum(BytesDestToSrc+BytesSrcToDest) by SrcRegion, DestRegion
基于订阅查看流量
使用以下查询查看过去 30 天内按订阅分组的 Azure 流量。
NTANetAnalytics
| where TimeGenerated > ago(30d)
| project SrcSubscription, DestSubscription, BytesDestToSrc, BytesSrcToDest
| where isnotempty(SrcSubscription) and isnotempty(DestSubscription)
| summarize TransferredBytes=sum(BytesDestToSrc+BytesSrcToDest) by SrcSubscription, DestSubscription
列出接收大多数本地流量的虚拟机
使用以下查询检查哪些虚拟机正在接收大多数本地流量。
NTANetAnalytics
| where SubType == "FlowLog" and FlowType == "S2S"
| where <Scoping condition>
| mvexpand vm = pack_array(SrcVm, DestVm) to typeof(string)
| where isnotempty(vm)
| extend traffic = AllowedInFlows + DeniedInFlows + AllowedOutFlows + DeniedOutFlows // For bytes use: | extend traffic = InboundBytes + OutboundBytes
| make-series TotalTraffic = sum(traffic) default = 0 on FlowStartTime from datetime(<time>) to datetime(<time>) step 1m by vm
| render timechart
列出接收大多数本地流量的 IP
使用以下查询检查哪些 IP 正在接收大多数本地流量。
NTANetAnalytics
| where SubType == "FlowLog" and FlowType == "S2S"
| where <Scoping condition>
| mvexpand vm = pack_array(SrcIp, DestIp) to typeof(string)
| where isnotempty(vm)
| extend traffic = AllowedInFlows + DeniedInFlows + AllowedOutFlows + DeniedOutFlows // For bytes use: | extend traffic = InboundBytes + OutboundBytes
| make-series TotalTraffic = sum(traffic) default = 0 on FlowStartTime from datetime(<time>) to datetime(<time>) step 1m by vm
| render timechart
列出从虚拟机发送或接收流量的 IP
使用以下查询列出在过去 30 天内使用其 IP 地址与虚拟机交换数据的所有 IP。
NTANetAnalytics
| where TimeGenerated > ago(30d)
| where SrcIp == "10.1.1.8" and strlen(DestIp)>0
| summarize TotalBytes=sum(BytesDestToSrc+BytesSrcToDest) by SrcIp, DestIp
查看 ExpressRoute 流量
使用以下查询查看过去 30 天内通过 ExpressRoute 连接的流量。
NTANetAnalytics
| where SubType == 'FlowLog' and TimeGenerated > ago(30d)
| where isnotnull(SrcExpressRouteCircuit) or isnotnull(DestExpressRouteCircuit)
| extend TargetResourceName = tostring(split(TargetResourceId, "/")[2])
| summarize TotalBytes=sum(BytesSrcToDest + BytesDestToSrc) by TargetResourceName, bin(TimeGenerated, 1d)
| render columnchart
查看负载均衡器流量分布
使用以下查询查看应用程序前面有负载均衡器的应用程序的流量分布。
NTANetAnalytics
| where SubType == 'FlowLog' and TimeGenerated > ago(30d)
| where SrcLoadBalancer contains 'web' or DestLoadBalancer contains 'web'
| summarize TotalBytes = sum(BytesSrcToDest + BytesDestToSrc) by tostring(SrcIp)
| render piechart
检查虚拟机收到的流量中的标准偏差
使用以下查询检查虚拟机从本地计算机收到的流量的标准偏差。
NTANetAnalytics
| where SubType == "FlowLog" and FlowType == "S2S"
| where <Scoping condition>
| mvexpand vm = pack_array(SrcVm, DestVm) to typeof(string)
| where isnotempty(vm)
| extend traffic = AllowedInFlows + DeniedInFlows + AllowedOutFlows + DeniedOutFlows // For bytes use: | extend traffic = InboundBytes + OutboundBytes
summarize deviation = stdev(traffic) by vm
检查 IP 收到的流量中的标准偏差
使用以下查询检查来自本地计算机的 IP 收到的流量的标准偏差。
NTANetAnalytics
| where SubType == "FlowLog" and FlowType == "S2S"
| where <Scoping condition>
| mvexpand vm = pack_array(SrcIp, DestIp) to typeof(string)
| where isnotempty(vm)
| extend traffic = AllowedInFlows + DeniedInFlows + AllowedOutFlows + DeniedOutFlows // For bytes use: | extend traffic = InboundBytes + OutboundBytes
| summarize deviation = stdev(traffic) by IP
NTAIpDetails 查询
本部分提供 NTAIpDetails 表的示例查询,可用于分析流量分析数据中特定于 IP 的信息。 有关详细信息,请参阅 NTAIpDetails。
查看流类型和公共 IP 位置
使用以下查询了解流量分析数据中公共 IP 的流类型和位置。
NTAIpDetails
| distinct FlowType, PublicIpDetails, Location
查看恶意流类型
使用以下查询查看恶意流中的线程类型。
NTAIpDetails
| where TimeGenerated > ago(30d)
| where FlowType == "MaliciousFlow"
| summarize count() by ThreatType
| render piechart
AzureNetworkAnalytics_CL查询
本部分提供 AzureNetworkAnalytics_CL查询 表的示例查询,可用于分析流量分析 NSG 流日志数据。
列出与公共 IP 交互的所有子网
使用以下查询列出过去 30 天内与非 Azure 公共 IP 交互的所有子网。
AzureNetworkAnalytics_CL
| where SubType_s == "FlowLog" and FlowStartTime_t >= ago(30d) and FlowType_s == "ExternalPublic"
| project Subnet1_s, Subnet2_s
查看与公共 IP 交互的流的 Blob 路径
使用以下查询查看上一查询中流的 Blob 路径。
let TableWithBlobId =
(AzureNetworkAnalytics_CL
| where SubType_s == "Topology" and ResourceType == "NetworkSecurityGroup" and DiscoveryRegion_s == Region_s and IsFlowEnabled_b
| extend binTime = bin(TimeProcessed_t, 6h),
nsgId = strcat(Subscription_g, "/", Name_s),
saNameSplit = split(FlowLogStorageAccount_s, "/")
| extend saName = iif(arraylength(saNameSplit) == 3, saNameSplit[2], '')
| distinct nsgId, saName, binTime)
| join kind = rightouter (
AzureNetworkAnalytics_CL
| where SubType_s == "FlowLog"
| extend binTime = bin(FlowEndTime_t, 6h)
) on binTime, $left.nsgId == $right.NSGList_s
| extend blobTime = format_datetime(todatetime(FlowIntervalStartTime_t), "yyyy MM dd hh")
| extend nsgComponents = split(toupper(NSGList_s), "/"), dateTimeComponents = split(blobTime, " ")
| extend BlobPath = strcat("https://", saName,
"@insights-logs-networksecuritygroupflowevent/resoureId=/SUBSCRIPTIONS/", nsgComponents[0],
"/RESOURCEGROUPS/", nsgComponents[1],
"/PROVIDERS/MICROSOFT.NETWORK/NETWORKSECURITYGROUPS/", nsgComponents[2],
"/y=", dateTimeComponents[0], "/m=", dateTimeComponents[1], "/d=", dateTimeComponents[2], "/h=", dateTimeComponents[3],
"/m=00/macAddress=", replace(@"-", "", MACAddress_s),
"/PT1H.json")
| project-away nsgId, saName, binTime, blobTime, nsgComponents, dateTimeComponents;
TableWithBlobId
| where SubType_s == "FlowLog" and FlowStartTime_t >= ago(30d) and FlowType_s == "ExternalPublic"
| project Subnet_s , BlobPath
前面的查询构造一个 URL 来直接访问 Blob,如下所示:
https://{storageAccountName}@insights-logs-networksecuritygroupflowevent/resoureId=/SUBSCRIPTIONS/{subscriptionId}/RESOURCEGROUPS/{resourceGroup}/PROVIDERS/MICROSOFT.NETWORK/NETWORKSECURITYGROUPS/{networkSecurityGroupName}/y={year}/m={month}/d={day}/h={hour}/m=00/macAddress={macAddress}/PT1H.json
列出接收大多数本地流量的虚拟机
使用以下查询检查哪些虚拟机正在接收大多数本地流量。
AzureNetworkAnalytics_CL
| where SubType_s == "FlowLog" and FlowType_s == "S2S"
| where <Scoping condition>
| mvexpand vm = pack_array(VM1_s, VM2_s) to typeof(string)
| where isnotempty(vm)
| extend traffic = AllowedInFlows_d + DeniedInFlows_d + AllowedOutFlows_d + DeniedOutFlows_d // For bytes use: | extend traffic = InboundBytes_d + OutboundBytes_d
| make-series TotalTraffic = sum(traffic) default = 0 on FlowStartTime_t from datetime(<time>) to datetime(<time>) step 1 m by vm
| render timechart
列出接收大多数本地流量的 IP
使用以下查询检查哪些 IP 正在接收大多数本地流量。
AzureNetworkAnalytics_CL
| where SubType_s == "FlowLog" and FlowType_s == "S2S"
//| where <Scoping condition>
| mvexpand IP = pack_array(SrcIP_s, DestIP_s) to typeof(string)
| where isnotempty(IP)
| extend traffic = AllowedInFlows_d + DeniedInFlows_d + AllowedOutFlows_d + DeniedOutFlows_d // For bytes use: | extend traffic = InboundBytes_d + OutboundBytes_d
| make-series TotalTraffic = sum(traffic) default = 0 on FlowStartTime_t from datetime(<time>) to datetime(<time>) step 1 m by IP
| render timechart
检查虚拟机收到的流量中的标准偏差
使用以下查询检查虚拟机从本地计算机收到的流量的标准偏差。
AzureNetworkAnalytics_CL
| where SubType_s == "FlowLog" and FlowType_s == "S2S"
//| where <Scoping condition>
| mvexpand vm = pack_array(VM1_s, VM2_s) to typeof(string)
| where isnotempty(vm)
| extend traffic = AllowedInFlows_d + DeniedInFlows_d + AllowedOutFlows_d + DeniedOutFlows_d // For bytes use: | extend traffic = InboundBytes_d + utboundBytes_d
| summarize deviation = stdev(traffic) by vm
检查 IP 收到的流量中的标准偏差
使用以下查询检查来自本地计算机的 IP 收到的流量的标准偏差。
AzureNetworkAnalytics_CL
| where SubType_s == "FlowLog" and FlowType_s == "S2S"
//| where <Scoping condition>
| mvexpand IP = pack_array(SrcIP_s, DestIP_s) to typeof(string)
| where isnotempty(IP)
| extend traffic = AllowedInFlows_d + DeniedInFlows_d + AllowedOutFlows_d + DeniedOutFlows_d // For bytes use: | extend traffic = InboundBytes_d + OutboundBytes_d
| summarize deviation = stdev(traffic) by IP
检查与 NSG 规则的 IP 对之间可访问或阻止哪些端口
使用以下查询检查 IP 对与 NSG 规则之间的哪些端口可访问(或被阻止)。
AzureNetworkAnalytics_CL
| where SubType_s == "FlowLog" and TimeGenerated between (startTime .. endTime)
| extend sourceIPs = iif(isempty(SrcIP_s), split(SrcPublicIPs_s," "), pack_array(SrcIP_s)),
destIPs = iif(isempty(DestIP_s), split(DestPublicIPs_s," "), pack_array(DestIP_s))
| mvexpand SourceIp = sourceIPs to typeof(string)
| mvexpand DestIp = destIPs to typeof(string)
| project SourceIp = tostring(split(SourceIp, "|")[0]), DestIp = tostring(split(DestIp, "|")[0]), NSGList_s, NSGRule_s, DestPort_d, L4Protocol_s, FlowStatus_s
| summarize DestPorts= makeset(DestPort_d) by SourceIp, DestIp, NSGList_s, NSGRule_s, L4Protocol_s, FlowStatus_s
防止重复记录
如果在连接的两端启用了流日志记录,则可以在多个设备上捕获流。 因此,如果所有流日志都聚合在同一 Log Analytics 工作区中,则可能会出现重复数据。 必须包括 FlowDirection
或 MACAddress
防止重复并区分记录。
在流/连接中:
-
MacAddress
表示正在捕获流的设备的 MAC。 -
SrcIp
表示从中启动连接的设备的 IP 地址。 -
DestIp
表示连接到的设备 IP 地址。 -
FlowDirection
表示与设备相关的连接方向。 例如,当从 VM1 (IP:10.0.0.4
和 MAC:A1:B1:C1:D1:E1:F1
)到 VM2 (IP:10.0.0.5
和 MAC:A2:B2:C2:D2:E2:F2
)建立连接时,如果流在 VM1FlowDirection
Outbound
上捕获,则此流的流是;如果流在 VM2 中捕获,则FlowDirection
此流为Inbound
流。 -
BytesSrcToDest
/PacketsSrcToDest
表示从源发送到目标的字节或数据包,而不考虑捕获它们的位置。 -
BytesDestToSrc
/PacketsDestToSrc
表示从目标发送到源的字节或数据包,而不考虑这些字节或数据包的捕获位置。
例如,如果使用以下字段从 VM1 连接到 VM2 。
VM | SrcIp | DestIp | MAC | BytesSrcToDest | BytesDestToSrc | FlowDirection |
---|---|---|---|---|---|---|
VM1 | 10.0.0.4 | 10.0.0.5 | A1-B1-C1-D1-E1-F1 | 100 | 200 | 出站 |
VM2 | 10.0.0.4 | 10.0.0.5 | A2-B2-C2-D2-E2-F2 | 100 | 200 | 入站 |
对于此设备启动的连接,可以使用以下任何查询来计算具有 IP 地址和 MAC 地址10.0.0.4
A1:B1:C1:D1:E1:F1
的设备的总出站字节数。
NTANetAnalytics
| where SubType == "FlowLog"
| where SrcIp == "10.0.0.4" and MacAddress == "A1:B1:C1:D1:E1:F1" and FlowDirection == "Outbound"
| summarize totalIniBytes = sum(BytesSrcToDest);
NTANetAnalytics
| where SubType == "FlowLog"
| where SrcIp == "10.0.0.4" and FlowDirection == "Outbound"
| summarize totalIniBytes = sum(BytesSrcToDest);
NTANetAnalytics
| where SubType == "FlowLog"
| where SrcIp == "10.0.0.4" and MacAddress == "A1:B1:C1:D1:E1:F1"
| summarize totalIniBytes = sum(BytesSrcToDest);
同样,可以使用以下任何查询计算具有 IP 地址和 MAC 地址10.0.0.4
A1:B1:C1:D1:E1:F1
的设备的总出站字节数,以便其他设备启动到此设备的连接。
NTANetAnalytics
| where DestIp == "10.0.0.4" and MacAddress == "A1:B1:C1:D1:E1:F1" and FlowDirection == "Inbound"
| summarize totalNoniniBytes = sum(BytesDestToSrc)
NTANetAnalytics
| where DestIp == "10.0.0.4" and FlowDirection == "Inbound"
| summarize totalNoniniBytes = sum(BytesDestToSrc)
NTANetAnalytics
| where DestIp == "10.0.0.4" and MacAddress == "A1:B1:C1:D1:E1:F1"
| summarize totalNoniniBytes = sum(BytesDestToSrc)
可以使用以下查询计算设备的总出站字节数:
let InitiatedByVM = NTANetAnalytics
| where SubType == "FlowLog"
| where SrcIp == "10.0.0.4" and MacAddress == "A1:B1:C1:D1:E1:F1" and FlowDirection == "Outbound"
| summarize totalIniBytes = sum(BytesSrcToDest);
let NotInitiatedByVM = NTANetAnalytics
| where DestIp == "10.0.0.4" and MacAddress == "A1:B1:C1:D1:E1:F1" and FlowDirection == "Inbound"
| summarize totalNoniniBytes = sum(BytesDestToSrc);
InitiatedByVM
| join kind=fullouter NotInitiatedByVM on FlowEndTime
| extend Time = iff(isnotnull(FlowEndTime), FlowEndTime, FlowEndTime1)
| summarize totalMB = (sum(totalIniBytes) + sum(totalNoniniBytes)) / 1024.0 /1024.0;