结构化流式处理检查点

检查点和预写日志协同工作,为结构化流式处理工作负载提供处理保证。 检查点用于跟踪可标识查询的信息,包括状态信息和已处理的记录。 删除检查点目录中的文件或更改到新的检查点位置时,查询的下一次运行将开始刷新。

检查点目录包含以下项:

  • 偏移量:每个微批处理中处理的源偏移量。 这样,查询就可以从完全离开的位置恢复,而无需重新处理数据。
  • 提交:记录哪些微批处理已提交到接收端,从而启用精确一次语义。
  • 状态:对于有状态查询(聚合、流联接、重复数据删除和自定义有状态运算符,例如 transformWithState),检查点存储有关有状态运算符、状态架构和由状态存储提供程序管理的检查点状态存储内容的元数据。
  • 元数据:用于标识查询的唯一查询 ID。 配置设置作为偏移日志的一部分存储。

每个查询必须具有不同的检查点位置。 多个查询不应使用相同的位置。

注意

本文介绍结构化流查询的检查点。 有关将 DataFrame.checkpoint() 用于 Unity Catalog 卷以截断非流式处理数据帧的执行计划的信息,请参阅 卷中的数据帧检查点

为结构化流式处理查询启用检查点

在运行流式处理查询之前必须指定 checkpointLocation 选项,如以下示例所示:

Python

(df.writeStream
  .option("checkpointLocation", "/Volumes/catalog/schema/volume/path")
  .toTable("catalog.schema.table")
)

Scala(编程语言)

df.writeStream
  .option("checkpointLocation", "/Volumes/catalog/schema/volume/path")
  .toTable("catalog.schema.table")

注意

如果省略此选项,某些汇聚点(例如笔记本中 display() 的输出和 memory 汇聚点)会自动生成临时检查点位置。 临时检查点位置不能确保任何容错,也不能保证数据一致性,并且可能无法进行正常的清理。 Databricks 建议始终为这些接收器指定检查点位置。

在结构化流式处理查询发生更改后恢复

在从同一检查点位置进行的各次重启之间,对于流式处理查询中允许哪些更改存在限制。

通常需要新检查点的更改包括输入源的数量或类型、订阅的 Kafka 主题或自动加载程序路径、有状态操作类型、状态架构和输出接收器类型。

通常安全的更改包括添加或删除筛选器、更改速率限制、触发间隔和更新用户定义函数逻辑( mapGroupsWithState 尽管语义可能更改)。

以下部分介绍不允许的更改,或者更改的效果定义不明确,其中:

  • “允许”一词意味着你可以执行指定的更改,但其效果的语义是否明确取决于查询和更改。
  • “不允许”一词意味着不应执行指定的更改,因为重启的查询可能会失败并出现不可预知的错误。
  • sdf 表示通过 sparkSession.readStream 生成的流式处理数据帧/数据集。

结构化流查询中的更改类型

  • 输入源的数量或类型的更改:默认情况下不允许这样做,因为结构化流式处理按源在查询计划中的位置来标识源。 如果启用源命名,可以重新排序现有源并添加新源,而无需从新的检查点开始。 请参阅使用源演变更改流式数据源

  • 输入源参数的更改:是否允许以及更改的语义是否定义良好,取决于源和查询,包括诸如maxFilesPerTriggermaxOffsetsPerTrigger之类的准入控制。 以下是一些示例:

    • 允许添加、删除和修改速率限制:
    spark.readStream.format("kafka").option("subscribe", "article")
    

    spark.readStream.format("kafka").option("subscribe", "article").option("maxOffsetsPerTrigger", ...)
    

    有关详细信息,请参阅 在 Azure Databricks 上配置结构化流批大小

    • 通常不允许对已订阅的文章和文件进行更改,因为结果不可预知: spark.readStream.format("kafka").option("subscribe", "article")spark.readStream.format("kafka").option("subscribe", "newarticle")
  • 触发器间隔更改:可以在增量批处理和时间间隔之间更改触发器。 请参阅 “更改运行之间的触发器间隔”。

  • 输出接收器类型的更改:允许在几个特定的接收器组合之间进行更改。 这需要根据具体情况进行验证。 以下是一些示例。

    • 允许文件接收器与 Kafka 接收器之间的转换。 Kafka 只会看到新数据。
    • 不允许配置 Kafka 汇聚点为文件汇聚点。
    • 允许将 Kafka 数据接收端更改为 foreach,或反之亦然。
  • 输出接收器的参数中的更改:是否允许这样做,以及更改的语义是否明确取决于接收器和查询。 以下是一些示例。

    • 不允许更改文件接收器的输出目录:从 sdf.writeStream.format("parquet").option("path", "/somePath")sdf.writeStream.format("parquet").option("path", "/anotherPath")
    • 对输出主题的更改是允许的:允许从 sdf.writeStream.format("kafka").option("topic", "topic1") 更改为 sdf.writeStream.format("kafka").option("topic", "topic2")
    • 允许更改用户定义的 foreach 接收器(即 ForeachWriter 代码),但更改的语义取决于代码。
  • 投影/筛选器/映射类操作中的更改:某些情况是允许的。 例如:

    • 允许添加/删除筛选器:允许将 sdf.selectExpr("a") 修改为 sdf.where(...).selectExpr("a").filter(...)
    • 允许对具有相同输出架构的投影进行更改:允许将 sdf.selectExpr("stringColumn AS json").writeStream 更改为 sdf.select(to_json(...).as("json")).writeStream
    • 当输出接收器允许架构从 sdf.selectExpr("a").writeStream 更改为 sdf.selectExpr("b").writeStream 时,具有不同输出架构的投影中的更改是有条件允许的:只有从 "a""b" 的更改才被允许。
  • 有状态操作的更改:流式处理查询中的某些操作需要保留状态数据才能持续更新结果。 结构化流式处理会自动检查状态数据到容错存储(例如 DBFS、Azure Blob 存储),并在重启后还原它。 但是,这假设状态数据的架构在重启后保持不变。 这意味着,在两次重启之间不允许对流式处理查询的有状态操作进行任何更改(即,添加、删除或架构修改)。 下面列出了不应在两次重启之间更改其架构以确保状态恢复的有状态操作:

    • 流式处理聚合:例如,sdf.groupBy("a").agg(...)。 不允许对分组键或聚合的数量或类型进行任何更改。
    • 流式处理去重:例如,sdf.dropDuplicates("a")。 不允许对分组键或聚合的数量或类型进行任何更改。
    • 流间联接:例如 sdf1.join(sdf2, ...)(即,两个输入都是通过 sparkSession.readStream 生成的)。 不允许对架构或同等联接列进行更改。 不允许对联接类型(外部或内部)进行更改。 联接条件中的其他更改定义不清。
    • 任意有状态的操作:例如,sdf.groupByKey(...).mapGroupsWithState(...)sdf.groupByKey(...).flatMapGroupsWithState(...)。 不允许对用户定义状态的架构和超时类型进行任何更改。 允许在用户定义的状态映射函数中进行任何更改,但更改的语义效果取决于用户定义的逻辑。 如果你确实想要支持状态架构更改,则可以使用支持架构迁移的编码/解码方案,将复杂的状态数据结构显式编码/解码为字节。 例如,如果你将状态另存为 Avro 编码的字节,则可以在两次查询重启之间更改 Avro 状态架构,因为这会还原二进制状态。

