在 Lakeflow Spark 声明性管道中使用实时模式

Important

Lakeflow Spark 声明性管道中的实时模式在预览频道上的 Databricks Runtime 18.1.3 上 处于公共预览 状态。

实时模式支持超低延迟数据处理,端到端延迟低至 5 毫秒。 对需要立即响应流数据(例如欺诈检测和实时个性化)的操作工作负荷使用实时模式。

实时模式也可以在管道外部的结构化流中直接使用。 请参阅 结构化流式处理中的实时模式

实时模式如何实现低延迟

实时模式在三个关键方面不同于标准连续处理:

  • 长时间运行的批处理:系统在长时间运行的批处理中在源中可用时处理数据(默认值为 5 分钟)。
  • 同时调度各阶段:所有查询阶段都会同时调度。 计算资源必须有足够的可用任务槽位来同时涵盖所有阶段。 请参阅 计算资源规格
  • 流式处理混排:数据在生成阶段后立即传递,而不是等待上游阶段完成,然后再启动下游阶段。

检查点间隔(通过 pipelines.trigger.interval 配置)控制状态和源偏移量被持久化到持久存储的频率。 较长的间隔可减少检查点开销,但会增加故障后的恢复时间,并延迟指标报告。 较短的间隔可提高持久性,但会增加开销。

实时模式和连续管道

实时模式是一种专用类型的连续触发器。 仍需要连续模式 - 实时模式在顶部添加了流级延迟优化。 若要使用实时模式,管道必须首先以连续模式运行。 随后,实时模式会在流级别进一步进行优化,以实现标准连续处理无法达到的亚秒级延迟。

启用实时模式需要三个配置步骤:

  1. 将管道设置为连续模式。
  2. 在管道级别启用实时模式。
  3. 定义实时更新流。

Requirements

要求 价值
Databricks Runtime SDP 预览通道中的 18.1.3
计算类型 经典计算或无服务器

配置实时模式

步骤 1:将管道设置为连续模式

在管道设置中,将 管道模式 设置为 “连续”,或在管道 JSON 中设置它:

{
  "continuous": true
}

步骤 2:在管道级别启用实时模式

在管道设置中,在 高级 > Spark 配置下的 Spark 配置中添加以下键:

spark.databricks.streaming.realTimeMode.enabled = true

还可以在管道 JSON 中设置此值:

{
  "continuous": true,
  "spark_conf": {
    "spark.databricks.streaming.realTimeMode.enabled": "true"
  }
}

步骤 3:定义实时更新流

实时模式需要更新流。 使用 dp.create_sink() 定义输出目标,然后使用 @dp.update_flow 修饰器,并将 pipelines.trigger 设置为 "RealTime",同时将 target 指向接收器。

from pyspark import pipelines as dp

# Define the output sink
dp.create_sink(
    "my_kafka_sink",
    "kafka",
    {
        "kafka.bootstrap.servers": "<bootstrap-servers>",
        "topic": "<output-topic>",
    }
)

# Define the real-time update flow targeting the sink
@dp.update_flow(
    name="my_rtm_flow",
    target="my_kafka_sink",
    spark_conf={
        "pipelines.trigger": "RealTime",
        "pipelines.trigger.interval": "5 minutes",  # optional; defaults to 5 minutes
    }
)
def my_real_time_flow():
    return (
        spark.readStream
            .format("kafka")
            .option("kafka.bootstrap.servers", "<bootstrap-servers>")
            .option("subscribe", "<input-topic>")
            .load()
    )

流级配置参数:

参数 必需 Default 说明
pipelines.trigger 是的 设置为 "RealTime" 为此流启用实时模式。
pipelines.trigger.interval "5 minutes" 检查点间隔。 控制提交状态和偏移的频率。 较短的值可提高可恢复性;更长的值可降低开销。

代码示例

Kafka 到 Kafka

从 Kafka 主题读取数据并将其写入 Kafka 输出目标:

from pyspark import pipelines as dp

dp.create_sink("kafka_output_sink", "kafka", {
    "kafka.bootstrap.servers": broker_address,
    "topic": output_topic,
})

@dp.update_flow(
    name="kafka_rtm_flow",
    target="kafka_output_sink",
    spark_conf={
        "pipelines.trigger": "RealTime",
        "pipelines.trigger.interval": "5 minutes",
    }
)
def kafka_rtm_flow():
    return (
        spark.readStream
            .format("kafka")
            .option("kafka.bootstrap.servers", broker_address)
            .option("subscribe", input_topic)
            .option("startingOffsets", "latest")
            .load()
            .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "timestamp")
    )

使用广播联接进行扩充

将 Kafka 流与静态查找表联接。 仅支持广播(流-静态)连接。 实时模式不支持流与流之间的联接。

from pyspark import pipelines as dp
from pyspark.sql.functions import broadcast, expr

dp.create_sink("enriched_output_sink", "kafka", {
    "kafka.bootstrap.servers": broker_address,
    "topic": enriched_output_topic,
})

