监视 Azure Databricks 上的结构化流式处理查询

Azure Databricks 通过“流式处理”选项卡下的 Spark UI 提供对结构化流式处理应用程序的内置监视。

区分 Spark UI 中的结构化流式处理查询

通过将 .queryName(<query-name>) 添加到 writeStream 代码为流提供唯一的查询名称,以便在 Spark UI 中轻松区分哪些指标属于哪个流。

将结构化流式处理指标推送到外部服务

可以使用 Apache Spark 的“流式处理查询侦听器”界面将流式处理指标推送到外部服务,以实现警报或仪表板用例。 在 Databricks Runtime 11.3 LTS 及更高版本中,流式处理查询侦听器在 Python 和 Scala 中可用。

重要

不能在 StreamingQueryListener 逻辑中使用 Unity Catalog 管理的凭据和对象。

注意

侦听器的处理延迟会显著影响查询处理速度。 建议限制这些侦听器中的处理逻辑,并选择写入到 Kafka 等快速响应系统以提高效率。

以下代码提供了用于实现侦听器的语法的基本示例:

Scala

import org.apache.spark.sql.streaming.StreamingQueryListener
import org.apache.spark.sql.streaming.StreamingQueryListener._

val myListener = new StreamingQueryListener {

  /**
    * Called when a query is started.
    * @note This is called synchronously with
    * [[org.apache.spark.sql.streaming.DataStreamWriter `DataStreamWriter.start()`]].
    * `onQueryStart` calls on all listeners before
    * `DataStreamWriter.start()` returns the corresponding [[StreamingQuery]].
    * Do not block this method, as it blocks your query.
    */
  def onQueryStarted(event: QueryStartedEvent): Unit = {}

  /**
    * Called when there is some status update (ingestion rate updated, etc.)
    *
    * @note This method is asynchronous. The status in [[StreamingQuery]] returns the
    * latest status, regardless of when this method is called. The status of [[StreamingQuery]]
    * may change before or when you process the event. For example, you may find [[StreamingQuery]]
    * terminates when processing `QueryProgressEvent`.
    */
  def onQueryProgress(event: QueryProgressEvent): Unit = {}

  /**
    * Called when the query is idle and waiting for new data to process.
    */
  def onQueryIdle(event: QueryProgressEvent): Unit = {}

  /**
    * Called when a query is stopped, with or without error.
    */
  def onQueryTerminated(event: QueryTerminatedEvent): Unit = {}
}

Python

class MyListener(StreamingQueryListener):
    def onQueryStarted(self, event):
        """
        Called when a query is started.

        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryStartedEvent`
            The properties are available as the same as Scala API.

        Notes
        -----
        This is called synchronously with
        meth:`pyspark.sql.streaming.DataStreamWriter.start`,
        that is, ``onQueryStart`` will be called on all listeners before
        ``DataStreamWriter.start()`` returns the corresponding
        :class:`pyspark.sql.streaming.StreamingQuery`.
        Do not block in this method as it will block your query.
        """
        pass

    def onQueryProgress(self, event):
        """
        Called when there is some status update (ingestion rate updated, etc.)

        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryProgressEvent`
            The properties are available as the same as Scala API.

        Notes
        -----
        This method is asynchronous. The status in
        :class:`pyspark.sql.streaming.StreamingQuery` returns the
        most recent status, regardless of when this method is called. The status
        of :class:`pyspark.sql.streaming.StreamingQuery`.
        may change before or when you process the event.
        For example, you may find :class:`StreamingQuery`
        terminates when processing `QueryProgressEvent`.
        """
        pass

    def onQueryIdle(self, event):
        """
        Called when the query is idle and waiting for new data to process.
        """
        pass

    def onQueryTerminated(self, event):
        """
        Called when a query is stopped, with or without error.

        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryTerminatedEvent`
            The properties are available as the same as Scala API.
        """
        pass

my_listener = MyListener()

在结构化流式处理中定义可观察指标

可观察指标是可以在查询(数据帧)中定义的命名任意聚合函数。 在数据帧的执行达到完成点(即,完成批处理查询或达到流式处理循环)后,会发出一个命名事件,其中包含自上一个完成点以来处理的数据的指标。

