使用水印优化 Delta Live Tables 中的有状态处理

如要有效管理保持状态的数据,请在 Delta Live Tables 中执行有状态流处理(包括聚合、联接和重复数据删除)时使用水印。 本文介绍如何在 Delta Live Tables 查询中使用水印,并包含建议的操作示例。

注意

为了确保以增量方式处理执行聚合的查询,并且不会在每次更新时完全重新计算,你必须使用水印。

什么是水印?

在流处理中,水印是一项 Apache Spark 功能,可在执行有状态操作(如聚合)时定义用于处理数据的基于时间的阈值。 到达的数据将得到处理,直到达到阈值,此时由阈值定义的时间窗口关闭。 水印可用于避免查询处理过程中出现的问题,这些问题多发生在处理较大的数据集时或长时间运行的处理中。 这些问题可能包括生成结果的延迟较高,以及由于处理过程中保持状态的数据量而导致内存不足 (OOM) 错误。 由于流数据本质上是无序的,水印还支持正确计算时间窗口聚合等操作。

如要详细了解如何在流处理中使用水印,请参阅 Apache Spark 结构化流式处理中的水印和应用水印来控制数据处理阈值

如何定义水印?

通过指定时间戳字段和表示延迟数据到达的时间阈值的值来定义水印。 如果数据在定义的时间阈值后到达,则将其视为延迟数据。 例如,如果阈值定义为 10 分钟,则到达时间晚于 10 分钟阈值的记录可能会被删除。

由于在定义的阈值后到达的记录可能会被删除,因此选择满足延迟与正确性要求的阈值非常重要。 选择较小的阈值会使系统更快地发出记录,但也意味着延迟记录更有可能被删除。 较大的阈值意味着更长的等待时间,但数据可能更完整。 由于状态大小越大,阈值越大,而较大的阈值可能还需要额外的计算资源。 由于阈值值取决于数据和处理要求,因此测试和监视处理对于确定最佳阈值非常重要。

在 Python 中使用 withWatermark() 函数来定义水印。 在 SQL 中,使用 WATERMARK 子句定义水印:

Python

withWatermark("timestamp", "3 minutes")

SQL

WATERMARK timestamp DELAY OF INTERVAL 3 MINUTES

将水印与流间联接配合使用

对于流间联接,必须在联接的两侧定义水印和时间间隔子句。 由于每个联接源的数据视图不完整,因此需要时间间隔子句来告知流式处理引擎何时无法进行进一步的匹配。 时间间隔子句必须使用用于定义水印的相同字段。

由于有时每个流都需要不同的水印阈值,因此流不需要具有相同的阈值。 为了避免丢失数据,流式处理引擎会以最慢的流为标准,维护一个全局水印。

以下示例联接了广告展示流和用户单击广告的流。 在此示例中,点击必须在展示后的 3 分钟内发生。 3 分钟时间间隔过后,将删除无法再匹配的状态中的行。

Python

import dlt

dlt.create_streaming_table("adImpressionClicks")
@dlt.append_flow(target = "adImpressionClicks")
def joinClicksAndImpressions():
  clicksDf = (read_stream("rawClicks")
    .withWatermark("clickTimestamp", "3 minutes")
  )
  impressionsDf = (read_stream("rawAdImpressions")
    .withWatermark("impressionTimestamp", "3 minutes")
  )
  joinDf = impressionsDf.alias("imp").join(
  clicksDf.alias("click"),
  expr("""
    imp.userId = click.userId AND
    clickAdId = impressionAdId AND
    clickTimestamp >= impressionTimestamp AND
    clickTimestamp <= impressionTimestamp + interval 3 minutes
  """),
  "inner"
  ).select("imp.userId", "impressionAdId", "clickTimestamp", "impressionSeconds")

  return joinDf

SQL

CREATE OR REFRESH STREAMING TABLE
  silver.adImpressionClicks
AS SELECT
  imp.userId, impressionAdId, clickTimestamp, impressionSeconds
FROM STREAM
  (LIVE.bronze.rawAdImpressions)
WATERMARK
  impressionTimestamp DELAY OF INTERVAL 3 MINUTES imp
