读取结构化流式处理状态信息

重要

此功能目前以公共预览版提供。

在 Databricks Runtime 14.3 LTS 及更高版本中,可以使用 DataFrame 操作或 SQL 表值函数来查询结构化流式处理状态数据和元数据。 可以使用这些函数来观察结构化流式处理有状态查询的状态信息,这对于监视和调试非常有用。

必须对流式处理查询的检查点路径具有读取访问权限才能查询状态数据或元数据。 本文中所述的函数提供对状态数据和元数据的只读访问权限。 只能使用批量读取语义来查询状态信息。

注意

不能查询增量实时表管道、流式处理表或具体化视图的状态信息。

读取结构化流式处理状态存储

可以读取在任何受支持的 Databricks Runtime 中执行的结构化流式处理查询的状态存储信息。 使用以下语法:

Python

df = (spark.read
  .format("statestore")
  .load("/checkpoint/path"))

SQL

SELECT * FROM read_statestore('/checkpoint/path')

支持以下可选配置:

选项 类型 默认值
batchId Long 最新批处理 ID
operatorId Long 0
storeName 字符串 “DEFAULT”
joinSide 字符串(“left”或“right”) 表示要从中读取的目标端。 当用户想要从流间联接读取状态时,使用此选项。

返回的数据具有以下架构:

类型​​ 描述
key 结构体(从状态键派生出的后续类型) 状态检查点中有状态运算符记录的键。
value 结构体(从状态值派生出的后续类型) 状态检查点中有状态运算符记录的值。
partition_id Integer 包含有状态运算符记录的状态检查点的分区。

读取结构化流式处理状态元数据

重要

必须在 Databricks Runtime 14.2 或更高版本上运行流式处理查询才能记录状态元数据。 状态元数据文件不会破坏向后兼容性。 如果选择在 Databricks Runtime 14.1 或更低版本上运行流式处理查询,则会忽略现有状态元数据文件,并且不会写入新的状态元数据文件。

可以读取 Databricks Runtime 14.2 或更高版本上运行的结构化流式处理查询的状态元数据信息。 使用以下语法:

Python

df = (spark.read
  .format("state-metadata")
  .load("<checkpointLocation>"))

SQL

SELECT * FROM read_state_metadata('/checkpoint/path')

返回的数据具有以下架构:

类型​​ 说明
operatorId Integer 有状态流式处理运算符的整数 ID。
operatorName Integer 有状态流式处理运算符的名称。
stateStoreName 字符串 运算符的状态存储的名称。
numPartitions Integer 状态存储的分区数。
minBatchId Long 可用于查询状态的最小批 ID。
maxBatchId Long 可用于查询状态的最大批 ID。

注意

minBatchIdmaxBatchId 提供的批处理 ID 值在写入检查点时反映状态。 系统会使用微批处理执行自动清理旧的批处理,因此不能保证此处提供的值仍可用。