本文讨论将 foreachBatch
与结构化流式处理一起使用,将流式处理查询的输出写入尚无现有流接收器的数据源。
代码模式 streamingDF.writeStream.foreachBatch(...)
允许将批处理函数应用于流式处理查询的每个微批处理的输出数据。 与 foreachBatch
一起使用的函数采用两个参数:
- 具有微批处理的输出数据的数据帧。
- 微批处理的唯一 ID。
必须为结构化流式处理中的 Delta Lake 合并操作使用 foreachBatch
。 请参阅使用 foreachBatch
从流式处理查询进行更新插入。
应用其他 DataFrame 操作
流式处理 DataFrame 不支持许多 DataFrame 和 Dataset 操作,因为在这些情况下,Spark 不支持生成增量计划。 使用 foreachBatch()
,你可以在每个微批处理输出中应用其中一些操作。 例如,可以使用 foreachBatch()
和 SQL MERGE INTO
操作在更新模式下将流式处理聚合的输出写入 Delta 表。 在 MERGE INTO 中查看更多详细信息。
重要
-
foreachBatch()
仅提供至少一次写入保证。 但是,可以使用为函数提供的batchId
来删除重复输出并获得正好一次的保证。 在这两种情况下,都必须自行考虑端到端语义。 -
foreachBatch()
不适用于连续处理模式,因为它基本上依赖于流式处理查询的微批处理执行。 如果在连续模式下写入数据,请改用foreach()
。 - 将
foreachBatch
与有状态运算符配合使用时,请务必在处理完成之前完全使用每个批。 请参阅完全使用每个批 DataFrame
可以使用 foreachBatch()
调用空数据帧,但用户代码需要可复原才能正确地进行操作。 下面显示了一个示例:
.foreachBatch(
(outputDf: DataFrame, bid: Long) => {
// Process valid data frames only
if (!outputDf.isEmpty) {
// business logic
}
}
).start()
Databricks Runtime 14.0 中 foreachBatch
的行为更改
在配置了标准访问模式的计算上的 Databricks Runtime 14.0 及更高版本中,以下行为更改适用:
-
print()
命令会将输出写入驱动程序日志。 - 无法访问函数内的
dbutils.widgets
子模块。 - 函数中引用的任何文件、模块或对象都必须可序列化并在 Spark 上可用。
重复使用现有的批数据源
使用 foreachBatch()
,可以将现有的批处理数据编写器用于可能没有结构化流式处理支持的数据接收器。 以下是一些示例:
可以通过 foreachBatch()
使用许多其他批数据源。 请参阅 “连接到数据源和外部服务”。
写入多个位置
如果需要将流式处理查询的输出写入多个位置,Databricks 建议使用多个结构化流式处理编写器来获得最佳并行化和吞吐量。
使用 foreachBatch
写入多个接收器可序列化流式写入的执行,这可能会增加每个微批处理的延迟。
如果使用 foreachBatch
来写入多个 Delta 表,请参阅 foreachBatch
中的幂等表写入。
完全消费每一个批数据帧
使用有状态运算符(例如,使用 dropDuplicatesWithinWatermark
),每个批处理迭代都必须使用整个 DataFrame 或重启查询。 如果不消费整个数据帧,流式查询将在下一批次中失败。
在某些情况下,可能会发生这种情况。 以下示例演示如何修复未正确使用 DataFrame 的查询。
有意使用批的子集
如果只关心批处理的子集,则可以有如下所示的代码。
from pyspark.sql.functions import expr
stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
# creates a stateful operator:
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()
def partial_func(batch_df, batch_id):
batch_df.show(2)
q = streamWithWatermark.writeStream \
.foreachBatch(partial_func) \
.option("checkpointLocation", checkpoint_dir) \
.trigger(processingTime='2 seconds') \
.start()
在这种情况下,batch_df.show(2)
仅处理批次中的前两个项,这是预期的,但如果有更多项,则必须消耗这些项。 以下代码使用完整的 DataFrame。
from pyspark.sql.functions import expr
stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()
# function to do nothing with a row
def do_nothing(row)
pass
def partial_func(batch_df, batch_id):
batch_df.show(2)
batch_df.foreach(do_nothing) # silently consume the rest of the batch
q = streamWithWatermark.writeStream \
.foreachBatch(partial_func) \
.option("checkpointLocation", checkpoint_dir) \
.trigger(processingTime='2 seconds') \
.start()
在这里,函数 do_nothing
会无提示地忽略 DataFrame 的其余部分。
处理批处理中的错误
运行 foreachBatch
进程时可能存在错误。 可以有如下代码(在本例中,示例有意引发错误以显示问题)。
from pyspark.sql.functions import expr
stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
# creates a stateful operator:
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()
def foreach_func(row):
# handle the row, but in this case, for the sample, will just raise an error:
raise Exception('error')
def partial_func(batch_df, batch_id):
try:
batch_df.foreach(foreach_func)
except Exception as e:
print(e) # or whatever error handling you want to have
q = streamWithWatermark.writeStream \
.foreachBatch(partial_func) \
.option("checkpointLocation", checkpoint_dir) \
.trigger(processingTime='2 seconds') \
.start()
通过处理错误(和静默处理),可能不会使用批的其余部分。 有两个选项可用于处理这种情况。
首先,你可能会重新引发错误,该错误会传递到编排层以重试该批。 如果这是暂时性问题,则可以解决此错误;如果不是,请将此问题提交给您的运维团队,以尝试手动解决。 为此,请将 partial_func
代码更改为如下所示:
def partial_func(batch_df, batch_id):
try:
batch_df.foreach(foreach_func)
except Exception as e:
print(e) # or whatever error handling you want to have
raise e # re-raise the issue
如果你想捕获异常并忽略批处理的其余部分,第二个选项是将代码更改为如下。
from pyspark.sql.functions import expr
stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
# creates a stateful operator:
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()
def foreach_func(row):
# handle the row, but in this case, for the sample, will just raise an error:
raise Exception('error')
# function to do nothing with a row
def do_nothing(row)
pass
def partial_func(batch_df, batch_id):
try:
batch_df.foreach(foreach_func)
except Exception as e:
print(e) # or whatever error handling you want to have
batch_df.foreach(do_nothing) # silently consume the remainder of the batch
q = streamWithWatermark.writeStream \
.foreachBatch(partial_func) \
.option("checkpointLocation", checkpoint_dir) \
.trigger(processingTime='2 seconds') \
.start()
此代码使用 do_nothing
函数以无提示方式忽略批处理的其余部分。