다음을 통해 공유

结构化流式处理的生产注意事项

本文包含有关在 Azure Databricks 上使用作业来计划结构化流式处理工作负载的建议。

Databricks 建议始终执行以下操作:

  • 从返回结果的笔记本中删除不必要的代码,例如 displaycount
  • 请勿使用通用计算能力来运行结构化流式处理工作负载。 始终使用作业计算功能将流流程作为任务进行调度。
  • 使用 Continuous 模式计划作业。
  • 不要为结构化流式处理作业启用计算自动缩放。

某些工作负载会受益于以下功能:

Azure Databricks 引入了 Lakeflow Spark 声明性管道,以减少管理结构化流式处理工作负荷的生产基础结构的复杂性。 Databricks 建议对新的结构化流管道使用 Lakeflow Spark 声明式管道。 请参阅 Lakeflow Spark 声明式管道

注意

计算资源自动缩放在缩小结构化流式处理工作负载的群集规模方面存在限制。 Databricks 建议使用具有增强自动缩放功能的 Lakeflow Spark 声明性管道来处理流式工作负载。 请参阅 使用自动缩放优化 Lakeflow Spark 声明性管道的群集利用率

设计流媒体任务时应预期失败

Databricks 建议始终将流式处理作业配置为在失败时自动重启。 某些功能(包括架构演变)假定结构化流式处理工作负载已配置为自动重试。 请参阅将结构化流式处理作业配置为在故障时重启流查询

某些操作(例如 foreachBatch)提供至少一次而非恰好一次的保证。 对于这些操作,应确保您的处理管道是幂等的。 请参阅使用 foreachBatch 将数据写入任意数据接收器

注意

当查询重启时,将会处理在之前运行中计划的微批处理。 如果作业因内存不足错误而失败,或者由于微批处理过大而手动取消了作业,则可能需要增加计算资源才能成功处理微批处理。

如果在运行之间更改了配置,这些配置将应用于计划的第一个新批处理。 请参阅在结构化流式处理查询发生更改后恢复

作业何时重试?

可以计划多个任务作为 Azure Databricks 作业的一部分。 使用连续触发器配置作业时,无法设置任务之间的依赖项。

可以选择以下任意一种方法在单个作业中调度多个流:

  • 多任务:定义一个具有多个任务的作业,这些任务会使用连续触发器运行流式处理工作负载。
  • 多查询:在单个任务的源代码中定义多个流式处理查询。

还可以组合使用这些策略。 下表比较了这些方法。

策略: 多个任务 多重查询
如何共享计算? Databricks 建议为每个流式处理任务部署适当大小的计算资源。 可以选择跨任务共享计算。 所有查询共享相同的计算。 可以选择性地将查询分配给调度器池
如何处理重试? 必须在所有任务失败之后才能进行作业重试。 如果任何查询失败,任务将会重试。

将结构化流式处理作业配置为在失败时重启流式处理查询

Databricks 建议使用连续触发器配置所有流式数据处理工作负载。 请参阅连续运行作业

连续触发器默认提供以下行为:

  • 防止作业出现多个并发运行。
  • 在上一次运行失败时启动新的运行。
  • 使用指数退避进行重试。

Databricks 建议在计划工作流时始终使用作业计算而不是通用计算。 在作业失败并重试时,将会部署新的计算资源。

注意

不需要使用 streamingQuery.awaitTermination()spark.streams.awaitAnyTermination()。 当流式处理查询处于活动状态时,作业会自动防止运行完成。

使用调度程序池来处理多个流式查询

可以通过配置调度程序池,在从同一源代码运行多个流式查询时,将计算容量分配给查询。

默认情况下,笔记本中启动的所有查询都在同一公平调度池中运行。 Apache Spark 作业是由笔记本中所有流式查询的触发器生成的,它们会按照“先进先出 (FIFO)”的顺序一个接一个地运行。 这可能会导致查询中产生不必要的延迟,因为它们不能有效地共享群集资源。

调度池允许您声明哪些结构化流处理查询共享计算资源。

以下示例将 query1 分配给专用池,而 query2query3 共享一个计划程序池。

# Run streaming query1 in scheduler pool1
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool1")
df.writeStream.queryName("query1").toTable("table1")

# Run streaming query2 in scheduler pool2
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool2")
df.writeStream.queryName("query2").toTable("table2")

# Run streaming query3 in scheduler pool2
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool2")
df.writeStream.queryName("query3").toTable("table3")

注意

本地属性配置必须位于你启动流式处理查询时所在的笔记本单元中。

有关更多详细信息,请参阅 Apache 公平计划程序文档