通过作业从结构化流式处理查询失败中恢复

结构化流式处理为流式处理查询提供容错和数据一致性;使用 Azure Databricks 作业,可以轻松地将结构化流式处理查询配置为在失败时自动重启。 通过为流式处理查询启用检查点,可以在失败后重启查询。 重启的查询将从失败查询停止之处继续。

为结构化流式处理查询启用检查点

Databricks 建议始终在启动查询之前将 checkpointLocation 选项指定为云存储路径。 例如:

streamingDataFrame.writeStream
  .format("parquet")
  .option("path", "/path/to/table")
  .option("checkpointLocation", "/path/to/table/_checkpoint")
  .start()

此检查点位置保留可用于标识查询的所有基本信息。 每个查询必须具有不同的检查点位置。 多个查询不应具有相同的位置。 有关更多详细信息,请参阅结构化流式处理编程指南

注意

尽管 checkpointLocation 是大多数类型的输出接收器所必需的,但当你未提供 checkpointLocation 时,某些接收器(例如内存接收器)可能会在 DBFS 上自动生成一个临时检查点位置。 临时检查点位置不能确保任何容错,也不能保证数据一致性,并且可能无法进行正常的清理。 通过始终指定 checkpointLocation 来避免潜在的陷阱。

将结构化流式处理作业配置为在失败时重启流式处理查询

你可以使用包含流式处理查询的笔记本或 JAR 创建 Azure Databricks 作业,并将其配置为:

  • 始终使用新群集。
  • 始终在失败时重试。

使用架构演变配置流式处理工作负荷时,在作业失败时自动重启尤其重要。 架构演变应用于 Azure Databricks 的方式是在检测到架构更改时引发预期错误,然后在作业重启时使用新架构正确处理数据。 Databricks 建议始终配置包含带架构演变的查询的流式处理任务,以便在 Databricks 作业中自动重启。

作业与结构化流式处理 API 紧密集成,并且可以监视运行中处于活动状态的所有流式处理查询。 此配置可确保如果查询的任何部分失败,作业会自动终止此运行(以及所有其他查询),并在新群集中启动新的运行。 这将重新执行笔记本或 JAR 代码,并再次重启所有查询。 这是恢复良好状态的最安全方法。

注意

  • 任何活动的流式处理查询中的失败都会导致活动的运行失败,并终止所有其他流式处理查询。
  • 你无需在笔记本的末尾使用 streamingQuery.awaitTermination()spark.streams.awaitAnyTermination()。 当流式处理查询处于活动状态时,作业会自动防止运行完成。
  • Databricks 建议在协调结构化流式处理笔记本时使用作业而不是 %rundbutils.notebook.run()。 请参阅从一个 Databricks 笔记本运行另一个笔记本

以下是推荐的作业配置示例。

  • 群集:将此项始终设置为使用新群集并使用最新的 Spark 版本(或至少使用版本 2.1)。 在查询和 Spark 版本升级之后,在 Spark 2.1 及更高版本中启动的查询是可恢复的查询。
  • 通知:如果希望在失败时收到电子邮件通知,请设置此项。
  • 计划不设置计划
  • 超时请勿设置超时。流式处理查询会运行无限长的时间。
  • 最大并发运行数:设置为 1。 每个查询只能有一个并发的活动实例。
  • 重试:设置为“不受限制”。

请参阅创建和运行 Azure Databricks 作业,以了解这些配置。

在结构化流式处理查询发生更改后恢复

在从同一检查点位置进行的各次重启之间,对于流式处理查询中允许哪些更改存在限制。 下面的几种更改是不允许的,或者是更改效果未明确的。 对于它们:

  • “允许”一词意味着你可以执行指定的更改,但其效果的语义是否明确取决于查询和更改。
  • “不允许”一词意味着不应执行指定的更改,因为重启的查询可能会失败并出现不可预知的错误。
  • sdf 表示通过 sparkSession.readStream 生成的流式处理数据帧/数据集。

