结构化流式处理中的实时模式

重要

此功能目前以公共预览版提供。

实时模式是结构化流式处理的触发器类型,可实现超低延迟数据处理,端到端延迟低至 5 毫秒。 对需要立即响应流数据(例如欺诈检测、实时个性化和即时决策系统)的作工作负荷使用实时模式。

Databricks Runtime 16.4 LTS 及更高版本中提供了实时模式。 有关分步设置说明,请参阅 实时模式入门。 有关代码示例,请参阅 实时模式示例

什么是实时模式?

操作型工作负载与分析型工作负载

流式处理工作负载通常可划分为分析工作负载和操作工作负载。

  • 分析工作负荷使用数据引入和转换,通常遵循奖牌体系结构(例如,将数据引入铜表、银表和黄金表)。
  • 操作工作负载使用实时数据、应用业务逻辑并触发下游行动和决策。

一些操作负荷的示例包括:

  • 如果欺诈分数超过阈值,则实时阻止或标记信用卡交易,具体取决于异常位置、大型交易大小或快速支出模式等因素。
  • 点击流数据显示用户已经浏览牛仔裤五分钟时传递促销消息,如果在接下来的 15 分钟内购买,则提供 25% 折扣。

总体而言,操作工作负载的特点是需要亚秒级的端到端延迟。 这可以通过 Apache Spark 结构化流式处理中的实时模式来实现。

实时模式如何实现低延迟

实时模式通过以下方式改进执行体系结构:

  • 执行长时间运行的批处理(默认值为 5 分钟),系统在源中可用时处理数据。
  • 同时调度查询的所有阶段。 这要求可用任务槽数等于或大于批处理中所有阶段的任务数。
  • 使用流式洗牌生成阶段后,立即在阶段之间传递数据。

在批处理结束时,下一批启动之前,结构化流式处理会记录进度并发布指标。 批处理持续时间影响检查点频率:

  • 较长的批处理:检查点的创建频率较低,这意味着在发生故障时重播时间更长,并且指标的可用性延迟。
  • 较短的批处理:更频繁的检查点,这可能会影响延迟。

Databricks 建议针对目标工作负荷对实时模式进行基准测试,以查找适当的触发器间隔。

何时使用实时模式

在用例需要时选择实时模式:

  • 次秒延迟:需要在毫秒内响应数据的应用程序,例如必须实时阻止事务的欺诈检测系统。
  • 运营决策制定:基于传入数据触发即时行动的系统,例如实时优惠、警报或通知。
  • 持续处理:数据到达时必须立即处理的工作负荷,而不是定期批处理。

在以下情况下使用微批处理模式(默认结构化流式处理触发器)

  • 分析处理:ETL 管道、数据转换和奖牌体系结构实现,其中延迟要求以秒或分钟为单位进行度量。
  • 成本优化:不需要次秒延迟的工作负荷,因为实时模式需要专用的计算资源。
  • 检查点频率很重要:支持更频繁设置检查点的应用程序有助于更快地恢复。

要求和配置

实时模式对计算设置和查询配置有特定要求。 本部分介绍使用实时模式所需的先决条件和配置步骤。

先决条件

若要使用实时模式,必须满足以下要求:

  • Databricks Runtime 16.4 LTS 或更高版本:实时模式仅适用于 DBR 16.4 LTS 及更高版本。
  • 专用计算:必须使用专用(以前是单个用户)计算。 不支持标准(以前共享)、Lakeflow Spark 声明性管道和无服务器群集。
  • 无自动缩放:必须禁用自动缩放。
  • 未启用 Photon:实时模式不支持 Photon 加速。
  • Spark 配置:必须将 spark.databricks.streaming.realTimeMode.enabled 设置为 true

计算配置

使用以下设置配置您的计算资源:

  • 在 Spark 配置中将spark.databricks.streaming.realTimeMode.enabled设置为true
  • 禁用自动缩放。
  • 禁用 Photon 加速。
  • 确保计算资源配置为专用集群(不是标准集群、Lakeflow Spark 声明性管道或无服务器)。

有关为实时模式创建和配置计算的分步说明,请参阅 实时模式入门

查询配置

若要以实时模式运行查询,必须启用实时触发器。 实时触发器仅在更新模式下受支持。

Python

