实时模式入门

重要

此功能目前以公共预览版提供。

实时模式支持超低延迟流式处理,端到端延迟低至 5 毫秒,因此非常适合欺诈检测和实时个性化等作工作负荷。 本教程指导你使用一个简单的示例设置第一个实时流式处理查询。

有关实时模式的概念性信息、何时使用它和支持的功能,请参阅 结构化流式处理中的实时模式

要求

  • 你有权 创建经典计算
  • Databricks Runtime 17.1 或以上版本(使用实时模式的 display 函数是必须的)。

注释

如果没有经典计算创建权限,请联系工作区管理员,使用步骤 1 中的配置创建实时模式群集。

步骤 1:为实时模式创建经典计算

实时模式需要特定的经典计算配置来实现超低延迟。 这些设置可确保任务在所有阶段同时运行,且数据在到达时便被连续处理,而不是批处理。

若要创建正确配置的经典计算,请:

  1. 在 Azure Databricks 工作区中,单击边栏中的 “计算 ”。

  2. 单击“ 创建计算”。

  3. 输入名称。

  4. 选择 Databricks Runtime 17.1 或更高版本。

  5. 清除 Photon 加速功能(实时模式不支持 Photon 功能)。

  6. 清除 “启用自动缩放 ”(实时模式需要固定的群集大小)。

  7. “高级性能”下,清除 “使用现成实例 ”(现成实例可能会导致中断)。

  8. 单击“ 高级”选项 以展开其他设置。

  9. “访问”模式下,选择 “专用”(以前为“单用户”)。

  10. Spark 配置下,添加以下配置:

    spark.databricks.streaming.realTimeMode.enabled true
    
  11. 单击“ 创建计算”。

步骤 2:创建笔记本

笔记本提供用于开发和测试流式处理查询的交互式环境。 你可以使用此笔记本来编写实时查询,并观察结果的持续更新。

创建笔记本:

  1. 单击边栏中的“ 新建 ”,然后单击“ 笔记本”。
  2. 在计算下拉菜单中,选择在步骤 1 中创建的计算。
  3. 选择 PythonScala 作为默认语言。

步骤 3:运行实时模式查询

将以下代码复制并粘贴到笔记本单元格中,然后运行它。 此示例使用速率源,该源以指定速率生成行,并实时显示结果。

注释

display具有realTime触发器的函数在 Databricks Runtime 17.1 及更高版本中可用。

Python

inputDF = (
  spark
  .readStream
  .format("rate")
  .option("numPartitions", 2)
  .option("rowsPerSecond", 1)
  .load()
)
display(inputDF, realTime="5 minutes", outputMode="update")

Scala

import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.streaming.OutputMode

val inputDF = spark
  .readStream
  .format("rate")
  .option("numPartitions", 2)
  .option("rowsPerSecond", 1)
  .load()
display(inputDF, trigger=Trigger.RealTime(), outputMode=OutputMode.Update())

运行代码后,会看到生成新行时实时更新的表。 该表显示一列 timestamp 和一列 value ,该列随每行一起递增。

了解数据

上面的代码演示实时流式处理查询的基本组件。 下表说明了关键参数及其控制内容:

Python

参数 说明
format("rate") 使用速率源,这是一个内置的数据源,它以可配置的速率生成数据行。 这对于在没有外部依赖项的情况下进行测试非常有用。
numPartitions 设置生成的数据的分区数。
rowsPerSecond 控制每秒生成多少行。
realTime="5 minutes" 启用实时模式。 间隔指定查询检查点的进度频率。 较长的间隔意味着检查点频率较低,但在发生故障后可能更长的恢复时间。
outputMode="update" 实时模式需要更新输出模式。

Scala

参数 说明
format("rate") 使用速率源,这是一个内置源,它以可配置的速率生成行。 这对于在没有外部依赖项的情况下进行测试非常有用。
numPartitions 设置生成的数据的分区数。
rowsPerSecond 控制每秒生成多少行。
Trigger.RealTime() 使用默认的检查点间隔启用实时模式。 还可以指定间隔,例如 Trigger.RealTime("5 minutes")
OutputMode.Update() 实时模式需要更新输出模式。

你正在看到的内容

运行查询时,该 display 函数将创建一个表,该表会在速率源生成新行时实时更新。 每行都包含:

  • timestamp:由速率源生成行的时间
  • :单调递增计数器,随每个新行一起递增

表会持续更新,延迟最小,演示实时模式在数据可用后如何处理数据。 这是实时模式的核心优势 - 能够立即查看和处理数据,而不是等待批处理。

你学到的知识

已成功设置并运行第一个实时流式处理查询。 现在,你已了解如何:

  • 使用实时模式所需的设置配置经典计算(专用集群、禁用 Photon、禁用自动缩放、Spark 配置)
  • 使用 realTime 触发器启用实时处理
  • 使用 display 函数进行交互式开发和测试
  • 通过观察连续更新来验证查询是否在实时模式下运行

你已准备好使用 Kafka、Kinesis 和其他受支持的源生成生产实时管道。 若要了解有关结构化流式处理的详细信息,请参阅 结构化流概念

后续步骤

运行第一个实时查询后,请浏览以下资源来构建生产环境中的流式处理应用程序。

  • 实时模式示例 - Kafka 源和接收器、有状态查询、聚合和自定义接收器的工作代码示例
  • 实时模式参考 - 了解集群大小、支持的操作符、监视和功能限制
  • 有状态流式处理应用程序 - 为流式处理查询添加状态管理功能,以实现去重、聚合和窗口化处理
  • 高级状态管理 - 用于 transformWithState 具有生存时间(TTL)和复杂逻辑的自定义有状态处理