运行第一个结构化流式处理工作负载
本文提供代码示例,并解释在 Azure Databricks 上运行第一个结构化流式处理查询所要了解的基本概念。 可将结构化流式处理用于准实时和增量处理工作负载。
结构化流式处理是为增量实时表中表的流式处理提供支持的多项技术之一。 Databricks 建议对所有新的 ETL、引入和结构化流式处理工作负载使用增量实时表。 请参阅什么是增量实时表?。
备注
虽然增量实时表为声明流式处理表提供略有不同的语法,但用于配置流式读取和转换的一般语法适用于 Azure Databricks 上的所有流式处理用例。 增量实时表还通过管理状态信息、元数据和大量配置简化了流式处理。
以下示例演示如何使用自动加载程序加载 JSON 数据,其中使用 cloudFiles
表示格式和选项。 schemaLocation
选项启用架构推理和演变。 将以下代码粘贴到某个 Databricks 笔记本单元格中,然后运行该单元格以创建名为 raw_df
的流式处理数据帧:
file_path = "/databricks-datasets/structured-streaming/events"
checkpoint_path = "/tmp/ss-tutorial/_checkpoint"
raw_df = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", checkpoint_path)
.load(file_path)
)
与 Azure Databricks 上的其他读取操作一样,配置流式读取不会实际加载数据。 必须在流开始之前触发数据操作。
备注
对流式处理数据帧调用 display()
会启动流式处理作业。 对于大多数结构化流式处理用例,触发流的操作应该是将数据写入接收器。 请参阅结构化流式处理的生产注意事项。
结构化流式处理支持 Azure Databricks 和 Spark SQL 中可用的大多数转换。 你甚至可以将 MLflow 模型作为 UDF 加载,并将流式预测设为转换。
以下代码示例完成一个简单的转换,以通过 Spark SQL 函数使用附加信息来扩充引入的 JSON 数据:
from pyspark.sql.functions import col, current_timestamp
transformed_df = (raw_df.select(
"*",
col("_metadata.file_path").alias("source_file"),
current_timestamp().alias("processing_time")
)
)
生成的 transformed_df
包含查询指令,以便在每条记录进入数据源时加载并转换该记录。
备注
结构化流式处理将数据源视为无界限或无限数据集。 因此,某些转换在结构化流式处理工作负载中不受支持,因为它们需要对无限数量的项进行排序。
大多数聚合与许多联接都需要使用水印、窗口和输出模式来管理状态信息。 请参阅应用水印来控制数据处理阈值。
以下示例使用指定的文件路径和检查点写入到 Delta Lake。
重要
始终确保为配置的每个流式处理写入器指定唯一的检查点位置。 检查点为流提供唯一标识,跟踪处理的所有记录以及与流式处理查询关联的状态信息。
触发器的 availableNow
设置指示结构化流式处理处理源数据集中以前未处理的所有记录,然后关闭,这样你就可以安全地执行以下代码,而无需担心让流保持运行所造成的问题:
target_path = "/tmp/ss-tutorial/"
checkpoint_path = "/tmp/ss-tutorial/_checkpoint"
transformed_df.writeStream
.trigger(availableNow=True)
.option("checkpointLocation", checkpoint_path)
.option("path", target_path)
.start()
在此示例中,没有任何新记录进入我们的数据源,因此重复执行此代码不会引入新记录。
警告
结构化流式处理执行可以防止自动终止操作关闭计算资源。 为避免意外成本,请务必终止流式处理查询。
Delta Lake 广泛支持将结构化流式处理用作源和接收器。 请参阅增量表流式读取和写入。
以下示例演示了从 Delta 表增量加载所有新记录、将其与另一个 Delta 表的快照相联接,并将其写入 Delta 表的示例语法:
(spark.readStream
.table("<table-name1>")
.join(spark.read.table("<table-name2>"), on="<id>", how="left")
.writeStream
.trigger(availableNow=True)
.option("checkpointLocation", "<checkpoint-path>")
.toTable("<table-name3>")
)
必须配置适当的权限才能读取源表,以及写入到目标表和指定的检查点位置。 使用数据源和接收器的相关值填写用尖括号 (<>
) 表示的所有参数。
备注
增量实时表提供完全声明性的语法用于创建 Delta Lake 管道以及自动管理触发器和检查点等属性。 请参阅什么是增量实时表?。
Apache Kafka 和其他消息传递总线针对大型数据集提供极低延迟。 你可以使用 Azure Databricks 将转换应用于从 Kafka 引入的数据,然后将数据写回 Kafka。
备注
将数据写入云对象存储会增加额外的延迟开销。 如果你希望在 Delta Lake 中存储来自消息传递总线的数据,但需要尽可能低的流式处理工作负载延迟,Databricks 建议配置单独的流式处理作业以将数据引入到湖屋,并为下游消息传递总线接收器应用准实时转换。
以下代码示例演示了一种简单模式,它通过将 Kafka 中的数据与 Delta 表中的数据相联接,然后写回 Kafka 来扩充 Kafka 中的数据:
(spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "latest")
.load()
.join(spark.read.table("<table-name>"), on="<id>", how="left")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("topic", "<topic>")
.option("checkpointLocation", "<checkpoint-path>")
.start()
)
必须配置适当的权限才能访问 Kafka 服务。 使用数据源和接收器的相关值填写用尖括号 (<>
) 表示的所有参数。 请参阅使用 Apache Kafka 和 Azure Databricks 进行流处理。