Delta Live Tables SQL 语言参考

本文提供了有关 Delta Live Tables SQL 编程接口的详细信息。

可以在 SQL 查询中使用 Python 用户定义函数 (UDF),但必须先在 Python 文件中定义这些 UDF,然后再在 SQL 源文件中调用它们。 请参阅用户定义标量函数 - Python

限制

不支持 PIVOT 子句。 Spark 中的 pivot 操作需要预先加载输入数据以计算输出架构。 Delta Live Tables 不支持此功能。

创建 Delta Live Tables 具体化视图或流式处理表

在声明流式处理表或具体化视图(也称为 LIVE TABLE)时,可以使用相同的基本 SQL 语法。

只能使用针对流式处理源读取的查询来声明流式处理表。 Databricks 建议使用自动加载程序从云对象存储中流式引入文件。 请参阅自动加载程序 SQL 语法

将管道中的其他表或视图指定为流式处理源时,必须在数据集名称周围包含 STREAM() 函数。

下面介绍了使用 SQL 声明具体化视图和流式处理表的语法:

CREATE OR REFRESH [TEMPORARY] { STREAMING TABLE | LIVE TABLE } table_name
  [(
    [
    col_name1 col_type1 [ GENERATED ALWAYS AS generation_expression1 ] [ COMMENT col_comment1 ] [ column_constraint ],
    col_name2 col_type2 [ GENERATED ALWAYS AS generation_expression2 ] [ COMMENT col_comment2 ] [ column_constraint ],
    ...
    ]
    [
    CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    ...
    ]
    [ table_constraint ] [, ...]
  )]
  [USING DELTA]
  [PARTITIONED BY (col_name1, col_name2, ... )]
  [LOCATION path]
  [COMMENT table_comment]
  [TBLPROPERTIES (key1 [ = ] val1, key2 [ = ] val2, ... )]
  AS select_statement

创建 Delta Live Tables 视图

下面介绍了使用 SQL 声明视图的语法:

CREATE TEMPORARY [STREAMING] LIVE VIEW view_name
  [(
    [
    col_name1 [ COMMENT col_comment1 ],
    col_name2 [ COMMENT col_comment2 ],
    ...
    ]
    [
    CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    ...
    ]
  )]
  [COMMENT view_comment]
  AS select_statement

自动加载程序 SQL 语法

下面介绍了在 SQL 中使用自动加载程序的语法:

CREATE OR REFRESH STREAMING TABLE table_name
AS SELECT *
  FROM cloud_files(
    "<file-path>",
    "<file-format>",
    map(
      "<option-key>", "<option_value",
      "<option-key>", "<option_value",
      ...
    )
  )

可以将支持的格式选项用于自动加载程序。 使用 map() 函数,可将任意数量的选项传递给 cloud_files() 方法。 选项是键值对,其中键和值是字符串。 有关支持格式和选项的详细信息,请参阅文件格式选项

示例:定义表

你可以通过以下方式创建数据集:从外部数据源或管道中定义的数据集读取数据。 若要从内部数据集读取数据,请在数据集名称前追加 LIVE 关键字。 以下示例定义了两个不同的数据集:一个将 JSON 文件作为输入源的 taxi_raw 表,一个将 taxi_raw 表作为输入的 filtered_data 表:

CREATE OR REFRESH LIVE TABLE taxi_raw
AS SELECT * FROM json.`/databricks-datasets/nyctaxi/sample/json/`

CREATE OR REFRESH LIVE TABLE filtered_data
AS SELECT
  ...
FROM LIVE.taxi_raw

示例:从流式处理源读取

若要从流式处理源(例如自动加载程序或内部数据集)读取数据,请定义 STREAMING 表:

CREATE OR REFRESH STREAMING TABLE customers_bronze
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/customers/", "csv")

CREATE OR REFRESH STREAMING TABLE customers_silver
AS SELECT * FROM STREAM(LIVE.customers_bronze)

有关对数据进行流式处理的详细信息,请参阅使用 Delta Live Tables 转换数据

