DLT 引入了多个新的 SQL 关键字和函数,用于在管道中定义具体化视图和流式处理表。 SQL 对开发管道的支持建立在 Spark SQL 的基础之上,并增添了对结构化流功能的支持。
熟悉 PySpark DataFrame 的用户可能更喜欢使用 Python 开发管道代码。 Python 支持更复杂的测试和操作,这些测试和操作难以通过 SQL 实现,例如元编程操作。 请参阅 使用 Python 开发管道代码。
有关 DLT SQL 语法的完整参考,请参阅 DLT SQL 语言参考。
创建 DLT 数据集的 SQL 代码使用 CREATE OR REFRESH
语法根据查询结果定义具体化视图和流式处理表。
关键字 STREAM
指示是否应使用流式处理语义读取子句中 SELECT
引用的数据源。
读取和写入管道配置期间指定的目录和架构的默认值。 请参阅 设置目标目录和数据库架构。
DLT 源代码与 SQL 脚本严重不同:DLT 在管道中配置的所有源代码文件中评估所有数据集定义,并在运行任何查询之前生成数据流图。 笔记本或脚本中显示的查询顺序定义代码计算的顺序,而不是查询执行的顺序。
下面的代码示例演示了使用 SQL 创建具体化视图的基本语法:
CREATE OR REFRESH MATERIALIZED VIEW basic_mv
AS SELECT * FROM samples.nyctaxi.trips;
下面的代码示例演示了使用 SQL 创建流式处理表的基本语法。 读取流式处理表的源时, STREAM
关键字指示对源使用流式处理语义。 创建具体化视图时不要使用 STREAM
关键字:
CREATE OR REFRESH STREAMING TABLE basic_st
AS SELECT * FROM STREAM samples.nyctaxi.trips;
备注
要使用流式处理语义从源中读取,请使用 STREAM 关键字。 如果读取遇到对现有记录的更改或删除,则会引发错误。 从静态源或仅限追加的源读取是最安全的。 若要引入具有更改提交的数据,可以使用 Python 和 SkipChangeCommits
选项来处理错误。
DLT 支持从 Azure Databricks 支持的所有格式加载数据。 请参阅数据格式选项。
备注
这些示例使用自动装载到工作区的 /databricks-datasets
下的可用数据。 Databricks 建议使用卷路径或云 URI 来引用存储在云对象存储中的数据。 请参阅什么是 Unity Catalog 卷?。
Databricks 建议在针对存储在云对象存储中的数据配置增量引入工作负荷时使用自动加载器和流式处理表。 请参阅什么是自动加载程序?。
SQL 使用该 read_files
函数调用自动加载程序功能。 还必须使用 STREAM
关键字通过 read_files
配置流式读取。
下面介绍了 SQL 中 read_files
的语法:
CREATE OR REFRESH STREAMING TABLE table_name
AS SELECT *
FROM STREAM read_files(
"<file-path>",
[<option-key> => <option_value>, ...]
)
自动加载程序的选项是键值对。 有关支持的格式和选项的详细信息,请参阅 “选项”。
以下示例使用自动加载程序从 JSON 文件创建流式处理表:
CREATE OR REFRESH STREAMING TABLE ingestion_st
AS SELECT *
FROM STREAM read_files(
"/databricks-datasets/retail-org/sales_orders",
format => "json");
该 read_files
函数还支持批处理语义来创建具体化视图。 以下示例使用批处理语义读取 JSON 目录并创建具体化视图:
CREATE OR REFRESH MATERIALIZED VIEW batch_mv
AS SELECT *
FROM read_files(
"/databricks-datasets/retail-org/sales_orders",
format => "json");
可以使用预期来设置和强制实施数据质量约束。 请参阅通过管道预期管理数据质量。
以下代码定义了一个名为 valid_data
的预期,该预期在数据引入过程中删除为 null 的记录。
CREATE OR REFRESH STREAMING TABLE orders_valid(
CONSTRAINT valid_date
EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");
以下示例定义四个数据集:
- 一个名为
orders
的流式处理表,用于加载 JSON 数据。 - 一个名为
customers
的物化视图,用于加载 CSV 数据。 - 一个名为
customer_orders
的具体化视图,该视图用于联接orders
和customers
数据集中的记录,将订单时间戳转换为日期,并选择customer_id
、order_number
、state
和order_date
字段。 - 名为
daily_orders_by_state
的物化视图,用于汇总每个州的每日订单数量。
备注
在管道中查询视图或表时,可以直接指定目录和架构,也可以使用管道中配置的默认值。 在此示例中,orders
、customers
和 customer_orders
表是从为管道配置的默认目录和架构中写入和读取的。
传统发布模式使用 LIVE
架构来查询管道中定义的其他物化视图和流式表。 在新管道中,LIVE
架构语法会被悄无声息地忽略。 请参阅 LIVE 架构(旧版)。
CREATE OR REFRESH STREAMING TABLE orders(
CONSTRAINT valid_date
EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");
CREATE OR REFRESH MATERIALIZED VIEW customers
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers");
CREATE OR REFRESH MATERIALIZED VIEW customer_orders
AS SELECT
c.customer_id,
o.order_number,
c.state,
date(timestamp(int(o.order_datetime))) order_date
FROM orders o
INNER JOIN customers c
ON o.customer_id = c.customer_id;
CREATE OR REFRESH MATERIALIZED VIEW daily_orders_by_state
AS SELECT state, order_date, count(*) order_count
FROM customer_orders
GROUP BY state, order_date;
在创建具体化视图或流式处理表时,可以使用 PRIVATE
子句。 创建专用表时,可以创建表,但不创建表的元数据。
PRIVATE
子句指令 DLT 创建一张供管道使用但不允许在管道外访问的表。 为了缩短处理时间,专用表在创建它的管道的生存期内将一直保存,而不仅作为单个更新。
专用表可以具有与目录中的表相同的名称。 如果为管道中的表指定了非限定名称,如果同时有专用表和具有该名称的目录表,则使用专用表。
专用表以前被称为临时表。
例如为了符合 GDPR 合规要求,要从启用了删除矢量的流式处理表中永久删除记录,必须对对象的基础 Delta 表执行其他操作。
具体化视图在刷新基础表中时始终反映数据。 若要删除具体化视图中的数据,必须从源中删除数据并刷新具体化视图。
使用 SET
在声明表或视图的查询中指定配置值,包括 Spark 配置。 在 SET
语句有权访问已定义的值之后,在笔记本中定义的任何表或视图。 对 SET 语句之后的任何表或视图执行 Spark 查询时,会使用通过 SET
语句指定的任何 Spark 配置。 若要在查询中读取配置值,请使用字符串内插语法 ${}
。 下面的示例设置名为 startDate
的配置值,并在查询中使用该值:
SET startDate='2025-01-01';
CREATE OR REFRESH MATERIALIZED VIEW filtered
AS SELECT * FROM src
WHERE date > ${startDate}
若要指定多个配置值,请对每个值使用单独的 SET
语句。
不支持 PIVOT
子句。 Spark 中的 pivot
操作需要预先加载输入数据以计算输出架构。 DLT 不支持此功能。
备注
弃用用于创建具体化视图的 CREATE OR REFRESH LIVE TABLE
语法。 请改用 CREATE OR REFRESH MATERIALIZED VIEW
。