自适应查询执行Adaptive query execution

自适应查询执行 (AQE) 是在查询执行期间发生的查询重新优化。Adaptive query execution (AQE) is query re-optimization that occurs during query execution.

运行时重新优化的推动因素是 Azure Databricks 在随机和广播交换(在 AQE 中称为查询阶段)结束时具有最新的准确统计信息。The motivation for runtime re-optimization is that Azure Databricks has the most up-to-date accurate statistics at the end of a shuffle and broadcast exchange (referred to as a query stage in AQE). 因此,Azure Databricks 可以选择更好的物理策略、选择最佳的随机后分区大小和数目,或执行以前需要提示的优化(例如倾斜联接处理)。As a result, Azure Databricks can opt for a better physical strategy, pick an optimal post-shuffle partition size and number, or do optimizations that used to require hints, for example, skew join handling.

这在未启用统计信息收集功能或统计信息过时的情况下会非常有用。This can be very useful when statistics collection is not turned on or when statistics are stale. 在静态派生的统计信息不准确的情况下(例如在复杂查询的过程中或在发生数据倾斜之后),也很有用。It is also useful in places where statically derived statistics are inaccurate, such as in the middle of a complicated query, or after the occurrence of data skew.

功能Capabilities

在 Databricks Runtime 7.3 中,会默认启用 AQE。In Databricks Runtime 7.3, AQE is enabled by default. 它有 4 个主要功能:It has 4 major features:

  • 将排序合并联接动态更改为广播哈希联接。Dynamically changes sort merge join into broadcast hash join.
  • 在随机交换后将分区进行动态联合(将小分区合并为大小合理的分区)。Dynamically coalesces partitions (combine small partitions into reasonably sized partitions) after shuffle exchange. 非常小的任务具有较差的 I/O 吞吐量,并且往往会产生更多计划开销和任务设置开销。Very small tasks have worse I/O throughput and tend to suffer more from scheduling overhead and task setup overhead. 合并小型任务可节省资源并提高群集吞吐量。Combining small tasks saves resources and improves cluster throughput.
  • 动态处理排序合并联接和随机哈希联接中的倾斜,方法是将倾斜的任务拆分(如果需要,还要进行复制)为大小大致相等的任务。Dynamically handles skew in sort merge join and shuffle hash join by splitting (and replicating if needed) skewed tasks into roughly evenly sized tasks.
  • 动态检测并传播空关系。Dynamically detects and propagates empty relations.

应用程序Application

AQE 适用于以下所有查询:AQE applies to all queries that are:

  • 非流式处理Non-streaming
  • 包含至少一个交换(通常在有联接、聚合或窗口时)、一个子查询或同时包含两者。Contain at least one exchange (usually when there’s a join, aggregate, or window), one sub-query, or both.

并非所有应用 AQE 的查询都需要重新优化。Not all AQE-applied queries are necessarily re-optimized. 重新优化产生的查询计划可能会与静态编译的查询计划不同,也可能不会。The re-optimization might or might not come up with a different query plan than the one statically compiled. 请参阅下一节,了解如何确定 AQE 是否更改了查询的计划。Refer to the next section regarding how to determine if a query’s plan has been changed by AQE.

查询计划Query plans

本部分讨论如何以不同方式检查查询计划。This section discusses how you can examine query plans in different ways.

本节内容:In this section:

Spark UISpark UI

AdaptiveSparkPlan 节点AdaptiveSparkPlan node

应用了 AQE 的查询包含一个或多个 AdaptiveSparkPlan 节点,通常作为每个主查询或子查询的根节点。AQE-applied queries contain one or more AdaptiveSparkPlan nodes, usually as the root node of each main query or sub-query. 在查询运行之前或运行时,相应的 AdaptiveSparkPlan 节点的 isFinalPlan 标志将显示为 false;查询执行完成后,isFinalPlan 标志变为 true.Before the query runs or when it is running, the isFinalPlan flag of the corresponding AdaptiveSparkPlan node shows as false; after the query execution completes, the isFinalPlan flag changes to true.

不断发展的计划Evolving plan

查询计划图随着执行的进展而变化,并反映正在执行的最新计划。The query plan diagram evolves as the execution progresses and reflects the most current plan that is being executed. 已执行的节点(其中的指标可用)将不会变化,但未执行的节点可能会因重新优化而随时间的推移发生变化。Nodes that have already been executed (in which metrics are available) will not change, but those that haven’t can change over time as the result of re-optimizations.

