Azure Databricks 中适用于作业、Lakeflow 声明式管道和 Lakeflow Connect 的可观测性

监视流式处理应用程序的性能、成本和运行状况对于构建可靠高效的 ETL 管道至关重要。 Azure Databricks 跨作业、Lakeflow 声明性管道和 Lakeflow Connect 提供了一组丰富的可观测性功能,以帮助诊断瓶颈、优化性能以及管理资源使用情况和成本。

本文概述了以下方面的最佳做法:

  • 关键流式处理性能指标

  • 事件日志架构和示例查询

  • 流式查询监控

  • 将日志和指标导出到外部工具

流媒体可观测性的关键指标

运行流式处理管道时,监视以下关键指标:

指标 目的
反压 监视文件数量及偏移量大小。 帮助识别瓶颈并确保系统可以在不落后的情况下处理传入的数据。
吞吐量 跟踪每个微批处理的消息数。 评估管道效率并检查它是否与数据引入保持同步。
时长 测量微批处理的平均持续时间。 指示处理速度并帮助调整批处理间隔。
延迟 指示一段时间内处理了多少条记录/消息。 帮助了解端到端管道延迟,并针对较低的延迟进行优化。
群集利用率 反映 CPU 和内存使用情况(%)。 确保高效的资源使用,并帮助缩放群集以满足处理需求。
网络 测量传输和接收的数据。 用于识别网络瓶颈和提高数据传输性能。
检查点 识别已处理的数据和偏移量。 确保一致性并在存在故障时也能容错。
成本 显示流媒体应用的每小时、每日和每月成本。 有助于预算和资源优化。
血统 显示在流式处理应用程序中生成的数据集和图层。 促进数据转换、跟踪、质量保证和调试。

群集日志和指标

Azure Databricks 群集日志和指标提供有关群集性能和利用率的详细见解。 这些日志和指标包括有关 CPU、内存、磁盘 I/O、网络流量和其他系统指标的信息。 监视这些指标对于优化群集性能、高效管理资源以及排查问题至关重要。

Azure Databricks 群集日志和指标提供有关群集性能和资源利用率的详细见解。 其中包括 CPU 和内存使用情况、磁盘 I/O 和网络流量。 监视这些指标对于以下方面至关重要:

  • 优化群集性能。
  • 高效管理资源。
  • 排查操作问题。

可以通过 Databricks UI 利用指标,也可以导出到个人监视工具。 请参阅 Notebook 示例:Datadog 指标

Spark用户界面

Spark UI 显示有关作业和阶段进度的详细信息,包括已完成、挂起和失败的任务数。 这有助于了解执行流并确定瓶颈。

对于流式处理应用程序, “流式处理”选项卡 显示输入速率、处理速率和批处理持续时间等指标。 它有助于监视流式处理作业的性能,并确定任何数据引入或处理问题。

有关详细信息 ,请参阅使用 Apache Spark UI 进行调试

计算指标

计算指标将帮助你了解群集利用率。 运行作业时,可以看到它如何扩展以及资源受到的具体影响。 你将能够找到可能导致 OOM 故障的内存压力或可能导致长时间延迟的 CPU 压力。 下面是你将看到的特定指标:

  • 服务器负载分发:过去一分钟内每个节点的 CPU 使用率。
  • CPU 使用率:CPU 在各种模式下(例如用户、系统、空闲和等待I/O操作)花费的时间百分比。
  • 内存利用率:每个模式的总内存使用量(例如,已用、可用、缓冲区和缓存)。
  • 内存交换利用率:内存交换总使用量。
  • 可用文件系统空间:每个装入点的文件系统总使用量。
  • 网络吞吐量:每个设备通过网络接收和传输的字节数。
  • 活动节点数:给定计算的每个时间戳的活动节点数。

有关详细信息,请参阅“监视性能和硬件指标”图表

Lakeflow 声明性管道

Lakeflow 声明性管道事件日志捕获所有管道事件的综合记录,包括:

  • 审核日志。
  • 数据质量检查。
  • 管道进度。
  • 数据世系。

为所有 Lakeflow 声明性管道自动启用事件日志,可通过以下方式访问:

  • 管道 UI:直接查看日志。
  • DLT API:编程访问。
  • 直接查询:查询事件日志表。

有关详细信息,请参阅 Lakeflow 声明性管道的事件日志架构

示例查询

这些示例查询通过提供关键指标(如批处理持续时间、吞吐量、回压和资源利用率)来帮助监视管道的性能和运行状况。

平均批处理持续时间

此查询计算管道处理的批处理的平均持续时间。

SELECT
  (max_t - min_t) / batch_count as avg_batch_duration_seconds,
  batch_count,
  min_t,
  max_t,
  date_hr,
  message
FROM
  -- /60 for minutes
  (
    SELECT
      count(*) as batch_count,
      unix_timestamp(
        min(timestamp)
      ) as min_t,
      unix_timestamp(
        max(timestamp)
      ) as max_t,
      date_format(timestamp, 'yyyy-MM-dd:HH') as date_hr,
      message
    FROM
      event_log
    WHERE
      event_type = 'flow_progress'
      AND level = 'METRICS'
    GROUP BY
      date_hr,
      message
  )
ORDER BY
  date_hr desc

平均吞吐量

此查询以每秒处理的行数计算管道的平均吞吐量。

SELECT
  (max_t - min_t) / total_rows as avg_throughput_rps,
  total_rows,
  min_t,
  max_t,
  date_hr,
  message
FROM
  -- /60 for minutes
  (
    SELECT
      sum(
        details:flow_progress:metrics:num_output_rows
      ) as total_rows,
      unix_timestamp(
        min(timestamp)
      ) as min_t,
      unix_timestamp(
        max(timestamp)
      ) as max_t,
      date_format(timestamp, 'yyyy-MM-dd:HH') as date_hr,
      message
    FROM
      event_log
    WHERE
      event_type = 'flow_progress'
      AND level = 'METRICS'
    GROUP BY
      date_hr,
      message
  )
