使用 ForEachBatch 写入管道中的任意数据接收器

重要

API foreach_batch_sink 处于 公开预览版

ForEachBatch 数据接收端允许将数据流作为一系列微批处理进行处理。 可以使用类似于 Apache Spark 结构化流式处理的 foreachBatch自定义逻辑在 Python 中处理每个批处理。 使用 Lakeflow Spark 声明式管道(SDP)ForEachBatch 接收端,您可以将流数据转换、合并或写入到不支持流式写入的一个或多个目标。 本页指导你设置 ForEachBatch 数据承接点,提供示例并讨论关键注意事项。

ForEachBatch 汇聚点提供以下功能:

  • 针对每个微批的自定义逻辑:ForEachBatch 是一个灵活的流式接收器。 可以使用 Python 代码应用任意操作(例如合并到外部表、写入多个目标或执行更新插入)。
  • 完全刷新支持:管道按流管理检查点,因此在执行管道完全刷新时,检查点会自动重置。 使用 ForEachBatch 接收器时,当这种情况发生时,你负责管理下游数据重置。
  • Unity 目录支持:ForEachBatch 接收器支持所有 Unity 目录功能,例如读取或写入 Unity 目录卷或表。
  • 有限的清理:管道不会跟踪从 ForEachBatch 接收器写入的数据,因此无法对这些数据进行清理。 你负责任何下游数据管理。
  • 事件日志条目:管道事件日志记录每个 ForEachBatch 接收器的创建和使用。 如果 Python 函数不可序列化,则会在事件日志中看到一个警告条目,其中包含其他建议。

注释

  • ForEachBatch 接收器专为流式处理查询而设计,例如 append_flow。 它不适用于仅限批处理的管道或 AutoCDC 语义。
  • 此页上介绍的 ForEachBatch 接收器适用于管道。 Apache Spark 结构化流式处理也支持 foreachBatch。 有关结构化流式处理 foreachBatch的信息,请参阅 “使用 foreachBatch 写入任意数据接收器”。

何时使用 ForEachBatch 滤器

每当管道需要通过内置接收器格式(例如 delta,或 kafka)不可用的功能时,请使用 ForEachBatch 接收器。 典型用例包括:

  • 合并或插入 Delta Lake 表:对每个微批处理执行自定义合并逻辑(例如,处理更新的记录)。
  • 写入多个或不受支持的目标:将每个批处理的输出写入多个表或不支持流式写入的外部存储系统(如某些 JDBC 接收器)。
  • 应用自定义逻辑或转换:直接在 Python 中操作数据(例如,使用专用库或高级转换)。

有关内置接收器或使用 Python 创建自定义接收器的信息,请参阅 Lakeflow Spark 声明性管道中的接收器

Syntax

使用 @dp.foreach_batch_sink() 修饰符生成 ForEachBatch 汇聚点。 然后,可以在流定义中将其引用为target,例如,在@dp.append_flow中。

from pyspark import pipelines as dp

@dp.foreach_batch_sink(name="<name>")
def batch_handler(df, batch_id):
    """
    Required:
      - `df`: a Spark DataFrame representing the rows of this micro-batch.
      - `batch_id`: unique integer ID for each micro-batch in the query.
    """
    # Your custom write or transformation logic here
    # Example:
    # df.write.format("some-target-system").save("...")
    #
    # To access the sparkSession inside the batch handler, use df.sparkSession.
参数 Description
name 可选。 用于标识管道中的接收器的唯一名称。 如果未包含,则默认为 UDF 的名称。
batch_handler 这是将为每个微批处理调用的用户定义函数(UDF)。
df 包含当前微批处理数据的 Spark 数据帧。
batch_id 微批处理的整数 ID。 Spark 为每个触发器间隔递增此 ID。
batch_id 0表示流的开头或完全刷新的开始。 代码 foreach_batch_sink 应正确处理下游数据源的完全刷新。 有关详细信息,请参阅下一部分。

完全刷新

由于 ForEachBatch 使用流式处理查询,因此管道会跟踪每个流的检查点目录。 完全刷新时:

  • 检查点目录已重置。
  • 汇流函数(foreach_batch_sink UDF)检测到从 0 开始的新的 batch_id 周期。
  • 目标系统中的数据 不会 由管道自动清理(因为管道不知道数据被写入到哪里)。 如果需要重新初始化的场景,则必须手动删除或截断由 ForEachBatch 输出填充的外部表或位置。