可以通过将侦听器附加到 Spark 会话来观察这些指标。 侦听器取决于执行模式:

  • 批处理模式:使用 QueryExecutionListener

    查询完成时调用 QueryExecutionListener。 使用 QueryExecution.observedMetrics 映射访问指标。

  • 流式处理或微批:使用 StreamingQueryListener

    流式处理查询完成某个循环时调用 StreamingQueryListener。 使用 StreamingQueryProgress.observedMetrics 映射访问指标。 Azure Databricks 不支持连续执行流式处理。

例如:

Scala

// Observe row count (rc) and error row count (erc) in the streaming Dataset
val observed_ds = ds.observe("my_event", count(lit(1)).as("rc"), count($"error").as("erc"))
observed_ds.writeStream.format("...").start()

// Monitor the metrics using a listener
spark.streams.addListener(new StreamingQueryListener() {
  override def onQueryProgress(event: QueryProgressEvent): Unit = {
    event.progress.observedMetrics.get("my_event").foreach { row =>
      // Trigger if the number of errors exceeds 5 percent
      val num_rows = row.getAs[Long]("rc")
      val num_error_rows = row.getAs[Long]("erc")
      val ratio = num_error_rows.toDouble / num_rows
      if (ratio > 0.05) {
        // Trigger alert
      }
    }
  }
})

Python

# Observe metric
observed_df = df.observe("metric", count(lit(1)).as("cnt"), count(col("error")).as("malformed"))
observed_df.writeStream.format("...").start()

# Define my listener.
class MyListener(StreamingQueryListener):
    def onQueryStarted(self, event):
        print(f"'{event.name}' [{event.id}] got started!")
    def onQueryProgress(self, event):
        row = event.progress.observedMetrics.get("metric")
        if row is not None:
            if row.malformed / row.cnt > 0.5:
                print("ALERT! Ouch! there are too many malformed "
                      f"records {row.malformed} out of {row.cnt}!")
            else:
                print(f"{row.cnt} rows processed!")
    def onQueryTerminated(self, event):
        print(f"{event.id} got terminated!")

# Add my listener.
spark.streams.addListener(MyListener())

StreamingQueryListener 对象指标

指标 说明
id 在重启时保留的唯一查询 ID。
runId 每次启动/重启时保留的唯一查询 ID。 请参阅 StreamingQuery.runId()
name 用户指定的查询名称。 如果未指定名称,则名称为 null。
timestamp 执行微批的时间戳。
batchId 当前正在处理的批数据的唯一 ID。 如果失败后重试,可多次执行给定批 ID。 同样,如果没有要处理的数据,则批 ID 不会递增。
numInputRows 触发器中处理的记录的(跨所有源)聚合数。
inputRowsPerSecond 到达数据的(跨所有源)聚合速率。
processedRowsPerSecond Spark 处理数据时的(跨所有源)聚合速率。

durationMs 对象

有关完成微批执行过程的各个阶段所所用的时间的信息。

跃点数 说明
durationMs.addBatch 执行微批所用的时间。 此时间不包括 Spark 规划微批所用的时间。
durationMs.getBatch 检索与源偏移量有关的元数据所用的时间。
durationMs.latestOffset 微批消耗的最新偏移量。 此进度对象是指从源检索最新偏移量所花费的时间。
durationMs.queryPlanning 生成执行计划所用的时间。
durationMs.triggerExecution 规划和执行微批所用的时间。
durationMs.walCommit 提交新的可用偏移量所用的时间。

eventTime 对象

有关在微批中处理的数据中看到的事件时间值的信息。 水印使用此数据来确定如何剪裁状态以处理结构化流式处理作业中定义的有状态聚合。

指标 说明
eventTime.avg 在该触发器中看到的平均事件时间。
eventTime.max 在该触发器中看到的最大事件时间。
eventTime.min 在该触发器中看到的最小事件时间。
eventTime.watermark 触发器中使用的水印值。

stateOperators 对象

