Read Structured Streaming state information

You can use DataFrame operations or SQL table-value functions to query Structured Streaming state data and metadata. Use these functions to observe state information for Structured Streaming stateful queries, which can be useful for monitoring and debugging.

You must have read access to the checkpoint path for a streaming query in order to query state data or metadata. The functions described in this article provide read-only access to state data and metadata. You can only use batch read semantics to query state information.

Note

You cannot query state information for DLT pipelines, streaming tables, or materialized views. You cannot query state information using serverless compute or compute configured with standard access mode.

Requirements

  • Use one of the following compute configurations:
    • Databricks Runtime 16.3 and above on compute configured with standard access mode.
    • Databricks Runtime 14.3 LTS and above on compute configured with dedicated or no isolation access mode.
  • Read access to the checkpoint path used by the streaming query.

Read Structured Streaming state store

You can read state store information for Structured Streaming queries executed in any supported Databricks Runtime. Use the following syntax:

Python

df = (spark.read
  .format("statestore")
  .load("/checkpoint/path"))

SQL

SELECT * FROM read_statestore('/checkpoint/path')

State reader API parameters

The state reader API supports the following optional configurations:

Option Type Default value Description
batchId Long latest batch ID Represents the target batch to read from. Specify this option to query state information for an earlier state of the query. The batch must be committed but not yet cleaned up.
operatorId Long 0 Represents the target operator to read from. This option is used when the query is using multiple stateful operators.
storeName String "DEFAULT" Represents the target state store name to read from. This option is used when the stateful operator uses multiple state store instances. Either storeName or joinSide must be specified for a stream-steam join, but not both.
joinSide String ("left" or "right") Represents the target side to read from. This option is used when users want to read the state from a stream-stream join.
stateVarName String None The state variable name to read as part of this query. The state variable name is the unique name given to each variable within the init function of a StatefulProcessor used by the transformWithState operator. This option is a required option if the transformWithState operator is used. This option only applies to the transformWithState operator and is ignored for other operators. Available in Databricks Runtime 16.2 and above.
readRegisteredTimers Boolean false Set to true to read registered timers used within the transformWithState operator. This option only applies to the transformWithState operator and is ignored for other operators. Available in Databricks Runtime 16.2 and above.
flattenCollectionTypes Boolean true If true, flattens the records returned for map and list state variables. If false, the records are returned using a Spark SQL Array or Map. This option only applies to the transformWithState operator and is ignored for other operators. Available in Databricks Runtime 16.2 and above.

The returned data has the following schema:

Column Type Description
key Struct (further type derived from the state key) The key for a stateful operator record in the state checkpoint.
value Struct (further type derived from the state value) The value for a stateful operator record in the state checkpoint.
partition_id Integer The partition of the state checkpoint that contains the stateful operator record.

See read_statestore table-valued function.

Read Structured Streaming state metadata

Important

You must run streaming queries on Databricks Runtime 14.2 or above to record state metadata. State metadata files do not break backward compatibility. If you choose to run a streaming query on Databricks Runtime 14.1 or below, existing state metadata files are ignored and no new state metadata files are written.

You can read state metadata information for Structured Streaming queries run on Databricks Runtime 14.2 or above. Use the following syntax:

Python

df = (spark.read
  .format("state-metadata")
  .load("<checkpointLocation>"))

SQL

SELECT * FROM read_state_metadata('/checkpoint/path')

The returned data has the following schema:

Column Type Description
operatorId Integer The integer ID of the stateful streaming operator.
operatorName Integer Name of the stateful streaming operator.
stateStoreName String Name of the state store of the operator.
numPartitions Integer Number of partitions of the state store.
minBatchId Long The minimum batch ID available for querying state.
maxBatchId Long The maximum batch ID available for querying state.

Note

The batch ID values provided by minBatchId and maxBatchId reflect the state at the time the checkpoint was written. Old batches are cleaned up automatically with micro-batch execution, so the value provided here is not guaranteed to still be available.

See read_state_metadata table-valued function.

Example: Query one side of a stream-stream join

Use the following syntax to query the left side of a stream-stream join:

Python

left_df = (spark.read
  .format("statestore")
  .option("joinSide", "left")
  .load("/checkpoint/path"))

SQL

SELECT * FROM read_statestore(
    '/checkpoint/path',
    joinSide => 'left'
);

Example: Query state store for stream with multiple stateful operators

This examples uses the state metadata reader to gather metadata details of a streaming query with multiple stateful operators, then uses the metadata results as options for the state reader.

The state metadata reader takes the checkpoint path as the only option, as in the following syntax example:

Python

df = (spark.read
  .format("state-metadata")
  .load("<checkpointLocation>"))

SQL

SELECT * FROM read_state_metadata('/checkpoint/path')

The following table represents an example output of state store metadata:

operatorId operatorName stateStoreName numPartitions minBatchId maxBatchId
0 stateStoreSave default 200 0 13
1 dedupeWithinWatermark default 200 0 13

To get results for the dedupeWithinWatermark operator, query the state reader with the operatorId option, as in the following example:

Python

left_df = (spark.read
  .format("statestore")
  .option("operatorId", 1)
  .load("/checkpoint/path"))

SQL

SELECT * FROM read_statestore(
    '/checkpoint/path',
    operatorId => 1
);