使用 Unity 目录功能

Spark 结构化流 foreach_batch_sink 中所有现有的 Unity Catalog 功能仍然可用。

这包括写入托管表或外部 Unity 目录表。 可以将微批处理写入 Unity 目录托管表或外部表,就像在任何 Apache Spark 结构化流式处理作业中一样。

事件日志条目

创建 ForEachBatch 汇集器时,会将 SinkDefinition 事件与 "format": "foreachBatch" 一同添加到管道的事件日志中。

这使您能够跟踪 ForEachBatch 接收器的使用情况,并查看有关您接收器的警告。

与 Databricks Connect 配合使用

如果提供的函数 不可序列化 (Databricks Connect 的重要要求),则事件日志包含一个 WARN 条目,建议在需要 Databricks Connect 支持的情况下简化或重构代码。

例如,如果您使用 dbutils 在 ForEachBatch UDF 中获取参数,您可以在使用参数之前获取这些参数。

# Instead of accessing parameters within the UDF...
def foreach_batch(df, batchId):
  value = dbutils.widgets.get ("X") + str (i)

# ...get the parameters first, and use them within the UDF:
argX = dbutils.widgets.get ("X")

def foreach_batch(df, batchId):
  value = argX + str (i)

最佳做法

  1. 保持 ForEachBatch 函数简洁:避免多线程、大量库依赖或在内存中进行大型数据操作。 复杂或有状态逻辑可能导致序列化错误或性能瓶颈。
  2. 监视检查点文件夹:对于流处理查询,SDP 通过流而不是汇聚点来管理检查点。 如果管道中有多个流,则每个流都有自己的检查点目录。
  3. 验证外部依赖项:如果依赖于外部系统或库,请检查它们是否已安装在所有群集节点上或容器中。
  4. 请注意 Databricks Connect:如果您的环境将来可能会迁移到 Databricks Connect,请检查您的代码是否可序列化,并且不依赖于 dbutils UDF 中的 foreach_batch_sink

局限性

  • ForEachBatch 没有清理功能:由于自定义 Python 代码可能会在任何地方写入数据,因此管道无法处理或跟踪这些数据。 您必须自行管理写入目标的数据或制定相关的保留策略。
  • 微批处理中的指标:管道收集流式指标,但某些情况下使用 ForEachBatch 可能会导致指标不完整或异常。 这是由于 ForEachBatch 的灵活性,使得系统难以有效跟踪数据流和具体数据行。
  • 支持在没有多次读取的情况下写入多个目标:某些客户可能会使用 ForEachBatch 从源读取一次,然后写入多个目标。 若要实现此目的,您必须在ForEachBatch函数中包含df.persistdf.cache。 使用这些选项,Azure Databricks 将尝试只准备一次数据。 如果没有这些选项,查询将导致多次读取。 以下代码示例中不包括此内容。
  • 与 Databricks Connect 一起使用:如果管道在 Databricks Connect 上运行, foreachBatch 则用户定义的函数(UDF)必须可序列化且无法使用 dbutils。 如果管道检测到不可序列化的 UDF,但不会使管道失败,则会引发警告。
  • 不可序列化的逻辑:引用本地对象、类或不可调用的资源的代码可能会中断 Databricks Connect 上下文。 如果 Databricks Connect 是必需的,则使用纯 Python 模块并确认引用(例如, dbutils)不使用。

例子

基本语法示例

from pyspark import pipelines as dp

# Create a ForEachBatch sink
@dp.foreach_batch_sink(name = "my_foreachbatch_sink")
def feb_sink(df, batch_id):
  # Custom logic here. You can perform merges,
  # write to multiple destinations, etc.
  return

# Create source data for example:
@dp.table()
def example_source_data():
  return spark.range(5)

# Add sink to an append flow:
@dp.append_flow(
    target="my_foreachbatch_sink",
)
def my_flow():
  return spark.readStream.format("delta").table("example_source_data")

对简单管道使用示例数据

此示例使用 NYC 出租车示例。 它假定工作区管理员已启用 Databricks 公共数据集目录。 对于数据接收端,请将 my_catalog.my_schema 更改为您有权访问的目录和架构。

from pyspark import pipelines as dp
from pyspark.sql.functions import current_timestamp