有关结构化流式处理作业中定义的有状态操作及其生成的聚合的信息。

指标 说明
stateOperators.operatorName 与指标相关的有状态运算符的名称,例如 symmetricHashJoindedupestateStoreSave
stateOperators.numRowsTotal 有状态运算符或聚合的结果状态中的总行数。
stateOperators.numRowsUpdated 有状态运算符或聚合的结果状态中已更新的总行数。
stateOperators.allUpdatesTimeMs 此指标目前无法通过 Spark 进行度量,我们已计划在将来的更新中将其删除。
stateOperators.numRowsRemoved 从有状态运算符或聚合的结果状态中删除的总行数。
stateOperators.allRemovalsTimeMs 此指标目前无法通过 Spark 进行度量,我们已计划在将来的更新中将其删除。
stateOperators.commitTimeMs 提交所有更新(放置和删除)并返回新版本所用的时间。
stateOperators.memoryUsedBytes 状态存储使用的内存。
stateOperators.numRowsDroppedByWatermark 被视为太晚而无法包含在有状态聚合中的行数。 仅流式处理聚合:聚合后删除的行数(不是原始输入行数)。 此数字并不精确,但可以表明存在被丢弃的延迟数据。
stateOperators.numShufflePartitions 此有状态运算符的随机分区数。
stateOperators.numStateStoreInstances 运算符已初始化和维护的实际状态存储实例。 对于许多有状态运算符,这与分区数相同。 但是,流到流的联接将为每个分区初始化四个状态存储实例。

stateOperators.customMetrics 对象

从 RocksDB 收集的信息,该源捕获的指标与它为结构化流式处理作业维护的状态值的性能和操作有关。 有关详细信息,请参阅在 Azure Databricks 上配置 RocksDB 状态存储

指标 说明
customMetrics.rocksdbBytesCopied RocksDB 文件管理器所跟踪的复制字节数。
customMetrics.rocksdbCommitCheckpointLatency 创建本机 RocksDB 快照并将其写入本地目录所花费的时间(以毫秒为单位)。
customMetrics.rocksdbCompactLatency 在检查点提交期间进行压缩(可选)所花费的时间(以毫秒为单位)。
customMetrics.rocksdbCommitFileSyncLatencyMs 将本机 RocksDB 快照同步到外部存储(检查点位置)所花费的时间(以毫秒为单位)。
customMetrics.rocksdbCommitFlushLatency 将 RocksDB 内存中更改刷新到本地磁盘所花费的时间(以毫秒为单位)。
customMetrics.rocksdbCommitPauseLatency 在检查点提交过程中停止后台工作线程(例如进行压缩)所花费的时间(以毫秒为单位)。
customMetrics.rocksdbCommitWriteBatchLatency 将内存中结构中的分阶段写入 (WriteBatch) 应用于本机 RocksDB 所花费的时间(以毫秒为单位)。
customMetrics.rocksdbFilesCopied RocksDB 文件管理器所跟踪的复制文件数。
customMetrics.rocksdbFilesReused RocksDB 文件管理器所跟踪的重用文件数。
customMetrics.rocksdbGetCount 对数据库的 get 调用次数(不包括来自 WriteBatchgets:用于暂存写入的内存批处理)。
customMetrics.rocksdbGetLatency 基础本机 RocksDB::Get 调用平均花费的时间(以纳秒为单位)。
customMetrics.rocksdbReadBlockCacheHitCount RocksDB 中块缓存的缓存命中计数,这些缓存在避免本地磁盘读取方面很有用。
customMetrics.rocksdbReadBlockCacheMissCount RocksDB 中的块缓存计数不可用于避免本地磁盘读取。
customMetrics.rocksdbSstFileSize 所有静态排序表 (SST) 文件的大小 - RocksDB 用于存储数据的表格结构。
customMetrics.rocksdbTotalBytesRead 通过 get 操作读取的未压缩字节数。
customMetrics.rocksdbTotalBytesReadByCompaction 压缩进程从磁盘读取的字节数。
customMetrics.rocksdbTotalBytesReadThroughIterator 使用迭代器读取的未压缩数据的字节总数。 某些有状态操作(例如 FlatMapGroupsWithState 中的超时处理和水印)需要通过迭代器读取 DB 中的数据。
customMetrics.rocksdbTotalBytesWritten 通过 put 操作写入的未压缩字节总数。
customMetrics.rocksdbTotalBytesWrittenByCompaction 压缩进程写入磁盘的字节总数。
customMetrics.rocksdbTotalCompactionLatencyMs RocksDB 压缩(包括后台压缩和提交期间启动的可选压缩)所花费的时间(以毫秒为单位)。
customMetrics.rocksdbTotalFlushLatencyMs 总刷新时间,包括后台刷新时间。 刷新操作是指 MemTable 在填满后刷新到存储的进程。 MemTables 是 RocksDB 中存储数据的第一级。
customMetrics.rocksdbZipFileBytesUncompressed 文件管理器报告的未压缩 zip 文件的大小(以字节为单位)。 文件管理器用于管理物理 SST 文件磁盘空间的利用率和删除。

