自适应查询执行

自适应查询执行 (AQE) 是在查询执行期间发生的查询重新优化。

运行时重新优化的推动因素是 Azure Databricks 在随机和广播交换(在 AQE 中称为查询阶段)结束时具有最新的准确统计信息。 因此,Azure Databricks 可以选择更好的物理策略、选择最佳的随机后分区大小和数目,或执行以前需要提示的优化(例如倾斜联接处理)。

这在未启用统计信息收集功能或统计信息过时的情况下会非常有用。 在静态派生的统计信息不准确的情况下(例如在复杂查询的过程中或在发生数据倾斜之后),也很有用。

功能

AQE 在默认情况下处于启用状态。 它有 4 个主要功能:

  • 将排序合并联接动态更改为广播哈希联接。
  • 在随机交换后将分区进行动态联合(将小分区合并为大小合理的分区)。 非常小的任务具有较差的 I/O 吞吐量,并且往往会产生更多计划开销和任务设置开销。 合并小型任务可节省资源并提高群集吞吐量。
  • 动态处理排序合并联接和随机哈希联接中的倾斜,方法是将倾斜的任务拆分(如果需要,还要进行复制)为大小大致相等的任务。
  • 动态检测并传播空关系。

应用程序

AQE 适用于以下所有查询:

  • 非流式处理
  • 包含至少一个交换(通常在有联接、聚合或窗口时)、一个子查询或同时包含两者。

并非所有应用 AQE 的查询都需要重新优化。 重新优化产生的查询计划可能会与静态编译的查询计划不同,也可能不会。 若要确定 AQE 是否更改了查询的计划,请参阅以下部分:查询计划

查询计划

本部分讨论如何以不同方式检查查询计划。

本节内容:

Spark UI

AdaptiveSparkPlan 节点

应用了 AQE 的查询包含一个或多个 AdaptiveSparkPlan 节点,通常作为每个主查询或子查询的根节点。 在查询运行之前或运行时,相应的 AdaptiveSparkPlan 节点的 isFinalPlan 标志将显示为 false;查询执行完成后,isFinalPlan 标志变为 true.

不断发展的计划

查询计划图随着执行的进展而变化,并反映正在执行的最新计划。 已执行的节点(其中的指标可用)将不会变化,但未执行的节点可能会因重新优化而随时间的推移发生变化。

下面是查询计划图示例:

Query plan diagram

DataFrame.explain()

AdaptiveSparkPlan 节点

应用了 AQE 的查询包含一个或多个 AdaptiveSparkPlan 节点,通常作为每个主查询或子查询的根节点。 在查询运行之前或运行时,相应的 AdaptiveSparkPlan 节点的 isFinalPlan 标志将显示为 false;查询执行完成后,isFinalPlan 标志变为 true

当前和初始计划

在每个 AdaptiveSparkPlan 节点下,将同时出现初始计划(应用任何 AQE 优化之前的计划)和当前或最终计划,具体取决于执行是否已完成。 当前计划将随执行的进展而不断变化。

运行时统计信息

每个随机和广播阶段都包含数据统计信息。

在此阶段运行之前或运行时,统计信息是编译时估计值,标志 isRuntimefalse,例如 Statistics(sizeInBytes=1024.0 KiB, rowCount=4, isRuntime=false);

阶段执行完成后,统计信息则是在运行时收集的,标志 isRuntime 将变成 true,例如 Statistics(sizeInBytes=658.1 KiB, rowCount=2.81E+4, isRuntime=true)

下面是一个 DataFrame.explain 示例:

  • 执行之前

    Before execution

  • 执行期间

    During execution

  • 执行之后

    After execution

SQL EXPLAIN

AdaptiveSparkPlan 节点

应用 AQE 的查询包含一个或多个 AdaptiveSparkPlan 节点,通常作为每个主查询或子查询的根节点。

无当前计划

由于 SQL EXPLAIN 不执行查询,当前计划始终与初始计划相同,并且不反映 AQE 最终将执行什么计划。

下面是一个 SQL 说明示例:

SQL explain

有效性

如果一个或多个 AQE 优化生效,查询计划将更改。 当前和最终计划与初始计划之间的差异以及当前和最终计划中的特定计划节点,反映了这些 AQE 优化的效果。

  • 将排序合并联接动态更改为广播哈希联接:当前/最终计划与初始计划之间的不同物理联接节点

    Join strategy string

  • 动态联合分区:带有 Coalesced 属性的 CustomShuffleReader 节点

    Custom shuffle reader

    Custom shuffle reader string

  • 动态处理倾斜联接:isSkew 字段为 true 的 SortMergeJoin 节点。

    Skew join plan

    Skew join string

  • 动态检测并传播空关系:部分(或整个)计划被节点 LocalTableScan 替换,其关系字段为空。

    Local table scan

    Local table scan string

