Azure Monitor 日志查询示例Azure Monitor log query examples

本文包含使用 Kusto 查询语言从 Azure Monitor 中检索不同类型的日志数据的各种查询示例。This article includes various examples of queries using the Kusto query language to retrieve different types of log data from Azure Monitor. 其中使用了不同的方法来合并和分析数据,因此,你可以使用这些示例来识别符合自身要求的不同策略。Different methods are used to consolidate and analyze data, so you can use these samples to identify different strategies that you might use for your own requirements.

有关这些示例中使用的不同关键字的详细信息,请参阅 Kusto 语言参考See the Kusto language reference for details on the different keywords used in these samples. 如果你是初次接触 Azure Monitor,请仔细阅读有关创建查询的课程Go through a lesson on creating queries if you're new to Azure Monitor.

事件Events

搜索描述为“加密”的应用程序级事件Search application-level events described as "Cryptographic"

此示例在 Events 表中搜索 EventLogApplication 并且 RenderedDescription 包含 cryptographic 的记录。This example searches the Events table for records in which EventLog is Application and RenderedDescription contains cryptographic. 包括过去 24 小时的记录。Includes records from the last 24 hours.

Event
| where EventLog == "Application" 
| where TimeGenerated > ago(24h) 
| where RenderedDescription contains "cryptographic"

在 Event 和 SecurityEvents 表中搜索提到了 unmashaling 的记录 。Search tables Event and SecurityEvents for records that mention unmarshaling.

search in (Event, SecurityEvent) "unmarshaling"

检测信号Heartbeat

绘制包含发送数据的计算机数的每周视图Chart a week-over-week view of the number of computers sending data

以下示例绘制每周发送检测信号的非重复计算机数的图表。The following example charts the number of distinct computers that sent heartbeats, each week.

Heartbeat
| where TimeGenerated >= startofweek(ago(21d))
| summarize dcount(Computer) by endofweek(TimeGenerated) | render barchart kind=default

查找陈旧的计算机Find stale computers

以下示例查找在前一日处于活动状态,但过去一小时未发送检测信号的计算机。The following example finds computers that were active in the last day but did not send heartbeats in the last hour.

Heartbeat
| where TimeGenerated > ago(1d)
| summarize LastHeartbeat = max(TimeGenerated) by Computer
| where isnotempty(Computer)
| where LastHeartbeat < ago(1h)

获取每个计算机 IP 的最新检测信号记录Get the latest heartbeat record per computer IP

此示例返回每个计算机 IP 的最后一条检测信号记录。This example returns the last heartbeat record for each computer IP.

Heartbeat
| summarize arg_max(TimeGenerated, *) by ComputerIP

将受保护状态记录与检测信号记录相匹配Match protected status records with heartbeat records

此示例查找按“计算机”和时间匹配的相关保护状态记录和检测信号记录。This example finds related protection status records and heartbeat records, matched on both Computer and time. 请注意,时间字段将舍入为最接近的分钟。Note the time field is rounded to the nearest minute. 我们已使用运行时 bin 计算来实现此目的:round_time=bin(TimeGenerated, 1m)We used runtime bin calculation to do that: round_time=bin(TimeGenerated, 1m).

let protection_data = ProtectionStatus
    | project Computer, DetectionId, round_time=bin(TimeGenerated, 1m);
let heartbeat_data = Heartbeat
    | project Computer, Category, round_time=bin(TimeGenerated, 1m);
protection_data | join (heartbeat_data) on Computer, round_time

服务器可用率Server availability rate

基于检测信号记录计算服务器可用率。Calculate server availability rate based on heartbeat records. 可用性定义为“每小时至少发送了 1 个检测信号”。Availability is defined as "at least 1 heartbeat per hour". 因此,如果某台服务器在 100 个小时中有 98 个小时保持可用状态,则可用率为 98%。So, if a server was available 98 of 100 hours, the availability rate is 98%.

let start_time=startofday(datetime("2018-03-01"));
let end_time=now();
Heartbeat
| where TimeGenerated > start_time and TimeGenerated < end_time
| summarize heartbeat_per_hour=count() by bin_at(TimeGenerated, 1h, start_time), Computer
| extend available_per_hour=iff(heartbeat_per_hour>0, true, false)
| summarize total_available_hours=countif(available_per_hour==true) by Computer 
| extend total_number_of_buckets=round((end_time-start_time)/1h)+1
| extend availability_rate=total_available_hours*100/total_number_of_buckets

多个数据类型Multiple data types

绘制每个表的记录计数图表Chart the record-count per table

以下示例收集过去五个小时内所有表的所有记录,并统计每个表中的记录数。The following example collects all records of all tables from the last five hours and counts how many records were in each table. 结果将在时间表中显示。The results are shown in a timechart.

