APPLY CHANGES INTO (DLT)

通过 APPLY CHANGES INTO 语句使用 DLT 更改数据捕获 (CDC) 功能。 此语句创建一个流,用于从 CDC 源读取更改并将其应用于流目标。

语法

CREATE OR REFRESH STREAMING TABLE table_name;

APPLY CHANGES INTO table_name
FROM source
KEYS (keys)
[IGNORE NULL UPDATES]
[APPLY AS DELETE WHEN condition]
[APPLY AS TRUNCATE WHEN condition]
SEQUENCE BY orderByColumn
[COLUMNS {columnList | * EXCEPT (exceptColumnList)}]
[STORED AS {SCD TYPE 1 | SCD TYPE 2}]
[TRACK HISTORY ON {columnList | * EXCEPT (exceptColumnList)}]

使用与非 APPLY CHANGES 查询相同的 CONSTRAINT 子句为 APPLY CHANGES 目标定义数据质量约束。 请参阅通过管道预期管理数据质量

INSERTUPDATE 事件的默认行为是从源中更新或插入 CDC 事件:更新目标表中与指定键匹配的行,或在目标表中不存在匹配记录时插入新行。 可以使用 DELETE 条件指定对 APPLY AS DELETE WHEN 事件的处理。

重要

必须声明一个要向其应用更改的目标流式处理表。 可以选择为目标表指定架构。 对于 SCD 类型 2 表,指定目标表的架构时,还必须包含数据类型与 sequence_by 字段相同的 __START_AT__END_AT 列。

请参阅 “应用更改 API:使用 DLT 简化更改数据捕获”。

参数

  • source

    数据的源。 源必须是 流媒体 源。 要使用流式处理语义从源中读取,请使用 STREAM 关键字。 如果读取遇到对现有记录的更改或删除,则会引发错误。 从静态源或仅限追加的源读取是最安全的。 若要引入具有更改提交的数据,可以使用 Python 和 SkipChangeCommits 选项来处理错误。

    有关流数据的详细信息,请参阅 使用管道转换数据

  • KEYS

    用于唯一标识源数据中的行的列或列组合。 这些列中的值用于标识哪些 CDC 事件应用于目标表中的特定记录。

    若要定义列的组合,请使用以逗号分隔的列列表。

    此条款是必需的。

  • IGNORE NULL UPDATES

    允许引入包含目标列子集的更新。 当 CDC 事件与现有行匹配并指定 IGNORE NULL UPDATES 时,具有 null 值的列将保留其目标中的现有值。 这也适用于具有 null 值的嵌套列。

    此子句是可选的。

    默认设置是用 null 值覆盖现有列。

  • APPLY AS DELETE WHEN

    指定何时应将 CDC 事件视为 DELETE 而不是更新插入。

    对于 SCD 类型 2 源,为了处理无序数据,已删除的行暂时保留为基础 Delta 表中的墓碑,并在元存储中创建一个视图,用于筛选掉这些墓碑。 可以使用表属性配置pipelines.cdc.tombstoneGCThresholdInSeconds保留间隔。

    此子句是可选的。

  • APPLY AS TRUNCATE WHEN

    指定何时应将 CDC 事件视为完整表 TRUNCATE。 由于此子句会触发目标表的完全截断,因此应仅将其用于需要此功能的特定用例。

    仅支持为 SCD 类型 1 使用 APPLY AS TRUNCATE WHEN 子句。 SCD 类型 2 不支持截断操作。

    此子句是可选的。

  • SEQUENCE BY

    用于指定源数据中 CDC 事件的逻辑顺序的列名。 DLT 使用此排序来处理以无序方式到达的更改事件。

    如果需要多个列进行排序处理,请使用表达式 STRUCT:它将先按第一个结构字段进行排序,然后如果有相同的值,再按第二个字段排序,依此类推。

    指定的列必须是可排序的数据类型。

    该语句是必需的。

  • COLUMNS

    指定要包含在目标表中的列子集。 您可以选择:

    • 指定要包含的完整列列表:COLUMNS (userId, name, city)
    • 指定要排除的列的列表: COLUMNS * EXCEPT (operation, sequenceNum)

    此子句是可选的。

    当未指定 COLUMNS 子句时,默认设置是包含目标表中的所有列。

  • STORED AS

    将记录存储为 SCD 类型 1 还是 SCD 类型 2。

    此子句是可选的。

    默认值为 SCD 类型 1。

  • TRACK HISTORY ON

    指定输出列的子集,以便在这些指定列发生任何更改时生成历史记录。 您可以选择:

    • 指定要跟踪的列的完整列表: COLUMNS (userId, name, city)
    • 指定要从跟踪中排除的列的列表: COLUMNS * EXCEPT (operation, sequenceNum)

    此子句是可选的。 默认设置为当发生任何更改时跟踪所有输出列的历史记录,等效于 TRACK HISTORY ON *

例子

-- Create a streaming table, then use apply changes into to populate it:
CREATE OR REFRESH STREAMING TABLE target;

APPLY CHANGES INTO target
  FROM stream(cdc_data.users)
  KEYS (userId)
  APPLY AS DELETE WHEN operation = "DELETE"
  SEQUENCE BY sequenceNum
  COLUMNS * EXCEPT (operation, sequenceNum)
  STORED AS SCD TYPE 2
  TRACK HISTORY ON * EXCEPT (city)