@dp.update_flow(
    name="enriched_events_flow",
    target="enriched_output_sink",
    spark_conf={
        "pipelines.trigger": "RealTime",
        "pipelines.trigger.interval": "5 minutes",
    }
)
def enriched_events():
    lookup = spark.read.table("catalog.schema.lookup_table")
    return (
        spark.readStream
            .format("kafka")
            .option("kafka.bootstrap.servers", broker_address)
            .option("subscribe", input_topic)
            .load()
            .withColumn("event_key", expr("CAST(value AS STRING)"))
            .join(broadcast(lookup), expr("event_key = lookup_key"))
            .select("event_key", "lookup_value", "timestamp")
    )

聚合

使用有状态的 groupBy 按键对事件计数。 将 spark.sql.shuffle.partitions 设置为与有状态操作的输入分区数相匹配:

from pyspark import pipelines as dp
from pyspark.sql.functions import col

dp.create_sink("event_counts_sink", "kafka", {
    "kafka.bootstrap.servers": broker_address,
    "topic": output_topic,
})

@dp.update_flow(
    name="event_counts_flow",
    target="event_counts_sink",
    spark_conf={
        "pipelines.trigger": "RealTime",
        "pipelines.trigger.interval": "5 minutes",
        "spark.sql.shuffle.partitions": "8",
    }
)
def event_counts():
    return (
        spark.readStream
            .format("kafka")
            .option("kafka.bootstrap.servers", broker_address)
            .option("subscribe", input_topic)
            .load()
            .selectExpr("CAST(key AS STRING) AS event_type", "timestamp")
            .groupBy(col("event_type"))
            .count()
    )

支持的源和汇器

连接器 作为源 作为接收器 笔记
Apache Kafka
AWS MSK 使用 Kafka 兼容的接口。
Azure 事件中心 (Kafka 连接器) 使用 Kafka 兼容的接口。
Amazon Kinesis 不支持 仅用于 EFO(增强型扇出)模式。
Delta 不支持 不支持

计算大小调整

如果计算有足够的任务槽,则可以为每个计算资源运行一个实时管道。 可用的任务槽必须涵盖所有查询阶段的所有任务。

管道类型 Configuration 所需的任务槽位
单阶段无状态 (Kafka 源 + 接收器) maxPartitions = 8 8
两阶段有状态 (Kafka 源 + 随机) maxPartitions = 8,随机分区 = 20 28 (8 + 20)
三阶段(Kafka 源 + 两次 Shuffle) maxPartitions = 8,两个随机阶段,每个阶段为 20 个 48 (8 + 20 + 20)

如果未设置 maxPartitions,请使用 Kafka 主题中的分区数。

操作员支持

Category Operator 支持
无状态 选择、投影
UDFs Scala 用户定义函数 (UDF) • (有限制)
UDFs Python 用户定义函数 (UDF) • (有限制)
聚合 sum、count、max、min、avg
Windowing 翻滚、滑动
Windowing Session 不支持
去重 dropDuplicates • (未绑定状态)
去重 dropDuplicatesWithinWatermark 不支持
联接 广播表连接
联接 流与流连接 不支持
自定义 transformWithState ✓(存在行为差异)
自定义 union • (有限制)
自定义 forEach 不支持
自定义 flatMapGroupsWithState 不支持
自定义 mapPartitions 不支持
自定义 forEachBatch 不支持

transformWithState 在实时模式下

transformWithState 在实时模式下受支持,但与微批处理存在以下差异:

  • handleInputRows 每行调用一次,而不是每批每个键调用一次。 inputRows迭代器根据调用生成单个值。
  • 不支持事件时间计时器。 如果一直没有数据到达,处理时间计时器会在长时间运行的批处理作业终止时触发。
  • 不支持 transformWithStateInPandas

实时模式下的 Pandas UDF

若要最大程度地减少 pandas UDF 的延迟,请设置为 spark.sql.execution.arrow.maxRecordsPerBatch1. 这以牺牲吞吐量为代价优化延迟。 如果吞吐量也很重要,请将此值设置为 100 或更高。

监视实时模式性能

实时模式会在StreamingQueryProgress中显示延迟指标,位于latencies字段下。 通过 StreamingQueryListener 或检查流式查询上的 lastProgress 属性来访问这些指标。

Metric 说明
processingLatencyMs 记录被流读取时到被流完全处理时之间的时间
sourceQueuingLatencyMs 记录成功写入消息总线(例如,Kafka 中的日志追加时间)与流首次读取记录之间的时间
e2eLatencyMs 从源生成记录到流完全处理记录时的总端到端延迟

每个指标都报告为 p50、p90、p95 和 p99 百分位。

局限性

建议每个管道使用一个实时流。 允许多个流,但跨流的任务槽争用会增加延迟。

有关操作员和源限制的完整列表,请参阅 实时模式限制

其他资源