源对象 (Kafka)

指标 说明
sources.description Kafka 源的详细说明,指定从中读取的确切 Kafka 主题。 例如:"KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]"
(属于sources.startOffset 对象)的父级。 启动流式处理作业的 Kafka 主题内的起始偏移量。
(属于sources.endOffset 对象)的父级。 微批处理的最后偏移量。 对于正在进行的微批执行,这可能等于 latestOffset
(属于sources.latestOffset 对象)的父级。 微批计算出的最新偏移量。 发生限制时,微批处理过程可能不会处理所有偏移量,这会导致 endOffsetlatestOffset 差异。
sources.numInputRows 从此源处理的输入行数。
sources.inputRowsPerSecond 从此源传送处理数据的速率。
sources.processedRowsPerSecond Spark 处理来自此源的数据的速率。

sources.metrics 对象 (Kafka)

指标 说明
sources.metrics.avgOffsetsBehindLatest 流式处理查询在所有订阅主题中最新可用偏移量之后使用的平均偏移量。
sources.metrics.estimatedTotalBytesBehindLatest 查询进程尚未从订阅主题使用的估计字节数。
sources.metrics.maxOffsetsBehindLatest 流式处理查询在所有订阅主题中最新可用偏移量之后使用的最大偏移量。
sources.metrics.minOffsetsBehindLatest 流式处理查询在所有订阅主题中最新可用偏移量之后使用的最小偏移量。

接收器对象 (Kafka)

指标 说明
sink.description 流式处理查询正在写入的 Kafka 接收器的说明,详细描述正在使用的特定 Kafka 接收器实现。 例如:"org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100"
sink.numOutputRows 作为微批的一部分写入到输出表或接收器的行数。 对于某些情况,此值可以是“-1”,并且通常可以解释为“未知”。

源对象 (Delta Lake)

指标 说明
sources.description 流式处理查询从中读取数据的源的说明。 例如:"DeltaSource[table]"
sources.[startOffset/endOffset].sourceVersion 对此偏移量进行编码的序列化版本。
sources.[startOffset/endOffset].reservoirId 正在读取的表的 ID。 此值用于在重启查询时检测错误配置。
sources.[startOffset/endOffset].reservoirVersion 当前正在处理的表的版本。
sources.[startOffset/endOffset].index 此版本中 AddFiles 序列中的索引。 此值用于将大型提交分解成多个批。 可通过对 modificationTimestamppath 进行排序来创建此索引。
sources.[startOffset/endOffset].isStartingVersion 确定当前偏移量是否标记新的流查询的开头,而不是处理初始数据后发生的更改。 启动新的查询时,首先会处理表中存在的所有数据,然后再处理任何新到达的数据。
sources.latestOffset 微批查询处理的最新偏移量。
sources.numInputRows 从此源处理的输入行数。
sources.inputRowsPerSecond 从此源传送处理数据的速率。
sources.processedRowsPerSecond Spark 处理来自此源的数据的速率。
sources.metrics.numBytesOutstanding 未完成文件(RocksDB 跟踪的文件)的总大小。 这是 Delta 和自动加载程序作为流式处理源的积压工作 (backlog) 指标。
sources.metrics.numFilesOutstanding 要处理的未完成文件数。 这是 Delta 和自动加载程序作为流式处理源的积压工作 (backlog) 指标。

