应用水印来控制数据处理阈值

本页介绍水印概念,并建议在常见的有状态流式处理操作中使用水印。

流式查询会随时间累积状态数据。 水印会自动删除旧状态数据,以防止内存错误和增加处理延迟。

什么是水印?

在处理期间,结构化流式处理会跨多个微批次保留状态。 流式查询使用状态来增量更新结果,而不是在每次微批处理后重新计算全部结果。 水印控制查询停止处理状态实体时的阈值。

状态实体的常见示例包括:

  • 基于时间窗口的聚合。
  • 两个流之间的联接中的唯一键。

若要在流式处理数据帧上声明水印,请指定时间戳字段和延迟阈值。 当新数据到达时,状态管理器跟踪指定字段中的最新时间戳,并且仅处理延迟阈值内的记录。

查询始终会处理在阈值内到达的记录。 查询仍可能会处理超出阈值的记录,但不能保证这一点。

以下示例将 10 分钟水印阈值应用于窗口计数:

Python

from pyspark.sql.functions import window

(df
  .withWatermark("event_time", "10 minutes")
  .groupBy(
    window("event_time", "5 minutes"),
    "id")
  .count()
)

Scala

import org.apache.spark.sql.functions.window

df
  .withWatermark("event_time", "10 minutes")
  .groupBy(
    window($"event_time", "5 minutes"),
    $"id")
  .count()

在本示例中:

  • event_time 列用于定义 10 分钟水印和 5 分钟翻滚窗口。
  • 对于每个互不重叠的 5 分钟时间窗口内观察到的每个 id,都会收集一个计数。
  • 对于每个计数,状态信息会一直保留,直到窗口结束时间比最新观测到的 event_time 早 10 分钟。

重要

groupBy()window()操作中,按名称引用列,使用"<colName>"col("<colName>"),以确保保留事件时间标记。 在 Scala 中,也可以使用 $colName

水印如何影响处理时间和吞吐量?

输出模式控制带有水印的查询何时将数据写入接收器。 水印对于有状态流式处理中的吞吐量控制至关重要,因为它们减少了内存中状态信息总量。 并非所有有状态操作都支持所有输出模式。 请参阅 窗口聚合的水印和输出模式

选择水印持续时间有利弊:

  • 较短的水印降低了查询延迟,因为查询存储的状态信息较少,并在每个水印持续时间完成后写入结果。 但是,短水印对后期数据的容忍度较低。
  • 较长的水印对后期数据具有很高的容忍度。 但是,长时间的水印会增加查询延迟,因为查询必须存储更多的状态信息,并等待在较长的水印持续时间后写入结果。

水印和窗口聚合的输出模式

下表显示了对时间戳和水印进行聚合的查询的处理行为:

输出模式 行为
追加 该查询在水印阈值通过后将行写入目标表。 所有写入操作都会根据延迟阈值进行延迟。 阈值通过后,旧聚合状态将被删除。
更新 查询在计算结果时将行写入目标表,当新数据到达时,查询可以更新和覆盖行。 阈值通过后,旧聚合状态将被删除。
完成 不会删除聚合状态。 查询会为每个触发器重写目标表。

流-流联接的水印和输出模式

多个流之间的联接仅支持追加模式。 查询会为每个批次写入匹配的记录。

对于内联接,Databricks 建议您为每个流式数据源设置水印阈值,以便查询能够丢弃旧记录的状态信息。 如果没有水印,结构化流处理会尝试在每次触发时联接连接两侧的每个键,这可能会影响性能。

对于外部联接,水印是必需的。 当记录不匹配时,查询会为该键写入 null 值。 由于联接仅支持追加模式,因此在延迟阈值通过之前不会写入不匹配的记录。

使用多个水印策略控制延迟数据阈值

对于多个结构化流输入,可以设置多个水印来控制迟到数据的容忍阈值。 水印允许你控制状态信息和延迟。

流式处理查询可以有多个联合或联接在一起的输入流。 对于有状态操作,每个输入流可能都需要不同的迟到数据容忍阈值。 在每个输入流上使用 withWatermark("eventTime", delay) 指定这些阈值。 下面是包含流间联接的示例查询。