重要

有状态运算符 dropDuplicates()dropDuplicatesWithinWatermark() 可能由于在更改计算访问模式时的状态架构兼容性检查而无法重新启动。

允许在专用访问模式和无隔离访问模式之间进行更改。 允许在标准访问模式和无服务器访问模式之间进行更改。 不要尝试在其他访问模式组合之间更改。

为了避免此错误,请不要更改包含这些运算符的流式处理查询的计算访问模式。

通过源演变更改流数据源

默认情况下,结构化流式处理按源在查询计划中的位置(例如 012等)来标识源。对输入源的数量或顺序所做的任何更改都中断了检查点兼容性,并且需要新的检查点。 通过源演进,你可以为每个流式源分配稳定的用户自定义名称,这样即使对查询中的源重新排序、添加或删除,也不会丢失检查点状态。

源演变需要 Databricks Runtime 18.2 及更高版本。

必需配置

若要启用源演变,请设置两个 Spark 配置:

  • spark.sql.streaming.queryEvolution.enableSourceEvolution:当 true 时,查询中的所有流数据源都必须使用 .name() API 显式命名。 默认值为 false
  • spark.sql.streaming.offsetLog.formatVersion:必须设置为 2,才能使用基于名称的偏移量跟踪格式。 默认值为 1

在定义流式处理查询之前设置这两种配置:

spark.conf.set("spark.sql.streaming.queryEvolution.enableSourceEvolution", "true")
spark.conf.set("spark.sql.streaming.offsetLog.formatVersion", "2")

命名规则

  • 名称必须仅包含字母数字字符和下划线([a-zA-Z0-9_]+)。
  • 每个源名称在查询中必须是唯一的。
  • 启用源演变后,每个流式源都必须有名称。 未命名的源会导致错误 UNNAMED_STREAMING_SOURCES_WITH_ENFORCEMENT

重新排序、添加和删除源

在查询使用同一检查点重新启动的情况下,以下更改是安全的:

  • 重新排序源:使用不同的源顺序重启查询。 每个源根据其名称从上次提交的偏移量恢复,并且不会更改检查点状态。
  • 添加新源:使用新源重启查询。 新的源从头开始处理,现有源则从其上次的偏移量继续处理。
  • 删除源:在没有源的情况下重启查询。 源已从检查点中被永久移除。 无法再次添加具有相同名称的已删除源。

Example

在调用.name()DataStreamReader之前,在.load()上使用.table()

Python

orders_us = (spark.readStream
  .name("orders_us")
  .table("catalog.schema.orders_us")
)

orders_eu = (spark.readStream
  .name("orders_eu")
  .table("catalog.schema.orders_eu")
)

all_orders = orders_us.union(orders_eu)

Scala(编程语言)

val ordersUS = spark.readStream
  .name("orders_us")
  .table("catalog.schema.orders_us")

val ordersEU = spark.readStream
  .name("orders_eu")
  .table("catalog.schema.orders_eu")

val allOrders = ordersUS.union(ordersEU)

局限性

  • 源命名需要新的检查点。 无法使用采用 V1 偏移日志格式的现有检查点启用源演变。
  • 升级到偏移日志格式 V2 后,无法降级回 V1。 请参阅 “所需配置”。
  • 源名称是永久性的。 若要重命名源,请将其删除,然后使用新名称添加它。 重命名后的源将从头开始处理。