INNER JOIN STREAM
  (LIVE.bronze.rawClicks)
WATERMARK clickTimestamp DELAY OF INTERVAL 3 MINUTES click
ON
  imp.userId = click.userId
AND
  clickAdId = impressionAdId
AND
  clickTimestamp >= impressionTimestamp
AND
  clickTimestamp <= impressionTimestamp + interval 3 minutes

使用水印执行窗口聚合

窗口聚合是流数据的常见有状态操作。 窗口聚合类似于分组聚合,不同之处在于返回属于定义窗口一部分的行集的聚合值。

窗口可以定义为特定长度,可以对属于该窗口的所有行执行聚合操作。 Spark 流支持三种类型的窗口:

  • 翻转(固定)窗口:一系列大小固定、非重叠且连续的时间间隔。 输入记录仅属于单个窗口。
  • 滑动窗口:类似于翻转窗口,滑动窗口大小固定,但窗口可以重叠,记录可以应用于多个窗口。

当数据到达窗口末尾加上水印的长度时,窗口不再接受新数据,发出聚合结果,并且窗口状态会被弃用。

以下示例使用固定窗口每 5 分钟计算一次展示总和。 在此示例中,select 子句使用别名 impressions_window,然后窗口本身定义为子句的 GROUP BY 一部分。 窗口必须参照与水印相同的时间戳列,即本示例中的 clickTimestamp 列。

CREATE OR REFRESH STREAMING TABLE
  gold.adImpressionSeconds
AS SELECT
  impressionAdId, impressions_window, sum(impressionSeconds) as totalImpressionSeconds
FROM STREAM
  (LIVE.silver.adImpressionClicks)
WATERMARK
  clickTimestamp DELAY OF INTERVAL 3 MINUTES
GROUP BY
  impressionAdId, window(clickTimestamp, "5 minutes")

Python 中的类似示例,用于计算每小时固定窗口的利润:

import dlt

@dlt.table()
def profit_by_hour():
  return (
    dlt.read_stream("sales")
      .withWatermark("timestamp", "1 hour")
      .groupBy(window("timestamp", "1 hour").alias("time"))
      .aggExpr("sum(profit) AS profit")
  )

重复数据删除流记录

结构化流式处理具备“恰好一次”处理保证,但不会自动从数据源中删除重复记录。 例如,由于许多消息队列具有“至少一次”保证,因此从其中一个消息队列读取时,应预期出现重复记录。 可以使用 dropDuplicatesWithinWatermark() 函数删除任何指定字段上的重复记录,这样便能够从流中删除重复项,即使某些字段不同(例如事件时间或到达时间)也是如此。 如要使用 dropDuplicatesWithinWatermark() 函数,必须指定水印。 将删除在水印指定的时间范围内到达的所有重复数据。

有序数据很重要,因为无序数据会导致水印值错误地向前跳转。 然后,当较旧的数据到达时,其会被视为晚到并被删除。 使用 withEventTimeOrder 选项根据水印中指定的时间戳按顺序处理初始快照。 withEventTimeOrder 选项可以在定义数据集的代码中声明,也可以在使用 spark.databricks.delta.withEventTimeOrder.enabled管道设置中声明。 例如:

{
  "spark_conf": {
    "spark.databricks.delta.withEventTimeOrder.enabled": "true"
  }
}

注意

仅 Python 支持 withEventTimeOrder 选项。

在下面的示例中,将按 clickTimestamp 顺序处理数据,并且在 5 秒内到达且包含重复 userIdclickAdId 列的记录将被删除。

clicksDedupDf = (
  spark.readStream
    .option("withEventTimeOrder", "true")
    .table(rawClicks)
    .withWatermark("clickTimestamp", "5 seconds")
    .dropDuplicatesWithinWatermark(["userId", "clickAdId"]))

针对有状态处理优化管道配置

为了帮助防止出现生产问题和过度延迟,Databricks 建议为有状态流处理启用基于 RocksDB 的状态管理,在处理需要保存大量中间状态时,更要如此。 若要启用 RocksDB 状态存储,请参阅为 Delta Live Tables 启用 RocksDB 状态存储