下面是查询计划图示例:The following is a query plan diagram example:

查询计划图Query plan diagram

DataFrame.explain()

AdaptiveSparkPlan 节点AdaptiveSparkPlan node

应用了 AQE 的查询包含一个或多个 AdaptiveSparkPlan 节点,通常作为每个主查询或子查询的根节点。AQE-applied queries contain one or more AdaptiveSparkPlan nodes, usually as the root node of each main query or sub-query. 在查询运行之前或运行时,相应的 AdaptiveSparkPlan 节点的 isFinalPlan 标志将显示为 false;查询执行完成后,isFinalPlan 标志变为 trueBefore the query runs or when it is running, the isFinalPlan flag of the corresponding AdaptiveSparkPlan node shows as false; after the query execution completes, the isFinalPlan flag changes to true.

当前和初始计划Current and initial plan

在每个 AdaptiveSparkPlan 节点下,将同时出现初始计划(应用任何 AQE 优化之前的计划)和当前或最终计划,具体取决于执行是否已完成。Under each AdaptiveSparkPlan node there will be both the initial plan (the plan before applying any AQE optimizations) and the current or the final plan, depending on whether the execution has completed. 当前计划将随执行的进展而不断变化。The current plan will evolve as the execution progresses.

运行时统计信息Runtime statistics

每个随机和广播阶段都包含数据统计信息。Each shuffle and broadcast stage contains data statistics.

在此阶段运行之前或运行时,统计信息是编译时估计值,标志 isRuntimefalse,例如 Statistics(sizeInBytes=1024.0 KiB, rowCount=4, isRuntime=false);Before the stage runs or when the stage is running, the statistics are compile-time estimates, and the flag isRuntime is false, for example: Statistics(sizeInBytes=1024.0 KiB, rowCount=4, isRuntime=false);

阶段执行完成后,统计信息则是在运行时收集的,标志 isRuntime 将变成 true,例如 Statistics(sizeInBytes=658.1 KiB, rowCount=2.81E+4, isRuntime=true)After the stage execution completes, the statistics are those collected at runtime, and the flag isRuntime will become true, for example: Statistics(sizeInBytes=658.1 KiB, rowCount=2.81E+4, isRuntime=true)

下面是一个 DataFrame.explain 示例:The following is a DataFrame.explain example:

  • 执行之前Before the execution

    执行之前Before execution

  • 执行期间During the execution

    执行期间During execution

  • 执行之后After the execution

    执行之后After execution

SQL EXPLAIN

AdaptiveSparkPlan 节点AdaptiveSparkPlan node

应用 AQE 的查询包含一个或多个 AdaptiveSparkPlan 节点,通常作为每个主查询或子查询的根节点。AQE-applied queries contain one or more AdaptiveSparkPlan nodes, usually as the root node of each main query or sub-query.

无当前计划No current plan

由于 SQL EXPLAIN 不执行查询,当前计划始终与初始计划相同,并且不反映 AQE 最终将执行什么计划。As SQL EXPLAIN does not execute the query, the current plan is always the same as the initial plan and does not reflect what would eventually get executed by AQE.

下面是一个 SQL 说明示例:The following is a SQL explain example:

SQL 说明SQL explain

有效性Effectiveness

如果一个或多个 AQE 优化生效,查询计划将更改。The query plan will change if one or more AQE optimizations take effect. 当前和最终计划与初始计划之间的差异以及当前和最终计划中的特定计划节点,反映了这些 AQE 优化的效果。The effect of these AQE optimizations is demonstrated by the difference between the current and final plans and the initial plan and specific plan nodes in the current and final plans.

  • 将排序合并联接动态更改为广播哈希联接:当前/最终计划与初始计划之间的不同物理联接节点Dynamically change sort merge join into broadcast hash join: different physical join nodes between the current/final plan and the initial plan

    联接策略字符串Join strategy string

  • 动态联合分区:带有 Coalesced 属性的 CustomShuffleReader 节点Dynamically coalesce partitions: node CustomShuffleReader with property Coalesced

    CustomShuffleReaderCustom shuffle reader

    CustomShuffleReader 字符串Custom shuffle reader string

  • 动态处理倾斜联接:isSkew 字段为 true 的 SortMergeJoin 节点。Dynamically handle skew join: node SortMergeJoin with field isSkew as true.

    倾斜联接计划Skew join plan

    倾斜联接字符串Skew join string

  • 动态检测并传播空关系:部分(或整个)计划被节点 LocalTableScan 替换,其关系字段为空。Dynamically detect and propagate empty relations: part of (or entire) the plan is replaced by node LocalTableScan with the relation field as empty.

    本地表扫描Local table scan

    LocalTableScan 字符串Local table scan string

