运行第一个结构化流式处理工作负载

本文提供代码示例,并解释在 Azure Databricks 上运行第一个结构化流式处理查询所要了解的基本概念。 可将结构化流式处理用于准实时和增量处理工作负载。

结构化流式处理是为增量实时表中表的流式处理提供支持的多项技术之一。 Databricks 建议对所有新的 ETL、引入和结构化流式处理工作负载使用增量实时表。 请参阅什么是增量实时表?

注意

虽然增量实时表为声明流式处理表提供略有不同的语法,但用于配置流式读取和转换的一般语法适用于 Azure Databricks 上的所有流式处理用例。 增量实时表还通过管理状态信息、元数据和大量配置简化了流式处理。

从数据流读取

可以使用结构化流式处理以增量方式从支持的数据源引入数据。 Azure Databricks 结构化流式处理工作负载中使用的一些最常见数据源包括:

  • 云对象存储中的数据文件
  • 消息总线和队列
  • Delta Lake

Databricks 建议使用自动加载程序从云对象存储中流式引入。 自动加载程序支持结构化流式处理所能支持的大多数文件格式。 请参阅什么是自动加载程序?

每个数据源提供多个选项来指定如何加载数据批。 在配置读取器期间,可能需要设置的主要选项分为以下几类:

  • 用于指定数据源或格式的选项(例如,文件类型、分隔符和架构)。
  • 用于配置源系统访问的选项(例如,端口设置和凭据)。
  • 用于指定要在流中哪个位置开始的选项(例如,Kafka 偏移量或读取所有现有文件)。
  • 用于控制要在每个批中处理多少数据的选项(例如,最大偏移量、文件数或每批字节数)。

使用自动加载程序从对象存储读取流式处理数据

以下示例演示如何使用自动加载程序加载 JSON 数据,其中使用 cloudFiles 表示格式和选项。 schemaLocation 选项启用架构推理和演变。 将以下代码粘贴到某个 Databricks 笔记本单元格中,然后运行该单元格以创建名为 raw_df 的流式处理数据帧:

file_path = "/databricks-datasets/structured-streaming/events"
checkpoint_path = "/tmp/ss-tutorial/_checkpoint"

raw_df = (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .option("cloudFiles.schemaLocation", checkpoint_path)
    .load(file_path)
)

与 Azure Databricks 上的其他读取操作一样,配置流式读取不会实际加载数据。 必须在流开始之前触发数据操作。

注意

对流式处理数据帧调用 display() 会启动流式处理作业。 对于大多数结构化流式处理用例,触发流的操作应该是将数据写入接收器。 请参阅为生产准备结构化流式处理代码

执行流式转换

结构化流式处理支持 Azure Databricks 和 Spark SQL 中可用的大多数转换。 你甚至可以将 MLflow 模型作为 UDF 加载,并将流式预测设为转换。

以下代码示例完成一个简单的转换,以通过 Spark SQL 函数使用附加信息来扩充引入的 JSON 数据:

from pyspark.sql.functions import col, current_timestamp

transformed_df = (raw_df.select(
    "*",
    col("_metadata.file_path").alias("source_file"),
    current_timestamp().alias("processing_time")
    )
)

生成的 transformed_df 包含查询指令,以便在每条记录进入数据源时加载并转换该记录。

注意

结构化流式处理将数据源视为无界限或无限数据集。 因此,某些转换在结构化流式处理工作负载中不受支持,因为它们需要对无限数量的项进行排序。

大多数聚合与许多联接都需要使用水印、窗口和输出模式来管理状态信息。 请参阅应用水印来控制数据处理阈值

写入到数据接收器

数据接收器是流式写入操作的目标。 Azure Databricks 流式处理工作负载中使用的常见接收器包括:

  • Delta Lake
  • 消息总线和队列
  • 键值数据库

与数据源一样,大多数数据接收器提供许多选项来控制将数据写入目标系统的方式。 在配置写入器期间,可能需要设置的主要选项分为以下几类:

  • 输出模式(默认为追加)。
  • 检查点位置(每个写入器都需要)。
  • 触发器间隔;请参阅配置结构化流式处理触发器间隔
  • 用于指定数据接收器或格式的选项(例如,文件类型、分隔符和架构)。
  • 用于配置目标系统访问的选项(例如,端口设置和凭据)。

