本文探讨如何为有状态流式处理选择输出模式。 只有包含聚合的有状态流才需要输出模式配置。
联接仅支持追加输出模式,输出模式不会影响重复数据删除。 任意有状态运算符 mapGroupsWithState 和 flatMapGroupsWithState 使用自己的自定义逻辑发出记录,因此流的输出模式不会影响其行为。
对于无状态流式处理,所有输出模式的行为都相同。
若要正确配置输出模式,必须了解状态流式处理、水印和触发器。 请参阅以下文章:
什么是输出模式?
结构化流式处理查询的输出模式决定了查询的操作符在每次触发期间发出哪些记录。 可发出下面三种类型的记录:
- 将来处理不会更改的记录。
- 自上次触发以来发生更改的记录。
- 状态表中的所有记录。
知晓要发出哪些类型的记录对于有状态运算符很重要,因为有状态运算符生成的特定行可能会因触发器而异。 例如,当流式聚合运算符为一个特定窗口接收到更多行时,该窗口的聚合值可能会在每次触发时发生变化。
对于无状态运算符,记录类型之间的区别不会影响运算符的行为。 无状态运算符在触发器期间发出的记录始终是在该触发器期间处理的源记录。
可用输出模式
有三种输出模式告知运算符在特定触发器期间发出哪些记录:
| 输出模式 | 说明 |
|---|---|
| 追加模式(默认) | 默认情况下,流式处理查询在追加模式下运行。 在这种模式下,操作符仅发出在后续触发中保持不变的行。 有状态运算符使用水印来确定何时发生这种情况。 |
| 更新模式 | 在更新模式下,运算符会发出在触发器期间更改的所有行,即使发出的记录可能会在后续触发器中更改。 |
| 完整模式 | 完整模式只适用于流式聚合。 在完整模式下,运算符生成的所有结果行都会在下游发出。 |
生产注意事项
对于许多有状态的流式处理操作,您必须在追加模式和更新模式之间做出选择。 以下部分概述了可能会影响你做出决定的注意事项。
注意
完全模式在某些情况下有用,但随着数据规模的扩大,性能可能会下降。 Databricks 建议使用物化视图,以获得与完整模式相关的语义保证,并针对许多有状态操作进行增量处理。 请参阅 具体化视图。
应用语义
应用语义描述了下游应用程序如何使用流数据。
如果下游服务在每次写入时需要采取单个操作,在大多数情况下使用附加模式。 例如,如果有下游通知服务为写入接收器的每个新记录发送通知,则追加模式可确保每个记录只写入一次。 每次状态信息更改时,更新模式都会写入记录,这将导致大量更新。
如果下游服务需要新的结果,更新模式可确保下游系统尽可能保持最新状态。 示例包括实时读取特征的机器学习模型或跟踪实时聚合的分析仪表板。
运算符和接收器兼容性
结构化流式处理不支持 Apache Spark 中提供的所有操作,并且某些流式处理操作在所有输出模式下都不受支持。 有关运算符限制的详细信息,请参阅 OSS 流式处理文档。
并非所有接收器都支持所有输出模式。 Delta Lake 和 Kafka 都支持所有输出模式,其中前者支持所有 Unity Catalog 托管表。 有关汇聚兼容性的详细信息,请参阅 OSS 流式处理文档。
延迟和成本
输出模式会影响写入记录之前必须花费的时间,写入数据的频率和写入量可能会影响与流式处理管道相关的成本。
追加模式强制有状态的运算符仅在有状态结果定型后进行输出,且延迟时长至少等于水印延迟。 在追加输出模式下,水印延迟 1 hour 意味着在记录下游发出之前,至少有 1 小时的延迟。
更新模式会导致每个触发器对每个聚合值执行一次写入。 如果输出端点对每条记录的每次写入收费,那么如果记录在水印延迟期结束之前多次更新,这可能会导致高昂的成本。
配置示例
以下代码示例演示如何配置将更新流式传输到 Unity Catalog 表的输出模式:
Python
# Append output mode (default)
(df.writeStream
.toTable("target_table")
)
# Append output mode (same as default behavior)
(df.writeStream
.outputMode("append")
.toTable("target_table")
)
# Update output mode
(df.writeStream
.outputMode("update")
.toTable("target_table")
)
# Complete output mode
(df.writeStream
.outputMode("complete")
.toTable("target_table")
)
Scala(编程语言)
// Append output mode (default)
df.writeStream
.toTable("target_table")
// Append output mode (same as default behavior)
df.writeStream
.outputMode("append")
.toTable("target_table")
// Update output mode
df.writeStream
.outputMode("update")
.toTable("target_table")
// Complete output mode
df.writeStream
.outputMode("complete")
.toTable("target_table")
有关 PySpark DataStreamWriter.outputMode 或 Scala DataStreamWriter.outputMode,请参阅 OSS 文档。
有状态流式传输和输出模式示例
以下示例旨在帮助你推理和分析输出模式如何与有状态流式处理的水印交互。
请考虑一个流式聚合,它计算商店每小时产生的总收入,并设有 15 分钟的水印延迟。 第一个微批处理以下记录:
- 下午2:40 人民币15元
- 下午 2:30 人民币 10 元
- 下午 3:10 人民币 30 元
此时,引擎的水印为 2:55pm,因为它从看到的最大时间 (3:10pm) 中减去 15 分钟(延迟)。 流式聚合操作符在其状态中具有以下内容:
-
[2pm, 3pm]:¥25 元 -
[3pm, 4pm]: ¥30
下表概述了每个输出模式下会发生什么情况:
| 输出模式 | 结果和原因 |
|---|---|
| 追加 | 流式处理聚合运算符不会在下游输出任何内容。 这是因为这两个窗口可能会随着后续触发器出现新值而更改:下午 2:55 的水印表示下午 2:55 之后的记录可能仍会到达,并且这些记录可能在 [2pm, 3pm] 时窗或 [3pm, 4pm] 时窗范围内。 |
| 更新 | 运算符发出这两条记录,因为它们都收到了更新。 |
| 完成 | 运算符输出所有记录。 |
现在,假设流又接收了一条记录:
- 下午3:20,20元
水印更新为下午 3:05,因为引擎从下午 3:20 中减去 15 分钟。 此时,流聚合运算符的状态如下:
-
[2pm, 3pm]:¥25 元 -
[3pm, 4pm]:人民币 50 元
下表概述了每个输出模式下会发生什么情况:
| 输出模式 | 结果和原因 |
|---|---|
| 追加 | 流式处理聚合运算符检测到下午 3:05 的水印大于 [2pm, 3pm] 时窗的末端。 根据水印的定义,这个窗口不能再更改,因此它会发出 [2pm, 3pm] 窗口。 |
| 更新 | 流式处理聚合运算符发出 [3pm, 4pm] 窗口,因为状态值已从 CNY30 更改为 CNY50。 |
| 完成 | 运算符发出所有记录。 |
下面总结了有状态运算符在每个追加模式下的行为方式:
- 在追加模式下,在水印延迟后写入记录一次。
- 在更新模式下,写入自上一个触发器以来已更改的记录。
- 在完整模式下,写入有状态运算符曾经生成的所有记录。