接收器对象 (Delta Lake)

指标 说明
sink.description Delta 接收器的说明,详细描述正在使用的特定 Delta 接收器实现。 例如:"DeltaSink[table]"
sink.numOutputRows 行数始终为“-1”,因为 Spark 无法推理 DSv1 接收器的输出行(这是 Delta Lake 接收器的分类)。

示例

Kafka-to-Kafka StreamingQueryListener 事件示例

{
  "id" : "3574feba-646d-4735-83c4-66f657e52517",
  "runId" : "38a78903-9e55-4440-ad81-50b591e4746c",
  "name" : "STREAMING_QUERY_NAME_UNIQUE",
  "timestamp" : "2022-10-31T20:09:30.455Z",
  "batchId" : 1377,
  "numInputRows" : 687,
  "inputRowsPerSecond" : 32.13433743393049,
  "processedRowsPerSecond" : 34.067241892293964,
  "durationMs" : {
    "addBatch" : 18352,
    "getBatch" : 0,
    "latestOffset" : 31,
    "queryPlanning" : 977,
    "triggerExecution" : 20165,
    "walCommit" : 342
  },
  "eventTime" : {
    "avg" : "2022-10-31T20:09:18.070Z",
    "max" : "2022-10-31T20:09:30.125Z",
    "min" : "2022-10-31T20:09:09.793Z",
    "watermark" : "2022-10-31T20:08:46.355Z"
  },
  "stateOperators" : [ {
    "operatorName" : "stateStoreSave",
    "numRowsTotal" : 208,
    "numRowsUpdated" : 73,
    "allUpdatesTimeMs" : 434,
    "numRowsRemoved" : 76,
    "allRemovalsTimeMs" : 515,
    "commitTimeMs" : 0,
    "memoryUsedBytes" : 167069743,
    "numRowsDroppedByWatermark" : 0,
    "numShufflePartitions" : 20,
    "numStateStoreInstances" : 20,
    "customMetrics" : {
      "rocksdbBytesCopied" : 0,
      "rocksdbCommitCheckpointLatency" : 0,
      "rocksdbCommitCompactLatency" : 0,
      "rocksdbCommitFileSyncLatencyMs" : 0,
      "rocksdbCommitFlushLatency" : 0,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 0,
      "rocksdbFilesCopied" : 0,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 222,
      "rocksdbGetLatency" : 0,
      "rocksdbPutCount" : 0,
      "rocksdbPutLatency" : 0,
      "rocksdbReadBlockCacheHitCount" : 165,
      "rocksdbReadBlockCacheMissCount" : 41,
      "rocksdbSstFileSize" : 232729,
      "rocksdbTotalBytesRead" : 12844,
      "rocksdbTotalBytesReadByCompaction" : 0,
      "rocksdbTotalBytesReadThroughIterator" : 161238,
      "rocksdbTotalBytesWritten" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 0,
      "rocksdbTotalCompactionLatencyMs" : 0,
      "rocksdbTotalFlushLatencyMs" : 0,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 0
    }
  }, {
    "operatorName" : "dedupe",
    "numRowsTotal" : 2454744,
    "numRowsUpdated" : 73,
    "allUpdatesTimeMs" : 4155,
    "numRowsRemoved" : 0,
    "allRemovalsTimeMs" : 0,
    "commitTimeMs" : 0,
    "memoryUsedBytes" : 137765341,
    "numRowsDroppedByWatermark" : 34,
    "numShufflePartitions" : 20,
    "numStateStoreInstances" : 20,
    "customMetrics" : {
      "numDroppedDuplicateRows" : 193,
      "rocksdbBytesCopied" : 0,
      "rocksdbCommitCheckpointLatency" : 0,
      "rocksdbCommitCompactLatency" : 0,
      "rocksdbCommitFileSyncLatencyMs" : 0,
      "rocksdbCommitFlushLatency" : 0,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 0,
      "rocksdbFilesCopied" : 0,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 146,
      "rocksdbGetLatency" : 0,
      "rocksdbPutCount" : 0,
      "rocksdbPutLatency" : 0,
      "rocksdbReadBlockCacheHitCount" : 3,
      "rocksdbReadBlockCacheMissCount" : 3,
      "rocksdbSstFileSize" : 78959140,
      "rocksdbTotalBytesRead" : 0,
      "rocksdbTotalBytesReadByCompaction" : 0,
      "rocksdbTotalBytesReadThroughIterator" : 0,
      "rocksdbTotalBytesWritten" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 0,
      "rocksdbTotalCompactionLatencyMs" : 0,
      "rocksdbTotalFlushLatencyMs" : 0,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 0
    }
  }, {
    "operatorName" : "symmetricHashJoin",
    "numRowsTotal" : 2583,
    "numRowsUpdated" : 682,
    "allUpdatesTimeMs" : 9645,
    "numRowsRemoved" : 508,
    "allRemovalsTimeMs" : 46,
    "commitTimeMs" : 21,
    "memoryUsedBytes" : 668544484,
    "numRowsDroppedByWatermark" : 0,
    "numShufflePartitions" : 20,
    "numStateStoreInstances" : 80,
    "customMetrics" : {
      "rocksdbBytesCopied" : 0,
      "rocksdbCommitCheckpointLatency" : 0,
      "rocksdbCommitCompactLatency" : 0,
      "rocksdbCommitFileSyncLatencyMs" : 0,
      "rocksdbCommitFlushLatency" : 0,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 0,
      "rocksdbFilesCopied" : 0,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 4218,
      "rocksdbGetLatency" : 3,
      "rocksdbPutCount" : 0,
      "rocksdbPutLatency" : 0,
      "rocksdbReadBlockCacheHitCount" : 3425,
      "rocksdbReadBlockCacheMissCount" : 149,
      "rocksdbSstFileSize" : 742827,
      "rocksdbTotalBytesRead" : 866864,
      "rocksdbTotalBytesReadByCompaction" : 0,
      "rocksdbTotalBytesReadThroughIterator" : 0,
      "rocksdbTotalBytesWritten" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 0,
      "rocksdbTotalCompactionLatencyMs" : 0,
      "rocksdbTotalFlushLatencyMs" : 0,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 0
    }
  } ],
  "sources" : [ {
    "description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]",
    "startOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_A" : {
        "0" : 349706380
      }
    },
    "endOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_A" : {
        "0" : 349706672
      }
    },
    "latestOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_A" : {
        "0" : 349706672
      }
    },
    "numInputRows" : 292,
    "inputRowsPerSecond" : 13.65826278123392,
    "processedRowsPerSecond" : 14.479817514628582,
    "metrics" : {
      "avgOffsetsBehindLatest" : "0.0",
      "estimatedTotalBytesBehindLatest" : "0.0",
      "maxOffsetsBehindLatest" : "0",
      "minOffsetsBehindLatest" : "0"
    }
  }, {
    "description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_B]]",
    "startOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_B" : {
        "2" : 143147812,
        "1" : 129288266,
        "0" : 138102966
      }
    },
    "endOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_B" : {
        "2" : 143147812,
        "1" : 129288266,
        "0" : 138102966
      }
    },
    "latestOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_B" : {
        "2" : 143147812,
        "1" : 129288266,
        "0" : 138102966
      }
    },
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0,
    "metrics" : {
      "avgOffsetsBehindLatest" : "0.0",
      "maxOffsetsBehindLatest" : "0",
      "minOffsetsBehindLatest" : "0"
    }
  } ],
  "sink" : {
    "description" : "org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100",
    "numOutputRows" : 76
  }
}

