使用 SQL 开发管道代码

DLT 引入了多个新的 SQL 关键字和函数,用于在管道中定义具体化视图和流式处理表。 SQL 对开发管道的支持建立在 Spark SQL 的基础之上,并增添了对结构化流功能的支持。

熟悉 PySpark DataFrame 的用户可能更喜欢使用 Python 开发管道代码。 Python 支持更复杂的测试和操作,这些测试和操作难以通过 SQL 实现,例如元编程操作。 请参阅 使用 Python 开发管道代码。

有关 DLT SQL 语法的完整参考,请参阅 DLT SQL 语言参考

用于管道开发的 SQL 基础知识

创建 DLT 数据集的 SQL 代码使用 CREATE OR REFRESH 语法根据查询结果定义具体化视图和流式处理表。

关键字 STREAM 指示是否应使用流式处理语义读取子句中 SELECT 引用的数据源。

读取和写入管道配置期间指定的目录和架构的默认值。 请参阅 设置目标目录和数据库架构

DLT 源代码与 SQL 脚本严重不同:DLT 在管道中配置的所有源代码文件中评估所有数据集定义,并在运行任何查询之前生成数据流图。 笔记本或脚本中显示的查询顺序定义代码计算的顺序,而不是查询执行的顺序。

使用 SQL 创建具体化视图

下面的代码示例演示了使用 SQL 创建具体化视图的基本语法:

CREATE OR REFRESH MATERIALIZED VIEW basic_mv
AS SELECT * FROM samples.nyctaxi.trips;

使用 SQL 创建流表

下面的代码示例演示了使用 SQL 创建流式处理表的基本语法。 读取流式处理表的源时, STREAM 关键字指示对源使用流式处理语义。 创建具体化视图时不要使用 STREAM 关键字:

CREATE OR REFRESH STREAMING TABLE basic_st
AS SELECT * FROM STREAM samples.nyctaxi.trips;

备注

要使用流式处理语义从源中读取,请使用 STREAM 关键字。 如果读取遇到对现有记录的更改或删除,则会引发错误。 从静态源或仅限追加的源读取是最安全的。 若要引入具有更改提交的数据,可以使用 Python 和 SkipChangeCommits 选项来处理错误。

从对象存储加载数据

DLT 支持从 Azure Databricks 支持的所有格式加载数据。 请参阅数据格式选项

备注

这些示例使用自动装载到工作区的 /databricks-datasets 下的可用数据。 Databricks 建议使用卷路径或云 URI 来引用存储在云对象存储中的数据。 请参阅什么是 Unity Catalog 卷?

Databricks 建议在针对存储在云对象存储中的数据配置增量引入工作负荷时使用自动加载器和流式处理表。 请参阅什么是自动加载程序?

SQL 使用该 read_files 函数调用自动加载程序功能。 还必须使用 STREAM 关键字通过 read_files 配置流式读取。

下面介绍了 SQL 中 read_files 的语法:

CREATE OR REFRESH STREAMING TABLE table_name
AS SELECT *
  FROM STREAM read_files(
    "<file-path>",
    [<option-key> => <option_value>, ...]
  )

自动加载程序的选项是键值对。 有关支持的格式和选项的详细信息,请参阅 “选项”。

以下示例使用自动加载程序从 JSON 文件创建流式处理表:

CREATE OR REFRESH STREAMING TABLE ingestion_st
AS SELECT *
FROM STREAM read_files(
  "/databricks-datasets/retail-org/sales_orders",
  format => "json");

read_files 函数还支持批处理语义来创建具体化视图。 以下示例使用批处理语义读取 JSON 目录并创建具体化视图:

CREATE OR REFRESH MATERIALIZED VIEW batch_mv
AS SELECT *
FROM read_files(
  "/databricks-datasets/retail-org/sales_orders",
  format => "json");

用预期来验证数据

可以使用预期来设置和强制实施数据质量约束。 请参阅通过管道预期管理数据质量