控制表的具体化方式

表还提供对其具体化的额外控制:

  • 指定如何使用 PARTITIONED BY 对表进行分区。 可以使用分区来加快查询速度。
  • 可以使用 TBLPROPERTIES 来设置表属性。 请参阅 Delta Live Tables 表属性
  • 使用 LOCATION 设置来设置存储位置。 默认情况下,如果未设置 LOCATION,表数据会存储在管道存储位置。
  • 可在架构定义中使用生成的列。 请参阅示例:指定架构和分区列

注意

对于小于 1 TB 的表,Databricks 建议让增量实时表控制数据组织方式。 除非你预期表会增长到超过 1 TB,否则一般情况下不应指定分区列。

示例:指定架构和分区列

你可以在定义表时指定架构。 以下示例指定目标表的架构,包括使用 Delta Lake 生成的列和为表定义分区列:

CREATE OR REFRESH LIVE TABLE sales
(customer_id STRING,
  customer_name STRING,
  number_of_line_items STRING,
  order_datetime STRING,
  order_number LONG,
  order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
) PARTITIONED BY (order_day_of_week)
COMMENT "Raw data on sales"
AS SELECT * FROM ...

默认情况下,如果未指定架构,则增量实时表将从 table 定义推断架构。

示例:定义表约束

注意

表约束支持目前处于公共预览版阶段。 若要定义表约束,管道必须是启用了 Unity 目录的管道,并配置为使用 preview 通道。

指定架构时,可以定义主键和外键。 约束具备信息性,系统不会强制执行。 以下示例定义具有主键和外键约束的表:

CREATE OR REFRESH LIVE TABLE sales
(customer_id STRING NOT NULL PRIMARY KEY,
  customer_name STRING,
  number_of_line_items STRING,
  order_datetime STRING,
  order_number LONG,
  order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
  CONSTRAINT fk_customer_id FOREIGN KEY (customer_id) REFERENCES main.default.customers(customer_id)
)
COMMENT "Raw data on sales"
AS SELECT * FROM ...

为表或视图设置配置值

使用 SET 为表或视图指定配置值,包括 Spark 配置。 在 SET 语句有权访问已定义的值之后,在笔记本中定义的任何表或视图。 对 SET 语句之后的任何表或视图执行 Spark 查询时,会使用通过 SET 语句指定的任何 Spark 配置。 若要在查询中读取配置值,请使用字符串内插语法 ${}。 下面的示例设置名为 startDate 的配置值,并在查询中使用该值:

SET startDate='2020-01-01';

CREATE OR REFRESH LIVE TABLE filtered
AS SELECT * FROM src
WHERE date > ${startDate}

若要指定多个配置值,请对每个值使用单独的 SET 语句。

SQL 属性

CREATE TABLE 或 VIEW
TEMPORARY

创建表,但不发布表的元数据。 TEMPORARY 子句指示 Delta Live Tables 创建可用于管道但不应在管道外部访问的表。 为了缩短处理时间,临时表会在创建它的管道的生存期内持久保留,而不仅仅是一次更新。
STREAMING

创建一个表,该表将输入数据集作为流读取。 输入数据集必须是流式处理数据源,例如自动加载程序或 STREAMING 表。
PARTITIONED BY

包含一列或多列的可选列表,用于对表进行分区。
LOCATION

表数据的可选存储位置。 如果未设置,系统将默认为管道存储位置。
COMMENT

表的可选说明。
column_constraint

列的可选信息性主键或外键约束
table_constraint

表的可选信息性主键或外键约束
TBLPROPERTIES

表的表属性可选列表。
select_statement

一个 Delta Live Tables 查询,用于定义表的数据集。
CONSTRAINT 子句
EXPECT expectation_name

定义数据质量约束 expectation_name。 如果未定义 ON VIOLATION 约束,则将违反约束的行添加到目标数据集。
ON VIOLATION

对失败的行执行的可选操作:

* FAIL UPDATE:立即停止管道执行。
* DROP ROW:删除记录并继续处理。

