若要有效管理存储在状态中的数据,请在 Lakeflow 声明性管道中执行有状态流处理时使用水印,包括聚合、连接和重复数据删除。 本文介绍如何在 Lakeflow 声明性管道查询中使用水印机制,并提供了推荐操作的示例。
注释
为了确保执行聚合的查询以增量方式处理,而不是在每次更新时完全重新计算,必须使用水印。
什么是水印?
在流处理中, 水印 是一项 Apache Spark 功能,可在执行有状态作(如聚合)时定义用于处理数据的基于时间的阈值。 到达的数据将得到处理,直到达到阈值,此时阈值定义的时间范围将关闭。 水印可用于避免查询处理过程中出现的问题,主要发生在处理较大的数据集或长时间运行的处理时。 这些问题可能包括由于在处理期间必须保持在状态中的数据量过大而导致的高延迟,甚至内存溢出(OOM)错误。 由于流数据本质上是无序的,因此水印从而可以支持正确计算时间窗口聚合操作。
若要详细了解如何在流处理中使用水印,请参阅 Apache Spark 结构化流式处理中的水印 和 应用水印来控制数据处理阈值。
如何定义水印?
通过指定时间戳字段和一个表示延迟数据到达时间阈值的值来定义水印。 如果数据在定义的时间阈值后到达,则会将其视为延迟数据。 例如,如果阈值定义为 10 分钟,则 10 分钟阈值后到达的记录可能会被删除。
由于在定义的阈值后到达的记录可能会被删除,因此选择满足延迟与正确性要求的阈值非常重要。 选择较小的阈值会导致更快地发出记录,但也意味着延迟记录更有可能被删除。 较大的阈值意味着更长的等待时间,但数据可能更完整。 由于状态大小越大,阈值越大,可能还需要额外的计算资源。 由于阈值取决于数据和处理要求,因此测试和监视处理对于确定最佳阈值非常重要。
在 Python 中使用 withWatermark()
函数来定义水印。 在 SQL 中,使用 WATERMARK
子句定义水印:
Python
withWatermark("timestamp", "3 minutes")
SQL
WATERMARK timestamp DELAY OF INTERVAL 3 MINUTES
将水印与流联接配合使用
对于数据流联接,您必须在联接的两侧分别定义水印,并设定时间间隔子句。 由于每个联接源的数据视图不完整,因此需要时间间隔子句来告知流式处理引擎何时无法进行进一步的匹配。 时间间隔子句必须使用用于定义水印的相同字段。
由于有时每个流都需要不同的水印阈值,因此流不需要具有相同的阈值。 为了避免丢失数据,流式处理引擎会基于最慢的流维护一个全局水印。
以下示例联接广告印象流和用户点击广告流。 在此示例中,点击必须在印象后的 3 分钟内发生。 3 分钟时间间隔过后,将删除无法再匹配的状态中的行。
Python
from pyspark import pipelines as dp
dp.create_streaming_table("adImpressionClicks")
@dp.append_flow(target = "adImpressionClicks")
def joinClicksAndImpressions():
clicksDf = (read_stream("rawClicks")
.withWatermark("clickTimestamp", "3 minutes")
)
impressionsDf = (read_stream("rawAdImpressions")
.withWatermark("impressionTimestamp", "3 minutes")
)
joinDf = impressionsDf.alias("imp").join(
clicksDf.alias("click"),
expr("""
imp.userId = click.userId AND
clickAdId = impressionAdId AND
clickTimestamp >= impressionTimestamp AND
clickTimestamp <= impressionTimestamp + interval 3 minutes
"""),
"inner"
).select("imp.userId", "impressionAdId", "clickTimestamp", "impressionSeconds")
return joinDf
SQL
CREATE OR REFRESH STREAMING TABLE
silver.adImpressionClicks
AS SELECT
imp.userId, impressionAdId, clickTimestamp, impressionSeconds
FROM STREAM
(bronze.rawAdImpressions)
WATERMARK
impressionTimestamp DELAY OF INTERVAL 3 MINUTES imp
INNER JOIN STREAM
(bronze.rawClicks)
WATERMARK clickTimestamp DELAY OF INTERVAL 3 MINUTES click
ON
imp.userId = click.userId
AND
clickAdId = impressionAdId
AND
clickTimestamp >= impressionTimestamp
AND
clickTimestamp <= impressionTimestamp + interval 3 minutes
使用水印执行开窗聚合
流数据常见的有状态操作是窗口化聚合。 开窗聚合类似于分组聚合,不同之处在于它为定义窗口中的行集返回聚合值。
窗口可以定义成特定长度,并可以对属于该窗口的所有行执行聚合操作。 Spark 流式处理支持三种类型的窗口:
- 翻转(固定)窗口:一系列固定大小、非重叠和连续时间间隔。 输入记录仅属于单个窗口。
- 滑动窗口:类似于翻转窗口,滑动窗口是固定大小的,但窗口可以重叠,记录可以落入多个窗口。
当数据到达窗口末尾加上水印长度之时,窗口将不再接受新数据,接着发出聚合结果,并清除窗口的状态。
以下示例使用固定窗口每 5 分钟计算一次印象总和。 在此示例中,select 子句使用别名 impressions_window
,然后窗口本身定义为子句的 GROUP BY
一部分。 窗口必须基于与水印相同的时间戳列,在这个例子中为 clickTimestamp
列。
CREATE OR REFRESH STREAMING TABLE
gold.adImpressionSeconds
AS SELECT
impressionAdId, window(clickTimestamp, "5 minutes") as impressions_window, sum(impressionSeconds) as totalImpressionSeconds
FROM STREAM
(silver.adImpressionClicks)
WATERMARK
clickTimestamp DELAY OF INTERVAL 3 MINUTES
GROUP BY
impressionAdId, window(clickTimestamp, "5 minutes")
Python 中的类似示例,用于计算每小时固定时段的利润:
from pyspark import pipelines as dp
@dp.table()
def profit_by_hour():
return (
spark.readStream.table("sales")
.withWatermark("timestamp", "1 hour")
.groupBy(window("timestamp", "1 hour").alias("time"))
.aggExpr("sum(profit) AS profit")
)
去重流式记录
结构化流具有完全一次性的处理保证,但不会自动从数据源取消重复记录。 例如,由于许多消息队列至少保证一次,因此从其中一个消息队列读取时,应预期重复记录。 可以使用 dropDuplicatesWithinWatermark()
函数取消任何指定字段上的重复记录,即使某些字段不同(例如事件时间或到达时间),也会从流中删除重复项。 必须指定水印才能使用该 dropDuplicatesWithinWatermark()
函数。 将删除在水印指定的时间范围内到达的所有重复数据。
有序数据很重要,因为无序数据会导致水印值错误地向前跳转。 然后,当较旧的数据到达时,它被视为迟到并被删除。
withEventTimeOrder
使用此选项根据水印中指定的时间戳按顺序处理初始快照。 该 withEventTimeOrder
选项可以在定义数据集的代码中声明,或者在 管道设置 中声明使用 spark.databricks.delta.withEventTimeOrder.enabled
。 例如:
{
"spark_conf": {
"spark.databricks.delta.withEventTimeOrder.enabled": "true"
}
}
注释
仅 Python 支持此选项 withEventTimeOrder
。
在下面的示例中,数据将按 clickTimestamp
排序处理,到达的记录在彼此 5 秒内且包含重复的 userId
列和 clickAdId
列的,将被丢弃。
clicksDedupDf = (
spark.readStream.table
.option("withEventTimeOrder", "true")
.table("rawClicks")
.withWatermark("clickTimestamp", "5 seconds")
.dropDuplicatesWithinWatermark(["userId", "clickAdId"]))
针对有状态处理优化管道配置
为了帮助防止生产问题和过度延迟,Databricks 建议为有状态流处理启用基于 RocksDB 的状态管理,尤其是在处理需要节省大量中间状态时。
可以在部署管道之前设置以下配置来启用基于 RocksDB 的状态管理:
{
"configuration": {
"spark.sql.streaming.stateStore.providerClass": "com.databricks.sql.streaming.state.RocksDBStateStoreProvider"
}
}
若要详细了解 RocksDB 状态存储,包括 RocksDB 的配置建议,请参阅 在 Azure Databricks 上配置 RocksDB 状态存储。