ORDER BY
  date_hr desc

反压力

此查询通过检查数据积压来度量管道的回压。

SELECT
  timestamp,
  DOUBLE(
    details:flow_progress:metrics:backlog_bytes
  ) AS backlog_bytes,
  DOUBLE(
    details:flow_progress:metrics:backlog_files
  ) AS backlog_files
FROM
  event_log
WHERE
  event_type = 'flow_progress'

群集和槽利用率

此查询深入了解管道使用的群集或槽的利用率。

SELECT
  date_trunc("hour", timestamp) AS hour,
  AVG (
    DOUBLE (
      details:cluster_resources:num_task_slots
    )
  ) AS num_task_slots,
  AVG (
    DOUBLE (
      details:cluster_resources:avg_num_task_slots
    )
  ) AS avg_num_task_slots,
  AVG (
    DOUBLE (
      details:cluster_resources:num_executors
    )
  ) AS num_executors,
  AVG (
    DOUBLE (
      details:cluster_resources:avg_task_slot_utilization
    )
  ) AS avg_utilization,
  AVG (
    DOUBLE (
      details:cluster_resources:avg_num_queued_tasks
    )
  ) AS queue_size
FROM
  event_log
WHERE
  details : cluster_resources : avg_num_queued_tasks IS NOT NULL
  AND origin.update_id = '${latest_update_id}'
GROUP BY
  1;

职位

可以通过流式处理查询侦听器监视作业中的流式处理查询。

将侦听器附加到 Spark 会话,以在 Azure Databricks 中启用流式查询侦听器。 此监听程序将跟踪您的流查询的进度和指标。 它可用于将指标推送到外部监视工具或记录指标以供进一步分析。

示例:将指标导出到外部监视工具

:::注意

这适用于 Python 和 Scala 的 Databricks Runtime 11.3 LTS 及更高版本。

:::

可以使用 StreamingQueryListener 接口将流式处理指标导出到外部服务,以进行报警或仪表板展示。

下面是有关如何实现侦听器的基本示例:

from pyspark.sql.streaming import StreamingQueryListener

class MyListener(StreamingQueryListener):
   def onQueryStarted(self, event):
       print("Query started: ", event.id)

   def onQueryProgress(self, event):
       print("Query made progress: ", event.progress)

   def onQueryTerminated(self, event):
       print("Query terminated: ", event.id)

spark.streams.addListener(MyListener())

示例:在 Azure Databricks 中使用查询侦听器

下面是 Kafka 到 Delta Lake 流式处理查询的 StreamingQueryListener 事件日志示例:

{
  "id": "210f4746-7caa-4a51-bd08-87cabb45bdbe",
  "runId": "42a2f990-c463-4a9c-9aae-95d6990e63f4",
  "timestamp": "2024-05-15T21:57:50.782Z",
  "batchId": 0,
  "batchDuration": 3601,
  "numInputRows": 20,
  "inputRowsPerSecond": 0.0,
  "processedRowsPerSecond": 5.55401277422938,
  "durationMs": {
    "addBatch": 1544,
    "commitBatch": 686,
    "commitOffsets": 27,
    "getBatch": 12,
    "latestOffset": 577,
    "queryPlanning": 105,
    "triggerExecution": 3600,
    "walCommit": 34
  },
  "stateOperators": [
    {
      "operatorName": "symmetricHashJoin",
      "numRowsTotal": 20,
      "numRowsUpdated": 20,
      "allUpdatesTimeMs": 473,
      "numRowsRemoved": 0,
      "allRemovalsTimeMs": 0,
      "commitTimeMs": 277,
      "memoryUsedBytes": 13120,
      "numRowsDroppedByWatermark": 0,
      "numShufflePartitions": 5,
      "numStateStoreInstances": 20,
      "customMetrics": {
        "loadedMapCacheHitCount": 0,
        "loadedMapCacheMissCount": 0,
        "stateOnCurrentVersionSizeBytes": 5280
      }
    }
  ],
  "sources": [
    {
      "description": "KafkaV2[Subscribe[topic-1]]",
      "numInputRows": 10,
      "inputRowsPerSecond": 0.0,
      "processedRowsPerSecond": 2.77700638711469,
      "metrics": {
        "avgOffsetsBehindLatest": "0.0",
        "estimatedTotalBytesBehindLatest": "0.0",
        "maxOffsetsBehindLatest": "0",
        "minOffsetsBehindLatest": "0"
      }
    },
    {
      "description": "DeltaSource[file:/tmp/spark-1b7cb042-bab8-4469-bb2f-733c15141081]",
      "numInputRows": 10,
      "inputRowsPerSecond": 0.0,
      "processedRowsPerSecond": 2.77700638711469,
      "metrics": {
        "numBytesOutstanding": "0",
        "numFilesOutstanding": "0"
      }
    }
  ]
}

有关更多示例,请参阅: 示例

查询进度指标

查询进度指标对于监视流式处理查询的性能和运行状况至关重要。 这些指标包括输入行数、处理速率以及与查询执行相关的各种持续时间。 可以通过将 StreamingQueryListener 附加到 Spark 会话来观察这些指标。 侦听器将在每个流式处理时期结束时发出包含这些指标的事件。

例如,可以使用侦听器StreamingQueryProgress.observedMetrics映射访问onQueryProgress方法中的指标。 这样,就可以实时跟踪和分析流式处理查询的性能。

class MyListener(StreamingQueryListener):
   def onQueryProgress(self, event):
       print("Query made progress: ", event.progress.observedMetrics)