DLT 引入了几个新的 Python 代码构造,用于在管道中定义具体化视图和流式处理表。 Python 对开发管道的支持基于 PySpark DataFrame 和结构化流式处理 API 的基础知识。
对于不熟悉 Python 和数据帧的用户,Databricks 建议使用 SQL 接口。 请参阅使用 SQL 开发管道代码。
有关 DLT Python 语法的完整参考,请参阅 DLT Python 语言参考。
创建 DLT 数据集的 Python 代码必须返回 DataFrames。
所有 DLT Python API 都在模块中 dlt
实现。 使用 Python 实现的 DLT 管道代码必须在 Python 笔记本和文件的顶部显式导入 dlt
模块。
读取和写入管道配置期间指定的目录和架构的默认值。 请参阅 设置目标目录和数据库架构。
特定于 DLT 的 Python 代码与其他类型的 Python 代码不同,这一关键方式是:Python 管道代码不直接调用执行数据引入和转换以创建 DLT 数据集的函数。 相反,DLT 会从管道中配置的所有源代码文件中解释 dlt
模块里的修饰器函数,并生成数据流图。
重要
要避免管道运行时出现意外行为,请不要在定义数据集的函数中包含可能具有副作用的代码。 要了解详细信息,请参阅Python 参考。
@dlt.table
修饰器指示 DLT 基于函数返回的结果创建物化视图或流表。 批处理读取的结果创建物化视图,而流式读取的结果则创建流表。
默认,物化视图和流式表的名称由函数名称推断而来。 下面的代码示例演示用于创建具体化视图和流式处理表的基本语法:
备注
这两个函数引用 samples
目录中的同一个表,并使用同一修饰器函数。 这些示例突出显示了具体化视图和流式处理表在基本语法上的唯一区别是使用spark.read
和spark.readStream
。
并非所有数据源都支持流式读取。 某些数据源应始终使用流处理语义进行处理。
import dlt
@dlt.table()
def basic_mv():
return spark.read.table("samples.nyctaxi.trips")
@dlt.table()
def basic_st():
return spark.readStream.table("samples.nyctaxi.trips")
(可选)可以使用修饰器中的name
参数指定表名@dlt.table
。 以下示例演示具体化视图和流式处理表的此模式:
import dlt
@dlt.table(name = "trips_mv")
def basic_mv():
return spark.read.table("samples.nyctaxi.trips")
@dlt.table(name = "trips_st")
def basic_st():
return spark.readStream.table("samples.nyctaxi.trips")
DLT 支持从 Azure Databricks 支持的所有格式加载数据。 请参阅数据格式选项。
备注
这些示例使用自动装载到工作区的 /databricks-datasets
下的可用数据。 Databricks 建议使用卷路径或云 URI 来引用存储在云对象存储中的数据。 请参阅什么是 Unity Catalog 卷?。
Databricks 建议在针对存储在云对象存储中的数据配置增量引入工作负荷时使用自动加载器和流式处理表。 请参阅什么是自动加载程序?。
以下示例使用自动加载程序从 JSON 文件创建流式处理表:
import dlt
@dlt.table()
def ingestion_st():
return (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("/databricks-datasets/retail-org/sales_orders")
)
以下示例使用批处理语义读取 JSON 目录并创建具体化视图:
import dlt
@dlt.table()
def batch_mv():
return spark.read.format("json").load("/databricks-datasets/retail-org/sales_orders")
可以使用预期来设置和强制实施数据质量约束。 请参阅通过管道预期管理数据质量。
以下代码使用 @dlt.expect_or_drop
定义一个名为 valid_data
的预期,该预期在数据引入期间删除为 null 的记录。
import dlt
@dlt.table()
@dlt.expect_or_drop("valid_date", "order_datetime IS NOT NULL AND length(order_datetime) > 0")
def orders_valid():
return (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("/databricks-datasets/retail-org/sales_orders")
)
以下示例定义四个数据集:
- 一个名为
orders
的流式处理表,用于加载 JSON 数据。 - 一个名为
customers
的物化视图,用于加载 CSV 数据。 - 一个名为
customer_orders
的具体化视图,该视图用于联接orders
和customers
数据集中的记录,将订单时间戳转换为日期,并选择customer_id
、order_number
、state
和order_date
字段。 - 名为
daily_orders_by_state
的物化视图,用于汇总每个州的每日订单数量。
备注
在管道中查询视图或表时,可以直接指定目录和架构,也可以使用管道中配置的默认值。 在此示例中,orders
、customers
和 customer_orders
表是从为管道配置的默认目录和架构中写入和读取的。
传统发布模式使用 LIVE
架构来查询管道中定义的其他物化视图和流式表。 在新管道中,LIVE
架构语法会被悄无声息地忽略。 请参阅 LIVE 架构(旧版)。
import dlt
from pyspark.sql.functions import col
@dlt.table()
@dlt.expect_or_drop("valid_date", "order_datetime IS NOT NULL AND length(order_datetime) > 0")
def orders():
return (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("/databricks-datasets/retail-org/sales_orders")
)
@dlt.table()
def customers():
return spark.read.format("csv").option("header", True).load("/databricks-datasets/retail-org/customers")
@dlt.table()
def customer_orders():
return (spark.read.table("orders")
.join(spark.read.table("customers"), "customer_id")
.select("customer_id",
"order_number",
"state",
col("order_datetime").cast("int").cast("timestamp").cast("date").alias("order_date"),
)
)
@dlt.table()
def daily_orders_by_state():
return (spark.read.table("customer_orders")
.groupBy("state", "order_date")
.count().withColumnRenamed("count", "order_count")
)
可以使用 Python for
循环以编程方式创建多个表。 这在您有多个数据源或目标数据集时非常有用,这些数据源或数据集仅因几个参数的不同而异,从而减少需要维护的代码总量和代码冗余。
该 for
循环按串行顺序评估逻辑,但在为数据集完成规划后,管道并行运行逻辑。
重要
使用此模式定义数据集时,请确保传递给 for
循环的值列表始终是累加的。 如果以前在管道中定义的数据集从将来的管道运行中省略,该数据集将自动从目标架构中删除。
以下示例创建五个表,用于按区域筛选客户订单。 在这里,区域名称用于设置目标具体化视图的名称并筛选源数据。 临时视图用于定义来自源表的联接,这些表用于构造最终的具体化视图。
import dlt
from pyspark.sql.functions import collect_list, col
@dlt.view()
def customer_orders():
orders = spark.read.table("samples.tpch.orders")
customer = spark.read.table("samples.tpch.customer")
return (orders.join(customer, orders.o_custkey == customer.c_custkey)
.select(
col("c_custkey").alias("custkey"),
col("c_name").alias("name"),
col("c_nationkey").alias("nationkey"),
col("c_phone").alias("phone"),
col("o_orderkey").alias("orderkey"),
col("o_orderstatus").alias("orderstatus"),
col("o_totalprice").alias("totalprice"),
col("o_orderdate").alias("orderdate"))
)
@dlt.view()
def nation_region():
nation = spark.read.table("samples.tpch.nation")
region = spark.read.table("samples.tpch.region")
return (nation.join(region, nation.n_regionkey == region.r_regionkey)
.select(
col("n_name").alias("nation"),
col("r_name").alias("region"),
col("n_nationkey").alias("nationkey")
)
)
# Extract region names from region table
region_list = spark.read.table("samples.tpch.region").select(collect_list("r_name")).collect()[0][0]
# Iterate through region names to create new region-specific materialized views
for region in region_list:
@dlt.table(name=f"{region.lower().replace(' ', '_')}_customer_orders")
def regional_customer_orders(region_filter=region):
customer_orders = spark.read.table("customer_orders")
nation_region = spark.read.table("nation_region")
return (customer_orders.join(nation_region, customer_orders.nationkey == nation_region.nationkey)
.select(
col("custkey"),
col("name"),
col("phone"),
col("nation"),
col("region"),
col("orderkey"),
col("orderstatus"),
col("totalprice"),
col("orderdate")
).filter(f"region = '{region_filter}'")
)
下面是此管道的数据流图示例:
由管道用来评估 Python 代码的惰性执行模型要求你的逻辑在调用由 @dlt.table()
修饰的函数时直接引用各个值。
以下示例演示了使用循环定义表 for
的两种正确方法。 在这两个示例中,列表中的每个表名tables
都在由@dlt.table()
修饰的函数中被明确引用。
import dlt
# Create a parent function to set local variables
def create_table(table_name):
@dlt.table(name=table_name)
def t():
return spark.read.table(table_name)
tables = ["t1", "t2", "t3"]
for t_name in tables:
create_table(t_name)
# Call `@dlt.table()` within a for loop and pass values as variables
tables = ["t1", "t2", "t3"]
for t_name in tables:
@dlt.table(name=t_name)
def create_table(table_name=t_name):
return spark.read.table(table_name)
以下示例未正确引用值。 此示例创建具有不同名称的表,但所有表都从循环中的 for
最后一个值加载数据:
import dlt
# Don't do this!
tables = ["t1", "t2", "t3"]
for t_name in tables:
@dlt.table(name=t_name)
def create_table():
return spark.read.table(t_name)