union withsource=sourceTable *
| where TimeGenerated > ago(5h) 
| summarize count() by bin(TimeGenerated,10m), sourceTable
| render timechart

按类型统计过去一小时收集的所有日志Count all logs collected over the last hour by type

以下示例搜索过去一小时内报告的任何内容,并按“类型”统计每个表的记录数。 The following example searches everything reported in the last hour and counts the records of each table by Type. 结果将在条形图中显示。The results are displayed in a bar chart.

search *
| where TimeGenerated > ago(1h) 
| summarize CountOfRecords = count() by Type
| render barchart

AzureDiagnosticsAzureDiagnostics

按类别统计 Azure 诊断记录数Count Azure diagnostics records per category

此示例统计每个唯一类别的所有 Azure 诊断记录数。This example counts all Azure diagnostics records for each unique category.

AzureDiagnostics 
| where TimeGenerated > ago(1d)
| summarize count() by Category

获取每个唯一类别的随机记录Get a random record for each unique category

此示例获取每个唯一类别的单个随机 Azure 诊断记录。This example gets a single random Azure diagnostics record for each unique category.

AzureDiagnostics
| where TimeGenerated > ago(1d) 
| summarize any(*) by Category

获取每个类别的最新记录Get the latest record per category

此示例获取每个唯一类别中的最新 Azure 诊断记录。This example gets the latest Azure diagnostics record in each unique category.

AzureDiagnostics
| where TimeGenerated > ago(1d) 
| summarize arg_max(TimeGenerated, *) by Category

网络监视Network monitoring

延迟不正常的计算机Computers with unhealthy latency

此示例创建延迟不正常的非重复计算机列表。This example creates a list of distinct computers with unhealthy latency.

NetworkMonitoring 
| where LatencyHealthState <> "Healthy" 
| where Computer != "" 
| distinct Computer

性能Performance

联接计算机 perf 记录以关联内存和 CPUJoin computer perf records to correlate memory and CPU

此示例关联特定计算机的 perf 记录,并创建包含平均 CPU 和最大内存的两个时间图表。This example correlates a particular computer's perf records and creates two time charts, the average CPU and maximum memory.

let StartTime = now()-5d;
let EndTime = now()-4d;
Perf
| where CounterName == "% Processor Time"  
| where TimeGenerated > StartTime and TimeGenerated < EndTime
| project TimeGenerated, Computer, cpu=CounterValue 
| join kind= inner (
   Perf
    | where CounterName == "% Used Memory"  
    | where TimeGenerated > StartTime and TimeGenerated < EndTime
    | project TimeGenerated , Computer, mem=CounterValue 
) on TimeGenerated, Computer
| summarize avgCpu=avg(cpu), maxMem=max(mem) by TimeGenerated bin=30m  
| render timechart

每台计算机的 Perf CPU 利用率图形Perf CPU Utilization graph per computer

此示例计算以 Contoso 开头的计算机的 CPU 利用率并绘制图表。This example calculates and charts the CPU utilization of computers that start with Contoso.

Perf
| where TimeGenerated > ago(4h)
| where Computer startswith "Contoso" 
| where CounterName == @"% Processor Time"
| summarize avg(CounterValue) by Computer, bin(TimeGenerated, 15m) 
| render timechart

保护状态Protection status

处于“未报告”保护状态的计算机以及此状态的持续时间Computers with non-reporting protection status duration

此示例列出保护状态为 Not Reporting 的计算机,及其处于此状态的持续时间。This example lists computers that had a protection status of Not Reporting and the duration they were in this status.

ProtectionStatus
| where ProtectionStatus == "Not Reporting"
| summarize count(), startNotReporting = min(TimeGenerated), endNotReporting = max(TimeGenerated) by Computer, ProtectionStatusDetails
| join ProtectionStatus on Computer
| summarize lastReporting = max(TimeGenerated), startNotReporting = any(startNotReporting), endNotReporting = any(endNotReporting) by Computer
| extend durationNotReporting = endNotReporting - startNotReporting

将受保护状态记录与检测信号记录相匹配Match protected status records with heartbeat records

此示例查找按“计算机”和时间匹配的相关保护状态记录和检测信号记录。This example finds related protection status records and heartbeat records matched on both Computer and time. 时间字段将使用 bin 舍入为最接近的分钟。The time field is rounded to the nearest minute using bin.

let protection_data = ProtectionStatus
    | project Computer, DetectionId, round_time=bin(TimeGenerated, 1m);
let heartbeat_data = Heartbeat
    | project Computer, Category, round_time=bin(TimeGenerated, 1m);
