重要
Databricks Runtime 16.2 及更高版本中的此功能目前处于公共预览阶段。
可以使用自定义有状态运算符生成流式处理应用程序,以实现使用任意有状态逻辑的低延迟和准实时解决方案。 自定义有状态运算符可以解锁通过传统结构化流处理无法实现的新操作用例和模式。
备注
Databricks 建议对支持的有状态操作(如聚合、重复数据删除和流联接)使用内置的结构化流式处理功能。 请参阅什么是有状态流式处理?。
Databricks 建议使用 transformWithState
代替传统运算符进行任意状态转换。 有关旧 flatMapGroupsWithState
版和 mapGroupsWithState
运算符的文档,请参阅 旧版任意有状态运算符。
transformWithState
运算符和相关 API 和类具有以下要求:
- 在 Databricks Runtime 16.2 及更高版本中可用。
- 计算必须使用专用或无隔离访问模式。
- 必须使用 RocksDB 状态存储提供程序。 Databricks 建议在计算配置过程中启用 RocksDB。
transformWithStateInPandas
支持 Databricks Runtime 16.3 及更高版本中的标准访问模式。
备注
若要为当前会话启用 RocksDB 状态存储提供程序,请运行以下命令:
spark.conf.set("spark.sql.streaming.stateStore.providerClass", "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")
该 transformWithState
运算符将自定义有状态处理器应用于结构化流式处理查询。 必须实现自定义有状态处理器才能使用 transformWithState
。 结构化流式处理包括用于使用 Python、Scala 或 Java 生成有状态处理器的 API。
使用 transformWithState
将自定义逻辑应用于使用结构化流式处理以增量方式处理的记录的分组键。 下面介绍了高级设计:
- 定义一个或多个状态变量。
- 针对每个分组键保留状态信息,可以根据用户定义的逻辑为每个状态变量访问状态信息。
- 对于处理的每个微批处理,键的所有记录都可用作迭代器。
- 使用内置控制器来管理何时以及如何根据计时器和用户定义的条件来发出记录。
- 状态值支持单个生存时间(TTL)定义,从而灵活地管理状态过期和状态大小。
由于 transformWithState
支持状态存储中的架构演变,因此你可以循环访问和更新生产应用程序,而无需丢失历史状态信息或需要重新处理记录,从而灵活地开发和简化维护。 请参阅 状态存储中的架构演变。
重要
PySpark 使用运算符 transformWithStateInPandas
而不是 transformWithState
. Azure Databricks 文档用于 transformWithState
描述 Python 和 Scala 实现的功能。
transformWithState
以及相关 API 的 Scala 和 Python 实现由于语言细节的不同而有所差异,但提供相同的功能。 有关首选编程语言,请参阅特定于语言的示例和 API 文档。
通过使用内置句柄实现处理程序来实现自定义有状态应用程序的核心逻辑。
- 句柄提供与状态值和计时器交互、处理传入记录和发出记录的方法。
- 处理程序定义自定义事件驱动的逻辑。
每种状态类型的句柄都基于基础数据结构实现,但每个句柄都包含获取、放置、更新和删除记录的功能。
处理程序基于输入记录或计时器中观察到的事件,使用以下语义实现:
- 使用
handleInputRows
该方法定义处理程序,以控制数据的处理方式、更新状态,并为分组键处理的每个微批记录发出记录。 请参阅 “处理输入行”。 - 使用
handleExpiredTimer
方法定义一个处理程序,以利用基于时间的阈值来运行逻辑,无论是否处理分组键的其他记录。 请参阅 程序定时事件。
下表对这些处理程序支持的功能行为进行了比较:
行为 | handleInputRows |
handleExpiredTimer |
---|---|---|
获取、放置、更新或清除状态值 | 是的 | 是的 |
创建或删除计时器 | 是的 | 是的 |
发出记录 | 是的 | 是的 |
循环访问当前微批处理中的记录 | 是的 | 否 |
基于已用时间触发逻辑 | 否 | 是的 |
可以根据需要合并 handleInputRows
和 handleExpiredTimer
实现复杂的逻辑。
例如,您可以实现一个应用程序,该应用程序利用 handleInputRows
来更新每个微批处理的状态值,并设置一个将在10秒后触发的计时器。 如果未处理其他记录,则可用于 handleExpiredTimer
在状态存储中发出当前值。 如果为分组键处理新记录,则可以清除现有计时器并设置新的计时器。
可以在单个有状态运算符中实现多个状态对象。 提供给每个状态对象的名称将保留在状态存储中,可以使用状态存储读取器访问该存储对象。 如果状态对象使用 a StructType
,则在传递架构时为结构中的每个字段提供名称。 读取状态存储时,这些名称也可见。 请参阅读取结构化流式处理状态信息。
内置类和运算符提供的功能旨在提供灵活性和可扩展性,并且应用程序需要运行的完整逻辑应告知实现选择。 例如,您可以使用按字段ValueState
和user_id
分组的session_id
来实现几乎相同的逻辑,或者使用按MapState
分组的user_id
,其中session_id
是MapState
的键。 在此实例中,如果逻辑需要评估多个MapState
条件,session_id
则可能是首选实现。
以下各节介绍了支持 transformWithState
的状态类型。
对于每个分组键,都有一个关联的值。
值状态可以包括复杂类型,例如结构或元组。 更新ValueState
时,实现逻辑以替换整个值。 当值更新时,值状态的 TTL 将重置,但如果处理与 ValueState
匹配的源键,而不更新存储的 ValueState
,则不会重置。
对于每个分组键,都有一个关联的列表。
列表状态是值的集合,每个值都可以包括复杂类型。 列表中的每个值都有自己的 TTL。 可以通过追加单个项、追加项列表或用 a put
覆盖整个列表来向列表中添加项。 仅将 put 操作视为重置 TTL 的更新。
对于每个分组键,都有一个关联的映射。 映射是等效于 Python 词典的 Apache Spark 功能。
重要
分组键描述结构化流式处理查询子句中指定的 GROUP BY
字段。 映射状态包含分组键的任意数量的键值对。
例如,如果按 user_id
进行分组,并希望为每个 session_id
定义一个映射,则分组键为 user_id
,而映射中的键为 session_id
。
映射状态是一组不同的键,每个键都映射到一个可以包含复杂类型的值。 映射中的每个键值对都有自己的 TTL。 可以更新特定键的值或删除键及其值。 可以使用其键返回单个值、列出所有键、列出所有值或返回迭代器来处理映射中完整的键值对。
初始化 StatefulProcessor
时,你可以为每个状态对象创建一个局部变量,以便与自定义逻辑中的状态对象进行交互。 通过替代 init
类中的内置 StatefulProcessor
方法来定义和初始化状态变量。
初始化getValueState
时,您可以使用getListState
、getMapState
和StatefulProcessor
方法定义任意数量的状态对象。
每个状态对象必须具有以下条件:
- 唯一的名称
- 指定的架构
- 在 Python 中,显式指定架构。
- 在 Scala 中,传递一个
Encoder
指定状态架构。
还可以提供可选的生存时间(TTL)持续时间(以毫秒为单位)。 如果实现映射状态,则必须为映射键和值提供单独的架构定义。
备注
状态信息的查询、更新和发出的逻辑被单独处理。 请参阅 “使用状态变量”。
下面演示了用于定义和使用自定义有状态处理器 transformWithState
的基本语法,包括每个受支持的类型的示例状态变量。 有关更多示例,请参阅 示例有状态应用程序。
备注
Python 使用元组进行与状态值的所有交互。 这意味着 Python 代码在使用诸如 put
和 update
之类的操作时,应使用元组传递值,并且在使用 get
时应做好处理元组的准备。
例如,如果值状态的架构只是一个整数,则可以实现如下所示的代码:
current_value_tuple = value_state.get() # Returns the value state as a tuple
current_value = current_value_tuple[0] # Extracts the first item in the tuple
new_value = current_value + 1 # Calculate a new value
value_state.update((new_value,)) # Pass the new value formatted as a tuple
对于 ListState
中的项目或 MapState
中的值也是如此。
import pandas as pd
from pyspark.sql import Row
from pyspark.sql.streaming import StatefulProcessor, StatefulProcessorHandle
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from typing import Iterator
spark.conf.set("spark.sql.streaming.stateStore.providerClass","org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")
output_schema = StructType(
[
StructField("id", StringType(), True),
StructField("countAsString", StringType(), True),
]
)
class SimpleCounterProcessor(StatefulProcessor):
def init(self, handle: StatefulProcessorHandle) -> None:
value_state_schema = StructType([StructField("count", IntegerType(), True)])
list_state_schema = StructType([StructField("count", IntegerType(), True)])
self.value_state = handle.getValueState(stateName="valueState", schema=value_state_schema)
self.list_state = handle.getListState(stateName="listState", schema=list_state_schema)
# Schema can also be defined using strings and SQL DDL syntax
self.map_state = handle.getMapState(stateName="mapState", userKeySchema="name string", valueSchema="count int")
def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
count = 0
for pdf in rows:
list_state_rows = [(120,), (20,)] # A list of tuples
self.list_state.put(list_state_rows)
self.list_state.appendValue((111,))
self.list_state.appendList(list_state_rows)
pdf_count = pdf.count()
count += pdf_count.get("value")
self.value_state.update((count,)) # Count is passed as a tuple
iter = self.list_state.get()
list_state_value = next(iter1)[0]
value = count
user_key = ("user_key",)
if self.map_state.exists():
if self.map_state.containsKey(user_key):
value += self.map_state.getValue(user_key)[0]
self.map_state.updateValue(user_key, (value,)) # Value is a tuple
yield pd.DataFrame({"id": key, "countAsString": str(count)})
q = (df.groupBy("key")
.transformWithStateInPandas(
statefulProcessor=SimpleCounterProcessor(),
outputStructType=output_schema,
outputMode="Update",
timeMode="None",
)
.writeStream...
)
import org.apache.spark.sql.streaming._
import org.apache.spark.sql.{Dataset, Encoder, Encoders , DataFrame}
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
spark.conf.set("spark.sql.streaming.stateStore.providerClass","org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")
class SimpleCounterProcessor extends StatefulProcessor[String, (String, String), (String, String)] {
@transient private var countState: ValueState[Int] = _
@transient private var listState: ListState[Int] = _
@transient private var mapState: MapState[String, Int] = _
override def init(
outputMode: OutputMode,
timeMode: TimeMode): Unit = {
countState = getHandle.getValueState[Int]("countState",
Encoders.scalaLong, TTLConfig.NONE)
listState = getHandle.getListState[Int]("listState",
Encoders.scalaInt, TTLConfig.NONE)
mapState = getHandle.getMapState[String, Int]("mapState",
Encoders.STRING, Encoders.scalaInt, TTLConfig.NONE)
}
override def handleInputRows(
key: String,
inputRows: Iterator[(String, String)],
timerValues: TimerValues): Iterator[(String, String)] = {
var count = countState.getOption().getOrElse(0)
for (row <- inputRows) {
val listData = Array(120, 20)
listState.put(listData)
listState.appendValue(count)
listState.appendList(listData)
count += 1
}
val iter = listState.get()
var listStateValue = 0
if (iter.hasNext) {
listStateValue = iter.next()
}
countState.update(count)
var value = count
val userKey = "userKey"
if (mapState.exists()) {
if (mapState.containsKey(userKey)) {
value += mapState.getValue(userKey)
}
}
mapState.updateValue(userKey, value)
Iterator((key, count.toString))
}
}
val q = spark
.readStream
.format("delta")
.load("$srcDeltaTableDir")
.as[(String, String)]
.groupByKey(x => x._1)
.transformWithState(
new SimpleCounterProcessor(),
TimeMode.None(),
OutputMode.Update(),
)
.writeStream...
PySpark 包括一个 StatefulProcessorHandle
类,用于提供对控制用户定义 Python 代码如何与状态信息交互的函数的访问权限。 在初始化StatefulProcessorHandle
时,必须始终导入handle
并将其传递给StatefulProcessor
变量。
该 handle
变量将 Python 类中的局部变量与状态变量关联。
备注
Scala 使用该方法 getHandle
。
可以选择性地提供用于第一个微批处理的初始状态。 将现有工作流迁移到新的自定义应用程序、升级有状态作员以更改架构或逻辑,或者修复无法自动修复且需要手动干预的故障时,这非常有用。
备注
使用状态存储读取器从现有检查点查询状态信息。 请参阅读取结构化流式处理状态信息。
如果要将现有的 Delta 表转换为有状态应用程序,请使用 spark.read.table("table_name")
读取该表,并传递生成的 DataFrame。 可以选择或修改字段,以符合新的有状态应用程序的要求。
使用含有与输入行相同的分组键架构的 DataFrame 提供初始状态。
备注
Python 使用 handleInitialState
来指定定义 StatefulProcessor
时的初始状态。 Scala 使用独特的类 StatefulProcessorWithInitialState
。
支持的状态对象提供获取状态、更新现有状态信息或清除当前状态的方法。 每个受支持的状态类型都具有与实现的数据结构相对应的方法的唯一实现。
观察到的每个分组键都有专用的状态信息。
- 记录基于所实现的逻辑发出,并使用指定的输出架构。 请参阅发出记录。
- 可以使用读取器访问状态存储
statestore
中的值。 此读取器具有批处理功能,不适用于低延迟工作负荷。 请参阅读取结构化流式处理状态信息。 - 仅当键的记录存在于微批处理中时,才触发使用
handleInputRows
指定的逻辑。 请参阅 “处理输入行”。 - 使用
handleExpiredTimer
实现不依赖于观察记录来触发的基于时间的逻辑。 请参阅 程序定时事件。
备注
状态对象通过对键进行分组来隔离,并具有以下影响:
- 状态值不能受到与不同分组键关联的记录的影响。
- 无法实现依赖于比较值或跨分组键更新状态的逻辑。
可以比较分组键中的值。 使用MapState
来实现逻辑,以便自定义逻辑能够使用第二个键。 例如,使用 user_id
分组并使用 IP 地址键入 MapState
可以实现跟踪同时用户会话的逻辑。
写入状态变量会触发对 RocksDB 的写入。 为了优化性能,Databricks 建议尽可能处理给定密钥迭代器中的所有值,并在单个写入中提交更新。
备注
状态更新是容错的。 如果任务在微批处理完成处理之前崩溃,则会在重试时使用上次成功的微批处理中的值。
状态值没有任何内置默认值。 如果逻辑需要读取现有状态信息,请在实现逻辑时使用 exists
该方法。
备注
MapState
变量具有其他功能来检查单个键或列出所有键以实现 null 状态的逻辑。
用户定义的逻辑控制transformWithState
如何发出记录。 记录按分组键发出。
自定义有状态应用程序不会对如何使用状态信息来确定如何发出记录做出任何假设,针对给定条件的记录返回数量可以是零、一个或多个。
您可以使用handleInputRows
或handleExpiredTimer
实现逻辑以发出记录。 请参阅 “处理输入行 ”和 “程序计时事件”。
备注
你可以实现多个状态值并定义用于发出记录的多个条件,但发出的所有记录都应使用相同的架构。
在 Python 中,调用 outputStructType
时使用 transformWithStateInPandas
关键字定义输出架构。
你使用 pandas DataFrame 对象和 yield
发出记录。
可以选择 yield
空 DataFrame。 与 update
输出模式结合使用时,发出空 DataFrame 会将分组键的值更新为 null。
在 Scala 中,您使用 Iterator
对象来发出记录。 输出架构是从发出的记录中派生而来的。
可以选择发出空 Iterator
。 当与 update
输出模式结合使用时,发出一个空的 Iterator
会将分组键的值更新为 null。
使用 handleInputRows
方法定义流式处理查询中观察到的记录与状态值交互方式的逻辑和更新状态值的逻辑。 每次通过结构化流式处理查询处理任何记录时,使用该方法定义的 handleInputRows
处理程序都会运行。
对于使用 transformWithState
实现的大多数有状态应用程序,其核心逻辑是使用 handleInputRows
定义的。
对于处理的每个微批处理更新,可以使用迭代器获取给定分组键的微批处理中的所有记录。 用户定义的逻辑可以与当前微包中的所有记录和状态存储中的值进行交互。
可以使用计时器基于指定条件中的已用时间实现自定义逻辑。
可以通过实现 handleExpiredTimer
方法来处理计时器。
在分组键内,计时器通过其时间戳进行唯一标识。
计时器过期时,结果由应用程序中实现的逻辑确定。 常见模式包括:
- 发出存储在状态变量中的信息。
- 逐出存储的状态信息。
- 创建新的计时器。
即使在微批处理中未处理与其关联密钥相关的记录,过期的计时器也会触发。
传递 StatefulProcessor
至 transformWithState
时,必须指定时间模型。 支持以下选项:
ProcessingTime
EventTime
NoTime
或TimeMode.None()
指定 NoTime
意味着处理器不支持计时器。
Databricks 建议不要在自定义有状态应用程序中调用系统时钟,因为这可能导致任务失败时的重试不可靠。 当必须访问处理时间或水印时,请使用类中的 TimerValues
方法:
TimerValues |
DESCRIPTION |
---|---|
getCurrentProcessingTimeInMs |
返回自 epoch 以来当前批处理的处理时间的时间戳(以毫秒为单位)。 |
getCurrentWatermarkInMs |
返回当前批次水印的时间戳,自纪元以来以毫秒为单位。 |
备注
处理时间描述 Apache Spark 处理微批处理的时间。 许多流式处理源(如 Kafka)还包括系统处理时间。
流式处理查询上的水印通常针对事件时间或流式处理源的处理时间定义。 请参阅应用水印来控制数据处理阈值。
水印和窗口都可以与 transformWithState
结合使用。 可以通过利用 TTL、计时器或MapState
ListState
功能在自定义有状态应用程序中实现类似的功能。
transformWithState
所使用的状态值支持可选的生存时间 (TTL) 规范。 当 TTL 过期时,该值将从状态存储区中逐出。 TTL 仅与状态存储中的值交互,这意味着您可以实现用于清除状态信息的逻辑,但由于 TTL 自动清除状态值,您无法直接触发这些逻辑。
重要
如果不实现 TTL,则必须使用其他逻辑处理状态逐出,以避免无休止的状态增长。
为每个状态值强制实施 TTL,每个状态类型都有不同的规则。
- 状态变量的范围限定为分组键。
- 对于
ValueState
对象,每个分组键只存储单个值。 TTL 适用于此值。 - 对于
ListState
对象,列表可以包含许多值。 TTL 独立适用于列表中的每个值。 - 对于
MapState
对象,每个映射键都具有关联的状态值。 TTL 独立应用于映射中的每个键值对。
对于所有状态类型,如果状态信息已更新,TTL 将重置。
备注
虽然 TTL 的范围限定为单个 ListState
值,但更新列表中的值的唯一方法是使用 put
该方法覆盖变量的 ListState
整个内容。
对于状态变量,计时器与生存时间(TTL)之间存在一些重叠,但计时器提供比 TTL 更广泛的功能集。
TTL 移除在用户指定时间段内尚未更新的状态信息。 这样,用户就可以防止未选中的状态增长并删除过时的状态条目。 由于映射和列表对每个值实现了 TTL,因此你可以通过设置 TTL 来实现那些仅考虑最近更新状态值的函数。
计时器允许定义超出状态逐出(包括发出记录)的自定义逻辑。 可以选择使用计时器清除给定状态值的状态信息,并可以灵活地根据计时器发出值或触发其他条件逻辑。