CREATE FLOW (Lakeflow 声明性管道)

使用 CREATE FLOW 语句为 Lakeflow 声明性管道表创建流或回填。

语法

CREATE FLOW flow_name [COMMENT comment] AS
{
  AUTO CDC INTO target_table create_auto_cdc_flow_spec |
  INSERT INTO [ONCE] target_table BY NAME query
}

参数

  • flow_name

    要创建的流的名称。

  • 评论

    可选的流程说明。

  • 自动 CDC INTO

    用于定义流的AUTO CDC ... INTO语句,带有create_auto_cdc_flow_spec。 必须包括 AUTO CDC ... INTO 语句或 INSERT INTO 语句。 当源查询使用更改数据语义时使用 AUTO CDC ... INTO

    有关详细信息,请参阅 AUTO CDC INTO (Lakeflow 声明式管道)。

  • target_table

    要更新的表。 这必须是流式处理表。

  • INSERT INTO

    定义插入到目标表中的表查询。 ONCE如果未提供该选项,则查询必须是流式处理查询。 要使用流式处理语义从源中读取,请使用 STREAM 关键字。 如果读取遇到对现有记录的更改或删除,则会引发错误。 从静态源或仅限追加的源读取是最安全的。 若要引入具有更改提交的数据,可以使用 Python 和 SkipChangeCommits 选项来处理错误。

    INSERT INTOAUTO CDC ... INTO 是相互排斥的。 当源数据包括更改数据捕获(CDC)功能时使用 AUTO CDC ... INTO 。 当源未使用INSERT INTO时,使用它。

    有关流数据的详细信息,请参阅 使用管道转换数据

  • 一次

    (可选)可将流程定义为一次性流程,例如回填。 使用 ONCE 以两种方式更改流:

    • querycreate_auto_cdc_flow_spec 不是流式处理表。
    • 该流默认运行一次。 如果管道通过完全刷新进行更新,则 ONCE 流将再次运行以重新创建数据。

例子

-- EXAMPLE 1:
-- Create a streaming table, and add two flows that append data to it:
CREATE OR REFRESH STREAMING TABLE users;

-- first flow into target_table:
CREATE FLOW users_flow AS
INSERT INTO users
SELECT * FROM stream(raw_data.users);

-- second flow into target_table:
CREATE FLOW backfill_users AS
INSERT INTO ONCE users
SELECT * FROM user_backfill_table;

-- EXAMPLE 2:
-- Create a streaming table, and add a flow that applies CDC changes to it:
CREATE OR REFRESH STREAMING TABLE admins_cdc_target_table;

-- first flow into target_table:
CREATE FLOW admin_cdc_flow AS
AUTO CDC INTO admins_cdc_target_table
FROM stream(cdc_data.admins)
KEYS (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
SEQUENCE BY sequenceNum
COLUMNS * EXCEPT (operation, sequenceNum)
STORED AS SCD TYPE 2;