protection_data | join (heartbeat_data) on Computer, round_time

安全记录Security records

按活动 ID 统计安全事件数Count security events by activity ID

此示例依赖于 Activity 列的固定结构:<ID>-<Name>。This example relies on the fixed structure of the Activity column: <ID>-<Name>. 它将 Activity 值分析为两个新列,并统计每个 activityID 的出现次数。It parses the Activity value into two new columns, and counts the occurrence of each activityID.

SecurityEvent
| where TimeGenerated > ago(30m) 
| project Activity 
| parse Activity with activityID " - " activityDesc
| summarize count() by activityID

此示例中显示 securityEvent 记录的数目,这些记录中的 Activity 列中包含整个单词 PermissionsThis example shows the number of securityEvent records, in which the Activity column contains the whole term Permissions. 查询将应用到在过去 30 分钟内创建的记录。The query applies to records created over the last 30 minutes.

SecurityEvent
| where TimeGenerated > ago(30m)
| summarize EventCount = countif(Activity has "Permissions")

查找无法从启用了安全检测的计算机登录的帐户Find accounts that failed to log in from computers with a security detection

此示例查找无法从启用了安全检测的计算机登录的帐户,并统计其数目。This example finds and counts accounts that failed to log in from computers on which we identify a security detection.

let detections = toscalar(SecurityDetection
| summarize makeset(Computer));
SecurityEvent
| where Computer in (detections) and EventID == 4624
| summarize count() by Account

我的安全数据是否可用?Is my security data available?

数据探索通常是从数据可用性检查开始的。Starting data exploration often starts with data availability check. 此示例显示过去 30 分钟内的 SecurityEvent 记录数。This example shows the number of SecurityEvent records in the last 30 minutes.

SecurityEvent 
| where TimeGenerated  > ago(30m) 
| count

分析活动名称和 IDParse activity name and ID

以下两个示例依赖于 Activity 列的固定结构:<ID>-<Name>。The two examples below rely on the fixed structure of the Activity column: <ID>-<Name>. 第一个示例使用 parse 运算符将值分配给两个新列:activityIDactivityDescThe first example uses the parse operator to assign values to two new columns: activityID and activityDesc.

SecurityEvent
| take 100
| project Activity 
| parse Activity with activityID " - " activityDesc

此示例使用 split 运算符创建不同值的数组This example uses the split operator to create an array of separate values

SecurityEvent
| take 100
| project Activity 
| extend activityArr=split(Activity, " - ") 
| project Activity , activityArr, activityId=activityArr[0]

显式凭据进程Explicit credentials processes

以下示例显示过去一周内使用了显式凭据的进程的饼图The following example shows a pie chart of processes that used explicit credentials in the last week

SecurityEvent
| where TimeGenerated > ago(7d)
// filter by id 4648 ("A logon was attempted using explicit credentials")
| where EventID == 4648
| summarize count() by Process
| render piechart 

最常见的运行中进程Top running processes

以下示例显示过去三天内五个最常见进程的活动时间线。The following example shows a time line of activity for the five most common processes, over the last three days.

// Find all processes that started in the last three days. ID 4688: A new process has been created.
let RunProcesses = 
    SecurityEvent
    | where TimeGenerated > ago(3d)
    | where EventID == "4688";
// Find the 5 processes that were run the most
let Top5Processes =
RunProcesses
| summarize count() by Process
| top 5 by count_;
// Create a time chart of these 5 processes - hour by hour
RunProcesses 
| where Process in (Top5Processes) 
| summarize count() by bin (TimeGenerated, 1h), Process
| render timechart

查找同一帐户从不同的 IP 重复登录但失败的尝试Find repeating failed login attempts by the same account from different IPs

以下示例查找过去六个小时内同一帐户从不同的 IP 重复登录但失败的尝试。The following example finds failed login attempts by the same account from more than five different IPs in the last six hours.

SecurityEvent 
| where AccountType == "User" and EventID == 4625 and TimeGenerated > ago(6h) 
| summarize IPCount = dcount(IpAddress), makeset(IpAddress)  by Account
| where IPCount > 5
| sort by IPCount desc

查找无法登录的用户帐户Find user accounts that failed to log in

以下示例识别过去一天登录失败超过五次的用户帐户,及其上次尝试登录的时间。The following example identifies user accounts that failed to log in more than five times in the last day, and when they last attempted to log in.

let timeframe = 1d;
SecurityEvent
| where TimeGenerated > ago(1d)
| where AccountType == 'User' and EventID == 4625 // 4625 - failed log in
| summarize failed_login_attempts=count(), latest_failed_login=arg_max(TimeGenerated, Account) by Account 
| where failed_login_attempts > 5
| project-away Account1

