使用增量实时表流以增量方式加载和处理数据
本文介绍什么是流,以及如何使用增量实时表管道中的流,以增量方式将源中的数据处理到目标流式处理表。 在增量实时表中,流以两种方式定义:
- 创建更新流式处理表的查询时,会自动定义流。
- 增量实时表还提供显式定义流的功能,以便进行更复杂的处理,例如从多个流式处理源追加到流式处理表。
本文讨论在定义查询以更新流式处理表时创建的隐式流,然后详细解说用于定义更复杂流的语法。
什么是流?
在增量实时表中,“流”是一个流式处理查询,该查询以增量方式处理源数据以更新目标流式处理表。 在管道中创建的大多数增量实时表数据集将流定义为查询的一部分,并且不需要显式定义流。 例如,在单个 DDL 命令中创建增量实时表中的流式处理表,而不是使用单独的表和流语句来创建流式处理表:
注意
此 CREATE FLOW
示例仅用于说明目的,其中的关键字是无效的增量实时表语法。
CREATE STREAMING TABLE raw_data
AS SELECT * FROM source_data("/path/to/source/data")
-- The above query is equivalent to the following statements:
CREATE STREAMING TABLE raw_data;
CREATE FLOW raw_data
AS INSERT INTO raw_data BY NAME
SELECT * FROM source_data("/path/to/source/data");
除了查询定义的默认流外,增量实时表 Python 和 SQL 接口还提供“追加流”功能。 追加流支持处理需要从多个流式处理源读取数据来更新单个流式处理表的情况。 例如,如果你现在已有流式处理表和流,并且想要添加能写入此现有流式处理表的新流式处理源,则可以使用追加流功能。
使用追加流从多个源流写入流式处理表
注意
若要使用追加流处理,必须将管道配置为使用预览通道。
使用 Python 接口中的 @append_flow
修饰器或 SQL 接口中的 CREATE FLOW
子句从多个流式处理源写入流式处理表。 使用追加流处理如下任务:
- 添加将数据追加到现有流式处理表的流式处理源,无需完全刷新。 例如,你可能有一个表合并了业务所在的每个区域的区域数据。 新区域推出后,你无需执行完全刷新即可将新区域数据添加到该表中。 参阅“示例:从多个 Kafka 主题写入流式处理表”。
- 通过追加缺失的历史数据(回填)来更新流式处理表。 例如,你现在有一个由 Apache Kafka 主题写入的流式处理表。 此外,表中还存储了历史数据,这些数据只需要插入到流式处理表中一次,并且你无法流式传输数据,因为在插入数据之前,处理包括执行复杂的聚合。 参阅“示例:运行一次性数据回填”。
- 合并来自多个源的数据并写入单个流式处理表,而不是在查询中使用
UNION
子句。 如果使用追加流处理而非UNION
,则可以用增量方式更新目标表,而无需运行完全刷新更新。 请参阅“示例:使用追加流处理而非 UNION”。
追加流处理输出的记录的目标可以是现有表或新表。 对于 Python 查询,请使用 create_streaming_table() 函数创建目标表。
重要
- 如果需要使用预期定义数据质量约束,请将目标表的预期定义为
create_streaming_table()
函数或现有表定义的一部分。 无法在@append_flow
定义中定义期望。 - 流由流名称标识,此名称用于标识流式处理检查点。 使用流名称来标识检查点意味着:
- 如果管道中的现有流已重命名,则检查点不会进行传递,并且重命名的流实际上是一个全新的流。
- 不能在管道中重用流名称,因为现有检查点与新的流定义不匹配。
下面是 @append_flow
的语法:
Python
import dlt
dlt.create_streaming_table("<target-table-name>") # Required only if the target table doesn't exist.
@dlt.append_flow(
target = "<target-table-name>",
name = "<flow-name>", # optional, defaults to function name
spark_conf = {"<key>" : "<value", "<key" : "<value>"}, # optional
comment = "<comment>") # optional
def <function-name>():
return (<streaming query>)
SQL
CREATE OR REFRESH STREAMING TABLE append_target; -- Required only if the target table doesn't exist.
CREATE FLOW
flow_name
AS INSERT INTO
target_table BY NAME
SELECT * FROM
source;
示例:从多个 Kafka 主题写入流式处理表
以下示例会创建一个名为 kafka_target
的流式处理表,并从两个 Kafka 主题写入该流式处理表:
Python
import dlt
dlt.create_streaming_table("kafka_target")
# Kafka stream from multiple topics
@dlt.append_flow(target = "kafka_target")
def topic1():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", "topic1")
.load()
)
@dlt.append_flow(target = "kafka_target")
def topic2():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", "topic2")
.load()
)
SQL
CREATE OR REFRESH STREAMING TABLE kafka_target;
CREATE FLOW
topic1
AS INSERT INTO
kafka_target BY NAME
SELECT * FROM
read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic1');
CREATE FLOW
topic2
AS INSERT INTO
kafka_target BY NAME
SELECT * FROM
read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic2');
若要详细了解 SQL 查询中使用的 read_kafka()
表值函数,请参阅 SQL 语言参考中的 read_kafka。
示例:运行一次性数据回填
以下示例会运行一个查询来将历史数据追加到流式处理表中:
注意
为了确保当回填查询是按计划运行或连续运行的管道的一部分时确实进行一次性回填,请在运行一次管道后移除该查询。 若要在数据到达回填目录中时追加新数据,请保留查询。
Python
import dlt
@dlt.table()
def csv_target():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format","csv")
.load("path/to/sourceDir")
@dlt.append_flow(target = "csv_target")
def backfill():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format","csv")
.load("path/to/backfill/data/dir")
SQL
CREATE OR REFRESH STREAMING TABLE csv_target
AS SELECT * FROM
cloud_files(
"path/to/sourceDir",
"csv"
);
CREATE FLOW
backfill
AS INSERT INTO
csv_target BY NAME
SELECT * FROM
cloud_files(
"path/to/backfill/data/dir",
"csv"
);
示例:使用追加流处理而非 UNION
可以使用追加流查询来合并多个源并写入单个流式处理表,而不是将查询与 UNION
子句一起使用。 如果使用追加流查询而非 UNION
,则可以追加到来自多个源的流式处理表,而无需运行完全刷新。
以下 Python 示例包含一个查询,该查询将多个数据源与 UNION
子句组合在一起:
@dlt.create_table(name="raw_orders")
def unioned_raw_orders():
raw_orders_us =
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/us")
raw_orders_eu =
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/eu")
return raw_orders_us.union(raw_orders_eu)
以下示例将 UNION
查询更换为追加流查询:
Python
dlt.create_streaming_table("raw_orders")
@dlt.append_flow(target="raw_orders")
def raw_oders_us():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/us")
@dlt.append_flow(target="raw_orders")
def raw_orders_eu():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/eu")
# Additional flows can be added without the full refresh that a UNION query would require:
@dlt.append_flow(target="raw_orders")
def raw_orders_apac():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/apac")
SQL
CREATE OR REFRESH STREAMING TABLE raw_orders;
CREATE FLOW
raw_orders_us
AS INSERT INTO
raw_orders BY NAME
SELECT * FROM
cloud_files(
"/path/to/orders/us",
"csv"
);
CREATE FLOW
raw_orders_eu
AS INSERT INTO
raw_orders BY NAME
SELECT * FROM
cloud_files(
"/path/to/orders/eu",
"csv"
);
-- Additional flows can be added without the full refresh that a UNION query would require:
CREATE FLOW
raw_orders_apac
AS INSERT INTO
raw_orders BY NAME
SELECT * FROM
cloud_files(
"/path/to/orders/apac",
"csv"
);