pyspark.pipelines
模块(此处别名为dp
)模块使用修饰器实现其大部分核心功能。 这些修饰器接受一个函数,该函数定义流查询或批查询并返回一个 Apache Spark 数据帧。 以下语法演示了定义 Lakeflow 声明性管道数据集的简单示例:
from pyspark import pipelines as dp
@dp.table()
def function_name(): # This is the function decorated
return (<query>) # This is the query logic that defines the dataset
本页概述了在 Lakeflow 声明性管道中定义数据集的函数和查询。 有关可用装饰器的完整列表,请参阅 Lakeflow 声明式管道开发参考资料。
用于定义数据集的函数不应包含与数据集无关的任意 Python 逻辑,包括对第三方 API 的调用。 Lakeflow 声明性管道在规划、验证和更新期间多次运行这些函数。 包括任意逻辑可能会导致意外结果。
读取数据以开始数据集定义
用于定义 Lakeflow 声明性管道数据集的函数通常以 spark.read
或 spark.readStream
作开头。 这些读取操作返回静态或流数据DataFrame对象,用于在获得DataFrame之前应用额外的转换。 返回 DataFrame 的其他 spark操作示例包括 spark.table
或 spark.range
。
函数不应引用函数外部定义的数据帧。 尝试引用在不同范围定义的数据帧可能会导致意外行为。 有关用于创建多个表的元编程模式的示例,请参阅 循环中创建 for
表。
以下示例显示了使用批处理或流式处理逻辑读取数据的基本语法:
from pyspark import pipelines as dp
# Batch read on a table
@dp.materialized_view()
def function_name():
return spark.read.table("catalog_name.schema_name.table_name")
# Batch read on a path
@dp.materialized_view()
def function_name():
return spark.read.format("parquet").load("/Volumes/catalog_name/schema_name/volume_name/data_path")
# Streaming read on a table
@dp.table()
def function_name():
return spark.readStream.table("catalog_name.schema_name.table_name")
# Streaming read on a path
@dp.table()
def function_name():
return (spark.read
.format("cloudFiles")
.option("cloudFile.format", "parquet")
.load("/Volumes/catalog_name/schema_name/volume_name/data_path")
)
如果需要从外部 REST API 读取数据,请使用 Python 自定义数据源实现此连接。 请参阅 PySpark 自定义数据源。
注释
可以从 Python 数据集合(包括 pandas DataFrame、字典和列表)创建任意 Apache Spark 数据帧。 这些模式在开发和测试期间可能很有用,但大多数生产 Lakeflow 声明性管道数据集定义应首先从文件、外部系统或现有表或视图加载数据开始。
链接转换
Lakeflow 声明性管道支持几乎所有 Apache Spark 数据帧转换。 可以在数据集定义函数中包含任意数量的转换,但应确保使用的方法始终返回 DataFrame 对象。
如果你有一个中间转换来驱动多个下游工作负载,但不需要将其具体化为表,可以使用@dp.temporary_view()
向管道添加临时视图。 然后,可以在多个下游数据集定义中使用spark.read.table("temp_view_name")
来引用此视图。 以下语法演示了此模式:
from pyspark import pipelines as dp
@dp.temporary_view()
def a():
return spark.read.table("source").filter(...)
@dp.materialized_view()
def b():
return spark.read.table("a").groupBy(...)
@dp.materialized_view()
def c():
return spark.read.table("a").groupBy(...)
这可确保 Lakeflow 声明性管道在管道规划过程中充分了解视图中的转换,并防止与在数据集定义外部运行的任意 Python 代码相关的潜在问题。
在你的函数中,可以将数据帧联结在一起,以创建新的数据帧,而无需将增量结果写入视图、具体化视图或流式处理表,如下所示的示例:
from pyspark import pipelines as dp
@dp.table()
def multiple_transformations():
df1 = spark.read.table("source").filter(...)
df2 = df1.groupBy(...)
return df2.filter(...)
如果所有数据帧都使用批处理逻辑执行其初始读取,则返回结果为静态数据帧。 如果您有任何正在流式传输的查询,那么返回结果将是流式数据帧。
返回数据帧
使用 @dp.table
从流读取的结果创建流表。 使用 @dp.materialized_view
从批量读取结果创建具体化视图。 大多数其他修饰器都适用于流式处理和静态数据帧,而一些修饰器则需要流式处理数据帧。
用于定义数据集的函数必须返回 Spark 数据帧。 切勿使用将文件或表保存或写入到 Lakeflow 声明性管道数据集代码中的方法。
不应在 Lakeflow 声明性管道代码中使用的 Apache Spark 操作示例:
collect()
count()
toPandas()
save()
saveAsTable()
start()
toTable()
注释
Lakeflow 声明性管道还支持将 Pandas on Spark 用于数据集定义函数。 请参阅 Spark 上的 Pandas API。
在 Python 管道中使用 SQL
PySpark 支持 spark.sql
操作员使用 SQL 编写 DataFrame 代码。 在 Lakeflow 声明性管道源代码中使用此模式时,它将编译为具体化视图或流式处理表。
下面的代码示例等效于对数据集查询逻辑使用 spark.read.table("catalog_name.schema_name.table_name")
:
@dp.materialized_view
def my_table():
return spark.sql("SELECT * FROM catalog_name.schema_name.table_name")
dlt.read
和 dlt.read_stream
(旧版)
旧的 dlt
模块包括由 dlt.read()
和 dlt.read_stream()
引入的函数,是为了支持传统管道发布模式下的功能。 支持这些方法,但 Databricks 建议始终使用 spark.read.table()
和 spark.readStream.table()
函数,原因如下:
- 函数
dlt
对读取当前管道外部定义的数据集的支持有限。 - 这些
spark
函数支持指定用于读取操作的选项,例如skipChangeCommits
。 函数不支持指定dlt
选项。 - 模块
dlt
已被模块pyspark.pipelines
替换。 Databricks 建议在 Python 中编写 Lakeflow 声明性管道代码时使用from pyspark import pipelines as dp
导入pyspark.pipelines
。