配置Configuration

在此部分的属性中,将 <prefix> 替换为 spark.sql.adaptiveIn the properties in this section, replace <prefix> with spark.sql.adaptive.

本节内容:In this section:

启用和禁用自适应查询执行Enable and disable adaptive query execution

属性Property 默认Default 说明Description
<prefix>.enabled true 是启用还是禁用自适应查询执行。Whether to enable or disable adaptive query execution.

将排序合并联接动态更改为广播哈希联接Dynamically change sort merge join into broadcast hash join

属性Property 默认Default 说明Description
<prefix>.autoBroadcastJoinThreshold 30MB30MB 运行时切换操作(切换到广播联接)的触发阈值。The threshold to trigger switching to broadcast join at runtime.

动态联合分区Dynamically coalesce partitions

属性Property 默认Default 说明Description
<prefix>.coalescePartitions.enabled true 是启用还是禁用分区联合。Whether to enable or disable partition coalescing.
<prefix>.advisoryPartitionSizeInBytes 64MB64MB 联合后的目标大小。The target size after coalescing. 联合后的分区大小将接近但不大于此目标大小。The coalesced partition sizes will be close to but no bigger than this target size.
<prefix>.coalescePartitions.minPartitionSize 1MB1MB 联合后的最小分区大小。The minimum size of partitions after coalescing. 联合后的分区大小将不小于此大小。The coalesced partition sizes will be no smaller than this size.
<prefix>.coalescePartitions.minPartitionNum 2x2x no. 群集核心数of cluster cores 联合后的最小分区数。The minimum number of partitions after coalescing. 不建议,因为显式设置会覆盖 <prefix>.coalescePartitions.minPartitionSizeNot recommended, because setting explicitly overrides <prefix>.coalescePartitions.minPartitionSize.

动态处理倾斜联接Dynamically handle skew join

属性Property 默认Default 说明Description
<prefix>.skewJoin.enabled true 设置 true/false 来启用/禁用倾斜联接处理。Set true/false to enable/disable skew join handling.
<prefix>.skewJoin.skewedPartitionFactor 55 一个系数,乘以分区大小中值时有助于确定分区是否倾斜。A factor that when multiplied by the median partition size contributes to determining whether a partition is skewed.
<prefix>.skewJoin.skewedPartitionThresholdInBytes 256 MB256MB 有助于确定分区是否倾斜的阈值。A threshold that contributes to determining whether a partition is skewed.

(partition size > skewedPartitionFactor * median partition size)(partition size > skewedPartitionThresholdInBytes) 均为 true 时,可认为分区倾斜。A partition is considered skewed when both (partition size > skewedPartitionFactor * median partition size) and (partition size > skewedPartitionThresholdInBytes) are true.

动态检测并传播空关系Dynamically detect and propagate empty relations

属性Property 默认Default 说明Description
<prefix>.emptyRelationPropagation.enabled true 是启用还是禁用动态空关系传播。Whether to enable or disable dynamic empty relation propagation.

常见问题 (FAQ)Frequently asked questions (FAQs)

本节内容:In this section:

为什么 AQE 没有更改随机分区数,尽管分区联合已启用?Why didn’t AQE change the shuffle partition number despite the partition coalescing already being enabled?

AQE 不会更改初始分区数。AQE does not change the initial partition number. 建议为随机分区数设置合理的高值,并让 AQE 根据每个查询阶段的输出数据大小,联合小分区。It is recommended that you set a reasonably high value for the shuffle partition number and let AQE coalesce small partitions based on the output data size at each stage of the query.

如果在作业中看到溢出,可以尝试执行以下操作:If you see spilling in your jobs, you can try:

  • 增加随机分区数配置:spark.sql.shuffle.partitionsIncreasing the shuffle partition number config: spark.sql.shuffle.partitions
  • 通过将 <prefix>.autoOptimizeShuffle.enabled 设置为 true 来启用自动随机优化Enabling auto optimized shuffle by setting <prefix>.autoOptimizeShuffle.enabled to true

为什么 AQE 未广播小型联接表?Why didn’t AQE broadcast a small join table?