query = (
    spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", broker_address)
        .option("subscribe", input_topic)
        .load()
        .writeStream
        .format("kafka")
        .option("kafka.bootstrap.servers", broker_address)
        .option("topic", output_topic)
        .option("checkpointLocation", checkpoint_location)
        .outputMode("update")
        # In PySpark, the realTime trigger requires specifying the interval.
        .trigger(realTime="5 minutes")
        .start()
)

Scala(编程语言)

import org.apache.spark.sql.execution.streaming.RealTimeTrigger

val readStream = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("subscribe", inputTopic).load()
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("topic", outputTopic)
      .option("checkpointLocation", checkpointLocation)
      .outputMode("update")
      .trigger(RealTimeTrigger.apply())
      // RealTimeTrigger can also accept an argument specifying the checkpoint interval.
      // For example, this code indicates a checkpoint interval of 5 minutes:
      // .trigger(RealTimeTrigger.apply("5 minutes"))
      .start()

计算大小调整

如果计算资源有足够的任务槽,则每个计算资源可以运行一个实时作业。

若要在低延迟模式下运行,可用任务槽总数必须大于或等于所有查询阶段的任务数。

槽计算示例

管道类型 配置 所需槽位
单阶段无状态 (Kafka 源 + 接收器) maxPartitions = 8 8 个槽
两阶段有状态 (Kafka 源 + 随机) maxPartitions = 8,随机分区 = 20 28 个槽(8 + 20)
三阶段 (Kafka 源 + 洗牌 + 重新分区) maxPartitions = 8,两个随机阶段,每个阶段为 20 个 48 个插槽(8 + 20 + 20)

如果未设置 maxPartitions,请使用 Kafka 主题中的分区数。

关键注意事项

配置计算时,请考虑以下事项:

  • 与微批处理模式不同,实时任务在等待数据时可以保持空闲状态,因此正确调整大小对于避免浪费的资源至关重要。
  • 通过优化实现目标利用率级别(例如 50%):
    • maxPartitions (对于卡夫卡)
    • spark.sql.shuffle.partitions (对于混排阶段)
  • Databricks 建议设置 maxPartitions ,以便每个任务处理多个 Kafka 分区以减少开销。
  • 调整每个工作节点的任务位置,以匹配简单单阶段作业的工作负荷。
  • 对于洗牌操作繁重的作业,请试验查找最小的洗牌分区数,以避免积压,并在此基础上进行调整。 如果计算节点没有足够的槽,作业将不会被调度。

注释

在 Databricks Runtime 16.4 LTS 及更高版本中,所有实时流程都使用检查点 v2,从而实现实时模式和微批处理模式之间的无缝切换。

优化技术

方法 默认启用
异步进度跟踪:将写入操作移动到偏移日志和提交日志中的异步线程上,从而减少两个微批处理之间的间隔时间。 这有助于减少无状态流式处理查询的延迟。
异步状态检查点:通过在前一个微批处理计算完成后立即开始处理下一个微批处理,而不等待状态检查点,有助于减少有状态流式查询的延迟。

监视和可观测性

测量查询性能对于实时工作负荷至关重要。 在实时模式下,传统的批处理持续时间指标不会反映实际延迟,因此需要替代方法。

端到端延迟特定于工作负荷,有时只能使用业务逻辑准确测量。 例如,如果源时间戳在 Kafka 中输出,则可以计算延迟,因为 Kafka 的输出时间戳与源时间戳之间的差异。

还可以使用下面所述的内置指标和 API 估算端到端延迟。

具有 "StreamingQueryProgress" 的内建指标

事件中 StreamingQueryProgress 包括以下指标,该指标会自动记录在驱动程序日志中。 还可以通过StreamingQueryListeneronQueryProgress()回调函数来访问它们。 QueryProgressEvent.json()toString() 包括额外的实时模式指标。

  1. 处理延迟(processingLatencyMs)。 从实时模式查询读取记录到查询将其写入到下一个阶段或下游所经过的时间。 对于单阶段查询,这将测量与 E2E 延迟相同的持续时间。 系统按任务报告此指标。
  2. 源队列延迟(sourceQueuingLatencyMs) 当系统将记录写入消息总线(例如,Kafka 中的日志追加时间)和实时模式查询首次读取记录时之间经过的时间。 系统按任务报告此指标。
  3. E2E 延迟(e2eLatencyMs)。 系统将记录写入消息总线和实时模式查询写入下游记录之间的时间。 系统跨所有任务处理的所有记录按批次聚合此指标。

例如:

"rtmMetrics" : {
    "processingLatencyMs" : {
      "P0" : 0,
      "P50" : 0,
      "P90" : 0,
      "P95" : 0,
      "P99" : 0
    },
    "sourceQueuingLatencyMs" : {
      "P0" : 0,
      "P50" : 1,
      "P90" : 1,
      "P95" : 2,
      "P99" : 3
    },
    "e2eLatencyMs" : {
      "P0" : 0,
      "P50" : 1,
      "P90" : 1,
      "P95" : 2,
      "P99" : 4
    },

使用观察 API 进行自定义延迟度量

观察 API 有助于在不启动另一个作业的情况下测量延迟。 如果源时间戳近似于源数据到达时间,则可以使用观察 API 估算每个批处理的延迟。 在到达接收器之前传递时间戳:

Python

from datetime import datetime

from pyspark.sql.functions import avg, col, lit, max, percentile_approx, udf, unix_millis
from pyspark.sql.types import TimestampType

@udf(returnType=TimestampType())
def current_timestamp():
  return datetime.now()

# Query before outputting
.withColumn("temp-timestamp", current_timestamp())
.withColumn(
  "latency",
  unix_millis(col("temp-timestamp")).cast("long") - unix_millis(col("timestamp")).cast("long"))
.observe(
  "observedLatency",
  avg(col("latency")).alias("avg"),
  max(col("latency")).alias("max"),
  percentile_approx(col("latency"), lit(0.99), lit(150)).alias("p99"),
  percentile_approx(col("latency"), lit(0.5), lit(150)).alias("p50"))
.drop(col("latency"))
.drop(col("temp-timestamp"))
# Output part of the query. For example, .WriteStream, etc.

Scala(编程语言)

import org.apache.spark.sql.functions.{avg, col, lit, max, percentile_approx, udf, unix_millis}

val currentTimestampUDF = udf(() => System.currentTimeMillis())

// Query before outputting
.withColumn("temp-timestamp", currentTimestampUDF())
.withColumn(
  "latency",
  col("temp-timestamp").cast("long") - unix_millis(col("timestamp")).cast("long"))
.observe(
  name = "observedLatency",
  avg(col("latency")).as("avg"),
  max(col("latency")).as("max"),
  percentile_approx(col("latency"), lit(0.99), lit(150)).as("p99"),
  percentile_approx(col("latency"), lit(0.5), lit(150)).as("p50"))
.drop(col("latency"))
.drop(col("temp-timestamp"))
// Output part of the query. For example, .WriteStream, etc.

在此示例中,在输出条目之前记录当前时间戳,通过计算此时间戳与记录的源时间戳之间的差异来估计延迟。 结果包含在进度报告中,并提供给听众。 下面是示例输出:

"observedMetrics" : {
  "observedLatency" : {
    "avg" : 63.8369765176552,
    "max" : 219,
    "p99" : 154,
    "p50" : 49
  }
}

功能支持和限制

本部分介绍实时模式支持的功能和当前限制,包括兼容环境、语言、源、接收器、运算符和特定功能的特殊注意事项。

支持的环境、语言和模式

计算类型 已支持
专用(以前:单个用户) 是的
标准(前:共享)
Lakeflow Spark 声明性管道 Classic版本
Lakeflow Spark 声明性管道无服务器
无服务器

支持的语言:

语言 已支持
Scala(编程语言) 是的
Java 是的
Python 是的

支持的执行模式:

执行模式 已支持
更新模式 是的
追加模式
完整模式

支持的源和汇器

来源:

来源 已支持
Apache Kafka 是的
AWS MSK 是的
事件中心(使用 Kafka 连接器) 是的
动动力 是(仅 EFO 模式)
Apache Pulsar

接收器:

水槽 已支持
Apache Kafka 是的
事件中心(使用 Kafka 连接器) 是的
动动力
Apache Pulsar
任意接收器(使用 forEachWriter) 是的

支持的运算符

运营商 已支持
无状态操作
选择 是的
投影 是的
UDFs(用户自定义函数)
Scala 用户定义函数 (UDF) 是(有一些限制
Python 用户定义函数 (UDF) 是(有一些限制
聚合
总和 是的
count 是的
最大值 是的
分钟 是的
平均值 是的
聚合函数 是的
窗口管理
翻转 是的
滑动 是的
会话
重复数据删除
dropDuplicates 是(状态无界)
水印范围内删除重复项
流 - 表连接
广播表 (应较小) 是的
流 - 流合并
(平)MapGroupsWithState
transformWithState 是(有一些差异
是(有一些限制
forEach 是的
forEachBatch
mapPartitions (映射分区) 否(请参阅限制

特殊注意事项

某些运算符和功能在实时模式下使用时具有特定的注意事项或差异。

在实时模式下的“transformWithState”

对于构建有状态自定义应用程序,Databricks 支持 Apache Spark 结构化流式处理中的 transformWithStateAPI。 有关 API 和代码片段的详细信息,请参阅 生成自定义有状态应用程序

但是,API 在实时模式下的行为方式和利用微批处理体系结构的传统流式处理查询之间存在一些差异。

  • 实时模式为每一行调用handleInputRows(key: String, inputRows: Iterator[T], timerValues: TimerValues)方法。
    • inputRows迭代器返回单个值。 微批处理模式为每个密钥调用一次, inputRows 迭代器返回微批处理中某个键的所有值。
    • 编写代码时,必须注意这种差异。
  • 实时模式下不支持事件时间计时器。
  • 在实时模式下,计时器因数据到达而延迟触发:
    • 如果计时器设置为10:00:00,但没有数据到达,计时器不会立刻触发。
    • 如果数据到达 10:00:10,计时器会以 10 秒的延迟触发。
    • 如果没有数据到达并且长时间运行的批处理正在终止,计时器会在批处理终止之前触发。

实时模式下的 Python UDF

Databricks 在实时模式下支持大多数 Python 用户定义的函数(UDF):

UDF 类型 已支持
无状态 UDF
Python 标量 UDF (链接 是的
箭头标量 UDF 是的
Pandas 标量 UDF (链接 是的
箭头函数 (mapInArrow 是的
Pandas 函数(链接 是的
有状态分组 UDF (UDAF)
transformWithState (仅 Row 接口) 是的
applyInPandasWithState
非有状态分组 UDF (UDAF)
应用
applyInArrow
applyInPandas
表函数
UDTF (链接
UC UDF

在实时模式下使用 Python UDF 时,需要考虑以下几点:

  • 若要最大程度地减少延迟,请将箭头批大小 (spark.sql.execution.arrow.maxRecordsPerBatch) 配置为 1。
    • 权衡:此配置以牺牲吞吐量为代价优化延迟。 对于大多数工作负荷,建议使用此设置。
    • 仅在需要更高的吞吐量来容纳输入量时才增加批处理大小,并接受可能的延迟增加。
  • Pandas UDF 和函数在 Arrow 批处理大小为 1 时性能不佳。
    • 如果使用 pandas UDF 或函数,请将箭头批大小设置为更高的值(例如 100 或更高)。
    • 请注意,这意味着延迟较高。 Databricks 建议尽可能使用箭头 UDF 或函数。
  • 由于 pandas 的性能问题,仅 Row 接口支持 transformWithState。

局限性

源限制

对于 Kinesis,实时模式不支持轮询模式。 此外,频繁的重新分区可能会对延迟产生负面影响。

联合限制

Union 运算符存在一些限制:

  • 实时模式不支持自我联合
    • Kafka:不能使用相同的源数据帧对象与从中派生的数据帧进行合并。 解决方法:使用从同一源读取的不同数据帧。
    • Kinesis:您不能将源自同一 Kinesis 源并使用相同配置的数据帧进行合并。 解决方法:除了使用不同的数据帧,还可以为每个数据帧分配不同的“consumerName”选项。
  • 实时模式不支持在 Union 之前定义的有状态运算符(例如aggregatededuplicatetransformWithState)。
  • 实时模式不支持与批处理源联合。

MapPartitions 限制

mapPartitions 在 Scala 和类似的 Python API(mapInPandasmapInArrow)中,获取整个输入分区的迭代器,并生成整个输出的迭代器,并在输入和输出之间进行任意映射。 这些 API 可能会在实时流式模式下阻止整个输出,从而导致性能问题,并增加延迟。 这些 API 的语义功能不支持水印传播。

将标量 UDF 与 转换复杂数据类型 结合使用,或通过 filter 来实现类似的功能。

后续步骤

了解什么是实时模式以及如何对其进行配置后,请浏览以下资源以开始实现实时流式处理应用程序:

  • 实时模式入门 - 按照分步说明配置计算并运行第一个实时流式处理查询。

  • 实时模式代码示例 - 探索工作示例,包括 Kafka 源和接收器、有状态查询、聚合和自定义接收器。

  • 结构化流概念 - 了解 Databricks 上结构化流式处理的基础概念。