# Create foreachBatch sink
@dp.foreach_batch_sink(name = "my_foreach_sink")
def my_foreach_sink(df, batch_id):
    # Custom logic here. You can perform merges,
    # write to multiple destinations, etc.
    # For this example, we are adding a timestamp column.
    enriched = df.withColumn("processed_timestamp", current_timestamp())
    # Write to a Delta location
    enriched.write \
      .format("delta") \
      .mode("append") \
      .saveAsTable("my_catalog.my_schema.trips_sink_delta")
    # Return is optional here, but generally not used for the sink
    return

# Create an append flow that reads sample data,
# and sends it to the ForEachBatch sink
@dp.append_flow(
    target="my_foreach_sink",
)
def taxi_source():
  df = spark.readStream.table("samples.nyctaxi.trips")
  return df

写入多个目标

此示例写入多个目标。 它演示了如何使用txnVersiontxnAppId来使写入Delta Lake表的操作具有幂等性。 有关详细信息,请参阅幂等表写入foreachBatch

假设我们要写入两个表table_atable_b,并且假设在一个批处理中,写入table_a成功,而写入table_b失败。 重新运行批处理时,txnVersiontxnAppId这对参数将允许Delta忽略重复写入table_a的动作,并且仅将批处理写入table_b

from pyspark import pipelines as dp

app_id = "my-app-name" # different applications that write to the same table should have unique txnAppId

# Create the ForEachBatch sink
@dp.foreach_batch_sink(name="user_events_feb")
def user_events_handler(df, batch_id):
    # Optionally do transformations, logging, or merging logic
    # ...

    # Write to a Delta table
    df.write \
     .format("delta") \
     .mode("append") \
     .option("txnVersion", batch_id) \
     .option("txnAppId", app_id) \
     .saveAsTable("my_catalog.my_schema.example_table_1")

    # Also write to a JSON file location
    df.write \
      .format("json") \
      .mode("append") \
      .option("txnVersion", batch_id) \
      .option("txnAppId", app_id) \
      .save("/tmp/json_target")
    return

# Create source data for example
@dp.table()
def example_source():
  return spark.range(5)

# Create the append flow, and target the ForEachBatch sink
@dp.append_flow(target="user_events_feb", name="user_events_flow")
def read_user_events():
    return spark.readStream.format("delta").table("example_source")

使用 spark.sql()

可以在 ForEachBatch 接收器中使用 spark.sql() ,如以下示例所示。

from pyspark import pipelines as dp
from pyspark.sql import Row

@dp.foreach_batch_sink(name = "example_sink")
def feb_sink(df, batch_id):
  df.createOrReplaceTempView("df_view")
  df.sparkSession.sql("MERGE INTO target_table AS tgt " +
            "USING df_view AS src ON tgt.id = src.id " +
            "WHEN MATCHED THEN UPDATE SET tgt.id = src.id * 10 " +
            "WHEN NOT MATCHED THEN INSERT (id) VALUES (id)"
          )
  return

# Create target delta table
spark.range(5).write.format("delta").mode("overwrite").saveAsTable("target_table")

# Create source table
@dp.table()
def src_table():
  return spark.range(5)

@dp.append_flow(
    target="example_sink",
)
def example_flow():
  return spark.readStream.format("delta").table("source_table")

常见问题 (FAQ)

可以在 ForEachBatch 数据接收器中使用 dbutils 吗?

如果打算在非 Databricks Connect 环境中运行管道, dbutils 则可以正常工作。 但是,如果使用 Databricks Connect,dbutilsforeachBatch 函数中无法访问。 如果管道检测到 dbutils 的使用,为了帮助您避免故障,可能会引发警告。

是否可以将多个流与单个 ForEachBatch 接收器一起使用?

是的。 可以定义多个流(带有 @dp.append_flow),这些流都以相同的接收器名称为目标,但它们各自维护自己的检查点。

数据管道是否处理我的目标的数据保留或清理?

否。 由于 ForEachBatch 接收器可以写入任何任意位置或系统,因此管道无法自动管理或删除该目标中的数据。 必须在自定义代码或外部进程中处理这些操作。

如何排查 ForEachBatch 函数中的序列化错误或故障?

查看群集驱动程序日志或管道事件日志。 对于与 Spark Connect 相关的序列化问题,请检查函数是否仅依赖于可序列化的 Python 对象,并且不引用不允许的对象(如打开的文件句柄或 dbutils)。