Delta Lake-to-Delta Lake StreamingQueryListener 事件示例

{
  "id" : "aeb6bc0f-3f7d-4928-a078-ba2b304e2eaf",
  "runId" : "35d751d9-2d7c-4338-b3de-6c6ae9ebcfc2",
  "name" : "silverTransformFromBronze",
  "timestamp" : "2022-11-01T18:21:29.500Z",
  "batchId" : 4,
  "numInputRows" : 0,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
    "latestOffset" : 62,
    "triggerExecution" : 62
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "DeltaSource[dbfs:/FileStore/max.fisher@databricks.com/ctc/stateful-trade-analysis-demo/table]",
    "startOffset" : {
      "sourceVersion" : 1,
      "reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
      "reservoirVersion" : 3216,
      "index" : 3214,
      "isStartingVersion" : true
    },
    "endOffset" : {
      "sourceVersion" : 1,
      "reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
      "reservoirVersion" : 3216,
      "index" : 3214,
      "isStartingVersion" : true
    },
    "latestOffset" : null,
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0,
    "metrics" : {
      "numBytesOutstanding" : "0",
      "numFilesOutstanding" : "0"
    }
  } ],
  "sink" : {
    "description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_silver_delta_demo2]",
    "numOutputRows" : -1
  }
}

Kinesis-to-Delta Lake StreamingQueryListener 事件示例