如果预期要广播的关系的大小确实低于此阈值,但仍未广播:If the size of the relation expected to be broadcast does fall under this threshold but is still not broadcast:

  • 检查联接类型。Check the join type. 某些联接类型不支持广播,例如,LEFT OUTER JOIN 的左关系无法广播。Broadcast is not supported for certain join types, for example, the left relation of a LEFT OUTER JOIN cannot be broadcast.
  • 也可能是因为该关系包含大量空分区,在这种情况下,大部分任务都可使用排序合并联接来快速完成,或者可使用倾斜联接处理进行优化。It can also be that the relation contains a lot of empty partitions, in which case the majority of the tasks can finish quickly with sort merge join or it can potentially be optimized with skew join handling. 如果非空分区的百分比低于 <prefix>.nonEmptyPartitionRatioForBroadcastJoin,AQE 会避免将此类排序合并联接更改为广播哈希联接。AQE avoids changing such sort merge joins to broadcast hash joins if the percentage of non-empty partitions is lower than <prefix>.nonEmptyPartitionRatioForBroadcastJoin.

我是否仍应使用已启用 AQE 的广播联接策略提示?Should I still use a broadcast join strategy hint with AQE enabled?

是的。Yes. 静态计划的广播联接的性能通常比由 AQE 动态计划的广播联接的性能要好,因为在同时对联接的两侧执行随机之前(实际的关系大小在这段时间内获取),AQE 可能不会切换到广播联接。A statically planned broadcast join is usually more performant than a dynamically planned one by AQE as AQE might not switch to broadcast join until after performing shuffle for both sides of the join (by which time the actual relation sizes are obtained). 因此,如果你非常了解查询,使用广播提示仍是一个不错的选择。So using a broadcast hint can still be a good choice if you know your query well. AQE 将像静态优化那样遵循查询提示,但仍可应用不受提示影响的动态优化。AQE will respect query hints the same way as static optimization does, but can still apply dynamic optimizations that are not affected by the hints.

倾斜联接提示与 AQE 倾斜联接优化之间有何区别?What is the difference between skew join hint and AQE skew join optimization? 应使用哪种方法?Which one should I use?

建议依赖 AQE 倾斜联接处理而不使用倾斜联接提示,因为 AQE 倾斜联接完全是自动的,并且其性能通常比提示的性能要好。It is recommended to rely on AQE skew join handling rather than use the skew join hint, because AQE skew join is completely automatic and in general performs better than the hint counterpart.

为什么 AQE 没有自动调整我的联接顺序?Why didn’t AQE adjust my join ordering automatically?

从 Databricks Runtime 7.3 开始,动态联接重新排序不再是 AQE 的一部分。Dynamic join reordering is not part of AQE as of Databricks Runtime 7.3.

为什么 AQE 没有检测到数据歪斜?Why didn’t AQE detect my data skew?

要使 AQE 将分区检测为倾斜分区,必须满足两个有关大小的条件:There are two size conditions that must be satisfied for AQE to detect a partition as a skewed partition:

  • 分区大小大于 <prefix>.skewJoin.skewedPartitionThresholdInBytes(默认为 256MB)The partition size is larger than the <prefix>.skewJoin.skewedPartitionThresholdInBytes (default 256MB)
  • 分区大小大于所有分区大小的中值与倾斜分区系数 <prefix>.skewJoin.skewedPartitionFactor(默认值为 5)的乘积The partition size is larger than the median size of all partitions times the skewed partition factor <prefix>.skewJoin.skewedPartitionFactor (default 5)

此外,对倾斜处理的支持对于某些联接类型是有限的,例如在 LEFT OUTER JOIN 中,只能优化左侧的倾斜。In addition, skew handling support is limited for certain join types, for example, in LEFT OUTER JOIN, only skew on the left side can be optimized.

旧的Legacy

术语“自适应执行”自 Spark 1.6 以来便已存在,但 Spark 3.0 中的新 AQE 是完全不同的一个概念。The term “Adaptive Execution” has existed since Spark 1.6, but the new AQE in Spark 3.0 is fundamentally different. 就功能而言,Spark 1.6 只执行“动态联合分区”部分。In terms of functionality, Spark 1.6 does only the “dynamically coalesce partitions” part. 就技术体系结构而言,新 AQE 是一种基于运行时统计信息的动态计划和重新计划查询的框架,它支持多种优化(如本文中所述的那些优化),并可进行扩展以实现更多可能的优化。In terms of technical architecture, the new AQE is a framework of dynamic planning and replanning of queries based on runtime stats, which supports a variety of optimizations such as the ones we have described in this article and can be extended to enable more potential optimizations.