对 Delta Lake 执行增量分批写入

以下示例使用指定的文件路径和检查点写入到 Delta Lake。

重要

始终确保为配置的每个流式处理写入器指定唯一的检查点位置。 检查点为流提供唯一标识,跟踪处理的所有记录以及与流式处理查询关联的状态信息。

触发器的 availableNow 设置指示结构化流式处理处理源数据集中以前未处理的所有记录,然后关闭,这样你就可以安全地执行以下代码,而无需担心让流保持运行所造成的问题:

target_path = "/tmp/ss-tutorial/"
checkpoint_path = "/tmp/ss-tutorial/_checkpoint"

transformed_df.writeStream
    .trigger(availableNow=True)
    .option("checkpointLocation", checkpoint_path)
    .option("path", target_path)
    .start()

在此示例中,没有任何新记录进入我们的数据源,因此重复执行此代码不会引入新记录。

警告

结构化流式处理执行可以防止自动终止操作关闭计算资源。 为避免意外成本,请务必终止流式处理查询。

为生产准备结构化流式处理代码

Databricks 建议对大多数结构化流式处理工作负载使用增量实时表。 以下建议提供了为生产准备结构化流式处理工作负载的起点:

  • 从返回结果的笔记本中删除不必要的代码,例如 displaycount
  • 不要在交互式群集上运行结构化流式处理工作负载;始终将流计划为作业。
  • 为帮助流式处理作业自动恢复,请为作业配置无限重试。
  • 不要对使用结构化流式处理的工作负载使用自动缩放。

有关更多建议,请参阅结构化流式处理的生产注意事项

从 Delta Lake 读取数据、转换数据和写入到 Delta Lake

Delta Lake 广泛支持将结构化流式处理用作源和接收器。 请参阅增量表流式读取和写入

以下示例演示了从 Delta 表增量加载所有新记录、将其与另一个 Delta 表的快照相联接,并将其写入 Delta 表的示例语法:

(spark.readStream
    .table("<table-name1>")
    .join(spark.read.table("<table-name2>"), on="<id>", how="left")
    .writeStream
    .trigger(availableNow=True)
    .option("checkpointLocation", "<checkpoint-path>")
    .toTable("<table-name3>")
)

必须配置适当的权限才能读取源表,以及写入到目标表和指定的检查点位置。 使用数据源和接收器的相关值填写用尖括号 (<>) 表示的所有参数。

注意

增量实时表提供完全声明性的语法用于创建 Delta Lake 管道以及自动管理触发器和检查点等属性。 请参阅什么是增量实时表?

从 Kafka 读取数据、转换数据和写入到 Kafka

Apache Kafka 和其他消息传递总线针对大型数据集提供极低延迟。 你可以使用 Azure Databricks 将转换应用于从 Kafka 引入的数据,然后将数据写回 Kafka。

注意

将数据写入云对象存储会增加额外的延迟开销。 如果你希望在 Delta Lake 中存储来自消息传递总线的数据,但需要尽可能低的流式处理工作负载延迟,Databricks 建议配置单独的流式处理作业以将数据引入到湖屋,并为下游消息传递总线接收器应用准实时转换。

以下代码示例演示了一种简单模式,它通过将 Kafka 中的数据与 Delta 表中的数据相联接,然后写回 Kafka 来扩充 Kafka 中的数据:

(spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "<server:ip>")
    .option("subscribe", "<topic>")
    .option("startingOffsets", "latest")
    .load()
    .join(spark.read.table("<table-name>"), on="<id>", how="left")
    .writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "<server:ip>")
    .option("topic", "<topic>")
    .option("checkpointLocation", "<checkpoint-path>")
    .start()
)

必须配置适当的权限才能访问 Kafka 服务。 使用数据源和接收器的相关值填写用尖括号 (<>) 表示的所有参数。 请参阅使用 Apache Kafka 和 Azure Databricks 进行流处理