配置

本节内容:

启用和禁用自适应查询执行

属性
spark.databricks.optimizer.adaptive.enabled

类型:Boolean

是启用还是禁用自适应查询执行。

默认值:30true

启用自动优化的随机

属性
spark.sql.shuffle.partitions

类型:Integer

为联接或聚合混洗数据时使用的默认分区数。 设置值 auto 会启用自动优化的随机,它会根据查询计划和查询输入数据大小自动确定此数字。

注意:对于结构化流式处理,无法在从同一检查点位置重启查询的两次操作之间更改此配置。

默认值:200

将排序合并联接动态更改为广播哈希联接

属性
spark.databricks.adaptive.autoBroadcastJoinThreshold

类型:Byte String

运行时切换操作(切换到广播联接)的触发阈值。

默认值:3030MB

动态联合分区

属性
spark.sql.adaptive.coalescePartitions.enabled

类型:Boolean

是启用还是禁用分区联合。

默认值:30true
spark.sql.adaptive.advisoryPartitionSizeInBytes

类型:Byte String

联合后的目标大小。 联合后的分区大小将接近但不大于此目标大小。

默认值:3064MB
spark.sql.adaptive.coalescePartitions.minPartitionSize

类型:Byte String

联合后的最小分区大小。 联合后的分区大小将不小于此大小。

默认值:301MB
spark.sql.adaptive.coalescePartitions.minPartitionNum

类型:Integer

联合后的最小分区数。 不建议,因为显式设置会进行覆盖
spark.sql.adaptive.coalescePartitions.minPartitionSize

默认值:2x 群集核心数

动态处理倾斜联接

属性
spark.sql.adaptive.skewJoin.enabled

类型:Boolean

是启用还是禁用倾斜联接处理。

默认值:30true
spark.sql.adaptive.skewJoin.skewedPartitionFactor

类型:Integer

一个系数,乘以分区大小中值时有助于确定分区是否倾斜。

默认值:305
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes

类型:Byte String

有助于确定分区是否倾斜的阈值。

默认值:30256MB

(partition size > skewedPartitionFactor * median partition size)(partition size > skewedPartitionThresholdInBytes) 均为 true 时,可认为分区倾斜。

动态检测并传播空关系

属性
spark.databricks.adaptive.emptyRelationPropagation.enabled

类型:Boolean

是启用还是禁用动态空关系传播。

默认值:30true

常见问题解答 (FAQ)

本节内容:

为什么 AQE 未广播小型联接表?

如果预期要广播的关系的大小确实低于此阈值,但仍未广播:

  • 检查联接类型。 某些联接类型不支持广播,例如,LEFT OUTER JOIN 的左关系无法广播。
  • 也可能是因为该关系包含大量空分区,在这种情况下,大部分任务都可使用排序合并联接来快速完成,或者可使用倾斜联接处理进行优化。 如果非空分区的百分比低于 spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin,AQE 会避免将此类排序合并联接更改为广播哈希联接。

我是否仍应使用已启用 AQE 的广播联接策略提示?

是的。 静态计划的广播联接的性能通常比由 AQE 动态计划的广播联接的性能要好,因为在同时对联接的两侧执行随机之前(实际的关系大小在这段时间内获取),AQE 可能不会切换到广播联接。 因此,如果你非常了解查询,使用广播提示仍是一个不错的选择。 AQE 将像静态优化那样遵循查询提示,但仍可应用不受提示影响的动态优化。

倾斜联接提示与 AQE 倾斜联接优化之间有何区别? 应使用哪种方法?

建议依赖 AQE 倾斜联接处理而不使用倾斜联接提示,因为 AQE 倾斜联接完全是自动的,并且其性能通常比提示的性能要好。

为什么 AQE 没有自动调整我的联接顺序?

动态联接重新排序不是 AQE 的一部分。

为什么 AQE 没有检测到数据倾斜?

要使 AQE 将分区检测为倾斜分区,必须满足两个有关大小的条件:

  • 分区大小大于 spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes(默认为 256MB)
  • 分区大小大于所有分区大小的中值与倾斜分区系数 spark.sql.adaptive.skewJoin.skewedPartitionFactor(默认值为 5)的乘积

此外,对倾斜处理的支持对于某些联接类型是有限的,例如在 LEFT OUTER JOIN 中,只能优化左侧的倾斜。

旧的

术语“自适应执行”自 Spark 1.6 以来便已存在,但 Spark 3.0 中的新 AQE 是完全不同的一个概念。 就功能而言,Spark 1.6 只执行“动态联合分区”部分。 就技术体系结构而言,新 AQE 是一种基于运行时统计信息的动态计划和重新计划查询的框架,它支持多种优化(如本文中所述的那些优化),并可进行扩展以实现更多可能的优化。