CREATE FLOW(Lakeflow 声明性管道)

使用 CREATE FLOW 语句为您的 Lakeflow 声明性管道表创建流或回填。

Syntax

CREATE FLOW flow_name [COMMENT comment] AS
{
  AUTO CDC INTO target_table create_auto_cdc_flow_spec |
  INSERT INTO [ONCE] target_table BY NAME query
}

参数

  • flow_name

    要创建的流的名称。

  • 评论

    流的可选说明。

  • 自动 CDC INTO

    定义流程的语句是使用 AUTO CDC ... INTOcreate_auto_cdc_flow_spec。 必须包括 AUTO CDC ... INTO 语句或 INSERT INTO 语句。 当源查询使用更改数据语义时使用 AUTO CDC ... INTO

    有关更多信息,请参阅 AUTO CDC INTO (Lakeflow 声明式流程)。

  • target_table

    要更新的表。 这必须是流式处理表。

  • INSERT INTO

    定义了一个查询,该查询用于向目标表插入数据。 ONCE如果未提供该选项,则查询必须是流式处理查询。 要使用流式处理语义从源中读取,请使用 STREAM 关键字。 如果读取遇到对现有记录的更改或删除,则会引发错误。 从静态源或仅限追加的源读取是最安全的。 若要引入具有更改提交的数据,可以使用 Python 和 SkipChangeCommits 选项来处理错误。

    INSERT INTOAUTO CDC ... INTO 是互斥的。 如果源数据包括更改数据捕获(CDC)功能,请使用 AUTO CDC ... INTO 。 当源文件未使用时使用INSERT INTO

    有关流数据的详细信息,请参阅 使用管道转换数据

  • 一次

    可选地将流定义为一次性流,例如回填流。 通过两种方式使用 ONCE 更改流:

    • querycreate_auto_cdc_flow_spec 不是流式处理表。
    • 默认情况运行一次。 如果管道通过完全刷新进行更新,则 ONCE 流将再次运行以重新创建数据。

例子

-- EXAMPLE 1:
-- Create a streaming table, and add two flows that append data to it:
CREATE OR REFRESH STREAMING TABLE users;

-- first flow into target_table:
CREATE FLOW users_flow AS
INSERT INTO users
SELECT * FROM stream(raw_data.users);

-- second flow into target_table:
CREATE FLOW backfill_users AS
INSERT INTO ONCE users
SELECT * FROM user_backfill_table;

-- EXAMPLE 2:
-- Create a streaming table, and add a flow that applies CDC changes to it:
CREATE OR REFRESH STREAMING TABLE admins_cdc_target_table;

-- first flow into target_table:
CREATE FLOW admin_cdc_flow AS
AUTO CDC INTO admins_cdc_target_table
FROM stream(cdc_data.admins)
KEYS (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
SEQUENCE BY sequenceNum
COLUMNS * EXCEPT (operation, sequenceNum)
STORED AS SCD TYPE 2;