结构化流式处理的生产注意事项

本页提供在 Azure Databricks 上使用作业计划结构化流式处理作业工作负载的建议。

Databricks 建议始终执行以下操作:

  • 从返回结果的笔记本中删除不必要的代码,例如 displaycount
  • 请勿使用通用计算能力来运行结构化流式处理工作负载。 始终使用作业计算功能将流流程作为任务进行调度。
  • 使用 Continuous 模式计划作业。 这指的是Azure Databricks作业计划功能,而不是结构化流式处理trigger 间隔
  • 不要为结构化流式处理作业启用计算自动缩放。

某些工作负载会受益于以下功能:

Azure Databricks引入了 Lakeflow Spark 声明性管道,以减少管理结构化流式处理工作负荷的生产基础结构的复杂性。 Databricks 建议对新的结构化流管道使用 Lakeflow Spark 声明式管道。 请参阅 Lakeflow Spark 声明式管道

注意

计算资源自动缩放在缩小结构化流式处理工作负载的群集规模方面存在限制。 Databricks 建议使用具有增强自动缩放功能的 Lakeflow Spark 声明性管道来处理流式工作负载。 请参阅 使用自动缩放优化 Lakeflow Spark 声明性管道的群集利用率

注意

在无服务器计算中,仅 Trigger.AvailableNow() 受支持且 Trigger.Once() 受支持。 Databricks 建议使用Trigger.AvailableNow()

对于无服务器计算上的连续流式处理,请在连续模式下使用触发与连续管道模式。

请参阅 流式处理限制

设计流媒体任务时应预期失败

Databricks 建议始终将流式处理作业配置为在失败时自动重启。 某些功能(包括架构演变)假定结构化流式处理工作负载配置为自动重试。 请参阅将结构化流式处理作业配置为在故障时重启流查询

某些操作(例如 foreachBatch)提供至少一次而非恰好一次的保证。 对于这些操作,应确保处理管道是幂等的。 请参阅使用 foreachBatch 将数据写入任意数据接收器

注意

当查询重启时,将会处理在之前运行中计划的微批处理。 如果作业因内存不足错误而失败,或者由于微批处理过大而手动取消了作业,则可能需要增加计算资源才能成功处理微批处理。

如果在运行之间更改了配置,这些配置将应用于计划的第一个新批处理。 请参阅在结构化流式处理查询发生更改后恢复

作业何时重试?

可以将多个任务安排为Azure Databricks作业的一部分。 使用连续触发器配置作业时,无法设置任务之间的依赖项。

可以选择以下任意一种方法在单个作业中调度多个流:

  • 多任务:定义一个具有多个任务的作业,这些任务会使用连续触发器运行流式处理工作负载。
  • 多查询:在单个任务的源代码中定义多个流式处理查询。

还可以组合使用这些策略。 下表比较了这些方法。

策略: 多个任务 多重查询
如何共享计算? Databricks 建议为每个流式处理任务部署适当大小的计算资源。 可以选择跨任务共享计算。 所有查询共享相同的计算。 可以选择性地将查询分配给调度器池
如何处理重试? 必须在所有任务失败之后才能进行作业重试。 如果任何查询失败,任务将会重试。

将结构化流式处理作业配置为在失败时重启流式处理查询

Databricks 建议使用连续触发器配置所有流式数据处理工作负载。 请参阅连续运行作业

连续触发器默认提供以下行为:

  • 防止作业出现多个并发运行。
  • 在上一次运行失败时启动新的运行。
  • 使用指数退避进行重试。

Databricks 建议在计划工作流时始终使用作业计算而不是通用计算。 在作业失败并重试时,将会部署新的计算资源。

注意

Databricks 建议不要使用 streamingQuery.awaitTermination()spark.streams.awaitAnyTermination()。 这些函数不需要包含在代码中,因为 Jobs 服务会在流式查询处于活动状态时自动阻止运行完成。 这两个函数都阻止笔记本单元完成并阻止作业服务跟踪流式处理查询。 此外,流处理积压指标和通知不发送到作业服务,这会中断作业通知。

使用调度程序池来处理多个流式查询

可以通过配置调度程序池,在从同一源代码运行多个流式查询时,将计算容量分配给查询。

默认情况下,笔记本中启动的所有查询都在同一公平调度池中运行。 Apache Spark 作业是由笔记本中所有流式查询的触发器生成的,它们会按照“先进先出 (FIFO)”的顺序一个接一个地运行。 这可能会导致查询中产生不必要的延迟,因为它们不能有效地共享群集资源。

调度池允许您声明哪些结构化流处理查询共享计算资源。

以下示例将 query1 分配给专用池,而 query2query3 共享一个计划程序池。

# Run streaming query1 in scheduler pool1
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool1")
df.writeStream.queryName("query1").toTable("table1")

# Run streaming query2 in scheduler pool2
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool2")
df.writeStream.queryName("query2").toTable("table2")

# Run streaming query3 in scheduler pool2
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool2")
df.writeStream.queryName("query3").toTable("table3")

注意

本地属性配置必须位于你启动流式处理查询时所在的笔记本单元中。

有关更多详细信息,请参阅 Apache 公平计划程序文档