本页介绍水印概念,并建议在常见的有状态流式处理操作中使用水印。
流式查询会随时间累积状态数据。 水印会自动删除旧状态数据,以防止内存错误和增加处理延迟。
什么是水印?
在处理期间,结构化流式处理会跨多个微批次保留状态。 流式查询使用状态来增量更新结果,而不是在每次微批处理后重新计算全部结果。 水印控制查询停止处理状态实体时的阈值。
状态实体的常见示例包括:
- 基于时间窗口的聚合。
- 两个流之间的联接中的唯一键。
若要在流式处理数据帧上声明水印,请指定时间戳字段和延迟阈值。 当新数据到达时,状态管理器跟踪指定字段中的最新时间戳,并且仅处理延迟阈值内的记录。
查询始终会处理在阈值内到达的记录。 查询仍可能会处理超出阈值的记录,但不能保证这一点。
以下示例将 10 分钟水印阈值应用于窗口计数:
Python
from pyspark.sql.functions import window
(df
.withWatermark("event_time", "10 minutes")
.groupBy(
window("event_time", "5 minutes"),
"id")
.count()
)
Scala
import org.apache.spark.sql.functions.window
df
.withWatermark("event_time", "10 minutes")
.groupBy(
window($"event_time", "5 minutes"),
$"id")
.count()
在本示例中:
-
event_time列用于定义 10 分钟水印和 5 分钟翻滚窗口。 - 对于每个互不重叠的 5 分钟时间窗口内观察到的每个
id,都会收集一个计数。 - 对于每个计数,状态信息会一直保留,直到窗口结束时间比最新观测到的
event_time早 10 分钟。
重要
在groupBy()和window()操作中,按名称引用列,使用"<colName>"或col("<colName>"),以确保保留事件时间标记。 在 Scala 中,也可以使用 $colName。
水印如何影响处理时间和吞吐量?
输出模式控制带有水印的查询何时将数据写入接收器。 水印对于有状态流式处理中的吞吐量控制至关重要,因为它们减少了内存中状态信息总量。 并非所有有状态操作都支持所有输出模式。 请参阅 窗口聚合的水印和输出模式。
选择水印持续时间有利弊:
- 较短的水印降低了查询延迟,因为查询存储的状态信息较少,并在每个水印持续时间完成后写入结果。 但是,短水印对后期数据的容忍度较低。
- 较长的水印对后期数据具有很高的容忍度。 但是,长时间的水印会增加查询延迟,因为查询必须存储更多的状态信息,并等待在较长的水印持续时间后写入结果。
水印和窗口聚合的输出模式
下表显示了对时间戳和水印进行聚合的查询的处理行为:
| 输出模式 | 行为 |
|---|---|
| 追加 | 该查询在水印阈值通过后将行写入目标表。 所有写入操作都会根据延迟阈值进行延迟。 阈值通过后,旧聚合状态将被删除。 |
| 更新 | 查询在计算结果时将行写入目标表,当新数据到达时,查询可以更新和覆盖行。 阈值通过后,旧聚合状态将被删除。 |
| 完成 | 不会删除聚合状态。 查询会为每个触发器重写目标表。 |
流-流联接的水印和输出模式
多个流之间的联接仅支持追加模式。 查询会为每个批次写入匹配的记录。
对于内联接,Databricks 建议您为每个流式数据源设置水印阈值,以便查询能够丢弃旧记录的状态信息。 如果没有水印,结构化流处理会尝试在每次触发时联接连接两侧的每个键,这可能会影响性能。
对于外部联接,水印是必需的。 当记录不匹配时,查询会为该键写入 null 值。 由于联接仅支持追加模式,因此在延迟阈值通过之前不会写入不匹配的记录。
使用多个水印策略控制延迟数据阈值
对于多个结构化流输入,可以设置多个水印来控制迟到数据的容忍阈值。 水印允许你控制状态信息和延迟。
流式处理查询可以有多个联合或联接在一起的输入流。 对于有状态操作,每个输入流可能都需要不同的迟到数据容忍阈值。 在每个输入流上使用 withWatermark("eventTime", delay) 指定这些阈值。 下面是包含流间联接的示例查询。
Python
input_stream1 = ... # delays up to 1 hour
input_stream2 = ... # delays up to 2 hours
(input_stream1.withWatermark("eventTime1", "1 hour")
.join(
input_stream2.withWatermark("eventTime2", "2 hours"),
joinCondition)
)
Scala
val inputStream1 = ... // delays up to 1 hour
val inputStream2 = ... // delays up to 2 hours
inputStream1.withWatermark("eventTime1", "1 hour")
.join(
inputStream2.withWatermark("eventTime2", "2 hours"),
joinCondition)
使用有状态操作运行查询时,结构化流单独跟踪每个输入流的最大事件时间,根据相应的延迟计算水印,并确定单个全局水印。 默认情况下,结构化流式处理将最小值用作全局水印。 如果某个流落后于其他流,则全局最小水印可防止查询意外地将数据标记为迟到。 例如,当其中一个流因上游故障而停止接收数据时,可能会发生这种情况。 全局水印可以安全地按最慢的流的速度移动,并在必要时延迟查询输出。
若要降低延迟,请将 spark.sql.streaming.multipleWatermarkPolicy 设置为 max(默认值为 min),以使用最快流的水印作为全局水印。 但是,此配置会从最慢的流中删除数据。 Databricks 建议谨慎应用此配置。
将水印应用于不同的操作
该 distinct 操作跟踪状态中的每个唯一记录。 如果没有水印,状态会无限期增长,并可能导致内存问题。 将时间戳字段上的水印指定为绑定状态,并在阈值通过后删除旧记录。
以下示例将水印应用到 distinct 操作:
Python
streamingDf = spark.readStream. ... # columns: eventTime, id, value, ...
# Apply watermark before distinct operation
(streamingDf
.withWatermark("eventTime", "1 hour")
.distinct()
)
Scala
val streamingDf = spark.readStream. ... // columns: eventTime, id, value, ...
// Apply watermark before distinct operation
streamingDf
.withWatermark("eventTime", "1 hour")
.distinct()
在此示例中,流式查询会删除在最近观测到的 eventTime 后 1 小时内到达的重复记录。 该查询在超过阈值后会丢弃用于去重的状态信息。
重要
若要删除特定列而不是所有列,请使用 dropDuplicates() 或 dropDuplicatesWithinWatermark() 改用 distinct。 请参阅删除水印内的重复项。
删除水印内的重复项
在 Databricks Runtime 13.3 LTS 或更高版本中,可以使用唯一标识符在水印阈值内删除重复记录。
结构化流式处理保证完全一次处理,但不会删除数据源中的记录。 使用 dropDuplicatesWithinWatermark 可基于任意字段删除重复项,即使重复记录中的字段值不同,例如事件时间或到达时间。
使用 dropDuplicatesWithinWatermark 时,查询始终会对在水印阈值内到达的记录进行去重。 查询还可能会删除超出阈值的记录,但不能保证这样做。 若要保证查询删除所有重复项,请将水印阈值设置为大于重复事件之间的最大时间戳差。
必须指定水印才能使用 dropDuplicatesWithinWatermark 该方法:
Python
streamingDf = spark.readStream. ...
# deduplicate using guid column with watermark based on eventTime column
(streamingDf
.withWatermark("eventTime", "10 hours")
.dropDuplicatesWithinWatermark(["guid"])
)
Scala
val streamingDf = spark.readStream. ... // columns: guid, eventTime, ...
// deduplicate using guid column with watermark based on eventTime column
streamingDf
.withWatermark("eventTime", "10 hours")
.dropDuplicatesWithinWatermark(Seq("guid"))
用例示例
以下示例显示了高级窗口化用例:
使用翻滚窗口计算每小时销售总额
滚动窗口具有固定大小,且各时间区间互不重叠。 每个输入行只属于一个窗口。 使用翻滚窗口计算离散时间段的聚合值,例如每小时销售总额:
Python
from pyspark.sql.functions import window, sum
hourly_sales = (orders
.withWatermark("timestamp", "1 hour")
.groupBy(window("timestamp", "1 hour"))
.agg(sum("amount").alias("total_sales"))
)
Scala
import org.apache.spark.sql.functions.{window, sum}
val hourlySales = orders
.withWatermark("timestamp", "1 hour")
.groupBy(window($"timestamp", "1 hour"))
.agg(sum($"amount").alias("total_sales"))
在本示例中:
-
window("timestamp", "1 hour")将订单分组到互不重叠的 1 小时区间中,例如凌晨 5 点到 6 点和早上 6 点到 7 点。 -
withWatermark("timestamp", "1 hour")将每个窗口的聚合保持为状态,直到窗口结束时间戳早于最大顺序时间戳 1 小时。
使用滑动窗口进行滚动聚合计算
滑动窗口是固定大小的,间隔可以重叠。 单个行可以属于多个窗口。 使用滑动窗口计算滚动聚合,例如滚动 6 小时周期内的销售额:
Python
from pyspark.sql.functions import window, sum
rolling_sales = (orders
.withWatermark("timestamp", "1 hour")
.groupBy(window("timestamp", "6 hours", slideDuration="1 hour"))
.agg(sum("amount").alias("total_sales"))
)
Scala
import org.apache.spark.sql.functions.{window, sum}
val rollingSales = orders
.withWatermark("timestamp", "1 hour")
.groupBy(window($"timestamp", "6 hours", "1 hour"))
.agg(sum($"amount").alias("total_sales"))
在本示例中:
-
window("timestamp", "6 hours", slideDuration="1 hour")按 6 小时的时间段对订单进行分组,每次推进 1 小时,例如上午 5 点到 11 点,以及上午 6 点到下午 12 点。 -
withWatermark("timestamp", "1 hour")将每个窗口的聚合保持为状态,直到窗口结束时间戳早于最大顺序时间戳 1 小时。 -
slideDuration必须小于或等于windowDuration。
使用会话窗口检查用户活动
会话窗口没有固定大小。 当一行数据到达时,系统会打开一个窗口;在随后一段没有新行到达的间隔时间后,该窗口会关闭。 使用会话窗口来聚合长时间空闲间隔之间集中发生的活动,例如用户在 30 分钟内的页面浏览:
Python
from pyspark.sql.functions import session_window, sum
sessionized_page_views = (activity
.withWatermark("timestamp", "1 hour")
.groupBy("user_id", session_window("timestamp", gapDuration="30 minutes"))
.agg(sum("page_views").alias("total_page_views"))
)
Scala
import org.apache.spark.sql.functions.{session_window, sum}
val sessionizedPageViews = activity
.withWatermark("timestamp", "1 hour")
.groupBy($"user_id", session_window($"timestamp", "30 minutes"))
.agg(sum($"page_views").alias("total_page_views"))
在本示例中:
-
session_window("timestamp", gapDuration="30 minutes")当第一页视图到达时,将打开一个窗口。 每次在 30 分钟内发生的后续页面浏览都会延长该时间窗口。 如果在 30 分钟内没有新的页面浏览,当前窗口将关闭,下一次页面浏览将开启一个新窗口。 -
withWatermark("timestamp", "1 hour")将每个会话的聚合保持为状态,直到窗口结束时间戳比最大页面视图时间戳晚 1 小时。 -
window()和session_window()的timeColumn参数必须属于TimestampType或TimestampNTZType类型。 - 用于
current_timestamp()根据处理时间而不是事件时间定义窗口。 - 可以设置从微秒到几天的窗口持续时间。 不支持以月为单位及更长的时长。
- 将
complete输出模式与窗口聚合结合使用,以无限期保留所有窗口状态。 使用带有适当水印的append输出模式,以限制状态增长并避免在处理大型数据集时出现内存问题。 有关输出模式行为的更多详细信息,请参阅 用于窗口聚合的水印和输出模式。