通过 APPLY CHANGES INTO
语句使用 DLT 更改数据捕获 (CDC) 功能。 此语句创建一个流,用于从 CDC 源读取更改并将其应用于流目标。
- 若要了解 CDC,请参阅 什么是变更数据捕获(CDC)?。
- 有关将“应用更改”与 CDC 配合使用的更多详细信息,请参阅 “应用更改 API:使用 DLT 简化更改数据捕获”。
语法
CREATE OR REFRESH STREAMING TABLE table_name;
APPLY CHANGES INTO table_name
FROM source
KEYS (keys)
[IGNORE NULL UPDATES]
[APPLY AS DELETE WHEN condition]
[APPLY AS TRUNCATE WHEN condition]
SEQUENCE BY orderByColumn
[COLUMNS {columnList | * EXCEPT (exceptColumnList)}]
[STORED AS {SCD TYPE 1 | SCD TYPE 2}]
[TRACK HISTORY ON {columnList | * EXCEPT (exceptColumnList)}]
使用与非 APPLY CHANGES
查询相同的 CONSTRAINT
子句为 APPLY CHANGES
目标定义数据质量约束。 请参阅通过管道预期管理数据质量。
INSERT
和 UPDATE
事件的默认行为是从源中更新或插入 CDC 事件:更新目标表中与指定键匹配的行,或在目标表中不存在匹配记录时插入新行。 可以使用 DELETE
条件指定对 APPLY AS DELETE WHEN
事件的处理。
重要
必须声明一个要向其应用更改的目标流式处理表。 可以选择为目标表指定架构。 对于 SCD 类型 2 表,指定目标表的架构时,还必须包含数据类型与 sequence_by
字段相同的 __START_AT
和 __END_AT
列。
请参阅 “应用更改 API:使用 DLT 简化更改数据捕获”。
参数
source
数据的源。 源必须是 流媒体 源。 要使用流式处理语义从源中读取,请使用 STREAM 关键字。 如果读取遇到对现有记录的更改或删除,则会引发错误。 从静态源或仅限追加的源读取是最安全的。 若要引入具有更改提交的数据,可以使用 Python 和
SkipChangeCommits
选项来处理错误。有关流数据的详细信息,请参阅 使用管道转换数据。
KEYS
用于唯一标识源数据中的行的列或列组合。 这些列中的值用于标识哪些 CDC 事件应用于目标表中的特定记录。
若要定义列的组合,请使用以逗号分隔的列列表。
此条款是必需的。
IGNORE NULL UPDATES
允许引入包含目标列子集的更新。 当 CDC 事件与现有行匹配并指定 IGNORE NULL UPDATES 时,具有
null
值的列将保留其目标中的现有值。 这也适用于具有null
值的嵌套列。此子句是可选的。
默认设置是用
null
值覆盖现有列。APPLY AS DELETE WHEN
指定何时应将 CDC 事件视为
DELETE
而不是更新插入。对于 SCD 类型 2 源,为了处理无序数据,已删除的行暂时保留为基础 Delta 表中的墓碑,并在元存储中创建一个视图,用于筛选掉这些墓碑。 可以使用表属性配置
pipelines.cdc.tombstoneGCThresholdInSeconds
保留间隔。此子句是可选的。
APPLY AS TRUNCATE WHEN
指定何时应将 CDC 事件视为完整表
TRUNCATE
。 由于此子句会触发目标表的完全截断,因此应仅将其用于需要此功能的特定用例。仅支持为 SCD 类型 1 使用
APPLY AS TRUNCATE WHEN
子句。 SCD 类型 2 不支持截断操作。此子句是可选的。
SEQUENCE BY
用于指定源数据中 CDC 事件的逻辑顺序的列名。 DLT 使用此排序来处理以无序方式到达的更改事件。
如果需要多个列进行排序处理,请使用表达式
STRUCT
:它将先按第一个结构字段进行排序,然后如果有相同的值,再按第二个字段排序,依此类推。指定的列必须是可排序的数据类型。
该语句是必需的。
COLUMNS
指定要包含在目标表中的列子集。 您可以选择:
- 指定要包含的完整列列表:
COLUMNS (userId, name, city)
。 - 指定要排除的列的列表:
COLUMNS * EXCEPT (operation, sequenceNum)
此子句是可选的。
当未指定
COLUMNS
子句时,默认设置是包含目标表中的所有列。- 指定要包含的完整列列表:
STORED AS
将记录存储为 SCD 类型 1 还是 SCD 类型 2。
此子句是可选的。
默认值为 SCD 类型 1。
TRACK HISTORY ON
指定输出列的子集,以便在这些指定列发生任何更改时生成历史记录。 您可以选择:
- 指定要跟踪的列的完整列表:
COLUMNS (userId, name, city)
。 - 指定要从跟踪中排除的列的列表:
COLUMNS * EXCEPT (operation, sequenceNum)
此子句是可选的。 默认设置为当发生任何更改时跟踪所有输出列的历史记录,等效于
TRACK HISTORY ON *
。- 指定要跟踪的列的完整列表:
例子
-- Create a streaming table, then use apply changes into to populate it:
CREATE OR REFRESH STREAMING TABLE target;
APPLY CHANGES INTO target
FROM stream(cdc_data.users)
KEYS (userId)
APPLY AS DELETE WHEN operation = "DELETE"
SEQUENCE BY sequenceNum
COLUMNS * EXCEPT (operation, sequenceNum)
STORED AS SCD TYPE 2
TRACK HISTORY ON * EXCEPT (city)