重要
API foreach_batch_sink 处于 公开预览版。
ForEachBatch 数据接收端允许将数据流作为一系列微批处理进行处理。 可以使用类似于 Apache Spark 结构化流式处理的 foreachBatch自定义逻辑在 Python 中处理每个批处理。 使用 Lakeflow Spark 声明式管道(SDP)ForEachBatch 接收端,您可以将流数据转换、合并或写入到不支持流式写入的一个或多个目标。 本页指导你设置 ForEachBatch 数据承接点,提供示例并讨论关键注意事项。
ForEachBatch 汇聚点提供以下功能:
- 针对每个微批的自定义逻辑:ForEachBatch 是一个灵活的流式接收器。 可以使用 Python 代码应用任意操作(例如合并到外部表、写入多个目标或执行更新插入)。
- 完全刷新支持:管道按流管理检查点,因此在执行管道完全刷新时,检查点会自动重置。 使用 ForEachBatch 接收器时,当这种情况发生时,你负责管理下游数据重置。
- Unity 目录支持:ForEachBatch 接收器支持所有 Unity 目录功能,例如读取或写入 Unity 目录卷或表。
- 有限的清理:管道不会跟踪从 ForEachBatch 接收器写入的数据,因此无法对这些数据进行清理。 你负责任何下游数据管理。
- 事件日志条目:管道事件日志记录每个 ForEachBatch 接收器的创建和使用。 如果 Python 函数不可序列化,则会在事件日志中看到一个警告条目,其中包含其他建议。
注释
- ForEachBatch 接收器专为流式处理查询而设计,例如
append_flow。 它不适用于仅限批处理的管道或AutoCDC语义。 - 此页上介绍的 ForEachBatch 接收器适用于管道。 Apache Spark 结构化流式处理也支持
foreachBatch。 有关结构化流式处理foreachBatch的信息,请参阅 “使用 foreachBatch 写入任意数据接收器”。
何时使用 ForEachBatch 滤器
每当管道需要通过内置接收器格式(例如 delta,或 kafka)不可用的功能时,请使用 ForEachBatch 接收器。 典型用例包括:
- 合并或插入 Delta Lake 表:对每个微批处理执行自定义合并逻辑(例如,处理更新的记录)。
- 写入多个或不受支持的目标:将每个批处理的输出写入多个表或不支持流式写入的外部存储系统(如某些 JDBC 接收器)。
- 应用自定义逻辑或转换:直接在 Python 中操作数据(例如,使用专用库或高级转换)。
有关内置接收器或使用 Python 创建自定义接收器的信息,请参阅 Lakeflow Spark 声明性管道中的接收器。
Syntax
使用 @dp.foreach_batch_sink() 修饰符生成 ForEachBatch 汇聚点。 然后,可以在流定义中将其引用为target,例如,在@dp.append_flow中。
from pyspark import pipelines as dp
@dp.foreach_batch_sink(name="<name>")
def batch_handler(df, batch_id):
"""
Required:
- `df`: a Spark DataFrame representing the rows of this micro-batch.
- `batch_id`: unique integer ID for each micro-batch in the query.
"""
# Your custom write or transformation logic here
# Example:
# df.write.format("some-target-system").save("...")
#
# To access the sparkSession inside the batch handler, use df.sparkSession.
| 参数 | Description |
|---|---|
| name | 可选。 用于标识管道中的接收器的唯一名称。 如果未包含,则默认为 UDF 的名称。 |
| batch_handler | 这是将为每个微批处理调用的用户定义函数(UDF)。 |
| df | 包含当前微批处理数据的 Spark 数据帧。 |
| batch_id | 微批处理的整数 ID。 Spark 为每个触发器间隔递增此 ID。batch_id
0表示流的开头或完全刷新的开始。 代码 foreach_batch_sink 应正确处理下游数据源的完全刷新。 有关详细信息,请参阅下一部分。 |
完全刷新
由于 ForEachBatch 使用流式处理查询,因此管道会跟踪每个流的检查点目录。 完全刷新时:
- 检查点目录已重置。
- 汇流函数(
foreach_batch_sinkUDF)检测到从 0 开始的新的batch_id周期。 - 目标系统中的数据 不会 由管道自动清理(因为管道不知道数据被写入到哪里)。 如果需要重新初始化的场景,则必须手动删除或截断由 ForEachBatch 输出填充的外部表或位置。
使用 Unity 目录功能
Spark 结构化流 foreach_batch_sink 中所有现有的 Unity Catalog 功能仍然可用。
这包括写入托管表或外部 Unity 目录表。 可以将微批处理写入 Unity 目录托管表或外部表,就像在任何 Apache Spark 结构化流式处理作业中一样。
事件日志条目
创建 ForEachBatch 汇集器时,会将 SinkDefinition 事件与 "format": "foreachBatch" 一同添加到管道的事件日志中。
这使您能够跟踪 ForEachBatch 接收器的使用情况,并查看有关您接收器的警告。
与 Databricks Connect 配合使用
如果提供的函数 不可序列化 (Databricks Connect 的重要要求),则事件日志包含一个 WARN 条目,建议在需要 Databricks Connect 支持的情况下简化或重构代码。
例如,如果您使用 dbutils 在 ForEachBatch UDF 中获取参数,您可以在使用参数之前获取这些参数。
# Instead of accessing parameters within the UDF...
def foreach_batch(df, batchId):
value = dbutils.widgets.get ("X") + str (i)
# ...get the parameters first, and use them within the UDF:
argX = dbutils.widgets.get ("X")
def foreach_batch(df, batchId):
value = argX + str (i)
最佳做法
- 保持 ForEachBatch 函数简洁:避免多线程、大量库依赖或在内存中进行大型数据操作。 复杂或有状态逻辑可能导致序列化错误或性能瓶颈。
- 监视检查点文件夹:对于流处理查询,SDP 通过流而不是汇聚点来管理检查点。 如果管道中有多个流,则每个流都有自己的检查点目录。
- 验证外部依赖项:如果依赖于外部系统或库,请检查它们是否已安装在所有群集节点上或容器中。
-
请注意 Databricks Connect:如果您的环境将来可能会迁移到 Databricks Connect,请检查您的代码是否可序列化,并且不依赖于
dbutilsUDF 中的foreach_batch_sink。
局限性
- ForEachBatch 没有清理功能:由于自定义 Python 代码可能会在任何地方写入数据,因此管道无法处理或跟踪这些数据。 您必须自行管理写入目标的数据或制定相关的保留策略。
- 微批处理中的指标:管道收集流式指标,但某些情况下使用 ForEachBatch 可能会导致指标不完整或异常。 这是由于 ForEachBatch 的灵活性,使得系统难以有效跟踪数据流和具体数据行。
-
支持在没有多次读取的情况下写入多个目标:某些客户可能会使用 ForEachBatch 从源读取一次,然后写入多个目标。 若要实现此目的,您必须在ForEachBatch函数中包含
df.persist或df.cache。 使用这些选项,Azure Databricks 将尝试只准备一次数据。 如果没有这些选项,查询将导致多次读取。 以下代码示例中不包括此内容。 -
与 Databricks Connect 一起使用:如果管道在 Databricks Connect 上运行,
foreachBatch则用户定义的函数(UDF)必须可序列化且无法使用dbutils。 如果管道检测到不可序列化的 UDF,但不会使管道失败,则会引发警告。 -
不可序列化的逻辑:引用本地对象、类或不可调用的资源的代码可能会中断 Databricks Connect 上下文。 如果 Databricks Connect 是必需的,则使用纯 Python 模块并确认引用(例如,
dbutils)不使用。
例子
基本语法示例
from pyspark import pipelines as dp
# Create a ForEachBatch sink
@dp.foreach_batch_sink(name = "my_foreachbatch_sink")
def feb_sink(df, batch_id):
# Custom logic here. You can perform merges,
# write to multiple destinations, etc.
return
# Create source data for example:
@dp.table()
def example_source_data():
return spark.range(5)
# Add sink to an append flow:
@dp.append_flow(
target="my_foreachbatch_sink",
)
def my_flow():
return spark.readStream.format("delta").table("example_source_data")
对简单管道使用示例数据
此示例使用 NYC 出租车示例。 它假定工作区管理员已启用 Databricks 公共数据集目录。 对于数据接收端,请将 my_catalog.my_schema 更改为您有权访问的目录和架构。
from pyspark import pipelines as dp
from pyspark.sql.functions import current_timestamp
# Create foreachBatch sink
@dp.foreach_batch_sink(name = "my_foreach_sink")
def my_foreach_sink(df, batch_id):
# Custom logic here. You can perform merges,
# write to multiple destinations, etc.
# For this example, we are adding a timestamp column.
enriched = df.withColumn("processed_timestamp", current_timestamp())
# Write to a Delta location
enriched.write \
.format("delta") \
.mode("append") \
.saveAsTable("my_catalog.my_schema.trips_sink_delta")
# Return is optional here, but generally not used for the sink
return
# Create an append flow that reads sample data,
# and sends it to the ForEachBatch sink
@dp.append_flow(
target="my_foreach_sink",
)
def taxi_source():
df = spark.readStream.table("samples.nyctaxi.trips")
return df
写入多个目标
此示例写入多个目标。 它演示了如何使用txnVersion和txnAppId来使写入Delta Lake表的操作具有幂等性。 有关详细信息,请参阅幂等表写入foreachBatch。
假设我们要写入两个表table_a和table_b,并且假设在一个批处理中,写入table_a成功,而写入table_b失败。 重新运行批处理时,txnVersion和txnAppId这对参数将允许Delta忽略重复写入table_a的动作,并且仅将批处理写入table_b。
from pyspark import pipelines as dp
app_id = "my-app-name" # different applications that write to the same table should have unique txnAppId
# Create the ForEachBatch sink
@dp.foreach_batch_sink(name="user_events_feb")
def user_events_handler(df, batch_id):
# Optionally do transformations, logging, or merging logic
# ...
# Write to a Delta table
df.write \
.format("delta") \
.mode("append") \
.option("txnVersion", batch_id) \
.option("txnAppId", app_id) \
.saveAsTable("my_catalog.my_schema.example_table_1")
# Also write to a JSON file location
df.write \
.format("json") \
.mode("append") \
.option("txnVersion", batch_id) \
.option("txnAppId", app_id) \
.save("/tmp/json_target")
return
# Create source data for example
@dp.table()
def example_source():
return spark.range(5)
# Create the append flow, and target the ForEachBatch sink
@dp.append_flow(target="user_events_feb", name="user_events_flow")
def read_user_events():
return spark.readStream.format("delta").table("example_source")
使用 spark.sql()
可以在 ForEachBatch 接收器中使用 spark.sql() ,如以下示例所示。
from pyspark import pipelines as dp
from pyspark.sql import Row
@dp.foreach_batch_sink(name = "example_sink")
def feb_sink(df, batch_id):
df.createOrReplaceTempView("df_view")
df.sparkSession.sql("MERGE INTO target_table AS tgt " +
"USING df_view AS src ON tgt.id = src.id " +
"WHEN MATCHED THEN UPDATE SET tgt.id = src.id * 10 " +
"WHEN NOT MATCHED THEN INSERT (id) VALUES (id)"
)
return
# Create target delta table
spark.range(5).write.format("delta").mode("overwrite").saveAsTable("target_table")
# Create source table
@dp.table()
def src_table():
return spark.range(5)
@dp.append_flow(
target="example_sink",
)
def example_flow():
return spark.readStream.format("delta").table("source_table")
常见问题 (FAQ)
可以在 ForEachBatch 数据接收器中使用 dbutils 吗?
如果打算在非 Databricks Connect 环境中运行管道, dbutils 则可以正常工作。 但是,如果使用 Databricks Connect,dbutils 在 foreachBatch 函数中无法访问。 如果管道检测到 dbutils 的使用,为了帮助您避免故障,可能会引发警告。
是否可以将多个流与单个 ForEachBatch 接收器一起使用?
是的。 可以定义多个流(带有 @dp.append_flow),这些流都以相同的接收器名称为目标,但它们各自维护自己的检查点。
数据管道是否处理我的目标的数据保留或清理?
否。 由于 ForEachBatch 接收器可以写入任何任意位置或系统,因此管道无法自动管理或删除该目标中的数据。 必须在自定义代码或外部进程中处理这些操作。
如何排查 ForEachBatch 函数中的序列化错误或故障?
查看群集驱动程序日志或管道事件日志。 对于与 Spark Connect 相关的序列化问题,请检查函数是否仅依赖于可序列化的 Python 对象,并且不引用不允许的对象(如打开的文件句柄或 dbutils)。