您可以使用 transformWithState 构建有状态流处理应用程序,并实现低延迟和近实时解决方案。 使用自定义有状态运算符,可以创建任意有状态逻辑,以便生成传统结构化流式处理无法实现的新操作用例。
注释
对于聚合、重复数据删除和流联接等有状态操作,Databricks 建议使用内置结构化流式处理运算符而不是自定义逻辑。 请参阅什么是有状态流式处理?。
Databricks 建议对任意状态转换使用 transformWithState 替代旧运算符,例如 flatMapGroupsWithState 和 mapGroupsWithState。 请参阅 旧版任意有状态运算符。
要求
transformWithState 和 transformWithStateInPandas 运算符具有以下要求:
- 在 Databricks Runtime 16.2 及更高版本中可用。
- 对于实时模式,请使用 Databricks Runtime 17.3 LTS 或更高版本。 请参阅 结构化流式处理中的实时模式。
- 对于标准访问模式,Python在 Databricks Runtime 16.3 及更高版本中可用,Scala 在 Databricks Runtime 17.3 及更高版本中可用。
- RocksDB 是 Databricks Runtime 17.3 及更高版本中的默认状态存储提供程序。
对于 Databricks Runtime 17.2 及更低版本,必须配置 RocksDB 状态存储提供程序。 Databricks 建议在 Spark 配置中启用 RocksDB。
spark.conf.set("spark.sql.streaming.stateStore.providerClass", "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")
什么是 transformWithState?
该 transformWithState 运算符将自定义有状态处理器应用于结构化流式处理查询。 必须实现自定义有状态处理器才能使用 transformWithState。 结构化流式处理包含用于通过 Python、Scala 或 Java 来构建有状态处理器的 API。
使用 transformWithState 将自定义逻辑应用于分组键。 下面介绍了高级设计:
- 定义一个或多个状态变量。
- 每个分组键的状态信息都会保留。 可以在用户定义的代码中访问每个状态变量。
- 对于处理的每个微批次,与该键对应的所有行都可通过一个迭代器获得。
- 将
StatefulProcessorHandle与计时器和自定义条件结合使用,以控制如何输出行。 - 为了管理状态过期和状态大小,状态值支持单独的生存时间(TTL)定义。
由于 transformWithState 支持状态存储中的架构演变,因此可以循环访问和更新生产应用程序,而不会丢失历史状态信息。 更新状态架构后,无需重新处理行,这简化了代码部署和维护。 请参阅 状态存储中的架构演变。
重要
Azure Databricks 文档使用 transformWithState 来描述 Python 和 Scala 实现方式:
- PySpark 支持基于
transformWithState行的 API 和基于transformWithStateInPandasPandas 的运算符。-
transformWithStateInPandas在实时模式下不受支持。 请改用transformWithState。 有关详细信息,请参阅transformWithState实时模式。
-
- Scala 仅支持基于
transformWithState行的 API。
Scala 和 Python 实现transformWithState具有相同的功能,但语法存在一些差异。
定义 StatefulProcessor
通过扩展 StatefulProcessor 类并实现其方法来定义有状态处理器。
Spark 将一个 StatefulProcessorHandle 传递给你的 init 的 StatefulProcessor 方法。 使用句柄创建状态变量并与状态存储进行交互。
transformWithState 支持三种状态类型: ValueState、 ListState和 MapState。 每种类型使用不同的基础数据结构存储每个分组键的状态。
实现以下方法来定义自定义逻辑:
- 实现
handleInputRows以控制应用程序如何处理数据、更新状态,以及为每个微批次输出行数据。 请参阅 “处理输入行”。 - 实现
handleExpiredTimer以运行基于时间的逻辑,而不考虑分组键是否在微批处理中接收新行。 请参阅 “处理过期计时器”。 - (可选)在应用程序处理任何输入行之前实现
handleInitialState预填充状态。 请参阅处理初始状态。
下表比较了这些方法的功能行为:
| 行为 | handleInputRows |
handleExpiredTimer |
|---|---|---|
| 获取、放置、更新或清除状态值 | 是的 | 是的 |
| 创建或删除计时器 | 是的 | 是的 |
| 输出行 | 是的 | 是的 |
| 遍历当前微批次中的行 | 是的 | 否 |
| 基于已用时间触发逻辑 | 否 | 是的 |
可以合并 handleInputRows 并 handleExpiredTimer 根据需要实现复杂的逻辑。
例如,您可以实现一个应用程序,该应用程序利用 handleInputRows 来更新每个微批处理的状态值,并设置一个将在10秒后触发的计时器。 如果没有处理其他行,你可以使用 handleExpiredTimer 输出状态存储中的当前值。 如果为分组键处理新行,则可以清除现有计时器并设置新的计时器。
StatefulProcessorHandle
在 PySpark 中,类 StatefulProcessorHandle 允许你访问控制代码如何使用状态信息的函数。
初始化 StatefulProcessor 时,必须始终导入并将 StatefulProcessorHandle 传递给 handle 变量。
handle 变量将Python类中的局部变量与状态变量关联。
注释
Scala 使用该方法 getHandle。
自定义状态类型
可以在单个有状态运算符中实现多个状态对象。
根据完整的应用程序逻辑选择状态类型。 例如,您可以使用 ValueState 跟踪会话,并按 user_id 和 session_id 进行分组。 或者,若要评估跨多个会话的条件,请使用一个按MapState分组、并以user_id作为映射键的session_id。
如果状态对象使用 StructType,则必须在该模式中为结构体的每个字段定义唯一的名称。 读取状态存储时,这些名称可见。 请参阅读取结构化流式处理状态信息。
以下各节介绍了 transformWithState 支持的状态类型:
ValueState
ValueState 存储每个分组键的值。
值状态可以包括复杂类型,例如结构或元组。 对于 ValueState,您必须实现用于替换整个值的逻辑。
值状态的存活时间会在该值更新时重置。 如果在不更新已存储的 ValueState 的情况下处理针对 ValueState 的源键,则其生存时间(TTL)不会被重置。
ListState
ListState 存储每个分组键的列表。
列表状态是值的集合,每个值都可以包括复杂类型。 列表中的每个值都有自己的生存时间。
可以通过追加单个项、追加项列表或用 a put覆盖整个列表来向列表中添加项。 若要重置生存时间,必须使用 put 操作。
MapState
MapState 存储每个分组键的映射。 映射是 Apache Spark 中与 Python 字典(dict)等效的概念。
映射状态是不同键的集合,每个键映射到一个值,每个键可以包括复杂类型。 映射中的每个键值对都有各自的生存时间。
可以更新特定键的值,也可以删除键及其值。 可以使用其键返回单个值、列出所有键、列出所有值或返回迭代器来处理映射中完整的键值对。
重要
分组键描述结构化流式处理查询子句中指定的 GROUP BY 字段。 映射状态可以针对某个分组键包含任意数量的键值对。
例如,如果您的查询使用GROUP BY user_id,并且您想为每个session_id定义一个映射,那么分组键是user_id,MapState键是session_id:
Python
class SessionTracker(StatefulProcessor):
def init(self, handle: StatefulProcessorHandle) -> None:
self.sessions = handle.getMapState("sessions", StringType(), LongType())
def handleInputRows(self, key, rows: Iterator[Row], timerValues) -> Iterator[Row]:
for row in rows:
session_id = row["session_id"] # session_id is the MapState key
count = self.sessions.getValue(session_id)[0] if self.sessions.containsKey(session_id) else 0
new_count = count + 1
self.sessions.updateValue(session_id, (new_count,))
yield from []
def close(self) -> None:
pass
df.groupBy("user_id").transformWithState(SessionTracker(), ...) # user_id is the grouping key
Scala(编程语言)
case class Event(userId: String, sessionId: String)
class SessionTracker extends StatefulProcessor[String, Event, Row] {
@transient private var sessions: MapState[String, Long] = _
override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
sessions = getHandle.getMapState[String, Long]("sessions", Encoders.STRING, Encoders.scalaLong, TTLConfig.NONE)
}
override def handleInputRows(
key: String,
rows: Iterator[Event],
timerValues: TimerValues): Iterator[Row] = {
rows.foreach { event =>
val count = if (sessions.containsKey(event.sessionId)) sessions.getValue(event.sessionId) else 0L
sessions.updateValue(event.sessionId, count + 1) // sessionId is the MapState key
}
Iterator.empty
}
}
df.as[Event]
.groupByKey(_.userId) // userId is the grouping key
.transformWithState(new SessionTracker(), TimeMode.None(), OutputMode.Update())
在StatefulProcessor中创建自定义状态变量
初始化 StatefulProcessor 时,你可以为每个状态对象创建一个局部变量,以便与自定义逻辑中的状态对象进行交互。 通过重写 init 类中的内置 StatefulProcessor 方法来定义和初始化状态变量。
您可以在 getValueState 中使用 getListState、getMapState 和 StatefulProcessor 方法定义任意数量的状态对象。
每个状态对象必须具有以下条件:
- 唯一的名称
- 架构
- 在Python中,必须指定架构。
- 在 Scala 中,可以传递一个
Encoder指定状态架构。
还可以选择提供以毫秒为单位的生存时间(TTL)。 如果实现映射状态,则必须为映射键和值提供单独的架构定义。
注释
这 StatefulProcessor 分别处理用于查询、更新和发出状态信息的逻辑。 请参阅 在具有自定义逻辑的方法中使用状态变量。
在具有自定义逻辑的方法中使用状态变量
状态对象具有获取状态、更新现有状态信息以及清除当前状态的方法。
每个分组密钥都有专用的状态信息。
-
StatefulProcessor会根据您的自定义逻辑和指定的输出架构输出行。 请参阅输出行。 -
statestore使用读取器访问状态存储中的值。 此读取器适用于批处理工作负荷,不适用于低延迟工作负荷。 请参阅读取结构化流式处理状态信息。 - 仅当微批次中存在该键对应的行时,使用
handleInputRows指定的逻辑才会运行。 请参阅 “处理输入行”。 - 使用
handleExpiredTimer来实现不依赖于观测行触发的基于时间的逻辑。 请参阅 “处理过期计时器”。
注释
状态对象通过对键进行分组来隔离,并具有以下影响:
- 状态值不能受到与其他分组键关联的行的影响。
- 无法实现依赖于比较值或跨分组键更新状态的逻辑。
可以比较分组键中的值。 使用MapState来实现逻辑,以便自定义逻辑能够使用第二个键。 例如,通过 user_id 分组和使用 ip_address 密钥 MapState ,可以跟踪同时的用户会话。
使用状态的高级注意事项
状态更新是容错的。 如果任务在微批处理完成处理之前崩溃,则重试将使用最后一个成功的微批处理中的值。
为了优化性能,Databricks 建议在迭代器中处理给定密钥的所有值,并在单个写入中提交更新。 写入状态变量时,这会触发对 RocksDB 的写入。
状态值没有默认值。 如果逻辑需要读取现有状态信息,请使用 exists 该方法。
若要实现 null 状态的逻辑, MapState 变量允许检查单个键或列出所有密钥。
处理输入行
使用该方法 handleInputRows 定义应用程序如何处理行和更新状态值。 每次结构化流查询处理某个分组键对应的行时,此方法都会运行。
对于使用 transformWithState 实现的大多数有状态应用程序,其核心逻辑是使用 handleInputRows 定义的。
对于处理的每次微批次更新,对于给定的分组键,可以通过迭代器访问该微批次中的所有行。 用户定义的逻辑可以与当前微包中的所有行和状态存储中的值进行交互。
处理过期的计时器
使用 handleExpiredTimer 方法基于经过的时间实现自定义逻辑。
在分组键内,计时器通过其时间戳进行唯一标识。
计时器过期时,结果由应用程序中实现的逻辑确定。 常见模式包括:
- 发出存储在状态变量中的信息。
- 逐出存储的状态信息。
- 创建新的计时器。
即使在微批次中没有处理与其关联键对应的任何行,过期的计时器也会触发。
指定时间模式
传递 StatefulProcessor 至 transformWithState时,必须使用参数指定时间模式 timeMode 。
支持以下选项:
| 时间模式 | DESCRIPTION |
|---|---|
ProcessingTime |
计时器和 TTL 均受支持,并根据 Apache Spark 处理每个微批次时的挂钟时间进行评估。 当你希望计时器相对于处理各行的时间按固定间隔触发,而不考虑数据中的时间戳时,请使用 ProcessingTime。 |
EventTime |
支持计时器,并根据事件时间水印进行评估。 水印随着 Apache Spark 观察输入数据中的时间戳而前进。
EventTime 不支持 TTL。
EventTime当数据包含时间戳并且希望计时器根据这些时间戳的进度触发时使用。 使用 EventTime时,还必须指定 eventTimeColumnName 参数。 请参阅 eventTimeColumnName。 |
NoTime 或 TimeMode.None() |
不支持计时器和 TTL。 当有状态应用程序不需要基于时间的逻辑时使用 NoTime 。 |
eventTimeColumnName
使用 EventTime 时间模式时,参数 eventTimeColumnName 指定包含事件时间戳的输出架构中的列的名称。 Apache Spark 使用此列将水印传递到输出流,从而使下游基于时间的操作能够正确执行。
Python
eventTimeColumnName 是用于 transformWithState 或 transformWithStateInPandas 的附加参数:
q = (
df.groupBy("key")
.transformWithState(
statefulProcessor=MyProcessor(),
outputStructType=output_schema,
outputMode="Append",
timeMode="EventTime",
eventTimeColumnName="outputTimestamp",
)
.writeStream...
)
Scala(编程语言)
transformWithState 接受 eventTimeColumnName 代替 timeMode。 此方法始终使用 EventTime 模式:
val q = spark
.readStream
.format("delta")
.load(srcDeltaTableDir)
.as[(String, String)]
.groupByKey(x => x._1)
.transformWithState(
new MyProcessor(),
"outputTimestamp",
OutputMode.Append(),
)
.writeStream...
内置计时器值
Databricks 建议不要在自定义有状态应用程序中调用系统时钟,因为这可能导致任务失败时的重试不可靠。 当必须访问处理时间或水印时,请使用类中的 TimerValues 方法:
TimerValues |
DESCRIPTION |
|---|---|
getCurrentProcessingTimeInMs |
返回自 epoch 以来当前批处理的处理时间的时间戳(以毫秒为单位)。 |
getCurrentWatermarkInMs |
返回当前批次水印的时间戳,自纪元以来以毫秒为单位。 |
注释
处理时间描述 Apache Spark 处理微批处理的时间。 许多流式处理源(如 Kafka)还包括系统处理时间。
流式处理查询上的水印通常针对事件时间或流式处理源的处理时间定义。 请参阅应用水印来控制数据处理阈值。
水印和窗口都可以与 transformWithState 结合使用。 可以通过利用 TTL、计时器或MapStateListState功能在自定义有状态应用程序中实现类似的功能。
状态类型的生存时间(TTL)
为防止内存不足错误并清除陈旧的状态类型值,transformWithState 支持为每个状态类型的值设置可选的生存时间(TTL)。 过期后,TTL 以无提示方式逐出状态类型值。 TTL 不运行 handleExpiredTimer 或任何自定义逻辑。 若要在状态过期时运行代码,请改用计时器。
重要
如果未实现 TTL,则必须处理状态清除,以避免内存溢出错误。
对于所有状态类型,TTL 在更新状态信息时重置。 为每个状态类型值强制实施 TTL,每个状态类型有不同的规则:
- 状态变量的范围限定为分组键。
- 对于
ValueState对象,每个分组键只存储单个值。 TTL 适用于此值。 - 对于
ListState对象,列表可以包含许多值。 TTL 独立适用于列表中的每个值。- 虽然 TTL 的范围限定为单个
ListState值,但更新单个值的唯一方法是使用put该方法,该方法覆盖变量的全部ListState内容,并为列表中的所有值重置 TTL。
- 虽然 TTL 的范围限定为单个
- 对于
MapState对象,每个映射键都具有关联的状态值。 TTL 独立应用于映射中的每个键值对。
注释
计时器除了可用于状态清除之外,还允许你定义自定义逻辑,包括输出行数据。 (可选)可以使用计时器来清除给定状态值的状态信息,并发出值或触发条件逻辑。 请参阅 “处理过期计时器”。
有状态应用程序示例
以下示例定义自定义有状态处理器, SimpleCounterProcessor包括示例状态变量。
SimpleCounterProcessor 使用 ValueState、ListState 和 MapState 来统计每个分组键的行数。
Python (Pandas)
import pandas as pd
from pyspark.sql import Row
from pyspark.sql.streaming import StatefulProcessor, StatefulProcessorHandle
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from typing import Iterator
spark.conf.set("spark.sql.streaming.stateStore.providerClass","org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")
output_schema = StructType(
[
StructField("id", StringType(), True),
StructField("countAsString", StringType(), True),
]
)
class SimpleCounterProcessor(StatefulProcessor):
def init(self, handle: StatefulProcessorHandle) -> None:
value_state_schema = StructType([StructField("count", IntegerType(), True)])
list_state_schema = StructType([StructField("count", IntegerType(), True)])
self.value_state = handle.getValueState(stateName="valueState", schema=value_state_schema)
self.list_state = handle.getListState(stateName="listState", schema=list_state_schema)
# Schema can also be defined using strings and SQL DDL syntax
self.map_state = handle.getMapState(stateName="mapState", userKeySchema="name string", valueSchema="count int")
def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
count = 0
for pdf in rows:
list_state_rows = [(120,), (20,)] # A list of tuples
self.list_state.put(list_state_rows)
self.list_state.appendValue((111,))
self.list_state.appendList(list_state_rows)
pdf_count = pdf.count()
count += pdf_count.get("value")
self.value_state.update((count,)) # Count is passed as a tuple
iter = self.list_state.get()
list_state_value = next(iter)[0]
value = count
user_key = ("user_key",)
if self.map_state.exists():
if self.map_state.containsKey(user_key):
value += self.map_state.getValue(user_key)[0]
self.map_state.updateValue(user_key, (value,)) # Value is a tuple
yield pd.DataFrame({"id": key, "countAsString": str(count)})
q = (df.groupBy("key")
.transformWithStateInPandas(
statefulProcessor=SimpleCounterProcessor(),
outputStructType=output_schema,
outputMode="Update",
timeMode="None",
)
.writeStream...
)
Python(基于行)
from pyspark.sql import Row
from pyspark.sql.streaming import StatefulProcessor, StatefulProcessorHandle
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from typing import Iterator
spark.conf.set("spark.sql.streaming.stateStore.providerClass", "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")
output_schema = StructType(
[
StructField("id", StringType(), True),
StructField("countAsString", StringType(), True),
]
)
class SimpleCounterProcessor(StatefulProcessor):
def init(self, handle: StatefulProcessorHandle) -> None:
value_state_schema = StructType([StructField("count", IntegerType(), True)])
list_state_schema = StructType([StructField("count", IntegerType(), True)])
self.value_state = handle.getValueState(stateName="valueState", schema=value_state_schema)
self.list_state = handle.getListState(stateName="listState", schema=list_state_schema)
self.map_state = handle.getMapState(stateName="mapState", userKeySchema="name string", valueSchema="count int")
def handleInputRows(self, key, rows: Iterator[Row], timerValues) -> Iterator[Row]:
count = 0
for row in rows:
list_state_rows = [(120,), (20,)] # A list of tuples
self.list_state.put(list_state_rows)
self.list_state.appendValue((111,))
self.list_state.appendList(list_state_rows)
count += 1
self.value_state.update((count,)) # Count is passed as a tuple
iter_list = self.list_state.get()
list_state_value = next(iter_list)[0]
value = count
user_key = ("user_key",)
if self.map_state.exists():
if self.map_state.containsKey(user_key):
value += self.map_state.getValue(user_key)[0]
self.map_state.updateValue(user_key, (value,)) # Value is a tuple
yield Row(id=key, countAsString=str(count))
q = (
df.groupBy("key")
.transformWithState(
statefulProcessor=SimpleCounterProcessor(),
outputStructType=output_schema,
outputMode="Update",
timeMode="None",
)
.writeStream...
)
Scala(编程语言)
import org.apache.spark.sql.streaming._
import org.apache.spark.sql.{Dataset, Encoder, Encoders , DataFrame}
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
spark.conf.set("spark.sql.streaming.stateStore.providerClass","org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")
class SimpleCounterProcessor extends StatefulProcessor[String, (String, String), (String, String)] {
@transient private var countState: ValueState[Int] = _
@transient private var listState: ListState[Int] = _
@transient private var mapState: MapState[String, Int] = _
private val longEncoder = Encoders.scalaLong
private val intEncoder = Encoders.scalaInt
private val stringEncoder = Encoders.STRING
override def init(
outputMode: OutputMode,
timeMode: TimeMode): Unit = {
countState = getHandle.getValueState[Int]("countState",
intEncoder, TTLConfig.NONE)
listState = getHandle.getListState[Int]("listState",
intEncoder, TTLConfig.NONE)
mapState = getHandle.getMapState[String, Int]("mapState",
stringEncoder, intEncoder, TTLConfig.NONE)
}
override def handleInputRows(
key: String,
inputRows: Iterator[(String, String)],
timerValues: TimerValues): Iterator[(String, String)] = {
var count = countState.getOption().getOrElse(0)
for (row <- inputRows) {
val listData = Array(120, 20)
listState.put(listData)
listState.appendValue(count)
listState.appendList(listData)
count += 1
}
val iter = listState.get()
var listStateValue = 0
if (iter.hasNext) {
listStateValue = iter.next()
}
countState.update(count)
var value = count
val userKey = "userKey"
if (mapState.exists()) {
if (mapState.containsKey(userKey)) {
value += mapState.getValue(userKey)
}
}
mapState.updateValue(userKey, value)
Iterator((key, count.toString))
}
}
val q = spark
.readStream
.format("delta")
.load("$srcDeltaTableDir")
.as[(String, String)]
.groupByKey(x => x._1)
.transformWithState(
new SimpleCounterProcessor(),
TimeMode.None(),
OutputMode.Update(),
)
.writeStream...
有关更多示例,请参阅 示例有状态应用程序。
注释
在Python中,状态值为元组。 将元组传递给 put 和 update,并期望从 get 获得元组。
例如,如果你的 ValueState 的架构定义是单个整数:
current_value_tuple = value_state.get() # Returns the value state as a tuple
current_value = current_value_tuple[0] # Extracts the first item in the tuple
new_value = current_value + 1 # Calculate a new value
value_state.update((new_value,)) # Pass the new value formatted as a tuple
对于 ListState 中的项或 MapState 中的值,也可使用这种方法。
输出行
您必须使用 handleInputRows 或 handleExpiredTimer 来定义 transformWithState 如何针对每个分组键输出行。 请参阅 “处理输入行 ”和 “处理过期的计时器”。
自定义有状态应用程序不假设如何使用状态信息。 对于给定条件,应用程序可能不会发出任何行、一行或多行。
注释
可以实现多个状态值并定义用于发出行的多个条件,但所有行都必须使用相同的架构。
Python (Pandas)
使用 transformWithStateInPandas,使用 outputStructType 关键字定义输出架构。
使用 pandas DataFrame 对象和 yield 发出行。
您也可以 yield 空的 DataFrame。 如果使用 update 输出模式并发出空数据帧,这将更新分组键 null的值。
Python(基于行)
使用 transformWithState,使用 outputStructType 关键字定义输出架构。
使用 Row 对象和 yield 输出行。
(可选)可以返回空迭代器。 如果使用 update 输出模式并发出空迭代器,则会更新分组键 null的值。
Scala(编程语言)
在 Scala 中,使用 Iterator 对象输出行。 该模式会根据输出行的模式自动推导出来。
(可选)可以返回空 Iterator。 如果使用 update 输出模式并发出空 Iterator,这将更新分组键 null的值。
处理初始状态
可选地,您可以将初始状态传递给第一个微批次。
例如,可以使用此方法来:
- 将现有工作流迁移到新的自定义应用程序。
- 升级有状态运算符以更改架构或逻辑。
- 修复无法自动修复的故障,需要手动干预。
注释
使用状态存储读取器从现有检查点查询状态信息。 请参阅读取结构化流式处理状态信息。
如果要将现有的 Delta 表转换为有状态应用程序,请使用 spark.read.table("table_name") 读取该表,并传递生成的 DataFrame。 可以选择或修改字段,以符合新的有状态应用程序的要求。
使用含有与输入行相同的分组键架构的 DataFrame 提供初始状态。
注释
Python使用 handleInitialState 在定义 StatefulProcessor 时指定初始状态。 Scala 使用独特的类 StatefulProcessorWithInitialState。
以下示例基于现有 Delta 表初始化每个键对应的计数器:
Python(基于行)
from pyspark.sql import Row
from pyspark.sql.streaming import StatefulProcessor, StatefulProcessorHandle
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from typing import Iterator
class CounterWithInitialState(StatefulProcessor):
def init(self, handle: StatefulProcessorHandle) -> None:
state_schema = StructType([StructField("count", IntegerType(), True)])
self.count_state = handle.getValueState("countState", state_schema)
def handleInitialState(self, key, initialState: Row, timerValues) -> None:
self.count_state.update((initialState["count"],))
def handleInputRows(self, key, rows: Iterator[Row], timerValues) -> Iterator[Row]:
count = self.count_state.get()[0] if self.count_state.exists() else 0
for _ in rows:
count += 1
self.count_state.update((count,))
yield Row(id=key[0], count=count)
def close(self) -> None:
pass
output_schema = StructType([
StructField("id", StringType(), True),
StructField("count", IntegerType(), True),
])
# Load existing counts as initial state — must use the same grouping key as the input
initial_state = spark.read.table("existing_counts").groupBy("id")
q = (
df.groupBy("id")
.transformWithState(
statefulProcessor=CounterWithInitialState(),
outputStructType=output_schema,
outputMode="Update",
timeMode="None",
initialState=initial_state,
)
.writeStream...
)
Scala(编程语言)
import org.apache.spark.sql.streaming._
import org.apache.spark.sql.Encoders
class CounterWithInitialState
extends StatefulProcessorWithInitialState[String, (String, String), (String, String), (String, Int)] {
@transient private var countState: ValueState[Int] = _
override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
countState = getHandle.getValueState[Int]("countState", Encoders.scalaInt, TTLConfig.NONE)
}
override def handleInitialState(
key: String, initialState: (String, Int), timerValues: TimerValues): Unit = {
countState.update(initialState._2)
}
override def handleInputRows(
key: String,
rows: Iterator[(String, String)],
timerValues: TimerValues): Iterator[(String, String)] = {
val count = if (countState.exists()) countState.get() else 0
val newCount = count + rows.size
countState.update(newCount)
Iterator((key, newCount.toString))
}
}
// Load existing counts as initial state — must use the same grouping key as the input
val initialState = spark.read.table("existing_counts")
.as[(String, Int)]
.groupByKey(_._1)
val q = spark
.readStream
.format("delta")
.load(srcDeltaTableDir)
.as[(String, String)]
.groupByKey(_._1)
.transformWithState(
new CounterWithInitialState(),
TimeMode.None(),
OutputMode.Update(),
initialState,
)
.writeStream...
在 Lakeflow Spark 声明性管道中使用 transformWithState
使用 Lakeflow Spark 声明性管道中的 transformWithState 运算符,使用 Python 在流式处理管道中实现任意有状态逻辑。
若要执行此操作,请完成以下步骤:
- 为任意有状态转换定义输出架构和有状态处理器逻辑。 例如,请参阅 有状态应用程序示例。
- 创建一个 Lakeflow Spark 声明性流水线,该流水线调用
transformWithState运算符到 DataFrame 上。 请参阅 教程:使用 Lakeflow 管道编辑器创建第一个管道。 - 运行管道并验证目标表或接收器上的结果。
有关使用 transformWithState 监视传感器心跳的示例,请参阅 示例:使用 transformWithState 监视传感器心跳。