应用水印来控制数据处理阈值

本文介绍水印的基本概念,并提供有关在常见有状态流式处理操作中使用水印的建议。 必须对有状态流式处理操作应用水印,以避免无限扩展状态中保留的数据量,否则可能会在长时间运行的流式处理操作期间造成内存问题并增大处理延迟。

什么是水印?

结构化流式处理使用水印来控制继续处理给定状态实体更新的持续时间阈值。 状态实体的常见示例包括:

  • 基于时间窗口的聚合。
  • 两个流之间的联接中的唯一键。

声明水印时,可以在流式处理数据帧上指定时间戳字段和水印阈值。 当新数据到达时,状态管理器将跟踪指定字段中的最新时间戳,并处理延迟阈值范围内的所有记录。

以下示例将 10 分钟水印阈值应用于窗口计数:

from pyspark.sql.functions import window

(df
  .withWatermark("event_time", "10 minutes")
  .groupBy(
    window("event_time", "5 minutes"),
    "id")
  .count()
)

在本示例中:

  • event_time 列用于定义 10 分钟水印和 5 分钟翻转窗口。
  • 对于每个非重叠 5 分钟窗口观察到的每个 id,将收集一个计数。
  • 为每个计数保留状态信息,直到窗口结束时间比观察到的最新 event_time 时间早 10 分钟。

重要

水印阈值保证在指定的阈值内到达的记录是根据所定义查询的语义来处理的。 在指定的阈值外到达的迟到记录可能仍可能使用查询指标进行处理,但这不能保证。

水印如何影响处理时间和吞吐量?

水印与输出模式交互,以控制何时将数据写入接收器。 由于水印减少了要处理的状态信息总量,因此有效使用水印对于高效的有状态流吞吐量至关重要。

注意

并非所有有状态操作都支持所有输出模式。

窗口聚合的水印和输出模式

下表详细说明了对已定义水印的时间戳进行聚合的查询处理:

输出模式 行为
追加 超过水印阈值后,将行写入目标表。 所有写入根据延迟阈值进行延迟。 超过阈值后,将删除旧聚合状态。
更新 计算结果时会将行写入目标表,并可以在新数据到达时更新和覆盖行。 超过阈值后,将删除旧聚合状态。
完成 不会删除聚合状态。 使用每个触发器重写目标表。

流-流联接的水印和输出

多个流之间的联接仅支持追加模式,匹配的记录将写入到发现它们的每个批中。 对于内部联接,Databricks 建议为每个流式处理数据源设置水印阈值。 这样就可以丢弃旧记录的状态信息。 如果没有水印,结构化流式处理会尝试将联接两端的每个键与每个触发器相联接。

结构化流式处理具有支持外部联接的特殊语义。 水印对于外部联接是必需的,因为它指示在出现不匹配情况后,何时必须使用 null 值写入键。 请注意,虽然外部联接可用于记录在数据处理期间永不匹配的记录,但由于联接仅作为追加操作写入到表,只有在超过延迟阈值后才会记录此缺失数据。

在结构化流式处理中使用多水印策略控制延迟数据阈值

使用多个结构化流式处理输入时,可以设置多个水印来控制延迟到达数据的容差阈值。 通过配置水印,可以控制状态信息并影响延迟。

流式处理查询可以有多个联合或联接在一起的输入流。 对于有状态操作,每个输入流可以有不同的需要容忍的延迟数据阈值。 可以使用 withWatermarks("eventTime", delay) 在每个输入流上指定这些阈值。 下面是包含流间联接的示例查询。

val inputStream1 = ...      // delays up to 1 hour
val inputStream2 = ...      // delays up to 2 hours

inputStream1.withWatermark("eventTime1", "1 hour")
  .join(
    inputStream2.withWatermark("eventTime2", "2 hours"),
    joinCondition)

当运行查询时,结构化流式处理会单独跟踪每个输入流中显示的最大事件时间,根据相应的延迟计算水印,并选择一个与它们一起用于有状态操作的全局水印。 默认情况下,最小值将被选为全局水印,因为它可确保在其中一个流落后于其他流的情况下(例如,其中一个流由于上游故障而停止接收数据),数据不会因为太迟而被意外丢弃。 换句话说,全局水印以最慢流的速度安全地移动,并且查询输出会相应地延迟。

如果想获得更快的结果,可以通过将 SQL 配置 spark.sql.streaming.multipleWatermarkPolicy 设置为 max(默认为 min)来设置多水印策略以选择最大值作为全局水印。 这允许全局水印以最快流的速度移动。 但是,此配置会从最慢的流中删除数据。 因此,Databricks 建议慎用此配置。

删除水印内的重复项

在 Databricks Runtime 13.1 及更高版本中,可以使用唯一标识符删除水印阈值内的重复记录。

结构化流式处理提供“恰好一次处理”保证,但不会自动从数据源中删除重复记录。 可以使用 dropDuplicatesWithinWatermark 删除任何指定字段上的重复记录,这样便能够从流中删除重复项,即使某些字段不同(例如事件时间或到达时间)也是如此。

在指定水印内到达的重复记录必定会删除。 此保证仅在一个方向上很严格,而在指定的阈值之外到达的重复记录可能也会删除。 必须将水印的延迟阈值设置为大于重复事件之间最大的时间戳差值,才能删除所有重复项。

必须指定水印才能使用 dropDuplicatesWithinWatermark 方法,如以下示例所示:

Python

streamingDf = spark.readStream. ...

# deduplicate using guid column with watermark based on eventTime column
(streamingDf
  .withWatermark("eventTime", "10 hours")
  .dropDuplicatesWithinWatermark("guid")
)

Scala

val streamingDf = spark.readStream. ...  // columns: guid, eventTime, ...

// deduplicate using guid column with watermark based on eventTime column
streamingDf
  .withWatermark("eventTime", "10 hours")
  .dropDuplicatesWithinWatermark("guid")