使用 foreachBatch 将内容写入到任意数据接收器

本文讨论将 foreachBatch 与结构化流式处理一起使用,将流式处理查询的输出写入尚无现有流接收器的数据源。

代码模式 streamingDF.writeStream.foreachBatch(...) 允许将批处理函数应用于流式处理查询的每个微批处理的输出数据。 与 foreachBatch 一起使用的函数采用两个参数:

  • 具有微批处理的输出数据的数据帧。
  • 微批处理的唯一 ID。

必须为结构化流式处理中的 Delta Lake 合并操作使用 foreachBatch。 请参阅使用 foreachBatch 从流式处理查询进行更新插入

应用其他 DataFrame 操作

流式处理 DataFrame 不支持许多 DataFrame 和 Dataset 操作,因为在这些情况下,Spark 不支持生成增量计划。 使用 foreachBatch(),你可以在每个微批处理输出中应用其中一些操作。 例如,可以使用 foreachBath() 和 SQL MERGE INTO 操作在更新模式下将流式处理聚合的输出写入 Delta 表。 在 MERGE INTO 中查看更多详细信息。

重要

  • foreachBatch() 仅提供至少一次写入保证。 但是,可以使用为函数提供的 batchId 来删除重复输出并获得正好一次的保证。 在这两种情况下,都必须自行考虑端到端语义。
  • foreachBatch() 不适用于连续处理模式,因为它基本上依赖于流式处理查询的微批处理执行。 如果在连续模式下写入数据,请改用 foreach()

可以使用 foreachBatch() 调用空数据帧,但用户代码需要可复原才能正确地进行操作。 下面显示了一个示例:

  .foreachBatch(
          (outputDf: DataFrame, bid: Long) => {
             // Process valid data frames only
             if (!outputDf.isEmpty) {
                // business logic
             }
         }
  ).start()

Databricks Runtime 14.0 中 foreachBatch 的行为更改

在配置了共享访问模式的计算上的 Databricks Runtime 14.0 及更高版本中,forEachBatch 在 Apache Spark 上的单独隔离的 Python 进程中运行,而不是在 REPL 环境中运行。 它会序列化并推送到 Spark,并且在会话期间无权访问全局 spark 对象。

在所有其他计算配置中,foreachBatch 在运行其余代码的同一 Python REPL 中运行。 因此,该函数不会序列化。

在配置了共享访问模式的计算上使用 Databricks Runtime 14.0 及更高版本时,必须在 Python 中使用 foreachBatch 时使用范围限定为本地 DataFrame 的 sparkSession 变量,如以下代码示例所示:

def example_function(df, batch_id):
  df.sparkSession.sql("<query>")

以下行为更改适用:

  • 不能从函数内部访问任何全局 Python 变量。
  • print() 命令会将输出写入驱动程序日志。
  • 函数中引用的任何文件、模块或对象都必须可序列化并在 Spark 上可用。

重复使用现有的批数据源

使用 foreachBatch(),可以将现有的批处理数据编写器用于可能没有结构化流式处理支持的数据接收器。 以下是一些示例:

可以通过 foreachBatch() 使用许多其他批数据源。 请参阅连接到数据源

写入多个位置

如果需要将流式处理查询的输出写入多个位置,Databricks 建议使用多个结构化流式处理编写器来获得最佳并行化和吞吐量。

使用 foreachBatch 写入多个接收器可序列化流式写入的执行,这可能会增加每个微批处理的延迟。

如果确实使用 foreachBatch 写入多个 Delta 表,请参阅 foreachBatch 中的幂等表写入