配置结构化流式处理触发器间隔
Apache Spark 结构化流式处理以增量方式处理数据;通过控制批处理的触发器间隔,可以将结构化流式处理用于工作负载,包括近实时处理、每隔 5 分钟或每小时刷新一次,数据库或批处理一天或一周的所有新数据。
由于 Databricks 自动加载程序使用结构化流式处理来加载数据,因此理解触发器的工作原理使你能够非常灵活地控制成本,同时以所需的频率引入数据。
指定基于时间的触发器间隔
结构化流式处理将基于时间的触发器间隔称为“固定间隔微批处理”。 使用 processingTime
关键字,将持续时间指定为字符串,例如 .trigger(processingTime='10 seconds')
。
如果指定的 trigger
间隔太小(小于数十秒),则系统可能会执行不必要的检查来查看新数据是否已到达。 配置处理时间以平衡延迟要求和数据到达源的速率。
配置增量批处理
重要
在 Databricks Runtime 11.3 LTS 及更高版本中,Trigger.Once
设置被弃用。 Databricks 建议对所有增量式批处理工作负载使用 Trigger.AvailableNow
。
“立即可用”触发器选项将所有可用记录用作一个增量批,并且让你能够使用 maxBytesPerTrigger
等选项配置批大小(大小选项因数据源而异)。
Azure Databricks 支持使用 Trigger.AvailableNow
从许多结构化流式处理源进行增量批处理。 下表包含每个数据源所需的最低受支持 Databricks Runtime 版本:
源 | 最低 Databricks Runtime 版本 |
---|---|
文件源(JSON、Parquet 等) | 9.1 LTS |
Delta Lake | 10.4 LTS |
自动加载程序 | 10.4 LTS |
Apache Kafka | 10.4 LTS |
Kinesis | 13.1 |
什么是默认触发器间隔?
结构化流式处理默认为 500 毫秒的固定间隔微批处理。 Databricks 建议始终指定一个定制的 trigger
,以最大程度地降低与检查新数据是否已到达和处理规模过小的批处理相关的成本。
更改运行之间的触发器间隔
可以使用同一检查点更改运行之间的触发器间隔。
如果结构化流作业在处理微批次时停止,则该微批次必须在新的触发间隔应用之前完成。 因此,在更改触发间隔后,可能会观察到具有先前指定设置的微批处理。
从基于时间的间隔转向使用 AvailableNow
时,这可能会导致在将所有可用记录作为增量批次处理之前进行微批处理。
从 AvailableNow
转换为基于时间的间隔时,这可能会导致继续处理上次触发 AvailableNow
作业时可用的所有记录。 这是预期的行为。
注意
如果尝试从与增量批处理相关的查询失败中恢复,则更改触发间隔并不能解决此问题,因为批处理仍必须完成。 Databricks 建议扩大用于处理批次的计算容量,以尝试解决问题。 在极少数情况下,可能需要使用新的检查点重新启动流。
什么是连续处理模式?
Apache Spark 支持称为连续处理的附加触发器间隔。 自 Spark 2.3 起,此模式已被归类为试验模式;请咨询 Azure Databricks 帐户团队,确保理解此处理模型的利弊。
请注意,此连续处理模式与增量实时表中应用的连续处理完全无关。