为结构化流式处理选择输出模式

本文探讨如何为有状态流式处理选择输出模式。 只有包含聚合的有状态流才需要输出模式配置。

联接仅支持追加输出模式,输出模式不会影响重复数据删除。 任意有状态运算符 mapGroupsWithStateflatMapGroupsWithState 使用自己的自定义逻辑发出记录,因此流的输出模式不会影响其行为。

对于无状态流式处理,所有输出模式的行为都相同。

若要正确配置输出模式,必须了解有状态流式处理、水印和触发器。 请参阅以下文章:

什么是输出模式?

结构化流式处理查询的输出模式决定了查询的操作符在每次触发期间发出哪些记录。 可发出下面三种类型的记录:

  • 将来处理不会更改的记录。
  • 自上次触发器以来已更改的记录。
  • 状态表中的所有记录。

知晓要发出哪些类型的记录对于有状态运算符很重要,因为有状态运算符生成的特定行可能会因触发器而异。 例如,当流式处理聚合运算符接收一个特定窗口的多行时,每次触发期间该窗口的聚合值可能会发生变化。

对于无状态运算符,记录类型之间的区别不会影响运算符的行为。 无状态运算符在触发器期间发出的记录始终是在该触发器期间处理的源记录。

可用输出模式

有三种输出模式告知运算符在特定触发器期间发出哪些记录:

输出模式 说明
追加模式(默认) 默认情况下,流式处理查询在追加模式下运行。 在此模式下,操作符仅发出在未来的触发中不会变化的行。 有状态运算符使用水印来确定何时发生这种情况。
更新模式 在更新模式下,运算符会发出在触发器期间更改的所有行,即使发出的记录可能会在后续触发器中更改。
完整模式 完整模式仅适用于流式处理聚合。 在完整模式下,运算符生成的所有结果行都会在下游发出。

生产注意事项

对于许多有状态流式处理操作,必须在追加模式和更新模式之间进行选择。 以下部分概述了可能会影响你做出决定的注意事项。

注意

完整模式具有一些应用,但随着数据缩放,可能性能不佳。 Databricks 建议使用具体化视图获取与完整模式关联的语义保证,以及针对许多有状态操作的增量处理。

应用语义

应用语义描述了下游应用程序如何使用流数据。

如果下游服务需要对每个下游写入执行单个操作,则在大多数情况下使用追加模式。 例如,如果有下游通知服务为写入接收器的每个新记录发送通知,则追加模式可确保每个记录只写入一次。 每次状态信息更改时,更新模式都会写入记录,这将导致大量更新。

如果下游服务需要新的结果,更新模式可确保接收器尽可能保持最新状态。 示例包括实时读取特征的机器学习模型或跟踪实时聚合的分析仪表板。

运算符和接收器兼容性

结构化流式处理不支持 Apache Spark 中提供的所有操作,并且某些流式处理操作在所有输出模式下都不受支持。 有关运算符限制的详细信息,请参阅 OSS 流式处理文档

并非所有接收器都支持所有输出模式。 Delta Lake 和 Kafka 都支持所有输出模式,其中前者支持所有 Unity Catalog 托管表。 有关接收器兼容性的详细信息,请参阅 OSS 流式处理文档

延迟和成本

输出模式会影响写入记录之前必须花费的时间,写入数据的频率和写入量可能会影响与流式处理管道相关的成本。

追加模式强制有状态运算符仅在完成有状态结果后发出结果,延迟时长至少等于水印延迟。 在追加输出模式下水印延迟 1 hour 意味着记录在下游发出之前至少有 1 小时的延迟。

更新模式会导致每个触发器对每个聚合值执行一次写入。 如果接收器对每个记录的每次写入收费,那么要是记录在水印延迟时间耗尽之前多次更新,则此操作可能成本高昂。

配置示例

以下代码示例演示如何配置将更新流式传输到 Unity Catalog 表的输出模式:

Python

# Append output mode (default)
(df.writeStream
  .toTable("target_table")
)

# Append output mode (same as default behavior)
(df.writeStream
  .outputMode("append")
  .toTable("target_table")
)

# Update output mode
(df.writeStream
  .outputMode("update")
  .toTable("target_table")
)

# Complete output mode
(df.writeStream
  .outputMode("complete")
  .toTable("target_table")
)

Scala

// Append output mode (default)
df.writeStream
  .toTable("target_table")

// Append output mode (same as default behavior)
df.writeStream
  .outputMode("append")
  .toTable("target_table")

// Update output mode
df.writeStream
  .outputMode("update")
  .toTable("target_table")

// Complete output mode
df.writeStream
  .outputMode("complete")
  .toTable("target_table")

有关 PySpark DataStreamWriter.outputModeScala DataStreamWriter.outputMode,请参阅 OSS 文档。

有状态流式传输和输出模式示例

以下示例旨在帮助你了解输出模式如何与有状态流式处理的水印交互。

请考虑一个流式处理聚合,该聚合计算商店每小时产生的总收入,水印延迟为 15 分钟。 第一个微批处理以下记录:

  • CNY15 at 2:40pm
  • CNY10 at 2:30pm
  • CNY30 at 3:10pm

此时,引擎的水印为 2:55pm,因为它从看到的最大时间 (3:10pm) 中减去 15 分钟(延迟)。 流式处理聚合运算符的状态如下:

  • [2pm, 3pm]: CNY25
  • [3pm, 4pm]: CNY30

下表概述了每个输出模式下会发生什么情况:

输出模式 结果和原因
追加 流式处理聚合运算符不会在下游发出任何内容。 这是因为这两个窗口可能会随着后续触发器出现新值而更改:下午 2:55 的水印表示下午 2:55 之后的记录可能仍会到达,并且这些记录可能在 [2pm, 3pm] 时窗或 [3pm, 4pm] 时窗范围内。
更新 运算符发出这两条记录,因为它们都收到了更新。
完成 运算符发出所有记录。

现在,假设流又接收了一条记录:

  • CNY20 at 3:20pm

水印更新为下午 3:05,因为引擎从下午 3:20 中减去 15 分钟。 此时,流式处理聚合运算符的状态如下:

  • [2pm, 3pm]: CNY25
  • [3pm, 4pm]: CNY50

下表概述了每个输出模式下会发生什么情况:

输出模式 结果和原因
追加 流式处理聚合运算符观察到下午 3:05 的水印超出了 [2pm, 3pm] 时窗的末尾。 根据水印的定义,该时窗无法再更改,因此它发出 [2pm, 3pm] 时窗。
更新 流式处理聚合运算符发出 [3pm, 4pm] 窗口,因为状态值已从 CNY30 更改为 CNY50。
完成 运算符发出所有记录。

下面总结了有状态运算符在每个追加模式下的行为方式:

  • 在追加模式下,在水印延迟后写入记录一次。
  • 在更新模式下,写入自上一个触发器以来已更改的记录。
  • 在完整模式下,写入由有状态运算符生成的所有记录。