Nota:
El acceso a esta página requiere autorización. Puede intentar iniciar sesión o cambiar directorios.
El acceso a esta página requiere autorización. Puede intentar cambiar los directorios.
流式处理表是 Delta 表,具有支持流式或增量数据处理的额外功能。 流式处理表可由管道中的一个或多个流作为目标。
流式处理表是数据导入的理想选择,原因如下:
- 每个输入行只处理一次,这对绝大多数引入工作负载建模(即,通过将行追加或插入更新到表中来实现)。
- 它们可以处理大量仅追加的数据。
流式表对于低延迟流数据转换也是一个出色的选择,因为它们能够在行和时间窗口上进行推理,处理海量数据并实现快速处理。
下图显示了数据流如何从流式源读取数据,并逐步写入管道中的流式表。
每次更新时,与流式处理表关联的流读取流源中更改的信息,并将新信息追加到该表。
流式处理表由单个管道拥有和更新。 你需要在管道的源代码中明确定义流式表。 管道定义的表不能由任何其他管道更改或更新。 可以定义多个流以追加到单个流式处理表。
Azure Databricks创建内部表以支持流式表处理。 这些表出现于 system.information_schema.tables,但在目录资源管理器或其他工作区 UI 页面中不可见。
注释
使用 Databricks SQL 在管道外部创建流式处理表时,Azure Databricks创建用于更新表的管道。 可以通过从工作区左侧导航中选择ETL. 在 Databricks SQL 中创建的流表的类型为 MV/ST.
有关流的详细信息,请参阅 使用 Lakeflow Spark 声明性管道流以增量方式加载和处理数据。
用于引入的流式处理表
流式处理表专为仅追加的数据源设计,并且仅处理一次输入。 这使得它们非常适合用于数据摄取工作负载,其中数据持续到达,并且必须可靠地捕获,不必重新处理已有记录。 Azure Databricks支持从云存储和流式传输消息总线引入数据。
从云存储引入文件
可以使用流式处理表从云存储引入新文件。 这些示例使用自动加载程序在到达时以增量方式处理新文件。
Python
from pyspark import pipelines as dp
# Create a streaming table
@dp.table
def customers_bronze():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.inferColumnTypes", "true")
.load("/Volumes/path/to/files")
)
要创建流式表,数据集定义必须是流类型。 在数据集定义中使用 spark.readStream 函数时,它将返回流数据集。
SQL
-- Create a streaming table
CREATE OR REFRESH STREAMING TABLE customers_bronze
AS SELECT * FROM STREAM read_files(
"/volumes/path/to/files",
format => "json"
);
流式处理表需要流式处理数据集。 关键字 STREAM 告诉 read_files 查询将数据集视为流。
引入流式处理消息
还可以使用流表从消息总线引入数据。 以下示例演示如何创建从 Pub/Sub 主题读取的流表。
Python
@dp.table
def pubsub_raw():
auth_options = {
"clientId": client_id,
"clientEmail": client_email,
"privateKey": private_key,
"privateKeyId": private_key_id
}
return (
spark.readStream
.format("pubsub")
.option("subscriptionId", "my-subscription")
.option("topicId", "my-topic")
.option("projectId", "my-project")
.options(auth_options)
.load()
)
SQL
CREATE OR REFRESH STREAMING TABLE pubsub_raw
AS SELECT * FROM STREAM read_pubsub(
subscriptionId => 'my-subscription',
projectId => 'my-project',
topicId => 'my-topic',
clientEmail => secret('pubsub-scope', 'clientEmail'),
clientId => secret('pubsub-scope', 'clientId'),
privateKeyId => secret('pubsub-scope', 'privateKeyId'),
privateKey => secret('pubsub-scope', 'privateKey')
);
Databricks 建议在提供授权选项时使用机密。
有关将数据加载到流表中的更多详细信息,请参阅在管道中加载数据。
下列图表展示了仅追加流式处理表的工作原理。
在管道的后续更新中,已追加到流式处理表的行将不会被重新查询。 如果修改查询(例如,从 SELECT LOWER (name) 修改为 SELECT UPPER (name)),现有行不会更新为大写,但新行将是大写。 可以触发完整刷新,以重新查询源表中的所有现有数据,更新流表中的所有行。
流式处理表和低延迟流式处理
流式处理表适用于针对有界状态的低延迟流式处理。 流式处理表使用检查点管理,因此非常适合实现低延迟流式处理。 但是,它们期望的是自然有界流或带有水印的有界流。
自然有界流由具有明确定义的开始和结束的流式处理数据源生成。 自然有界流的一个示例是从文件目录读取数据,这种文件目录在放置初始批文件后不会添加新文件。 流被视为有限,因为文件数有限,并且流在处理完所有文件后结束。
还可以使用水印来设置流的边界。 结构化流中的水印是一种机制,它通过指定系统应在将时间窗口视为完成之前等待延迟事件的时间长度来帮助处理延迟数据。 没有时间戳的无界流可能会导致管道因内存压力而失败。
有关有状态流处理的详细信息,请参阅 使用水印优化有状态处理。
流-快照联接
流快照联接将流数据集连接到在流开始时快照的维度表。 由于维度表在该时间点被视为固定的,因此在流启动后对它所做的任何更改都不会反映在联接中。 当小差异无关紧要时,这是可以接受的,例如,当事务数比客户数量大得多时。
下面的代码示例将一个包含两行的维度表与一个不断增加的数据集 customers 进行联接,表名为 transactions。 它在名为 customer_id=3, name=Zoya),则联接中不会显示此新行,因为启动流时快照了静态维度表。
from pyspark import pipelines as dp
@dp.temporary_view
# assume this table contains an append-only stream of rows about transactions
# (customer_id=1, value=100)
# (customer_id=2, value=150)
# (customer_id=3, value=299)
# ... <and so on> ...
def v_transactions():
return spark.readStream.table("transactions")
# assume this table contains only these two rows about customers
# (customer_id=1, name=Bilal)
# (customer_id=2, name=Olga)
@dp.temporary_view
def v_customers():
return spark.read.table("customers")
@dp.table
def sales_report():
facts = spark.readStream.table("v_transactions")
dims = spark.read.table("v_customers")
return facts.join(dims, on="customer_id", how="inner")
流式处理表的限制
流式处理表具有以下限制:
-
有限的演变: 无需重新计算整个数据集即可更改查询。 如果没有完全刷新,流式处理表只看到每一行一次,因此不同的查询将处理不同的行。 例如,在查询中将
UPPER()添加到字段之后,只有更改后处理的行才会转换为大写。 这意味着必须了解在数据集上运行的所有以前版本的查询。 若要在更改之前重新处理已处理的现有行,需要完全刷新。 - 状态管理: 流式处理表具有低延迟特性,并需要使用自然限定的流或通过水印限定的流。 更多信息,请参阅使用水印优化有状态处理。
- 联接不会重新计算: 当维度发生更改时,流式处理表中的联接不会重新计算。 这种特性在“快速但错误”的情境下可能是有益的。 如果希望视图始终正确,可能需要使用具体化视图。 具体化视图始终正确,因为它们在维度更改时自动重新计算联接。 有关详细信息,请参阅 具体化视图。