Python

input_stream1 = ...      # delays up to 1 hour
input_stream2 = ...      # delays up to 2 hours

(input_stream1.withWatermark("eventTime1", "1 hour")
  .join(
    input_stream2.withWatermark("eventTime2", "2 hours"),
    joinCondition)
)

Scala

val inputStream1 = ...      // delays up to 1 hour
val inputStream2 = ...      // delays up to 2 hours

inputStream1.withWatermark("eventTime1", "1 hour")
  .join(
    inputStream2.withWatermark("eventTime2", "2 hours"),
    joinCondition)

使用有状态操作运行查询时,结构化流单独跟踪每个输入流的最大事件时间,根据相应的延迟计算水印,并确定单个全局水印。 默认情况下,结构化流式处理将最小值用作全局水印。 如果某个流落后于其他流,则全局最小水印可防止查询意外地将数据标记为迟到。 例如,当其中一个流因上游故障而停止接收数据时,可能会发生这种情况。 全局水印可以安全地按最慢的流的速度移动,并在必要时延迟查询输出。

若要降低延迟,请将 spark.sql.streaming.multipleWatermarkPolicy 设置为 max(默认值为 min),以使用最快流的水印作为全局水印。 但是,此配置会从最慢的流中删除数据。 Databricks 建议谨慎应用此配置。

将水印应用于不同的操作

distinct 操作跟踪状态中的每个唯一记录。 如果没有水印,状态会无限期增长,并可能导致内存问题。 将时间戳字段上的水印指定为绑定状态,并在阈值通过后删除旧记录。

以下示例将水印应用到 distinct 操作:

Python

streamingDf = spark.readStream. ...  # columns: eventTime, id, value, ...

# Apply watermark before distinct operation
(streamingDf
  .withWatermark("eventTime", "1 hour")
  .distinct()
)

Scala

val streamingDf = spark.readStream. ...  // columns: eventTime, id, value, ...

// Apply watermark before distinct operation
streamingDf
  .withWatermark("eventTime", "1 hour")
  .distinct()

在此示例中,流式查询会删除在最近观测到的 eventTime 后 1 小时内到达的重复记录。 该查询在超过阈值后会丢弃用于去重的状态信息。

重要

若要删除特定列而不是所有列,请使用 dropDuplicates()dropDuplicatesWithinWatermark() 改用 distinct。 请参阅删除水印内的重复项

删除水印内的重复项

在 Databricks Runtime 13.3 LTS 或更高版本中,可以使用唯一标识符在水印阈值内删除重复记录。

结构化流式处理保证完全一次处理,但不会删除数据源中的记录。 使用 dropDuplicatesWithinWatermark 可基于任意字段删除重复项,即使重复记录中的字段值不同,例如事件时间或到达时间。

使用 dropDuplicatesWithinWatermark 时,查询始终会对在水印阈值内到达的记录进行去重。 查询还可能会删除超出阈值的记录,但不能保证这样做。 若要保证查询删除所有重复项,请将水印阈值设置为大于重复事件之间的最大时间戳差。

必须指定水印才能使用 dropDuplicatesWithinWatermark 该方法:

Python

streamingDf = spark.readStream. ...

# deduplicate using guid column with watermark based on eventTime column
(streamingDf
  .withWatermark("eventTime", "10 hours")
  .dropDuplicatesWithinWatermark(["guid"])
)

Scala

val streamingDf = spark.readStream. ...  // columns: guid, eventTime, ...

// deduplicate using guid column with watermark based on eventTime column
streamingDf
  .withWatermark("eventTime", "10 hours")
  .dropDuplicatesWithinWatermark(Seq("guid"))

用例示例

以下示例显示了高级窗口化用例:

使用翻滚窗口计算每小时销售总额

滚动窗口具有固定大小,且各时间区间互不重叠。 每个输入行只属于一个窗口。 使用翻滚窗口计算离散时间段的聚合值,例如每小时销售总额:

Python

