使用 AUTO CDC ... INTO
语句创建使用 Lakeflow 声明性管道更改数据捕获(CDC)功能的流。 此语句从 CDC 源读取更改并将其应用于流式处理目标。
- 若要了解 CDC,请参阅 什么是变更数据捕获(CDC)?。
- 有关使用
AUTO CDC
的详细信息,请参阅 AUTO CDC API:使用 Lakeflow 声明性管道简化更改数据捕获。 - 如需了解更多信息
CREATE FLOW
,请参阅 CREATE FLOW(Lakeflow 声明性管道)。
Syntax
CREATE OR REFRESH STREAMING TABLE table_name;
CREATE FLOW flow_name AS AUTO CDC INTO table_name
FROM source
KEYS (keys)
[IGNORE NULL UPDATES]
[APPLY AS DELETE WHEN condition]
[APPLY AS TRUNCATE WHEN condition]
SEQUENCE BY orderByColumn
[COLUMNS {columnList | * EXCEPT (exceptColumnList)}]
[STORED AS {SCD TYPE 1 | SCD TYPE 2}]
[TRACK HISTORY ON {columnList | * EXCEPT (exceptColumnList)}]
使用与其他 Lakeflow 的声明性管道查询相同的 CONSTRAINT
子句来定义针对目标的数据质量约束。 请参阅通过管道预期管理数据质量。
默认行为对于 INSERT
和 UPDATE
事件是从源 更新插入 CDC 事件:对于符合指定键的目标表中的行进行更新,或者在目标表中不存在匹配记录时插入新行。 可以通过DELETE
条件来指定APPLY AS DELETE WHEN
事件的处理。
重要
您必须声明一个目标流式表,以便应用更改。 可以选择为目标表指定架构。 对于 SCD 类型 2 表,指定目标表的架构时,还必须包含与__START_AT
字段具有相同数据类型的__END_AT
sequence_by
列。
请参阅 AUTO CDC API:使用 Lakeflow 声明性管道简化变更数据捕获。
参数
flow_name
要创建的流的名称。
source
数据的源。 源必须是 流媒体 来源。 要使用流式处理语义从源中读取,请使用 STREAM 关键字。 如果读取遇到对现有记录的更改或删除,则会引发错误。 从静态源或仅限追加的源读取是最安全的。 若要引入具有更改提交的数据,可以使用 Python 和
SkipChangeCommits
选项来处理错误。有关流数据的详细信息,请参阅 使用管道转换数据。
KEYS
用于唯一标识源数据中的行的列或列组合。 这些列中的值用于标识哪些 CDC 事件应用于目标表中的特定记录。
若要定义列的组合,请使用以逗号分隔的列列表。
此子句是必需的。
IGNORE NULL UPDATES
允许导入包含目标列子集的更新。 当 CDC 事件与现有行匹配并指定 IGNORE NULL UPDATES 时,具有
null
值的列将保留其目标中的现有值。 这也适用于具有null
值的嵌套列。此子句是可选的。
默认是用
null
值覆盖现有列。APPLY AS DELETE WHEN
指定何时应将 CDC 事件视为
DELETE
而不是更新插入。对于 SCD 类型 2 源,为了处理无序数据,已删除的行暂时保留为基础 Delta 表中的墓碑,并在元存储中创建一个视图,用于筛选掉这些墓碑。 可以使用
pipelines.cdc.tombstoneGCThresholdInSeconds
配置保留间隔。此子句是可选的。
APPLY AS TRUNCATE WHEN
指定何时应将 CDC 事件视为完整表
TRUNCATE
。 由于此子句会触发目标表的完全截断,因此应仅将其用于需要此功能的特定用例。APPLY AS TRUNCATE WHEN
子句仅支持 SCD 类型 1。 SCD 类型 2 不支持截断操作。此子句是可选的。
SEQUENCE BY
指定源数据中 CDC 事件的逻辑顺序的列名。 Lakeflow 声明性管道使用这种排序来处理按无序顺序到达的变更事件。
如果需要多个列进行排序,可以使用表达式,方法是:它将先按第一个
STRUCT
结构字段进行排序,然后按第二个字段进行排序(如果有相同的值)。指定的列必须是可排序的数据类型。
此条款是必要的。
COLUMNS
指定要包含在目标表中的列的子集。 您可以选择:
- 指定要包括的列的完整列表:
COLUMNS (userId, name, city)
。 - 指定要排除的列的列表:
COLUMNS * EXCEPT (operation, sequenceNum)
此子句是可选的。
当未指定
COLUMNS
子句时,默认是在目标表中包括所有列。- 指定要包括的列的完整列表:
STORED AS
将记录存储为 SCD 类型 1 还是 SCD 类型 2。
此子句是可选的。
默认值为 SCD 类型 1。
TRACK HISTORY ON
指定输出列的子集,以在对这些指定列进行任何更改时生成历史记录。 您可以选择:
- 指定要跟踪的列的完整列表:
COLUMNS (userId, name, city)
。 - 指定要从跟踪中排除的列的列表:
COLUMNS * EXCEPT (operation, sequenceNum)
此子句是可选的。 默认值是在发生任何更改时跟踪所有输出列的历史记录,等效于
TRACK HISTORY ON *
。- 指定要跟踪的列的完整列表:
例子
-- Create a streaming table, then use AUTO CDC to populate it:
CREATE OR REFRESH STREAMING TABLE target;
CREATE FLOW flow
AS AUTO CDC INTO
target
FROM stream(cdc_data.users)
KEYS (userId)
APPLY AS DELETE WHEN operation = "DELETE"
SEQUENCE BY sequenceNum
COLUMNS * EXCEPT (operation, sequenceNum)
STORED AS SCD TYPE 2
TRACK HISTORY ON * EXCEPT (city);