Important
Lakeflow Spark 声明性管道中的实时模式在预览频道上的 Databricks Runtime 18.1.3 上 处于公共预览 状态。
实时模式支持超低延迟数据处理,端到端延迟低至 5 毫秒。 对需要立即响应流数据(例如欺诈检测和实时个性化)的操作工作负荷使用实时模式。
实时模式也可以在管道外部的结构化流中直接使用。 请参阅 结构化流式处理中的实时模式。
实时模式如何实现低延迟
实时模式在三个关键方面不同于标准连续处理:
- 长时间运行的批处理:系统在长时间运行的批处理中在源中可用时处理数据(默认值为 5 分钟)。
- 同时调度各阶段:所有查询阶段都会同时调度。 计算资源必须有足够的可用任务槽位来同时涵盖所有阶段。 请参阅 计算资源规格。
- 流式处理混排:数据在生成阶段后立即传递,而不是等待上游阶段完成,然后再启动下游阶段。
检查点间隔(通过 pipelines.trigger.interval 配置)控制状态和源偏移量被持久化到持久存储的频率。 较长的间隔可减少检查点开销,但会增加故障后的恢复时间,并延迟指标报告。 较短的间隔可提高持久性,但会增加开销。
实时模式和连续管道
实时模式是一种专用类型的连续触发器。 仍需要连续模式 - 实时模式在顶部添加了流级延迟优化。 若要使用实时模式,管道必须首先以连续模式运行。 随后,实时模式会在流级别进一步进行优化,以实现标准连续处理无法达到的亚秒级延迟。
启用实时模式需要三个配置步骤:
- 将管道设置为连续模式。
- 在管道级别启用实时模式。
- 定义实时更新流。
Requirements
| 要求 | 价值 |
|---|---|
| Databricks Runtime | SDP 预览通道中的 18.1.3 |
| 计算类型 | 经典计算或无服务器 |
配置实时模式
步骤 1:将管道设置为连续模式
在管道设置中,将 管道模式 设置为 “连续”,或在管道 JSON 中设置它:
{
"continuous": true
}
步骤 2:在管道级别启用实时模式
在管道设置中,在 高级 > Spark 配置下的 Spark 配置中添加以下键:
spark.databricks.streaming.realTimeMode.enabled = true
还可以在管道 JSON 中设置此值:
{
"continuous": true,
"spark_conf": {
"spark.databricks.streaming.realTimeMode.enabled": "true"
}
}
步骤 3:定义实时更新流
实时模式需要更新流。 使用 dp.create_sink() 定义输出目标,然后使用 @dp.update_flow 修饰器,并将 pipelines.trigger 设置为 "RealTime",同时将 target 指向接收器。
from pyspark import pipelines as dp
# Define the output sink
dp.create_sink(
"my_kafka_sink",
"kafka",
{
"kafka.bootstrap.servers": "<bootstrap-servers>",
"topic": "<output-topic>",
}
)
# Define the real-time update flow targeting the sink
@dp.update_flow(
name="my_rtm_flow",
target="my_kafka_sink",
spark_conf={
"pipelines.trigger": "RealTime",
"pipelines.trigger.interval": "5 minutes", # optional; defaults to 5 minutes
}
)
def my_real_time_flow():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<bootstrap-servers>")
.option("subscribe", "<input-topic>")
.load()
)
流级配置参数:
| 参数 | 必需 | Default | 说明 |
|---|---|---|---|
pipelines.trigger |
是的 | — | 设置为 "RealTime" 为此流启用实时模式。 |
pipelines.trigger.interval |
否 | "5 minutes" |
检查点间隔。 控制提交状态和偏移的频率。 较短的值可提高可恢复性;更长的值可降低开销。 |
代码示例
Kafka 到 Kafka
从 Kafka 主题读取数据并将其写入 Kafka 输出目标:
from pyspark import pipelines as dp
dp.create_sink("kafka_output_sink", "kafka", {
"kafka.bootstrap.servers": broker_address,
"topic": output_topic,
})
@dp.update_flow(
name="kafka_rtm_flow",
target="kafka_output_sink",
spark_conf={
"pipelines.trigger": "RealTime",
"pipelines.trigger.interval": "5 minutes",
}
)
def kafka_rtm_flow():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.option("startingOffsets", "latest")
.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "timestamp")
)
使用广播联接进行扩充
将 Kafka 流与静态查找表联接。 仅支持广播(流-静态)连接。 实时模式不支持流与流之间的联接。
from pyspark import pipelines as dp
from pyspark.sql.functions import broadcast, expr
dp.create_sink("enriched_output_sink", "kafka", {
"kafka.bootstrap.servers": broker_address,
"topic": enriched_output_topic,
})
@dp.update_flow(
name="enriched_events_flow",
target="enriched_output_sink",
spark_conf={
"pipelines.trigger": "RealTime",
"pipelines.trigger.interval": "5 minutes",
}
)
def enriched_events():
lookup = spark.read.table("catalog.schema.lookup_table")
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
.withColumn("event_key", expr("CAST(value AS STRING)"))
.join(broadcast(lookup), expr("event_key = lookup_key"))
.select("event_key", "lookup_value", "timestamp")
)
聚合
使用有状态的 groupBy 按键对事件计数。 将 spark.sql.shuffle.partitions 设置为与有状态操作的输入分区数相匹配:
from pyspark import pipelines as dp
from pyspark.sql.functions import col
dp.create_sink("event_counts_sink", "kafka", {
"kafka.bootstrap.servers": broker_address,
"topic": output_topic,
})
@dp.update_flow(
name="event_counts_flow",
target="event_counts_sink",
spark_conf={
"pipelines.trigger": "RealTime",
"pipelines.trigger.interval": "5 minutes",
"spark.sql.shuffle.partitions": "8",
}
)
def event_counts():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
.selectExpr("CAST(key AS STRING) AS event_type", "timestamp")
.groupBy(col("event_type"))
.count()
)
支持的源和汇器
| 连接器 | 作为源 | 作为接收器 | 笔记 |
|---|---|---|---|
| Apache Kafka | ✓ | ✓ | — |
| AWS MSK | ✓ | ✓ | 使用 Kafka 兼容的接口。 |
| Azure 事件中心 (Kafka 连接器) | ✓ | ✓ | 使用 Kafka 兼容的接口。 |
| Amazon Kinesis | ✓ | 不支持 | 仅用于 EFO(增强型扇出)模式。 |
| Delta | 不支持 | 不支持 | — |
计算大小调整
如果计算有足够的任务槽,则可以为每个计算资源运行一个实时管道。 可用的任务槽必须涵盖所有查询阶段的所有任务。
| 管道类型 | Configuration | 所需的任务槽位 |
|---|---|---|
| 单阶段无状态 (Kafka 源 + 接收器) |
maxPartitions = 8 |
8 |
| 两阶段有状态 (Kafka 源 + 随机) |
maxPartitions = 8,随机分区 = 20 |
28 (8 + 20) |
| 三阶段(Kafka 源 + 两次 Shuffle) |
maxPartitions = 8,两个随机阶段,每个阶段为 20 个 |
48 (8 + 20 + 20) |
如果未设置 maxPartitions,请使用 Kafka 主题中的分区数。
操作员支持
| Category | Operator | 支持 |
|---|---|---|
| 无状态 | 选择、投影 | ✓ |
| UDFs | Scala 用户定义函数 (UDF) | • (有限制) |
| UDFs | Python 用户定义函数 (UDF) | • (有限制) |
| 聚合 | sum、count、max、min、avg | ✓ |
| Windowing | 翻滚、滑动 | ✓ |
| Windowing | Session | 不支持 |
| 去重 | dropDuplicates |
• (未绑定状态) |
| 去重 | dropDuplicatesWithinWatermark |
不支持 |
| 联接 | 广播表连接 | ✓ |
| 联接 | 流与流连接 | 不支持 |
| 自定义 | transformWithState |
✓(存在行为差异) |
| 自定义 | union |
• (有限制) |
| 自定义 | forEach |
不支持 |
| 自定义 | flatMapGroupsWithState |
不支持 |
| 自定义 | mapPartitions |
不支持 |
| 自定义 | forEachBatch |
不支持 |
transformWithState 在实时模式下
transformWithState 在实时模式下受支持,但与微批处理存在以下差异:
-
handleInputRows每行调用一次,而不是每批每个键调用一次。inputRows迭代器根据调用生成单个值。 - 不支持事件时间计时器。 如果一直没有数据到达,处理时间计时器会在长时间运行的批处理作业终止时触发。
- 不支持
transformWithStateInPandas。
实时模式下的 Pandas UDF
若要最大程度地减少 pandas UDF 的延迟,请设置为 spark.sql.execution.arrow.maxRecordsPerBatch1. 这以牺牲吞吐量为代价优化延迟。 如果吞吐量也很重要,请将此值设置为 100 或更高。
监视实时模式性能
实时模式会在StreamingQueryProgress中显示延迟指标,位于latencies字段下。 通过 StreamingQueryListener 或检查流式查询上的 lastProgress 属性来访问这些指标。
| Metric | 说明 |
|---|---|
processingLatencyMs |
记录被流读取时到被流完全处理时之间的时间 |
sourceQueuingLatencyMs |
记录成功写入消息总线(例如,Kafka 中的日志追加时间)与流首次读取记录之间的时间 |
e2eLatencyMs |
从源生成记录到流完全处理记录时的总端到端延迟 |
每个指标都报告为 p50、p90、p95 和 p99 百分位。
局限性
建议每个管道使用一个实时流。 允许多个流,但跨流的任务槽争用会增加延迟。
有关操作员和源限制的完整列表,请参阅 实时模式限制。