本文介绍如何使用 DLT 声明数据集的转换,以及如何通过查询逻辑处理记录。 它还包含用于生成 DLT 管道的常见转换模式的示例。
可以针对返回数据帧的任何查询定义数据集。 可以使用 Apache Spark 内置操作、UDF、自定义逻辑和 MLflow 模型作为 DLT 管道中的转换。 将数据引入到 DLT 管道后,可以针对上游源定义新数据集,以创建新的流表、物化视图和视图。
若要了解如何使用 DLT 有效执行有状态处理,请参阅 使用水印优化 DLT 中的有状态处理。
实现管道查询时,请选择最佳数据集类型以确保管道高效且可维护。
请考虑使用视图来执行以下操作:
- 将所需的大型或复杂查询分解为更易于管理的查询。
- 使用期望来验证中间结果。
- 减少不需要保留的结果的存储和计算成本。 由于表已具体化,因此它们需要额外的计算和存储资源。
在以下情况下考虑使用具体化视图:
- 多个下游查询使用表。 由于视图是按需计算的,因此每次查询视图时都会重新计算视图。
- 其他管道、作业或查询将使用表。 由于视图未具体化,因此你只能在同一个管道中使用它们。
- 你想要在开发过程中查看查询结果。 由于表已具体化并可以在管道外部查看和查询,因此在开发过程中使用表可帮助验证计算的正确性。 验证后,将不需要具体化的查询转换为视图。
在以下情况下考虑使用流式表:
- 查询是针对持续或以增量方式增长的数据源定义的。
- 应以增量方式计算查询结果。
- 管道需要高吞吐量和低延迟。
备注
流数据表始终是基于流数据源定义的。 你还可以将流式处理源与 APPLY CHANGES INTO
结合使用以应用 CDC 源中的更新。 请参阅 “应用更改 API:使用 DLT 简化更改数据捕获”。
如果必须计算不供外部使用的中间表,可以使用 TEMPORARY
关键字阻止将其发布到架构。 临时表仍根据 DLT 语义存储和处理数据,但不应在当前管道外部访问。 临时表在创建它的管道的整个生命周期内保持不变。 使用以下语法声明临时表:
CREATE TEMPORARY STREAMING TABLE temp_table
AS SELECT ... ;
@dlt.table(
temporary=True)
def temp_table():
return ("...")
流式表继承了 Apache Spark 结构化流式处理的处理保证,并配置为处理来自仅追加数据源的查询,其中的新行始终插入到源表中,而不会经过修改。
备注
虽然默认情况下,流式处理表要求使用“仅追加”数据源,但当流式处理源是另一个需要更新或删除的流式处理表时,可以使用 skipChangeCommits 标志改写此行为
常见的流式处理模式涉及到引入源数据以在管道中创建初始数据集。 这些初始数据集通常称为铜表,通常用于执行简单的转换。
相比之下,管道中的最终表(通常称为 gold 表)通常需要复杂的聚合,或从 APPLY CHANGES INTO
操作目标中读取。 由于这些操作本质上是创建更新而不是追加,因此不支持将它们作为流式表的输入。 这些转换更适合具体化视图。
通过将流式表和具体化视图混合到单个管道中,可以简化管道并避免对原始数据进行代价高昂的重新引入或重新处理,还能充分利用 SQL 来计算经过有效编码和筛选的数据集上的复杂聚合。 以下示例演示了这种类型的混合处理:
备注
这些示例使用自动加载程序从云存储加载文件。 若要在启用 Unity Catalog 的管道中使用 Auto Loader 加载文件,必须使用 外部位置。 若要详细了解如何将 Unity 目录与 DLT 配合使用,请参阅 将 Unity 目录与 DLT 管道配合使用。
@dlt.table
def streaming_bronze():
return (
# Since this is a streaming source, this table is incremental.
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("abfss://path/to/raw/data")
)
@dlt.table
def streaming_silver():
# Since we read the bronze table as a stream, this silver table is also
# updated incrementally.
return spark.readStream.table("streaming_bronze").where(...)
@dlt.table
def live_gold():
# This table will be recomputed completely by reading the whole silver table
# when it is updated.
return spark.readStream.table("streaming_silver").groupBy("user_id").count()
CREATE OR REFRESH STREAMING TABLE streaming_bronze
AS SELECT * FROM STREAM read_files(
"abfss://path/to/raw/data",
format => "json"
)
CREATE OR REFRESH STREAMING TABLE streaming_silver
AS SELECT * FROM STREAM(streaming_bronze) WHERE...
CREATE OR REFRESH MATERIALIZED VIEW live_gold
AS SELECT count(*) FROM streaming_silver GROUP BY user_id
详细了解如何使用自动加载程序从 Azure 存储增量地引入 JSON 文件。
在使用主要静态维度表对连续仅追加数据流进行非规范化时,流静态联接是一个不错的选择。
每次更新管道时,流中的新记录都会与静态表的最新快照联接在一起。 如果在处理流式表中的相应数据后在静态表中添加或更新记录,则除非执行完全刷新,否则不会重新计算最终的记录。
在配置为触发执行的管道中,静态表返回更新开始时的结果。 在配置为连续执行的管道中,每次表处理更新时,就会查询静态表的最新版本。
下面是流静态联接的示例:
@dlt.table
def customer_sales():
return spark.readStream.table("sales").join(spark.readStream.table("customers"), ["customer_id"], "left")
CREATE OR REFRESH STREAMING TABLE customer_sales
AS SELECT * FROM STREAM(sales)
INNER JOIN LEFT customers USING (customer_id)
可以使用流式表来以增量方式计算简单的分布聚合(例如计数、最小值、最大值或总和)和代数聚合(例如平均值或标准偏差)。 Databricks 建议对组数量有限的查询进行增量聚合,例如,包含 GROUP BY country
子句的查询。 每次更新时仅读取新的输入数据。
若要详细了解如何编写执行增量聚合的 DLT 查询,请参阅 使用水印执行开窗聚合。
备注
若要在启用了 Unity Catalog 的管道中使用 MLflow 模型,必须将管道配置为使用 preview
通道。 若要使用 current
通道,必须配置管道以发布到 Hive 元存储库。
可以在 DLT 管道中使用 MLflow 训练的模型。 MLflow 模型在 Azure Databricks 中被视为转换,这意味着,它们将作用于 Spark 数据帧输入并将结果作为 Spark 数据帧返回。 由于 DLT 针对 DataFrame 定义数据集,因此,只需几行代码即可将使用 MLflow 的 Apache Spark 工作负载转换为 DLT。 有关 MLflow 的详细信息,请参阅生成型 AI 代理的 MLflow 和 ML 模型生命周期。
如果已有一个调用 MLflow 模型的 Python 笔记本,则可以使用 @dlt.table
修饰器将此代码适应 DLT,并确保定义函数以返回转换结果。 DLT 默认不安装 MLflow,因此请确认已安装 MLflow 库%pip install mlflow
,并且已在笔记本顶部导入了mlflow
和dlt
。 有关 DLT 语法的简介,请参阅 使用 Python 开发管道代码。
若要在 DLT 中使用 MLflow 模型,请完成以下步骤:
获取 MLflow 模型的运行 ID 和模型名称。 运行 ID 和模型名称用于构造 MLflow 模型的 URI。
使用 URI 定义 Spark UDF,以加载 MLflow 模型。
在表定义中调用 UDF 以使用 MLflow 模型。
以下示例演示了此模式的基本语法:
%pip install mlflow
import dlt
import mlflow
run_id= "<mlflow-run-id>"
model_name = "<the-model-name-in-run>"
model_uri = f"runs:/{run_id}/{model_name}"
loaded_model_udf = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri)
@dlt.table
def model_predictions():
return spark.read.table(<input-data>)
.withColumn("prediction", loaded_model_udf(<model-features>))
下面是一个完整的代码示例,它定义一个名为 loaded_model_udf
的 Spark UDF,用于加载一个基于贷款风险数据训练的 MLflow 模型。 用于预测的数据列作为参数传递给该 UDF。 表 loan_risk_predictions
计算 loan_risk_input_data
中每一行的预测。
%pip install mlflow
import dlt
import mlflow
from pyspark.sql.functions import struct
run_id = "mlflow_run_id"
model_name = "the_model_name_in_run"
model_uri = f"runs:/{run_id}/{model_name}"
loaded_model_udf = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri)
categoricals = ["term", "home_ownership", "purpose",
"addr_state","verification_status","application_type"]
numerics = ["loan_amnt", "emp_length", "annual_inc", "dti", "delinq_2yrs",
"revol_util", "total_acc", "credit_length_in_years"]
features = categoricals + numerics
@dlt.table(
comment="GBT ML predictions of loan risk",
table_properties={
"quality": "gold"
}
)
def loan_risk_predictions():
return spark.read.table("loan_risk_input_data")
.withColumn('predictions', loaded_model_udf(struct(features)))
DLT 允许从表手动删除或更新记录,并执行刷新作以重新计算下游表。
默认情况下,DLT 会在每次更新管道时基于输入数据重新计算表结果,因此必须确保不会从源数据重新加载已删除的记录。 将 pipelines.reset.allowed
表属性设置为 false
可防止对表进行刷新,但不能防止对表进行增量写入,或防止新数据流入表中。
下图演示了使用两个流式处理表的示例:
-
raw_user_table
从源中引入原始用户数据。 -
bmi_table
使用raw_user_table
中的重量和高度以增量方式计算 BMI 评分。
你想要手动删除或更新 raw_user_table
中的用户记录并重新计算 bmi_table
。
以下代码演示了将 pipelines.reset.allowed
表属性设置为 false
,以禁用 raw_user_table
的完全刷新,从而确保所需的更改随着时间得以保留,而在管道更新运行时重新计算下游表。
CREATE OR REFRESH STREAMING TABLE raw_user_table
TBLPROPERTIES(pipelines.reset.allowed = false)
AS SELECT * FROM STREAM read_files("/databricks-datasets/iot-stream/data-user", format => "csv");
CREATE OR REFRESH STREAMING TABLE bmi_table
AS SELECT userid, (weight/2.2) / pow(height*0.0254,2) AS bmi FROM STREAM(raw_user_table);