结构化流式处理查询中的更改类型

  • 输入源的数量或类型(即不同源)的更改:这是不允许的。
  • 输入源的参数中的更改:是否允许这样做,以及更改的语义是否明确取决于源和查询。 以下是一些示例。
    • 允许添加、删除和修改速率限制:

      spark.readStream.format("kafka").option("subscribe", "article")
      

      to

      spark.readStream.format("kafka").option("subscribe", "article").option("maxOffsetsPerTrigger", ...)
      
    • 通常不允许更改已订阅的文章和文件,因为结果不可预知:不允许将 spark.readStream.format("kafka").option("subscribe", "article") 更改为 spark.readStream.format("kafka").option("subscribe", "newarticle")

  • 触发器间隔更改:可以在增量批处理和时间间隔之间更改触发器。 请参阅更改运行之间的触发器间隔
  • 输出接收器类型的更改:允许在几个特定的接收器组合之间进行更改。 这需要根据具体情况进行验证。 以下是一些示例。
    • 允许从文件接收器更改为 Kafka 接收器。 Kafka 只会看到新数据。
    • 不允许从 Kafka 接收器更改为文件接收器。
    • 允许从 Kafka 接收器更改为 foreach,反之亦然。
  • 输出接收器的参数中的更改:是否允许这样做,以及更改的语义是否明确取决于接收器和查询。 以下是一些示例。
    • 不允许更改文件接收器的输出目录:不允许从 sdf.writeStream.format("parquet").option("path", "/somePath") 更改为 sdf.writeStream.format("parquet").option("path", "/anotherPath")
    • 对输出主题的更改是允许的:允许从 sdf.writeStream.format("kafka").option("topic", "topic1") 更改为 sdf.writeStream.format("kafka").option("topic", "topic2")
    • 允许更改用户定义的 foreach 接收器(即 ForeachWriter 代码),但更改的语义取决于代码。
  • 投影/筛选器/映射类操作中的更改:允许某些案例。 例如:
    • 允许添加/删除筛选器:允许将 sdf.selectExpr("a") 修改为 sdf.where(...).selectExpr("a").filter(...)
    • 允许对具有相同输出架构的投影进行更改:允许将 sdf.selectExpr("stringColumn AS json").writeStream 更改为 sdf.select(to_json(...).as("json")).writeStream
    • 具有不同输出架构的投影中的更改是有条件允许的:只有当输出接收器允许架构从 "a" 更改为 "b" 时,才允许从 sdf.selectExpr("a").writeStreamsdf.selectExpr("b").writeStream 的更改。
  • 有状态操作的更改:流式处理查询中的某些操作需要保留状态数据才能持续更新结果。 结构化流式处理自动为容错存储(例如,DBFS、Azure Blob 存储)的状态数据设置检查点并在重启后对其进行还原。 但是,这假设状态数据的架构在重启后保持不变。 这意味着,在两次重启之间不允许对流式处理查询的有状态操作进行任何更改(即,添加、删除或架构修改)。 下面列出了不应在两次重启之间更改其架构以确保状态恢复的有状态操作:
    • 流式处理聚合:例如,sdf.groupBy("a").agg(...)。 不允许对分组键或聚合的数量或类型进行任何更改。
    • 流式处理去重:例如,sdf.dropDuplicates("a")。 不允许对分组键或聚合的数量或类型进行任何更改。
    • 流间联接:例如 sdf1.join(sdf2, ...)(即,两个输入都是通过 sparkSession.readStream 生成的)。 不允许对架构或同等联接列进行更改。 不允许对联接类型(外部或内部)进行更改。 联接条件中的其他更改被视为错误定义。
    • 任意监控状态的操作:例如,sdf.groupByKey(...).mapGroupsWithState(...)sdf.groupByKey(...).flatMapGroupsWithState(...)。 不允许对用户定义状态的架构和超时类型进行任何更改。 允许在用户定义的状态映射函数中进行任何更改,但更改的语义效果取决于用户定义的逻辑。 如果你确实想要支持状态架构更改,则可以使用支持架构迁移的编码/解码方案,将复杂的状态数据结构显式编码/解码为字节。 例如,如果你将状态另存为 Avro 编码的字节,则可以在两次查询重启之间更改 Avro 状态架构,因为这会还原二进制状态。