结构化流式处理的生产注意事项

本页提供在 Azure Databricks 上使用作业计划结构化流式处理作业工作负载的建议。

Databricks 建议始终配置以下内容:

  • 从返回结果的笔记本中删除不必要的代码,例如 displaycount
  • 请勿使用通用计算能力来运行结构化流式处理工作负载。 始终使用作业计算功能将流流程作为任务进行调度。
  • 使用 Continuous 模式计划作业。 这指的是Azure Databricks作业计划功能,而不是结构化流式处理trigger 间隔
  • 不要为结构化流式处理作业启用计算自动缩放。

某些工作负载会受益于以下功能:

Azure Databricks引入了 Lakeflow Spark 声明性管道,以减少管理结构化流式处理工作负荷的生产基础结构的复杂性。 Databricks 建议对新的结构化流管道使用 Lakeflow Spark 声明式管道。 请参阅 Lakeflow Spark 声明式管道

注意

计算资源自动缩放在缩小结构化流式处理工作负载的群集规模方面存在限制。 Databricks 建议使用具有增强自动缩放功能的 Lakeflow Spark 声明性管道来处理流式工作负载。 请参阅 使用自动缩放优化 Lakeflow Spark 声明性管道的群集利用率

注意

在无服务器计算中,仅 Trigger.AvailableNow() 受支持且 Trigger.Once() 受支持。 Databricks 建议使用Trigger.AvailableNow()

对于无服务器计算上的连续流式处理,请在连续模式下使用触发与连续管道模式。

请参阅 流式处理限制

设计流媒体任务时应预期失败

Databricks 建议始终将流式处理作业配置为在失败时自动重启。 某些功能(包括架构演变)要求将结构化流式处理工作负载配置为自动重试。 请参阅将结构化流式处理作业配置为在故障时重启流查询

某些操作(例如 foreachBatch)提供至少一次而非恰好一次的保证。 对于这些操作,请确保处理管道是幂等的。 请参阅使用 foreachBatch 将数据写入任意数据接收器

注意

当查询重启时,将会处理在之前运行中计划的微批处理。 如果作业因内存不足错误而失败,或者由于微批处理过大而手动取消了作业,则可能需要增加计算资源才能成功处理微批处理。

如果在运行之间更改了配置,这些配置将应用于计划的第一个新批处理。 请参阅在结构化流式处理查询发生更改后恢复

作业何时重试?

可以将多个任务安排为Azure Databricks作业的一部分。 使用连续触发器配置作业时,无法设置任务之间的依赖项。

可以选择以下任意一种方法在单个作业中调度多个流:

  • 多任务:定义一个具有多个任务的作业,这些任务会使用连续触发器运行流式处理工作负载。
  • 多查询:在单个任务的源代码中定义多个流式处理查询。

还可以组合使用这些策略。 下表比较了这些方法。

策略 多个任务 多重查询
如何共享计算? Databricks 建议为每个流式处理任务部署适当大小的计算资源。 可以选择跨任务共享计算。 所有查询共享相同的计算。 可以选择将查询分配给 调度池
如何处理重试? 必须在所有任务失败之后才能进行作业重试。 如果任何查询失败,任务将会重试。

将结构化流式处理作业配置为在失败时重启流式处理查询

Databricks 建议使用连续触发器配置所有流式数据处理工作负载。 请参阅连续运行作业

默认情况下,连续触发器具有以下行为:

  • 防止作业出现多个并发运行。
  • 在上一次运行失败时启动新的运行。
  • 使用指数退避进行重试。

Databricks 建议在计划工作流时始终使用作业计算而不是通用计算。 在作业失败并重试时,将会部署新的计算资源。

注意

Databricks 建议不要使用 streamingQuery.awaitTermination()spark.streams.awaitAnyTermination()。 请参阅 何时使用 awaitTermination()

何时使用 awaitTermination()

streamingQuery.awaitTermination()spark.streams.awaitAnyTermination() 阻止当前线程,直到流式查询终止。 是否使用这些函数取决于执行环境。

对于 Databricks Jobs,请勿使用 streamingQuery.awaitTermination()spark.streams.awaitAnyTermination()。 这些函数并不是必需的,因为作业服务会在流式查询处于活动状态时自动阻止运行完成。 这两个函数都阻止笔记本单元格完成执行,并阻止 Jobs 服务跟踪流式处理查询,这会影响积压指标和作业通知。

在以下情况下使用 awaitTermination()

用例 Behavior
用于全用途计算的交互式笔记本 awaitTermination() 使单元格保持运行状态,使你能够观察查询状态,并确保笔记本输出中的故障浮出水面。
本地和开发环境 在本地运行 Spark 程序时,当主线程完成时,进程将退出。 调用 awaitTermination() 以使程序保持活动状态,直到流式处理查询完成或失败。
故障蔓延至驱动程序 如果没有 awaitTermination(),那么在非作业上下文中的流式查询失败可能不会传播到调用线程。 查询可能会以无提示方式失败,从而使故障更难检测和诊断。 调用 awaitTermination() 会重新引发驱动程序上的查询异常。

使用调度程序池来处理多个流式查询

可以配置调度池,以便在从同一源代码运行多个流式查询时,将计算能力分配给查询。

默认情况下,笔记本中启动的所有查询都在同一公平调度池中运行。 Apache Spark 作业是由笔记本中所有流式查询的触发器生成的,它们会按照“先进先出 (FIFO)”的顺序一个接一个地运行。 这可能会导致查询中产生不必要的延迟,因为它们不能有效地共享群集资源。

调度池允许您声明哪些结构化流处理查询共享计算资源。

以下示例将query1分配到专用池,而query2query3共享一个计划程序池。

# Run streaming query1 in scheduler pool1
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool1")
df.writeStream.queryName("query1").toTable("table1")

# Run streaming query2 in scheduler pool2
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool2")
df.writeStream.queryName("query2").toTable("table2")

# Run streaming query3 in scheduler pool2
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool2")
df.writeStream.queryName("query3").toTable("table3")

注意

本地属性配置必须位于你启动流式处理查询时所在的笔记本单元中。

有关 Apache 公平调度器池的详细信息,请参阅 Apache 公平调度器文档