from pyspark.sql.functions import window, sum

hourly_sales = (orders
  .withWatermark("timestamp", "1 hour")
  .groupBy(window("timestamp", "1 hour"))
  .agg(sum("amount").alias("total_sales"))
)

Scala

import org.apache.spark.sql.functions.{window, sum}

val hourlySales = orders
  .withWatermark("timestamp", "1 hour")
  .groupBy(window($"timestamp", "1 hour"))
  .agg(sum($"amount").alias("total_sales"))

在本示例中:

  • window("timestamp", "1 hour") 将订单分组到互不重叠的 1 小时区间中,例如凌晨 5 点到 6 点和早上 6 点到 7 点。
  • withWatermark("timestamp", "1 hour") 将每个窗口的聚合保持为状态,直到窗口结束时间戳早于最大顺序时间戳 1 小时。

使用滑动窗口进行滚动聚合计算

滑动窗口是固定大小的,间隔可以重叠。 单个行可以属于多个窗口。 使用滑动窗口计算滚动聚合,例如滚动 6 小时周期内的销售额:

Python

from pyspark.sql.functions import window, sum

rolling_sales = (orders
  .withWatermark("timestamp", "1 hour")
  .groupBy(window("timestamp", "6 hours", slideDuration="1 hour"))
  .agg(sum("amount").alias("total_sales"))
)

Scala

import org.apache.spark.sql.functions.{window, sum}

val rollingSales = orders
  .withWatermark("timestamp", "1 hour")
  .groupBy(window($"timestamp", "6 hours", "1 hour"))
  .agg(sum($"amount").alias("total_sales"))

在本示例中:

  • window("timestamp", "6 hours", slideDuration="1 hour") 按 6 小时的时间段对订单进行分组,每次推进 1 小时,例如上午 5 点到 11 点,以及上午 6 点到下午 12 点。
  • withWatermark("timestamp", "1 hour") 将每个窗口的聚合保持为状态,直到窗口结束时间戳早于最大顺序时间戳 1 小时。
  • slideDuration 必须小于或等于 windowDuration

使用会话窗口检查用户活动

会话窗口没有固定大小。 当一行数据到达时,系统会打开一个窗口;在随后一段没有新行到达的间隔时间后,该窗口会关闭。 使用会话窗口来聚合长时间空闲间隔之间集中发生的活动,例如用户在 30 分钟内的页面浏览:

Python

from pyspark.sql.functions import session_window, sum

sessionized_page_views = (activity
  .withWatermark("timestamp", "1 hour")
  .groupBy("user_id", session_window("timestamp", gapDuration="30 minutes"))
  .agg(sum("page_views").alias("total_page_views"))
)

Scala

import org.apache.spark.sql.functions.{session_window, sum}

val sessionizedPageViews = activity
  .withWatermark("timestamp", "1 hour")
  .groupBy($"user_id", session_window($"timestamp", "30 minutes"))
  .agg(sum($"page_views").alias("total_page_views"))

在本示例中:

  • session_window("timestamp", gapDuration="30 minutes") 当第一页视图到达时,将打开一个窗口。 每次在 30 分钟内发生的后续页面浏览都会延长该时间窗口。 如果在 30 分钟内没有新的页面浏览,当前窗口将关闭,下一次页面浏览将开启一个新窗口。
  • withWatermark("timestamp", "1 hour") 将每个会话的聚合保持为状态,直到窗口结束时间戳比最大页面视图时间戳晚 1 小时。
  • window()session_window()timeColumn 参数必须属于 TimestampTypeTimestampNTZType 类型。
  • 用于 current_timestamp() 根据处理时间而不是事件时间定义窗口。
  • 可以设置从微秒到几天的窗口持续时间。 不支持以月为单位及更长的时长。
  • complete 输出模式与窗口聚合结合使用,以无限期保留所有窗口状态。 使用带有适当水印的 append 输出模式,以限制状态增长并避免在处理大型数据集时出现内存问题。 有关输出模式行为的更多详细信息,请参阅 用于窗口聚合的水印和输出模式