重要
此功能目前以公共预览版提供。
实时模式支持超低延迟流式处理,端到端延迟低至 5 毫秒,因此非常适合欺诈检测和实时个性化等作工作负荷。 本教程指导你使用一个简单的示例设置第一个实时流式处理查询。
有关实时模式的概念性信息、何时使用它和支持的功能,请参阅 结构化流式处理中的实时模式。
要求
- 你有权 创建经典计算。
- Databricks Runtime 17.1 或以上版本(使用实时模式的
display函数是必须的)。
注释
如果没有经典计算创建权限,请联系工作区管理员,使用步骤 1 中的配置创建实时模式群集。
步骤 1:为实时模式创建经典计算
实时模式需要特定的经典计算配置来实现超低延迟。 这些设置可确保任务在所有阶段同时运行,且数据在到达时便被连续处理,而不是批处理。
若要创建正确配置的经典计算,请:
在 Azure Databricks 工作区中,单击边栏中的 “计算 ”。
单击“ 创建计算”。
输入名称。
选择 Databricks Runtime 17.1 或更高版本。
清除 Photon 加速功能(实时模式不支持 Photon 功能)。
清除 “启用自动缩放 ”(实时模式需要固定的群集大小)。
在 “高级性能”下,清除 “使用现成实例 ”(现成实例可能会导致中断)。
单击“ 高级”选项 以展开其他设置。
在 “访问”模式下,选择 “专用”(以前为“单用户”)。
在 Spark 配置下,添加以下配置:
spark.databricks.streaming.realTimeMode.enabled true单击“ 创建计算”。
步骤 2:创建笔记本
笔记本提供用于开发和测试流式处理查询的交互式环境。 你可以使用此笔记本来编写实时查询,并观察结果的持续更新。
创建笔记本:
- 单击边栏中的“ 新建 ”,然后单击“ 笔记本”。
- 在计算下拉菜单中,选择在步骤 1 中创建的计算。
- 选择 Python 或 Scala 作为默认语言。
步骤 3:运行实时模式查询
将以下代码复制并粘贴到笔记本单元格中,然后运行它。 此示例使用速率源,该源以指定速率生成行,并实时显示结果。
注释
display具有realTime触发器的函数在 Databricks Runtime 17.1 及更高版本中可用。
Python
inputDF = (
spark
.readStream
.format("rate")
.option("numPartitions", 2)
.option("rowsPerSecond", 1)
.load()
)
display(inputDF, realTime="5 minutes", outputMode="update")
Scala
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.streaming.OutputMode
val inputDF = spark
.readStream
.format("rate")
.option("numPartitions", 2)
.option("rowsPerSecond", 1)
.load()
display(inputDF, trigger=Trigger.RealTime(), outputMode=OutputMode.Update())
运行代码后,会看到生成新行时实时更新的表。 该表显示一列 timestamp 和一列 value ,该列随每行一起递增。
了解数据
上面的代码演示实时流式处理查询的基本组件。 下表说明了关键参数及其控制内容:
Python
| 参数 | 说明 |
|---|---|
format("rate") |
使用速率源,这是一个内置的数据源,它以可配置的速率生成数据行。 这对于在没有外部依赖项的情况下进行测试非常有用。 |
numPartitions |
设置生成的数据的分区数。 |
rowsPerSecond |
控制每秒生成多少行。 |
realTime="5 minutes" |
启用实时模式。 间隔指定查询检查点的进度频率。 较长的间隔意味着检查点频率较低,但在发生故障后可能更长的恢复时间。 |
outputMode="update" |
实时模式需要更新输出模式。 |
Scala
| 参数 | 说明 |
|---|---|
format("rate") |
使用速率源,这是一个内置源,它以可配置的速率生成行。 这对于在没有外部依赖项的情况下进行测试非常有用。 |
numPartitions |
设置生成的数据的分区数。 |
rowsPerSecond |
控制每秒生成多少行。 |
Trigger.RealTime() |
使用默认的检查点间隔启用实时模式。 还可以指定间隔,例如 Trigger.RealTime("5 minutes")。 |
OutputMode.Update() |
实时模式需要更新输出模式。 |
你正在看到的内容
运行查询时,该 display 函数将创建一个表,该表会在速率源生成新行时实时更新。 每行都包含:
- timestamp:由速率源生成行的时间
- 值:单调递增计数器,随每个新行一起递增
表会持续更新,延迟最小,演示实时模式在数据可用后如何处理数据。 这是实时模式的核心优势 - 能够立即查看和处理数据,而不是等待批处理。
你学到的知识
已成功设置并运行第一个实时流式处理查询。 现在,你已了解如何:
- 使用实时模式所需的设置配置经典计算(专用集群、禁用 Photon、禁用自动缩放、Spark 配置)
- 使用
realTime触发器启用实时处理 - 使用
display函数进行交互式开发和测试 - 通过观察连续更新来验证查询是否在实时模式下运行
你已准备好使用 Kafka、Kinesis 和其他受支持的源生成生产实时管道。 若要了解有关结构化流式处理的详细信息,请参阅 结构化流概念。
后续步骤
运行第一个实时查询后,请浏览以下资源来构建生产环境中的流式处理应用程序。
- 实时模式示例 - Kafka 源和接收器、有状态查询、聚合和自定义接收器的工作代码示例
- 实时模式参考 - 了解集群大小、支持的操作符、监视和功能限制
- 有状态流式处理应用程序 - 为流式处理查询添加状态管理功能,以实现去重、聚合和窗口化处理
-
高级状态管理 - 用于
transformWithState具有生存时间(TTL)和复杂逻辑的自定义有状态处理