{
  "id" : "3ce9bd93-da16-4cb3-a3b6-e97a592783b5",
  "runId" : "fe4a6bda-dda2-4067-805d-51260d93260b",
  "name" : null,
  "timestamp" : "2024-05-14T02:09:20.846Z",
  "batchId" : 0,
  "batchDuration" : 59322,
  "numInputRows" : 20,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.33714304979602844,
  "durationMs" : {
    "addBatch" : 5397,
    "commitBatch" : 4429,
    "commitOffsets" : 211,
    "getBatch" : 5,
    "latestOffset" : 21998,
    "queryPlanning" : 12128,
    "triggerExecution" : 59313,
    "walCommit" : 220
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "KinesisV2[KinesisTestUtils-7199466178786508570-at-1715652545256]",
    "startOffset" : null,
    "endOffset" : [ {
      "shard" : {
        "stream" : "KinesisTestUtils-7199466178786508570-at-1715652545256",
        "shardId" : "shardId-000000000000"
      },
      "firstSeqNum" : "49652022592149344892294981243280420130985816456924495874",
      "lastSeqNum" : "49652022592149344892294981243290091537542733559041622018",
      "closed" : false,
      "msBehindLatest" : "0",
      "lastRecordSeqNum" : "49652022592149344892294981243290091537542733559041622018"
    }, {
      "shard" : {
        "stream" : "KinesisTestUtils-7199466178786508570-at-1715652545256",
        "shardId" : "shardId-000000000001"
      },
      "firstSeqNum" : "49652022592171645637493511866421955849258464818430476306",
      "lastSeqNum" : "49652022592171645637493511866434045107454611178897014802",
      "closed" : false,
      "msBehindLatest" : "0",
      "lastRecordSeqNum" : "49652022592171645637493511866434045107454611178897014802"
    } ],
    "latestOffset" : null,
    "numInputRows" : 20,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.33714304979602844,
    "metrics" : {
      "avgMsBehindLatest" : "0.0",
      "maxMsBehindLatest" : "0",
      "minMsBehindLatest" : "0",
      "mode" : "efo",
      "numClosedShards" : "0",
      "numProcessedBytes" : "30",
      "numProcessedRecords" : "18",
      "numRegisteredConsumers" : "1",
      "numStreams" : "1",
      "numTotalShards" : "2",
      "totalPrefetchedBytes" : "0"
    }
  } ],
  "sink" : {
    "description" : "DeltaSink[dbfs:/streaming/test/KinesisToDeltaServerlessLiteSuite/2024-05-14-01-58-14-76eb7e51/56b9426c-3492-4ac5-8fe8-3d00efe20be5/deltaTable]",
    "numOutputRows" : -1
  }
}

Kafka+Delta Lake-to-Delta Lake StreamingQueryListener 事件示例

