该 dlt
模块使用修饰器实现其大部分核心功能。 这些修饰器接受定义流式处理或批处理查询并返回 Apache Spark 数据帧的函数。 以下语法演示了定义 Lakeflow 声明性管道数据集的简单示例:
import dlt
@dlt.table()
def function_name(): # This is the function decorated
return (<query>) # This is the query logic that defines the dataset
本页概述了在 Lakeflow 声明性管道中定义数据集的函数和查询。 可用修饰器的完整列表,请参阅Lakeflow 声明性管道开发参考。
用于定义数据集的函数不应包含与数据集无关的任意 Python 逻辑,包括对第三方 API 的调用。 Lakeflow 声明性管道在规划、验证和更新期间多次运行这些函数。 包括任意逻辑可能会导致意外结果。
用于定义 Lakeflow 声明性管道数据集的函数通常以 spark.read
或 spark.readStream
作开头。 这些读取操作返回一个静态或流式处理的 DataFrame 对象,您可以在返回 DataFrame 之前定义其他转换。 返回 DataFrame 的其他 Spark 操作的示例包括 spark.table
或 spark.range
。
函数不应引用函数外部定义的数据帧。 尝试引用在不同范围定义的数据帧可能会导致出现意外行为。 有关用于创建多个表的元编程模式的示例,请参阅 循环中创建 for
表。
以下示例显示了使用批处理或流式处理逻辑读取数据的基本语法:
import dlt
# Batch read on a table
@dlt.table()
def function_name():
return spark.read.table("catalog_name.schema_name.table_name")
# Batch read on a path
@dlt.table()
def function_name():
return spark.read.format("parquet").load("/Volumes/catalog_name/schema_name/volume_name/data_path")
# Streaming read on a table
@dlt.table()
def function_name():
return spark.readStream.table("catalog_name.schema_name.table_name")
# Streaming read on a path
@dlt.table()
def function_name():
return (spark.read
.format("cloudFiles")
.option("cloudFile.format", "parquet")
.load("/Volumes/catalog_name/schema_name/volume_name/data_path")
)
如果需要从外部 REST API 读取数据,请使用 Python 自定义数据源实现此连接。 请参阅 PySpark 自定义数据源。
备注
可以从 Python 数据集合(包括 pandas DataFrame、字典和列表)创建任意的 Apache Spark DataFrame。 这些模式在开发和测试期间可能很有用,但大多数生产 Lakeflow 声明性管道数据集定义应首先从文件、外部系统或现有表或视图加载数据开始。
Lakeflow 声明性管道支持几乎所有 Apache Spark 数据帧转换。 可以在数据集定义函数中包含任意数量的转换,但应确保使用的方法始终返回 DataFrame 对象。
如果你有一个用于驱动多个下游任务的中间转换,但不需要将其具体化为表,则使用 @dlt.view()
向管道添加临时视图。 然后,可以在多个下游数据集定义中使用spark.read.table("temp_view_name")
来引用此视图。 以下语法演示了此模式:
import dlt
@dlt.view()
def a():
return spark.read.table("source").filter(...)
@dlt.table()
def b():
return spark.read.table("b").groupBy(...)
@dlt.table()
def c():
return spark.read.table("c").groupBy(...)
这可确保 Lakeflow 声明性管道在管道计划过程中充分了解您视图中的转换,并防止与在数据集定义之外运行的随意 Python 代码相关的潜在问题。
在你的函数中,可以将 DataFrame 链接在一起,以创建新的 DataFrame,而不需要将增量结果写入视图、物化视图或流式处理表,如以下示例所示:
import dlt
@dlt.table()
def multiple_transformations():
df1 = spark.read.table("source").filter(...)
df2 = df1.groupBy(...)
return df2.filter(...)
如果所有数据帧都使用批处理逻辑执行其初始读取,则返回结果为静态数据帧。 如果有任何流式查询,则返回结果为流式数据帧。
@dlt.table()
对于修饰器,返回静态数据帧意味着要定义具体化视图。 返回流式处理数据帧意味着要定义流式处理表。 大多数修饰器都适用于流式处理和静态数据帧,而其他修饰器则要求流式处理数据帧。
用于定义数据集的函数必须返回 Spark 数据帧。 切勿使用将文件或表保存或写入到 Lakeflow 声明性管道数据集代码中的方法。
不应在 Lakeflow 声明性管道代码中使用的 Apache Spark 示例操作:
collect()
count()
toPandas()
save()
saveAsTable()
start()
toTable()
备注
Lakeflow 声明性管道还支持将 Pandas on Spark 用于数据集定义函数。 请参阅 Spark 上的 Pandas API。
PySpark 支持 spark.sql
作员使用 SQL 编写 DataFrame 代码。 在 Lakeflow 声明性管道源代码中使用此模式时,它将编译为具体化视图或流式处理表。
下面的代码示例等效于对数据集查询逻辑使用 spark.read.table("catalog_name.schema_name.table_name")
:
@dlt.table
def my_table():
return spark.sql("SELECT * FROM catalog_name.schema_name.table_name")
该 dlt
模块包括由 dlt.read()
和 dlt.read_stream()
引入的函数,以支持旧版管道发布模式中的功能。 支持这些方法,但 Databricks 建议始终使用 spark.read.table()
和 spark.readStream.table()
函数,原因如下:
- 函数
dlt
对读取当前管道外部定义的数据集的支持有限。 - 这些
spark
函数支持指定用于读取操作的选项,例如skipChangeCommits
。dlt
函数不支持指定选项。