在 Delta Live Tables 中使用 SQL 进行的变更数据捕获

通过 APPLY CHANGES INTO 语句使用 Delta Live Tables CDC 功能,如下所述:

CREATE OR REFRESH STREAMING TABLE table_name;

APPLY CHANGES INTO LIVE.table_name
FROM source
KEYS (keys)
[IGNORE NULL UPDATES]
[APPLY AS DELETE WHEN condition]
[APPLY AS TRUNCATE WHEN condition]
SEQUENCE BY orderByColumn
[COLUMNS {columnList | * EXCEPT (exceptColumnList)}]
[STORED AS {SCD TYPE 1 | SCD TYPE 2}]
[TRACK HISTORY ON {columnList | * EXCEPT (exceptColumnList)}]

使用与非 APPLY CHANGES 查询相同的 CONSTRAINT 子句为 APPLY CHANGES 目标定义数据质量约束。 请参阅使用 Delta Live Tables 管理数据质量

注意

INSERTUPDATE 事件的默认行为是从源更新插入 CDC 事件:更新目标表中与指定键匹配的任何行或在目标表中不存在匹配记录时插入新行。 可以使用 APPLY AS DELETE WHEN 条件指定对 DELETE 事件的处理。

重要

必须声明一个要向其应用更改的目标流式处理表。 可以选择为目标表指定架构。 在指定 APPLY CHANGES 目标表的架构时,还必须包含具有与 sequence_by 字段相同数据类型的 __START_AT__END_AT 列。

请参阅在 Delta Live Tables 中使用 APPLY CHANGES API 简化变更数据捕获

子句
KEYS

唯一标识源数据中的行的列或列组合。 这用于标识哪些 CDC 事件适用于目标表中的特定记录。

该语句是必需的。
IGNORE NULL UPDATES

允许引入包含目标列子集的更新。 当 CDC 事件匹配现有行并指定 IGNORE NULL UPDATES 时,具有 null 的列将在目标中保留其现有值。 这也适用于值为 null 的嵌套列。

此子句是可选的。

默认设置是用 null 值覆盖现有列。
APPLY AS DELETE WHEN

指定何时应将 CDC 事件视为 DELETE 而不是更新插入。 为了处理乱序数据,被删除的行被暂时保留为基础 Delta 表中的无效标记,并在元存储中创建一个视图来筛选掉这些无效标记。 保留间隔可以配置为
pipelines.cdc.tombstoneGCThresholdInSeconds表属性

此子句是可选的。
APPLY AS TRUNCATE WHEN

指定何时应将 CDC 事件视为完整表 TRUNCATE。 由于此子句会触发目标表的完全截断,因此应仅将其用于需要此功能的特定用例。

仅 SCD 类型 1 支持 APPLY AS TRUNCATE WHEN 子句。 SCD 类型 2 不支持截断。

此子句是可选的。
SEQUENCE BY

指定源数据中 CDC 事件的逻辑顺序的列名。 增量实时表使用此排序来处理乱序到达的更改事件。

该语句是必需的。
COLUMNS

指定要包含在目标表中的列子集。 可以:

* 指定要包含的完整列列表:COLUMNS (userId, name, city)
* 指定要排除的列列表:COLUMNS * EXCEPT (operation, sequenceNum)

此子句是可选的。

当未指定 COLUMNS 子句时,默认设置是包含目标表中的所有列。
STORED AS

将记录存储为 SCD 类型 1 还是 SCD 类型 2。

此子句是可选的。

默认值为 SCD 类型 1。
TRACK HISTORY ON

指定输出列的子集,以便在这些指定列发生任何更改时生成历史记录。 可以:

* 指定要跟踪的完整列列表:COLUMNS (userId, name, city)
* 指定要从跟踪中排除的列列表:COLUMNS * EXCEPT (operation, sequenceNum)

此子句是可选的。 默认设置为当发生任何更改时跟踪所有输出列的历史记录,等效于 TRACK HISTORY ON *