以下代码定义了一个名为 valid_data 的预期,该预期在数据引入过程中删除为 null 的记录。

CREATE OR REFRESH STREAMING TABLE orders_valid(
  CONSTRAINT valid_date
  EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
  ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");

查询管道中定义的具体化视图和流式处理表

以下示例定义四个数据集:

  • 一个名为 orders 的流式处理表,用于加载 JSON 数据。
  • 一个名为 customers 的物化视图,用于加载 CSV 数据。
  • 一个名为customer_orders的具体化视图,该视图用于联接orderscustomers数据集中的记录,将订单时间戳转换为日期,并选择customer_idorder_numberstateorder_date字段。
  • 名为 daily_orders_by_state 的物化视图,用于汇总每个州的每日订单数量。

备注

在管道中查询视图或表时,可以直接指定目录和架构,也可以使用管道中配置的默认值。 在此示例中,orderscustomerscustomer_orders 表是从为管道配置的默认目录和架构中写入和读取的。

传统发布模式使用 LIVE 架构来查询管道中定义的其他物化视图和流式表。 在新管道中,LIVE 架构语法会被悄无声息地忽略。 请参阅 LIVE 架构(旧版)。

CREATE OR REFRESH STREAMING TABLE orders(
  CONSTRAINT valid_date
  EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
  ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");

CREATE OR REFRESH MATERIALIZED VIEW customers
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers");

CREATE OR REFRESH MATERIALIZED VIEW customer_orders
AS SELECT
  c.customer_id,
  o.order_number,
  c.state,
  date(timestamp(int(o.order_datetime))) order_date
FROM orders o
INNER JOIN customers c
ON o.customer_id = c.customer_id;

CREATE OR REFRESH MATERIALIZED VIEW daily_orders_by_state
AS SELECT state, order_date, count(*) order_count
FROM customer_orders
GROUP BY state, order_date;

定义专用表

在创建具体化视图或流式处理表时,可以使用 PRIVATE 子句。 创建专用表时,可以创建表,但不创建表的元数据。 PRIVATE 子句指令 DLT 创建一张供管道使用但不允许在管道外访问的表。 为了缩短处理时间,专用表在创建它的管道的生存期内将一直保存,而不仅作为单个更新。

专用表可以具有与目录中的表相同的名称。 如果为管道中的表指定了非限定名称,如果同时有专用表和具有该名称的目录表,则使用专用表。

专用表以前被称为临时表。

从具体化视图或流式处理表中永久删除记录

例如为了符合 GDPR 合规要求,要从启用了删除矢量的流式处理表中永久删除记录,必须对对象的基础 Delta 表执行其他操作。

具体化视图在刷新基础表中时始终反映数据。 若要删除具体化视图中的数据,必须从源中删除数据并刷新具体化视图。

在使用 SQL 声明表或视图时,对所用值进行参数化

使用 SET 在声明表或视图的查询中指定配置值,包括 Spark 配置。 在 SET 语句有权访问已定义的值之后,在笔记本中定义的任何表或视图。 对 SET 语句之后的任何表或视图执行 Spark 查询时,会使用通过 SET 语句指定的任何 Spark 配置。 若要在查询中读取配置值,请使用字符串内插语法 ${}。 下面的示例设置名为 startDate 的配置值,并在查询中使用该值:

SET startDate='2025-01-01';

CREATE OR REFRESH MATERIALIZED VIEW filtered
AS SELECT * FROM src
WHERE date > ${startDate}

若要指定多个配置值,请对每个值使用单独的 SET 语句。

局限性

不支持 PIVOT 子句。 Spark 中的 pivot 操作需要预先加载输入数据以计算输出架构。 DLT 不支持此功能。

备注

弃用用于创建具体化视图的 CREATE OR REFRESH LIVE TABLE 语法。 请改用 CREATE OR REFRESH MATERIALIZED VIEW