使用 joinlet 语句可以检查相同的可疑帐户后来是否能够成功登录。Using join, and let statements we can check if the same suspicious accounts were later able to log in successfully.

let timeframe = 1d;
let suspicious_users = 
    SecurityEvent
    | where TimeGenerated > ago(timeframe)
    | where AccountType == 'User' and EventID == 4625 // 4625 - failed login
    | summarize failed_login_attempts=count(), latest_failed_login=arg_max(TimeGenerated, Account) by Account 
    | where failed_login_attempts > 5
    | project-away Account1;
let suspicious_users_that_later_logged_in = 
    suspicious_users 
    | join kind=innerunique (
        SecurityEvent
        | where TimeGenerated > ago(timeframe)
        | where AccountType == 'User' and EventID == 4624 // 4624 - successful login,
        | summarize latest_successful_login=arg_max(TimeGenerated, Account) by Account
    ) on Account
    | extend was_login_after_failures = iif(latest_successful_login>latest_failed_login, 1, 0)
    | where was_login_after_failures == 1
;
suspicious_users_that_later_logged_in

使用情况Usage

Usage 数据类型可用于按解决方案或数据类型跟踪引入数据量。The Usage data type can be used to track the ingested data volume by solution or data type. 还可以使用其他方法来研究按计算机Azure 订阅、资源组或资源引入的数据量。There are other techniques to study ingested data volumes by computer or Azure subscription, resource group or resource.

按解决方案统计的数据量Data volume by solution

用于按解决方案查看上个月(不包括最后不完整的一天)的计费数据量的查询是:The query used to view the billable data volume by solution over the last month (excluding the last partial day) is:

Usage 
| where TimeGenerated > ago(32d)
| where StartTime >= startofday(ago(31d)) and EndTime < startofday(now())
| where IsBillable == true
| summarize BillableDataGB = sum(Quantity) / 1000. by bin(StartTime, 1d), Solution | render barchart

请注意,子句 where IsBillable = true 从某些解决方案中筛选掉没有引入费用的数据类型。Note that the clause where IsBillable = true filters out data types from certain solutions for which there is no ingestion charge. 另外,带有 TimeGenerated 的子句仅用于确保 Azure 门户中的查询体验的回溯范围会超出默认的 24 小时。Also the clause with TimeGenerated is only to ensure that the query experience in the Azure portal will look back beyond the default 24 hours. 使用“使用情况”数据类型时,StartTimeEndTime 表示显示结果的时间存储桶。When using the Usage data type, StartTime and EndTime represent the time buckets for which results are presented.

按类型的数据量Data volume by type

可以进一步钻取,按数据类型查看数据趋势:You can drill in further to see data trends for by data type:

Usage 
| where TimeGenerated > ago(32d)
| where StartTime >= startofday(ago(31d)) and EndTime < startofday(now())
| where IsBillable == true
| summarize BillableDataGB = sum(Quantity) / 1000. by bin(StartTime, 1d), DataType | render barchart

或者按解决方案和类型查看上个月的表,Or to see a table by solution and type for the last month,

Usage 
| where TimeGenerated > ago(32d)
| where StartTime >= startofday(ago(31d)) and EndTime < startofday(now())
| where IsBillable == true
| summarize BillableDataGB = sum(Quantity) / 1000. by Solution, DataType
| sort by Solution asc, DataType asc

备注

使用情况数据类型的某些字段虽然仍在架构中,但已弃用,其值将不再填充。Some of the fields of the Usage data type, while still in the schema, have been deprecated and will their values are no longer populated. 这些是计算机以及与引入相关的字段(TotalBatchesBatchesWithinSlaBatchesOutsideSlaBatchesCappedAverageProcessingTimeMs)。These are Computer as well as fields related to ingestion (TotalBatches, BatchesWithinSla, BatchesOutsideSla, BatchesCapped and AverageProcessingTimeMs.

更新Updates

仍缺少更新的计算机Computers Still Missing Updates

此示例显示几天前缺少一个或多个关键更新并且当前仍然缺少更新的计算机列表。This example shows a list of computers that were missing one or more critical updates a few days ago and are still missing updates.

let ComputersMissingUpdates3DaysAgo = Update
| where TimeGenerated between (ago(30d)..ago(1h))
| where Classification !has "Critical" and UpdateState =~ "Needed"
| summarize makeset(Computer);
Update
| where TimeGenerated > ago(1d)
| where Classification has "Critical" and UpdateState =~ "Needed"
| where Computer in (ComputersMissingUpdates3DaysAgo)
| summarize UniqueUpdatesCount = dcount(Product) by Computer, OSType

后续步骤Next steps