通过 流,在 Lakeflow 声明性管道上处理数据。 每个流都包含 一个查询 ,通常为 一个目标。 流程将查询处理为批处理,或以增量数据流的形式传输到目标中。 流位于 Azure Databricks 的 ETL 管道中。
通常,在 Lakeflow 声明性管道中创建更新目标的查询时,会自动定义流,但也可以显式定义其他流,以便进行更复杂的处理,例如追加到来自多个源的单个目标。
更新
每次更新其定义的流水线时,流程都会运行。 该流将创建或更新具有最新可用数据的表。 更新可能会执行增量刷新(仅处理新记录或执行完全刷新),以重新处理数据源中的所有记录,具体取决于流类型和数据更改的状态。
- 有关管道更新的详细信息,请参阅 在 Lakeflow 声明性管道中运行更新。
- 有关调度和触发更新的详细信息,请参阅 触发与连续管道模式。
创建默认流
在管道中创建 Lakeflow Declarative Pipelines 对象时,通常会定义一个表或视图及其支持的查询。 例如,在此 SQL 查询中,通过从名为customers_silver
的表中读取数据来创建一个名为customers_bronze
的流式表。
CREATE OR REFRESH STREAMING TABLE customers_silver
AS SELECT * FROM STREAM(customers_bronze)
也可以在 Python 中创建相同的流表。 在 Python 中,通常通过创建返回数据帧的查询函数来使用 Lakeflow 声明性管道,并使用修饰器访问 Lakeflow 声明性管道功能:
from pyspark import pipelines as dp
@dp.table()
def customers_silver():
return spark.readStream.table("customers_bronze")
在此示例中,你创建了一个 流式处理表。 还可以在 SQL 和 Python 中创建具有类似语法的具体化视图。
此示例创建一个默认流,并与流式处理表一起使用。 流式处理表的默认流是 追加 流,它为每个触发器添加新行。 这是使用 Lakeflow 声明性管道(在单个步骤中创建流和目标)的最常用方法。 可以使用此样式引入数据或转换数据。
追加流还支持处理,需要从多个流源读取数据以更新单个目标。 例如,如果您有一个现有的流式处理表和流,并希望添加一个新的流源以写入此现有的流式处理表,则可以使用追加流功能。
使用多个流写入单个目标
在前面的示例中,你在一个步骤中创建了一个流程和一个流式处理表。 也可以为以前创建的表创建流。 在此示例中,可以看到在不同的步骤中创建一个表和与之关联的流。 此代码的结果与创建默认流相同,包括对流式处理表和流使用相同的名称。
Python
from pyspark import pipelines as dp
# create streaming table
dp.create_streaming_table("customers_silver")
# add a flow
@dp.append_flow(
target = "customers_silver")
def customer_silver():
return spark.readStream.table("customers_bronze")
SQL
-- create a streaming table
CREATE OR REFRESH STREAMING TABLE customers_silver;
-- add a flow
CREATE FLOW customers_silver
AS INSERT INTO customers_silver BY NAME
SELECT * FROM STREAM(customers_bronze);
创建不依赖于目标的流,这样您还可以创建多个将数据追加到同一目标的流。
使用 Python 接口中的 @dp.append_flow
修饰器或 SQL 接口中的 CREATE FLOW...INSERT INTO
子句来创建新的数据流,例如,可以使其目标为来自多个流源的流式处理表。 使用追加流处理如下任务:
- 添加将数据追加到现有流式处理表的流源,而无需完全刷新。 例如,你可能有一个表,该表合并了你在其中运行的每个区域的区域数据。 新区域推出后,无需执行完全刷新即可将新区域数据添加到表中。 有关将流式数据源添加到现有流表的示例,请参阅 示例:从多个 Kafka 主题写入流表。
- 通过追加缺失的历史数据(回填)来更新流表。 可以使用
INSERT INTO ONCE
语法来创建一个只运行一次的历史回填追加操作。 例如,你有一个由 Apache Kafka 主题写入的现有流表。 另外,您有一些存储在表中的历史数据,需要精确地插入到流数据表中一次,并且不能通过流式传输的方式处理这些数据,因为在插入数据之前需要执行复杂的聚合运算。 有关回填的示例,请参阅 使用 Lakeflow 声明性管道回填历史数据。 - 将来自多个源的数据整合并写入一个流处理表,以替代在查询中使用
UNION
子句。 使用追加流处理,而不是UNION
允许以增量方式更新目标表,而无需运行 完全刷新更新。 有关以这种方式完成的联合示例,请参阅示例:使用追加流处理,而不是UNION
。
追加流处理输出的记录的目标可以是现有表或新表。 对于 Python 查询,请使用 create_streaming_table() 函数创建目标表。
以下示例为同一目标添加两个流,从而创建两个源表的联合:
Python
from pyspark import pipelines as dp
# create a streaming table
dp.create_streaming_table("customers_us")
# add the first append flow
@dp.append_flow(target = "customers_us")
def append1():
return spark.readStream.table("customers_us_west")
# add the second append flow
@dp.append_flow(target = "customers_us")
def append2():
return spark.readStream.table("customers_us_east")
SQL
-- create a streaming table
CREATE OR REFRESH STREAMING TABLE customers_us;
-- add the first append flow
CREATE FLOW append1
AS INSERT INTO customers_us BY NAME
SELECT * FROM STREAM(customers_us_west);
-- add the second append flow
CREATE FLOW append2
AS INSERT INTO customers_us BY NAME
SELECT * FROM STREAM(customers_us_east);
重要
- 如果需要使用 预期定义数据质量约束,请将目标表的预期定义为函数的
create_streaming_table()
一部分或现有表定义。 不能在@append_flow
定义中定义期望。 - 流由 流名称标识,此名称用于标识流式处理检查点。 使用流名称标识检查点意味着:
- 如果管道中的现有流已重命名,则检查点不会进行传递,并且重命名的流实际上是一个全新的流。
- 不能重用管道中的流名称,因为现有检查点与新流定义不匹配。
流类型
流表和物化视图的默认流动方式为追加流。 还可以创建流以从 更改数据捕获 数据源进行读取。 下表描述了不同类型的流。
流类型 | Description |
---|---|
Append |
追加流是最常见的流类型,其中源中的新记录在每次更新时写入目标系统。 它们对应于结构化流处理中的附加模式。 可以添加 ONCE 标志,指定一个批处理查询,该查询的数据应该只插入目标一次,除非目标完全刷新。 任意数量的追加流都可以写入特定目标。默认流(使用目标流式处理表或具体化视图创建)将具有与目标相同的名称。 其他目标没有默认流。 |
自动 CDC (以前 应用更改) |
自动 CDC 流引入包含更改数据捕获(CDC)数据的查询。 自动 CDC 流只能面向流式处理表,并且源必须是流源(即使在使用ONCE 流的情况下,也必须如此)。 多个自动 CDC 流可以面向单个流式处理表。 作为自动 CDC 流目标的流式表只能被其他自动 CDC 流用作目标。有关 CDC 数据的详细信息,请参阅 AUTO CDC API:使用 Lakeflow 声明性管道简化更改数据捕获。 |
其他信息
有关流及其用法的详细信息,请参阅以下主题: