使用 CREATE FLOW
语句为 Lakeflow 声明性管道表创建流或回填。
CREATE FLOW flow_name [COMMENT comment] AS
{
AUTO CDC INTO target_table create_auto_cdc_flow_spec |
INSERT INTO [ONCE] target_table BY NAME query
}
flow_name
要创建的流的名称。
评论
可选的流程说明。
-
用于定义流的
AUTO CDC ... INTO
语句,带有create_auto_cdc_flow_spec
。 必须包括AUTO CDC ... INTO
语句或INSERT INTO
语句。 当源查询使用更改数据语义时使用AUTO CDC ... INTO
。有关详细信息,请参阅 AUTO CDC INTO (Lakeflow 声明式管道)。
target_table
要更新的表。 这必须是流式处理表。
INSERT INTO
定义插入到目标表中的表查询。
ONCE
如果未提供该选项,则查询必须是流式处理查询。 要使用流式处理语义从源中读取,请使用 STREAM 关键字。 如果读取遇到对现有记录的更改或删除,则会引发错误。 从静态源或仅限追加的源读取是最安全的。 若要引入具有更改提交的数据,可以使用 Python 和SkipChangeCommits
选项来处理错误。INSERT INTO
与AUTO CDC ... INTO
是相互排斥的。 当源数据包括更改数据捕获(CDC)功能时使用AUTO CDC ... INTO
。 当源未使用INSERT INTO
时,使用它。有关流数据的详细信息,请参阅 使用管道转换数据。
一次
(可选)可将流程定义为一次性流程,例如回填。 使用
ONCE
以两种方式更改流:- 源
query
或create_auto_cdc_flow_spec
不是流式处理表。 - 该流默认运行一次。 如果管道通过完全刷新进行更新,则
ONCE
流将再次运行以重新创建数据。
- 源
-- EXAMPLE 1:
-- Create a streaming table, and add two flows that append data to it:
CREATE OR REFRESH STREAMING TABLE users;
-- first flow into target_table:
CREATE FLOW users_flow AS
INSERT INTO users
SELECT * FROM stream(raw_data.users);
-- second flow into target_table:
CREATE FLOW backfill_users AS
INSERT INTO ONCE users
SELECT * FROM user_backfill_table;
-- EXAMPLE 2:
-- Create a streaming table, and add a flow that applies CDC changes to it:
CREATE OR REFRESH STREAMING TABLE admins_cdc_target_table;
-- first flow into target_table:
CREATE FLOW admin_cdc_flow AS
AUTO CDC INTO admins_cdc_target_table
FROM stream(cdc_data.admins)
KEYS (userId)
APPLY AS DELETE WHEN
operation = "DELETE"
SEQUENCE BY sequenceNum
COLUMNS * EXCEPT (operation, sequenceNum)
STORED AS SCD TYPE 2;