监视流式处理应用程序的性能、成本和运行状况对于构建可靠高效的 ETL 管道至关重要。 Azure Databricks 跨作业、Lakeflow 声明性管道和 Lakeflow Connect 提供了一组丰富的可观测性功能,以帮助诊断瓶颈、优化性能以及管理资源使用情况和成本。
本文概述了以下方面的最佳做法:
关键流式处理性能指标
事件日志架构和示例查询
流式查询监控
将日志和指标导出到外部工具
流媒体可观测性的关键指标
运行流式处理管道时,监视以下关键指标:
指标 | 目的 |
---|---|
反压 | 监视文件数量及偏移量大小。 帮助识别瓶颈并确保系统可以在不落后的情况下处理传入的数据。 |
吞吐量 | 跟踪每个微批处理的消息数。 评估管道效率并检查它是否与数据引入保持同步。 |
时长 | 测量微批处理的平均持续时间。 指示处理速度并帮助调整批处理间隔。 |
延迟 | 指示一段时间内处理了多少条记录/消息。 帮助了解端到端管道延迟,并针对较低的延迟进行优化。 |
群集利用率 | 反映 CPU 和内存使用情况(%)。 确保高效的资源使用,并帮助缩放群集以满足处理需求。 |
网络 | 测量传输和接收的数据。 用于识别网络瓶颈和提高数据传输性能。 |
检查点 | 识别已处理的数据和偏移量。 确保一致性并在存在故障时也能容错。 |
成本 | 显示流媒体应用的每小时、每日和每月成本。 有助于预算和资源优化。 |
血统 | 显示在流式处理应用程序中生成的数据集和图层。 促进数据转换、跟踪、质量保证和调试。 |
群集日志和指标
Azure Databricks 群集日志和指标提供有关群集性能和利用率的详细见解。 这些日志和指标包括有关 CPU、内存、磁盘 I/O、网络流量和其他系统指标的信息。 监视这些指标对于优化群集性能、高效管理资源以及排查问题至关重要。
Azure Databricks 群集日志和指标提供有关群集性能和资源利用率的详细见解。 其中包括 CPU 和内存使用情况、磁盘 I/O 和网络流量。 监视这些指标对于以下方面至关重要:
- 优化群集性能。
- 高效管理资源。
- 排查操作问题。
可以通过 Databricks UI 利用指标,也可以导出到个人监视工具。 请参阅 Notebook 示例:Datadog 指标。
Spark用户界面
Spark UI 显示有关作业和阶段进度的详细信息,包括已完成、挂起和失败的任务数。 这有助于了解执行流并确定瓶颈。
对于流式处理应用程序, “流式处理”选项卡 显示输入速率、处理速率和批处理持续时间等指标。 它有助于监视流式处理作业的性能,并确定任何数据引入或处理问题。
有关详细信息 ,请参阅使用 Apache Spark UI 进行调试 。
计算指标
计算指标将帮助你了解群集利用率。 运行作业时,可以看到它如何扩展以及资源受到的具体影响。 你将能够找到可能导致 OOM 故障的内存压力或可能导致长时间延迟的 CPU 压力。 下面是你将看到的特定指标:
- 服务器负载分发:过去一分钟内每个节点的 CPU 使用率。
- CPU 使用率:CPU 在各种模式下(例如用户、系统、空闲和等待I/O操作)花费的时间百分比。
- 内存利用率:每个模式的总内存使用量(例如,已用、可用、缓冲区和缓存)。
- 内存交换利用率:内存交换总使用量。
- 可用文件系统空间:每个装入点的文件系统总使用量。
- 网络吞吐量:每个设备通过网络接收和传输的字节数。
- 活动节点数:给定计算的每个时间戳的活动节点数。
Lakeflow 声明性管道
Lakeflow 声明性管道事件日志捕获所有管道事件的综合记录,包括:
- 审核日志。
- 数据质量检查。
- 管道进度。
- 数据世系。
为所有 Lakeflow 声明性管道自动启用事件日志,可通过以下方式访问:
- 管道 UI:直接查看日志。
- DLT API:编程访问。
- 直接查询:查询事件日志表。
有关详细信息,请参阅 Lakeflow 声明性管道的事件日志架构。
示例查询
这些示例查询通过提供关键指标(如批处理持续时间、吞吐量、回压和资源利用率)来帮助监视管道的性能和运行状况。
平均批处理持续时间
此查询计算管道处理的批处理的平均持续时间。
SELECT
(max_t - min_t) / batch_count as avg_batch_duration_seconds,
batch_count,
min_t,
max_t,
date_hr,
message
FROM
-- /60 for minutes
(
SELECT
count(*) as batch_count,
unix_timestamp(
min(timestamp)
) as min_t,
unix_timestamp(
max(timestamp)
) as max_t,
date_format(timestamp, 'yyyy-MM-dd:HH') as date_hr,
message
FROM
event_log
WHERE
event_type = 'flow_progress'
AND level = 'METRICS'
GROUP BY
date_hr,
message
)
ORDER BY
date_hr desc
平均吞吐量
此查询以每秒处理的行数计算管道的平均吞吐量。
SELECT
(max_t - min_t) / total_rows as avg_throughput_rps,
total_rows,
min_t,
max_t,
date_hr,
message
FROM
-- /60 for minutes
(
SELECT
sum(
details:flow_progress:metrics:num_output_rows
) as total_rows,
unix_timestamp(
min(timestamp)
) as min_t,
unix_timestamp(
max(timestamp)
) as max_t,
date_format(timestamp, 'yyyy-MM-dd:HH') as date_hr,
message
FROM
event_log
WHERE
event_type = 'flow_progress'
AND level = 'METRICS'
GROUP BY
date_hr,
message
)
ORDER BY
date_hr desc
反压力
此查询通过检查数据积压来度量管道的回压。
SELECT
timestamp,
DOUBLE(
details:flow_progress:metrics:backlog_bytes
) AS backlog_bytes,
DOUBLE(
details:flow_progress:metrics:backlog_files
) AS backlog_files
FROM
event_log
WHERE
event_type = 'flow_progress'
群集和槽利用率
此查询深入了解管道使用的群集或槽的利用率。
SELECT
date_trunc("hour", timestamp) AS hour,
AVG (
DOUBLE (
details:cluster_resources:num_task_slots
)
) AS num_task_slots,
AVG (
DOUBLE (
details:cluster_resources:avg_num_task_slots
)
) AS avg_num_task_slots,
AVG (
DOUBLE (
details:cluster_resources:num_executors
)
) AS num_executors,
AVG (
DOUBLE (
details:cluster_resources:avg_task_slot_utilization
)
) AS avg_utilization,
AVG (
DOUBLE (
details:cluster_resources:avg_num_queued_tasks
)
) AS queue_size
FROM
event_log
WHERE
details : cluster_resources : avg_num_queued_tasks IS NOT NULL
AND origin.update_id = '${latest_update_id}'
GROUP BY
1;
职位
可以通过流式处理查询侦听器监视作业中的流式处理查询。
将侦听器附加到 Spark 会话,以在 Azure Databricks 中启用流式查询侦听器。 此监听程序将跟踪您的流查询的进度和指标。 它可用于将指标推送到外部监视工具或记录指标以供进一步分析。
示例:将指标导出到外部监视工具
:::注意
这适用于 Python 和 Scala 的 Databricks Runtime 11.3 LTS 及更高版本。
:::
可以使用 StreamingQueryListener
接口将流式处理指标导出到外部服务,以进行报警或仪表板展示。
下面是有关如何实现侦听器的基本示例:
from pyspark.sql.streaming import StreamingQueryListener
class MyListener(StreamingQueryListener):
def onQueryStarted(self, event):
print("Query started: ", event.id)
def onQueryProgress(self, event):
print("Query made progress: ", event.progress)
def onQueryTerminated(self, event):
print("Query terminated: ", event.id)
spark.streams.addListener(MyListener())
示例:在 Azure Databricks 中使用查询侦听器
下面是 Kafka 到 Delta Lake 流式处理查询的 StreamingQueryListener 事件日志示例:
{
"id": "210f4746-7caa-4a51-bd08-87cabb45bdbe",
"runId": "42a2f990-c463-4a9c-9aae-95d6990e63f4",
"timestamp": "2024-05-15T21:57:50.782Z",
"batchId": 0,
"batchDuration": 3601,
"numInputRows": 20,
"inputRowsPerSecond": 0.0,
"processedRowsPerSecond": 5.55401277422938,
"durationMs": {
"addBatch": 1544,
"commitBatch": 686,
"commitOffsets": 27,
"getBatch": 12,
"latestOffset": 577,
"queryPlanning": 105,
"triggerExecution": 3600,
"walCommit": 34
},
"stateOperators": [
{
"operatorName": "symmetricHashJoin",
"numRowsTotal": 20,
"numRowsUpdated": 20,
"allUpdatesTimeMs": 473,
"numRowsRemoved": 0,
"allRemovalsTimeMs": 0,
"commitTimeMs": 277,
"memoryUsedBytes": 13120,
"numRowsDroppedByWatermark": 0,
"numShufflePartitions": 5,
"numStateStoreInstances": 20,
"customMetrics": {
"loadedMapCacheHitCount": 0,
"loadedMapCacheMissCount": 0,
"stateOnCurrentVersionSizeBytes": 5280
}
}
],
"sources": [
{
"description": "KafkaV2[Subscribe[topic-1]]",
"numInputRows": 10,
"inputRowsPerSecond": 0.0,
"processedRowsPerSecond": 2.77700638711469,
"metrics": {
"avgOffsetsBehindLatest": "0.0",
"estimatedTotalBytesBehindLatest": "0.0",
"maxOffsetsBehindLatest": "0",
"minOffsetsBehindLatest": "0"
}
},
{
"description": "DeltaSource[file:/tmp/spark-1b7cb042-bab8-4469-bb2f-733c15141081]",
"numInputRows": 10,
"inputRowsPerSecond": 0.0,
"processedRowsPerSecond": 2.77700638711469,
"metrics": {
"numBytesOutstanding": "0",
"numFilesOutstanding": "0"
}
}
]
}
有关更多示例,请参阅: 示例。
查询进度指标
查询进度指标对于监视流式处理查询的性能和运行状况至关重要。 这些指标包括输入行数、处理速率以及与查询执行相关的各种持续时间。 可以通过将 StreamingQueryListener
附加到 Spark 会话来观察这些指标。 侦听器将在每个流式处理时期结束时发出包含这些指标的事件。
例如,可以使用侦听器StreamingQueryProgress.observedMetrics
映射访问onQueryProgress
方法中的指标。 这样,就可以实时跟踪和分析流式处理查询的性能。
class MyListener(StreamingQueryListener):
def onQueryProgress(self, event):
print("Query made progress: ", event.progress.observedMetrics)