{
 "id" : "210f4746-7caa-4a51-bd08-87cabb45bdbe",
 "runId" : "42a2f990-c463-4a9c-9aae-95d6990e63f4",
 "name" : null,
 "timestamp" : "2024-05-15T21:57:50.782Z",
 "batchId" : 0,
 "batchDuration" : 3601,
 "numInputRows" : 20,
 "inputRowsPerSecond" : 0.0,
 "processedRowsPerSecond" : 5.55401277422938,
 "durationMs" : {
  "addBatch" : 1544,
  "commitBatch" : 686,
  "commitOffsets" : 27,
  "getBatch" : 12,
  "latestOffset" : 577,
  "queryPlanning" : 105,
  "triggerExecution" : 3600,
  "walCommit" : 34
 },
 "stateOperators" : [ {
  "operatorName" : "symmetricHashJoin",
  "numRowsTotal" : 20,
  "numRowsUpdated" : 20,
  "allUpdatesTimeMs" : 473,
  "numRowsRemoved" : 0,
  "allRemovalsTimeMs" : 0,
  "commitTimeMs" : 277,
  "memoryUsedBytes" : 13120,
  "numRowsDroppedByWatermark" : 0,
  "numShufflePartitions" : 5,
  "numStateStoreInstances" : 20,
  "customMetrics" : {
   "loadedMapCacheHitCount" : 0,
   "loadedMapCacheMissCount" : 0,
   "stateOnCurrentVersionSizeBytes" : 5280
  }
 } ],
 "sources" : [ {
  "description" : "KafkaV2[Subscribe[topic-1]]",
  "startOffset" : null,
  "endOffset" : {
   "topic-1" : {
    "1" : 5,
    "0" : 5
   }
  },
  "latestOffset" : {
   "topic-1" : {
    "1" : 5,
    "0" : 5
   }
  },
  "numInputRows" : 10,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 2.77700638711469,
  "metrics" : {
   "avgOffsetsBehindLatest" : "0.0",
   "estimatedTotalBytesBehindLatest" : "0.0",
   "maxOffsetsBehindLatest" : "0",
   "minOffsetsBehindLatest" : "0"
  }
 }, {
  "description" : "DeltaSource[file:/tmp/spark-1b7cb042-bab8-4469-bb2f-733c15141081]",
  "startOffset" : null,
  "endOffset" : {
   "sourceVersion" : 1,
   "reservoirId" : "b207a1cd-0fbe-4652-9c8f-e5cc467ae84f",
   "reservoirVersion" : 1,
   "index" : -1,
   "isStartingVersion" : false
  },
  "latestOffset" : null,
  "numInputRows" : 10,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 2.77700638711469,
  "metrics" : {
   "numBytesOutstanding" : "0",
   "numFilesOutstanding" : "0"
  }
 } ],
 "sink" : {
  "description" : "DeltaSink[/tmp/spark-d445c92a-4640-4827-a9bd-47246a30bb04]",
  "numOutputRows" : -1
 }
}

Delta Lake StreamingQueryListener 事件的速率源示例

{
  "id" : "912ebdc1-edf2-48ec-b9fb-1a9b67dd2d9e",
  "runId" : "85de73a5-92cc-4b7f-9350-f8635b0cf66e",
  "name" : "dataGen",
  "timestamp" : "2022-11-01T18:28:20.332Z",
  "batchId" : 279,
  "numInputRows" : 300,
  "inputRowsPerSecond" : 114.15525114155251,
  "processedRowsPerSecond" : 158.9825119236884,
  "durationMs" : {
    "addBatch" : 1771,
    "commitOffsets" : 54,
    "getBatch" : 0,
    "latestOffset" : 0,
    "queryPlanning" : 4,
    "triggerExecution" : 1887,
    "walCommit" : 58
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "RateStreamV2[rowsPerSecond=100, rampUpTimeSeconds=0, numPartitions=default",
    "startOffset" : 560,
    "endOffset" : 563,
    "latestOffset" : 563,
    "numInputRows" : 300,
    "inputRowsPerSecond" : 114.15525114155251,
    "processedRowsPerSecond" : 158.9825119236884
  } ],
  "sink" : {
    "description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_bronze_delta_demo]",
    "numOutputRows" : -1
  }
}