AUTO CDC INTO(Lakeflow 声明性流水线)

使用 AUTO CDC ... INTO 语句创建使用 Lakeflow 声明性管道更改数据捕获(CDC)功能的流。 此语句从 CDC 源读取更改并将其应用于流式处理目标。

Syntax

CREATE OR REFRESH STREAMING TABLE table_name;

CREATE FLOW flow_name AS AUTO CDC INTO table_name
FROM source
KEYS (keys)
[IGNORE NULL UPDATES]
[APPLY AS DELETE WHEN condition]
[APPLY AS TRUNCATE WHEN condition]
SEQUENCE BY orderByColumn
[COLUMNS {columnList | * EXCEPT (exceptColumnList)}]
[STORED AS {SCD TYPE 1 | SCD TYPE 2}]
[TRACK HISTORY ON {columnList | * EXCEPT (exceptColumnList)}]

使用与其他 Lakeflow 的声明性管道查询相同的 CONSTRAINT 子句来定义针对目标的数据质量约束。 请参阅通过管道预期管理数据质量

默认行为对于 INSERTUPDATE 事件是从源 更新插入 CDC 事件:对于符合指定键的目标表中的行进行更新,或者在目标表中不存在匹配记录时插入新行。 可以通过DELETE条件来指定APPLY AS DELETE WHEN事件的处理。

重要

您必须声明一个目标流式表,以便应用更改。 可以选择为目标表指定架构。 对于 SCD 类型 2 表,指定目标表的架构时,还必须包含与__START_AT字段具有相同数据类型的__END_ATsequence_by列。

请参阅 AUTO CDC API:使用 Lakeflow 声明性管道简化变更数据捕获

参数

  • flow_name

    要创建的流的名称。

  • source

    数据的源。 源必须是 流媒体 来源。 要使用流式处理语义从源中读取,请使用 STREAM 关键字。 如果读取遇到对现有记录的更改或删除,则会引发错误。 从静态源或仅限追加的源读取是最安全的。 若要引入具有更改提交的数据,可以使用 Python 和 SkipChangeCommits 选项来处理错误。

    有关流数据的详细信息,请参阅 使用管道转换数据

  • KEYS

    用于唯一标识源数据中的行的列或列组合。 这些列中的值用于标识哪些 CDC 事件应用于目标表中的特定记录。

    若要定义列的组合,请使用以逗号分隔的列列表。

    此子句是必需的。

  • IGNORE NULL UPDATES

    允许导入包含目标列子集的更新。 当 CDC 事件与现有行匹配并指定 IGNORE NULL UPDATES 时,具有 null 值的列将保留其目标中的现有值。 这也适用于具有 null 值的嵌套列。

    此子句是可选的。

    默认是用 null 值覆盖现有列。

  • APPLY AS DELETE WHEN

    指定何时应将 CDC 事件视为 DELETE 而不是更新插入。

    对于 SCD 类型 2 源,为了处理无序数据,已删除的行暂时保留为基础 Delta 表中的墓碑,并在元存储中创建一个视图,用于筛选掉这些墓碑。 可以使用pipelines.cdc.tombstoneGCThresholdInSeconds配置保留间隔。

    此子句是可选的。

  • APPLY AS TRUNCATE WHEN

    指定何时应将 CDC 事件视为完整表 TRUNCATE。 由于此子句会触发目标表的完全截断,因此应仅将其用于需要此功能的特定用例。

    APPLY AS TRUNCATE WHEN子句仅支持 SCD 类型 1。 SCD 类型 2 不支持截断操作。

    此子句是可选的。

  • SEQUENCE BY

    指定源数据中 CDC 事件的逻辑顺序的列名。 Lakeflow 声明性管道使用这种排序来处理按无序顺序到达的变更事件。

    如果需要多个列进行排序,可以使用表达式,方法是:它将先按第一个 STRUCT 结构字段进行排序,然后按第二个字段进行排序(如果有相同的值)。

    指定的列必须是可排序的数据类型。

    此条款是必要的。

  • COLUMNS

    指定要包含在目标表中的列的子集。 您可以选择:

    • 指定要包括的列的完整列表: COLUMNS (userId, name, city)
    • 指定要排除的列的列表: COLUMNS * EXCEPT (operation, sequenceNum)

    此子句是可选的。

    当未指定COLUMNS子句时,默认是在目标表中包括所有列。

  • STORED AS

    将记录存储为 SCD 类型 1 还是 SCD 类型 2。

    此子句是可选的。

    默认值为 SCD 类型 1。

  • TRACK HISTORY ON

    指定输出列的子集,以在对这些指定列进行任何更改时生成历史记录。 您可以选择:

    • 指定要跟踪的列的完整列表: COLUMNS (userId, name, city)
    • 指定要从跟踪中排除的列的列表: COLUMNS * EXCEPT (operation, sequenceNum)

    此子句是可选的。 默认值是在发生任何更改时跟踪所有输出列的历史记录,等效于 TRACK HISTORY ON *

例子

-- Create a streaming table, then use AUTO CDC to populate it:
CREATE OR REFRESH STREAMING TABLE target;

CREATE FLOW flow
AS AUTO CDC INTO
  target
FROM stream(cdc_data.users)
  KEYS (userId)
  APPLY AS DELETE WHEN operation = "DELETE"
  SEQUENCE BY sequenceNum
  COLUMNS * EXCEPT (operation, sequenceNum)
  STORED AS SCD TYPE 2
  TRACK HISTORY ON * EXCEPT (city);