重要
此功能目前以公共预览版提供。
本页介绍实时模式,这是一种用于结构化流的触发器类型,能够实现超低延迟的数据处理,端到端延迟可低至 5 毫秒。 此模式专为需要立即响应流数据的作工作负荷而设计。
Databricks Runtime 16.4 LTS 及更高版本中提供了实时模式。
操作负荷
流式处理工作负载通常可划分为分析工作负载和操作工作负载。
- 分析工作负荷使用数据引入和转换,通常遵循奖牌体系结构(例如,将数据引入铜表、银表和黄金表)。
- 操作工作负载使用实时数据、应用业务逻辑并触发下游行动和决策。
一些操作负荷的示例包括:
- 如果欺诈分数超过阈值,则实时阻止或标记信用卡交易,具体取决于异常位置、大型交易大小或快速支出模式等因素。
- 点击流数据显示用户已经浏览牛仔裤五分钟时传递促销消息,如果在接下来的 15 分钟内购买,则提供 25% 折扣。
一般情况下,操作工作负载的特点是需要亚秒级的端到端延迟。 这可以通过 Apache Spark 结构化流式处理中的实时模式来实现。
实时模式如何实现低延迟
实时模式通过以下方式改进执行体系结构:
- 执行长时间运行的批处理(默认值为 5 分钟),在源中可用时处理数据。
- 查询的所有阶段同时安排。 这要求可用任务槽数等于或大于批处理中所有阶段的任务数。
- 一旦使用流式随机处理生成数据,数据就会在阶段之间传递。
在每批处理结束时、下一批开始之前,结构化流式处理进行检查点以记录进度,并提供最后一批的指标。 如果批处理时间更长,则这些活动频率可能较低,导致在指标可用性出现失败和延迟时重播时间较长。 另一方面,如果批较小,这些活动会变得更加频繁,这可能会影响延迟。 Databricks 建议根据目标工作负荷和要求对实时模式进行基准测试,以查找适当的触发器间隔。
群集配置
若要在结构化流中使用实时模式,您必须配置经典的 Lakeflow 作业:
在 Azure Databricks 工作区中,单击左上角的“ 新建 ”。 选择 “更多 ”并单击“ 群集”。
清除 光子加速。
清除 “启用自动缩放”。
在 “高级性能”下,清除 “使用现成实例”。
在“高级”和“访问”模式下,单击“手动”并选择“专用”(以前为:单用户)。
在 Spark 下,在 Spark 配置下输入以下内容:
spark.databricks.streaming.realTimeMode.enabled true
单击 “创建” 。
群集大小要求
如果群集有足够的任务槽,则可以为每个群集运行一个实时作业。
若要在低延迟模式下运行,可用任务槽总数必须大于或等于所有查询阶段的任务数。
槽计算示例
单阶段无状态管道(Kafka 源 + 汇):
如果 maxPartitions = 8,则至少需要 8 个槽。 如果未设置 maxPartitions,请使用 Kafka 主题分区数。
两阶段有状态管道(Kafka 源 + 随机):
如果 maxPartitions = 8,随机分区 = 20,则需要 8 + 20 = 28 个槽。
三阶段管道(Kafka 源 + 随机 + 重新分区):
maxPartitions = 8 和两个洗牌阶段(每个阶段为 20 个)需要 8 + 20 + 20 = 48 个槽。
关键注意事项
配置群集时,请考虑到以下事项:
- 与微批处理模式不同,实时任务在等待数据时可以保持空闲状态,因此正确调整大小对于避免浪费的资源至关重要。
- 通过优化实现目标利用率级别(例如 50%):
-
maxPartitions
(对于卡夫卡) -
spark.sql.shuffle.partitions
(对于混排阶段)
-
- Databricks 建议设置 maxPartitions,以便每个任务处理多个 Kafka 分区以减少开销。
- 调整每个工作节点的任务位置,以匹配简单单阶段作业的工作负荷。
- 对于洗牌操作繁重的作业,请试验查找最小的洗牌分区数,以避免积压,并在此基础上进行调整。 如果群集没有足够的槽,则不会调度作业。
注释
在 Databricks Runtime 16.4 LTS 及更高版本中,所有实时流程都使用检查点 v2,从而实现实时模式和微批处理模式之间的无缝切换。
查询配置
必须启用实时触发器,以指定查询应使用低延迟模式运行。 此外,实时触发器仅在更新模式下受支持。 例如:
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
val readStream = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("subscribe", inputTopic).load()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", checkpointLocation)
.outputMode("update")
.trigger(RealTimeTrigger.apply())
.start()
RealTimeTrigger 还可以接受指定检查点间隔的参数。 例如,此代码指示检查点间隔为 5 分钟:
.trigger(RealTimeTrigger.apply("5 minutes"))
可观察性
以前,端到端查询延迟与批处理持续时间密切相关,使批处理持续时间成为查询延迟的良好指标。 但是,此方法不再以实时模式应用,需要用于测量延迟的替代方法。 端到端延迟特定于工作负荷,有时只能使用业务逻辑准确测量。 例如,如果在 Kafka 中输出源时间戳,则可以将延迟计算为 Kafka 的输出时间戳与源时间戳之间的差异。
可以根据流式处理过程中收集的部分信息,以多种方式估算端到端延迟。
使用 StreamingQueryProgress
事件中 StreamingQueryProgress
包括以下指标,该指标会自动记录在驱动程序日志中。 还可以通过StreamingQueryListener
的onQueryProgress()
回调函数来访问它们。
QueryProgressEvent.json()
或 toString()
包括额外的实时模式指标。
处理延迟(processingLatencyMs)。 实时模式查询读取记录后到写入下一阶段或下游之前所经过的时间。 对于单阶段查询,这将测量与 E2E 延迟相同的持续时间。 此指标按任务报告。
源队列延迟(sourceQueuingLatencyMs) 成功将记录写入消息总线(例如 Kafka 中的日志追加时间)之后,到记录首次通过实时模式查询读取之前所经过的时间量。 此指标按任务报告。
E2E 延迟(e2eLatencyMs)。 记录成功写入消息总线与通过实时模式查询将记录写入下游之间的时间间隔。 此指标按每批聚合,涵盖所有任务处理的所有记录。
例如:
"rtmMetrics" : {
"processingLatencyMs" : {
"P0" : 0,
"P50" : 0,
"P90" : 0,
"P95" : 0,
"P99" : 0
},
"sourceQueuingLatencyMs" : {
"P0" : 0,
"P50" : 1,
"P90" : 1,
"P95" : 2,
"P99" : 3
},
"e2eLatencyMs" : {
"P0" : 0,
"P50" : 1,
"P90" : 1,
"P95" : 2,
"P99" : 4
},
在作业中使用观察 API
观察 API 有助于在不启动另一个作业的情况下测量延迟。 如果有一个源时间戳,该时间戳近似于源数据到达时间,并在到达接收器之前传递,或者如果可以找到传递时间戳的方法,则可以使用观察 API 估算每个批处理的延迟:
val currentTimestampUDF = udf(() => System.currentTimeMillis())
// Query before outputting
.withColumn("temp-timestamp", currentTimestampUDF())
.withColumn(
"latency",
col("temp-timestamp").cast("long") - unix_millis(col("timestamp")).cast("long"))
.observe(
name = "observedLatency",
avg(col("latency")).as("avg"),
max(col("latency")).as("max"),
percentile_approx(col("latency"), lit(0.99), lit(150)).as("p99"),
percentile_approx(col("latency"), lit(0.5), lit(150)).as("p50"))
.drop(col("latency"))
.drop(col("temp-timestamp"))
// Output part of the query. For example, .WriteStream, etc.
在此示例中,在输出条目之前记录当前时间戳,通过计算此时间戳与记录的源时间戳之间的差异来估计延迟。 结果包含在进度报告中,并提供给听众。 下面是示例输出:
"observedMetrics" : {
"observedLatency" : {
"avg" : 63.8369765176552,
"max" : 219,
"p99" : 154,
"p50" : 49
}
}
支持哪些内容?
环境
群集类型 | 已支持 |
---|---|
专用(以前:单个用户) | 是的 |
标准(前:共享) | 否 |
Lakeflow 声明式管道经典版 | 否 |
Lakeflow 声明性管道无服务器 | 否 |
无服务器 | 否 |
语言
语言 | 已支持 |
---|---|
Scala(编程语言) | 是的 |
Java | 是的 |
Python | 是的 |
执行模式
执行模式 | 已支持 |
---|---|
更新模式 | 是的 |
追加模式 | 否 |
完整模式 | 否 |
来源
来源 | 已支持 |
---|---|
Apache Kafka | 是的 |
AWS MSK | 是的 |
Eventhub (使用 Kafka 连接器) | 是的 |
动动力 | 是(仅 EFO 模式) |
Apache Pulsar | 否 |
水槽
水槽 | 已支持 |
---|---|
Apache Kafka | 是的 |
Eventhub (使用 Kafka 连接器) | 是的 |
动动力 | 否 |
Apache Pulsar | 否 |
任意接收器(使用 forEachWriter) | 是的 |
运营商
运营商 | 已支持 |
---|---|
无状态操作 | |
|
是的 |
|
是的 |
UDF | |
|
是的 |
|
是(有一些限制) |
集合体 | |
|
是的 |
|
是的 |
|
是的 |
|
是的 |
|
是的 |
聚合函数 | 是的 |
窗口 | |
|
是的 |
|
是的 |
|
否 |
去重 | |
|
是(状态无界) |
|
否 |
流 - 表联接 | |
|
是的 |
流 - 流合并 | 否 |
(平)MapGroupsWithState | 否 |
transformWithState | 是(有一些差异) |
并 | 是(有一些限制) |
forEach | 是的 |
forEachBatch | 否 |
mapPartitions (映射分区) | 是的 |
在实时模式下使用 transformWithState
对于构建有状态自定义应用程序,Databricks 支持 Apache Spark 结构化流式处理中的 transformWithState
API。 有关 API 和代码片段的详细信息,请参阅 生成自定义有状态应用程序 。
但是,API 在实时模式下的行为方式和利用微批处理体系结构的传统流式处理查询之间存在一些差异。
- 对每行调用实时模式下
handleInputRows(key: String, inputRows: Iterator[T], timerValues: TimerValues)
的方法。-
inputRows
迭代器返回单个值。 在微批处理模式下,它为每个键调用一次,inputRows
迭代器返回微批中某个键的所有值。 - 编写代码时,必须了解这种差异。
-
- 实时模式下不支持事件时间计时器。
- 在实时模式下,计时器的触发会根据数据的到达情况而延迟。 否则,如果没有数据,则会在长时间运行的批处理结束时触发它。 例如,如果计时器应在 10:00:00 触发,并且同时没有数据到达,则不会触发它。 相反,如果数据到达 10:00:10,则会触发计时器,延迟为 10 秒。 或者,如果没有数据到达并且长时间运行的批处理正在终止,则会在终止长时间运行的批处理之前运行计时器。
Python UDF
Databricks 在实时模式下支持大多数 Python 用户定义的函数(UDF):
UDF 类型 | 已支持 |
---|---|
无状态 UDF | |
|
是的 |
|
是的 |
|
是的 |
|
是的 |
|
是的 |
有状态分组 UDF (UDAF) | |
|
是的 |
|
否 |
非有状态分组 UDF (UDAF) | |
|
否 |
|
否 |
|
否 |
表函数 | |
|
否 |
UC UDF | 否 |
在实时模式下使用 Python UDF 时,需要考虑以下几点:
- 若要最大程度地减少延迟,请将箭头批大小(spark.sql.execution.arrow.maxRecordsPerBatch)配置为 1。
- 权衡:此配置以牺牲吞吐量为代价优化延迟。 对于大多数工作负荷,建议使用此设置。
- 仅当需要更高的吞吐量来容纳输入卷时,才增加批大小,从而接受延迟的潜在增加。
- Pandas UDF 和函数的性能不佳,箭头批大小为 1。
- 如果使用 pandas UDF 或函数,请将箭头批大小设置为更高的值(例如 100 或更高)。
- 请注意,这意味着延迟较高。 Databricks 建议尽可能使用箭头 UDF 或函数。
- 由于 pandas 的性能问题,因此仅接口支持
Row
transformWithState。
局限性
源限制
对于 Kinesis,不支持轮询模式。 此外,频繁的重新分区可能会对延迟产生负面影响。
联合体限制
对于 Union,存在一些限制:
- 不支持自联合:
- Kafka:不能使用相同的源数据帧对象来合并从中派生的数据帧。 解决方法:使用从同一源读取的不同数据帧。
- Kinesis:不能将从同一 Kinesis 源派生且配置相同的数据帧进行合并。 解决方法:除了使用不同的数据帧,还可以为每个数据帧分配不同的“consumerName”选项。
- 不支持在联合之前定义的有状态运算符(例如,
aggregate
、deduplicate
、transformWithState
)。 - 不支持与批量源合并。
例子
以下示例显示了支持的查询。
无状态查询
支持任何单阶段或多阶段无状态查询。
Kafka 源到 Kafka 接收器
在此示例中,从 Kafka 源读取并写入 Kafka 接收端。
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("subscribe", inputTopic)
.load()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", Files.createTempDirectory("some-prefix").toFile.getName)
.trigger(RealTimeTrigger.apply())
.outputMode(OutputMode.Update())
.start()
重新分区
在此示例中,从 Kafka 源读取数据,将数据重新分区到 20 个分区,并写入 Kafka 接收器。
由于当前的实现限制,请在使用重新分区之前将 Spark 配置 spark.sql.execution.sortBeforeRepartition
设置为 false
。
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
// Sorting is not supported in repartition with real-time mode, so this has to be set to false to achieve low latency.
spark.conf.set("spark.sql.execution.sortBeforeRepartition", "false")
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("subscribe", inputTopic)
.load()
.repartition(20)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", Files.createTempDirectory("some-prefix").toFile.getName)
.trigger(RealTimeTrigger.apply())
.outputMode(OutputMode.Update())
.start()
流快照连接(仅限广播)
在此示例中,从 Kafka 读取数据,将数据与静态表联接,并写入 Kafka 接收器。 请注意,仅支持广播静态表的流静态联接,这意味着静态表应适合内存中。
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("subscribe", inputTopic)
.load()
.join(broadcast(spark.read.format("parquet").load(staticTableLocation)), expr("joinKey = lookupKey"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", Files.createTempDirectory("some-prefix").toFile.getName)
.trigger(RealTimeTrigger.apply())
.outputMode(OutputMode.Update())
.start()
Kafka 接收器的 Kinesis 源
在此示例中,从 Kinesis 数据源读取并写入 Kafka 数据接收端。
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
spark.readStream
.format("kinesis")
.option(REGION_KEY, regionName)
.option(AWS_ACCESS_ID_KEY, awsAccessKeyId)
.option(AWS_SECRET_KEY, awsSecretAccessKey)
.option(CONSUMER_MODE_KEY, CONSUMER_MODE_EFO)
.option(CONSUMER_NAME_KEY, kinesisSourceStream.consumerName)
.load()
.select(
col("partitionKey").alias("key"),
col("data").cast("string").alias("value")
)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", Files.createTempDirectory("some-prefix").toFile.getName)
.trigger(RealTimeTrigger.apply())
.outputMode(OutputMode.Update())
.start()
联盟
在此示例中,你将从两个不同的主题中联合两个 Kafka 数据帧并写入 Kafka 接收器。
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
val df1 = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("subscribe", inputTopic1)
.load()
val df2 = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("subscribe", inputTopic2)
.load()
df1.union(df2)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", Files.createTempDirectory("some-prefix").toFile.getName)
.trigger(RealTimeTrigger.apply())
.outputMode(OutputMode.Update())
.start()
有状态查询
去重
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
spark.conf.set(SQLConf.SHUFFLE_PARTITIONS.key, 40)
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("subscribe", inputTopic)
.load()
.dropDuplicates("timestamp", "value")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", Files.createTempDirectory("some-prefix").toFile.getName)
.trigger(RealTimeTrigger.apply())
.outputMode(OutputMode.Update())
.start()
集合体
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
spark.conf.set(SQLConf.SHUFFLE_PARTITIONS.key, 20)
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("subscribe", inputTopic)
.load()
.groupBy(col("timestamp"), col("value"))
.count()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", Files.createTempDirectory("some-prefix").toFile.getName)
.trigger(RealTimeTrigger.apply("5 minutes"))
.outputMode(OutputMode.Update())
.start()
与聚合的联合
在此示例中,首先将两个不同主题中的两个 Kafka DataFrame 合并,然后执行聚合。 最后,将数据写入 Kafka sink。
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
val df1 = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("subscribe", inputTopic1)
.load()
val df2 = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("subscribe", inputTopic2)
.load()
df1.union(df2)
.groupBy(col("timestamp"), col("value"))
.count()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", Files.createTempDirectory("some-prefix").toFile.getName)
.trigger(RealTimeTrigger.apply())
.outputMode(OutputMode.Update())
.start()
TransformWithState
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
/**
* This processor counts the number of records it has seen for each key using state variables
* with TTLs. It redundantly maintains this count with a value, list, and map state to put load
* on the state variable cleanup mechanism. (In practice, only one value state is needed to maintain
* the count for a given grouping key.)
*
* The input schema it expects is (String, Long) which represents a (key, source-timestamp) tuple.
* The source-timestamp is passed through so that we can calculate end-to-end latency. The output
* schema is (String, Long, Long), which represents a (key, count, source-timestamp) 3-tuple.
*
*/
class RTMStatefulProcessor(ttlConfig: TTLConfig)
extends StatefulProcessor[String, (String, Long), (String, Long, Long)] {
@transient private var _value: ValueState[Long] = _
@transient private var _map: MapState[Long, String] = _
@transient private var _list: ListState[String] = _
override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
// Counts the number of records this key has seen
_value = getHandle.getValueState("value", Encoders.scalaLong, ttlConfig)
_map = getHandle.getMapState("map", Encoders.scalaLong, Encoders.STRING, ttlConfig)
_list = getHandle.getListState("list", Encoders.STRING, ttlConfig)
}
override def handleInputRows(
key: String,
inputRows: Iterator[(String, Long)],
timerValues: TimerValues): Iterator[(String, Long, Long)] = {
inputRows.map { row =>
val key = row._1
val sourceTimestamp = row._2
val oldValue = _value.get()
_value.update(oldValue + 1)
_map.updateValue(oldValue, key)
_list.appendValue(key)
(key, oldValue + 1, sourceTimestamp)
}
}
}
spark.conf.set(SQLConf.SHUFFLE_PARTITIONS.key, 20)
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("subscribe", inputTopic)
.load()
.select(col("key").cast("STRING"), col("value").cast("STRING"), col("timestamp"))
.as[(String, String, Timestamp)]
.groupByKey(row => row._1)
.transformWithState(new RTMStatefulProcessor(TTLConfig(Duration.ofSeconds(30))), TimeMode.ProcessingTime, OutputMode.Update)
.as[(String, Long, Long)]
.select(
col("_1").as("key"),
col("_2").as("value")
)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", Files.createTempDirectory("some-prefix").toFile.getName)
.trigger(RealTimeTrigger.apply("5 minutes"))
.outputMode(OutputMode.Update())
.start()
注释
在结构化流式处理中,实时模式与其他执行模式在执行StatefulProcessor
transformWithState
时存在差异。 请参阅 在实时模式下使用 transformWithState
TransformWithState (PySpark,行接口)
from typing import Iterator, Tuple
from pyspark.sql import Row
from pyspark.sql.streaming import StatefulProcessor, StatefulProcessorHandle
from pyspark.sql.types import LongType, StringType, TimestampType, StructField, StructType
class RTMStatefulProcessor(StatefulProcessor):
"""
This processor counts the number of records it has seen for each key using state variables
with TTLs. It redundantly maintains this count with a value, list, and map state to put load
on the state variable cleanup mechanism. (In practice, only one value state is needed to maintain
the count for a given grouping key.)
The input schema it expects is (String, Long) which represents a (key, source-timestamp) tuple.
The source-timestamp is passed through so that we can calculate end-to-end latency. The output
schema is (String, Long, Long), which represents a (key, count, source-timestamp) 3-tuple.
"""
def init(self, handle: StatefulProcessorHandle) -> None:
state_schema = StructType([StructField("value", LongType(), True)])
self.value_state = handle.getValueState("value", state_schema, 30000)
map_key_schema = StructType([StructField("key", LongType(), True)])
map_value_schema = StructType([StructField("value", StringType(), True)])
self.map_state = handle.getMapState("map", map_key_schema, map_value_schema, 30000)
list_schema = StructType([StructField("value", StringType(), True)])
self.list_state = handle.getListState("list", list_schema, 30000)
def handleInputRows(self, key, rows, timerValues) -> Iterator[Row]:
for row in rows:
# row is a tuple (key, source_timestamp)
key_str = row[0]
source_timestamp = row[1]
old_value = value.get()
if old_value is None:
old_value = 0
self.value_state.update((old_value + 1,))
self.map_state.update((old_value,), (key_str,))
self.list_state.appendValue((key_str,))
yield Row(key=key_str, value=old_value + 1, timestamp=source_timestamp)
def close(self) -> None:
pass
spark.conf.set("spark.sql.shuffle.partitions", "20")
output_schema = StructType(
[
StructField("key", StringType(), True),
StructField("value", LongType(), True),
StructField("timestamp", TimestampType(), True),
]
)
query = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("subscribe", inputTopic)
.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "timestamp")
.groupBy("key")
.transformWithState(
statefulProcessor=RTMStatefulProcessor(),
outputStructType=output_schema,
outputMode="Update",
timeMode="processingTime",
)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", "/your/checkpoint/location")
.trigger(realTime="5 minutes")
.outputMode("Update")
.start()
)
注释
结构化流式处理中的实时模式与其他执行模式的运行 StatefulProcessor
方式 transformWithState
之间存在差异。 请参阅 在实时模式下使用 transformWithState
水槽
使用 foreachSink 写入 Postgres
import java.sql.{Connection, DriverManager, PreparedStatement}
import org.apache.spark.sql.{ForeachWriter, Row}
/**
* Groups connection properties for
* the JDBC writers.
*
* @param url JDBC url of the form jdbc:subprotocol:subname to connect to
* @param dbtable database table that should be written into
* @param username username for authentication
* @param password password for authentication
*/
class JdbcWriterConfig(
val url: String,
val dbtable: String,
val username: String,
val password: String,
) extends Serializable
/**
* Handles streaming data writes to a database sink via JDBC, by:
* - connecting to the database
* - buffering incoming data rows in batches to reduce write overhead
*
* @param config connection parameters and configuration knobs for the writer
*/
class JdbcStreamingDataWriter(config: JdbcWriterConfig)
extends ForeachWriter[Row] with Serializable {
// The writer currently only supports this hard-coded schema
private val UPSERT_STATEMENT_SQL =
s"""MERGE INTO "${config.dbtable}"
|USING (
| SELECT
| CAST(? AS INTEGER) AS "id",
| CAST(? AS CHARACTER VARYING) AS "data"
|) AS "source"
|ON "test"."id" = "source"."id"
|WHEN MATCHED THEN
| UPDATE SET "data" = "source"."data"
|WHEN NOT MATCHED THEN
| INSERT ("id", "data") VALUES ("source"."id", "source"."data")
|""".stripMargin
private val MAX_BUFFER_SIZE = 3
private val buffer = new Array[Row](MAX_BUFFER_SIZE)
private var bufferSize = 0
private var connection: Connection = _
/**
* Flushes the [[buffer]] by writing all rows in the buffer to the database.
*/
private def flushBuffer(): Unit = {
require(connection != null)
if (bufferSize == 0) {
return
}
var upsertStatement: PreparedStatement = null
try {
upsertStatement = connection.prepareStatement(UPSERT_STATEMENT_SQL)
for (i <- 0 until bufferSize) {
val row = buffer(i)
upsertStatement.setInt(1, row.getAs[String]("key"))
upsertStatement.setString(2, row.getAs[String]("value"))
upsertStatement.addBatch()
}
upsertStatement.executeBatch()
connection.commit()
bufferSize = 0
} catch { case e: Exception =>
if (connection != null) {
connection.rollback()
}
throw e
} finally {
if (upsertStatement != null) {
upsertStatement.close()
}
}
}
override def open(partitionId: Long, epochId: Long): Boolean = {
connection = DriverManager.getConnection(config.url, config.username, config.password)
true
}
override def process(row: Row): Unit = {
buffer(bufferSize) = row
bufferSize += 1
if (bufferSize >= MAX_BUFFER_SIZE) {
flushBuffer()
}
}
override def close(errorOrNull: Throwable): Unit = {
flushBuffer()
if (connection != null) {
connection.close()
connection = null
}
}
}
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("subscribe", inputTopic)
.load()
.writeStream
.outputMode(OutputMode.Update())
.trigger(defaultTrigger)
.foreach(new JdbcStreamingDataWriter(new JdbcWriterConfig(jdbcUrl, tableName, jdbcUsername, jdbcPassword)))
.start()
显示
重要
此功能在 Databricks Runtime 17.1 及更高版本中提供。
显示速率源
在此示例中,您从速率源读取数据,并在笔记本中显示流式数据帧。
Scala(编程语言)
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.streaming.OutputMode
val inputDF = spark
.readStream
.format("rate")
.option("numPartitions", 2)
.option("rowsPerSecond", 1)
.load()
display(inputDF, trigger=Trigger.RealTime("30 seconds"), outputMode=OutputMode.Update())
Python
inputDF = spark \
.readStream \
.format("rate") \
.option("numPartitions", 2) \
.option("rowsPerSecond", 1) \
.load()
display(inputDF, realTime="30 seconds", outputMode="update")