使用 Lakeflow 声明性管道流以增量方式加载和处理数据

通过 ,在 Lakeflow 声明性管道上处理数据。 每个流都包含 一个查询 ,通常为 一个目标。 流程将查询处理为批处理,或以增量数据流的形式传输到目标中。 流位于 Azure Databricks 的 ETL 管道中。

通常,在 Lakeflow 声明性管道中创建更新目标的查询时,会自动定义流,但也可以显式定义其他流,以便进行更复杂的处理,例如追加到来自多个源的单个目标。

更新

每次更新其定义的流水线时,流程都会运行。 该流将创建或更新具有最新可用数据的表。 更新可能会执行增量刷新(仅处理新记录或执行完全刷新),以重新处理数据源中的所有记录,具体取决于流类型和数据更改的状态。

创建默认流

在管道中创建 Lakeflow Declarative Pipelines 对象时,通常会定义一个表或视图及其支持的查询。 例如,在此 SQL 查询中,通过从名为customers_silver的表中读取数据来创建一个名为customers_bronze的流式表。

CREATE OR REFRESH STREAMING TABLE customers_silver
AS SELECT * FROM STREAM(customers_bronze)

也可以在 Python 中创建相同的流表。 在 Python 中,通常通过创建返回数据帧的查询函数来使用 Lakeflow 声明性管道,并使用修饰器访问 Lakeflow 声明性管道功能:

from pyspark import pipelines as dp

@dp.table()
def customers_silver():
  return spark.readStream.table("customers_bronze")

在此示例中,你创建了一个 流式处理表。 还可以在 SQL 和 Python 中创建具有类似语法的具体化视图。

此示例创建一个默认流,并与流式处理表一起使用。 流式处理表的默认流是 追加 流,它为每个触发器添加新行。 这是使用 Lakeflow 声明性管道(在单个步骤中创建流和目标)的最常用方法。 可以使用此样式引入数据或转换数据。

追加流还支持处理,需要从多个流源读取数据以更新单个目标。 例如,如果您有一个现有的流式处理表和流,并希望添加一个新的流源以写入此现有的流式处理表,则可以使用追加流功能。

使用多个流写入单个目标

在前面的示例中,你在一个步骤中创建了一个流程和一个流式处理表。 也可以为以前创建的表创建流。 在此示例中,可以看到在不同的步骤中创建一个表和与之关联的流。 此代码的结果与创建默认流相同,包括对流式处理表和流使用相同的名称。

Python

from pyspark import pipelines as dp

# create streaming table
dp.create_streaming_table("customers_silver")

# add a flow
@dp.append_flow(
  target = "customers_silver")
def customer_silver():
  return spark.readStream.table("customers_bronze")

SQL

-- create a streaming table
CREATE OR REFRESH STREAMING TABLE customers_silver;

-- add a flow
CREATE FLOW customers_silver
AS INSERT INTO customers_silver BY NAME
SELECT * FROM STREAM(customers_bronze);

创建不依赖于目标的流,这样您还可以创建多个将数据追加到同一目标的流。

使用 Python 接口中的 @dp.append_flow 修饰器或 SQL 接口中的 CREATE FLOW...INSERT INTO 子句来创建新的数据流,例如,可以使其目标为来自多个流源的流式处理表。 使用追加流处理如下任务:

  • 添加将数据追加到现有流式处理表的流源,而无需完全刷新。 例如,你可能有一个表,该表合并了你在其中运行的每个区域的区域数据。 新区域推出后,无需执行完全刷新即可将新区域数据添加到表中。 有关将流式数据源添加到现有流表的示例,请参阅 示例:从多个 Kafka 主题写入流表
  • 通过追加缺失的历史数据(回填)来更新流表。 可以使用INSERT INTO ONCE语法来创建一个只运行一次的历史回填追加操作。 例如,你有一个由 Apache Kafka 主题写入的现有流表。 另外,您有一些存储在表中的历史数据,需要精确地插入到流数据表中一次,并且不能通过流式传输的方式处理这些数据,因为在插入数据之前需要执行复杂的聚合运算。 有关回填的示例,请参阅 使用 Lakeflow 声明性管道回填历史数据
  • 将来自多个源的数据整合并写入一个流处理表,以替代在查询中使用UNION子句。 使用追加流处理,而不是 UNION 允许以增量方式更新目标表,而无需运行 完全刷新更新。 有关以这种方式完成的联合示例,请参阅示例:使用追加流处理,而不是UNION

追加流处理输出的记录的目标可以是现有表或新表。 对于 Python 查询,请使用 create_streaming_table() 函数创建目标表。

以下示例为同一目标添加两个流,从而创建两个源表的联合:

Python

from pyspark import pipelines as dp

# create a streaming table
dp.create_streaming_table("customers_us")

# add the first append flow
@dp.append_flow(target = "customers_us")
def append1():
  return spark.readStream.table("customers_us_west")

# add the second append flow
@dp.append_flow(target = "customers_us")
def append2():
  return spark.readStream.table("customers_us_east")

SQL

-- create a streaming table
CREATE OR REFRESH STREAMING TABLE customers_us;

-- add the first append flow
CREATE FLOW append1
AS INSERT INTO customers_us BY NAME
SELECT * FROM STREAM(customers_us_west);

-- add the second append flow
CREATE FLOW append2
AS INSERT INTO customers_us BY NAME
SELECT * FROM STREAM(customers_us_east);

重要

  • 如果需要使用 预期定义数据质量约束,请将目标表的预期定义为函数的 create_streaming_table() 一部分或现有表定义。 不能在 @append_flow 定义中定义期望。
  • 流由 流名称标识,此名称用于标识流式处理检查点。 使用流名称标识检查点意味着:
    • 如果管道中的现有流已重命名,则检查点不会进行传递,并且重命名的流实际上是一个全新的流。
    • 不能重用管道中的流名称,因为现有检查点与新流定义不匹配。

流类型

流表和物化视图的默认流动方式为追加流。 还可以创建流以从 更改数据捕获 数据源进行读取。 下表描述了不同类型的流。

流类型 Description
Append 追加流是最常见的流类型,其中源中的新记录在每次更新时写入目标系统。 它们对应于结构化流处理中的附加模式。 可以添加 ONCE 标志,指定一个批处理查询,该查询的数据应该只插入目标一次,除非目标完全刷新。 任意数量的追加流都可以写入特定目标。
默认流(使用目标流式处理表或具体化视图创建)将具有与目标相同的名称。 其他目标没有默认流。
自动 CDC (以前 应用更改 自动 CDC 流引入包含更改数据捕获(CDC)数据的查询。 自动 CDC 流只能面向流式处理表,并且源必须是流源(即使在使用ONCE流的情况下,也必须如此)。 多个自动 CDC 流可以面向单个流式处理表。 作为自动 CDC 流目标的流式表只能被其他自动 CDC 流用作目标。
有关 CDC 数据的详细信息,请参阅 AUTO CDC API:使用 Lakeflow 声明性管道简化更改数据捕获

其他信息

有关流及其用法的详细信息,请参阅以下主题: