在查询计划包含 BroadcastNestedLoopJoin 时禁用广播

本文介绍了当查询计划的物理计划中有 BroadcastNestedLoopJoin 时如何禁用广播。

你预计在禁用广播阈值后广播会停止(通过将 spark.sql.autoBroadcastJoinThreshold 设置为 -1),但 Apache Spark 会尝试广播较大的表,并因广播错误而失败。

此行为并不是一个 bug,但可能是意外的。 我们将查看预期行为,并提供此问题的缓解选项。

创建表

首先创建两个表,一个表具有 null 值 table_withNull,另一个不包含空值 tblA_NoNull

sql("SELECT id FROM RANGE(10)").write.mode("overwrite").saveAsTable("tblA_NoNull")
sql("SELECT id FROM RANGE(50) UNION SELECT NULL").write.mode("overwrite").saveAsTable("table_withNull")

尝试禁用广播

我们尝试通过为查询设置 spark.sql.autoBroadcastJoinThreshold 来禁用广播,该查询具有包含 in 子句的子查询。

spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
sql("select * from table_withNull where id not in (select id from tblA_NoNull)").explain(true)

如果查看查询计划,BroadcastNestedLoopJoin 是这种情况下最后一个可能的回退。 即使在尝试禁用广播后,它也会出现。

== Physical Plan ==
*(2) BroadcastNestedLoopJoin BuildRight, LeftAnti, ((id#2482L = id#2483L) || isnull((id#2482L = id#2483L)))
:- *(2) FileScan parquet default.table_withnull[id#2482L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[dbfs:/user/hive/warehouse/table_withnull], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint>
+- BroadcastExchange IdentityBroadcastMode, [id=#2586]
   +- *(1) FileScan parquet default.tbla_nonull[id#2483L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[dbfs:/user/hive/warehouse/tbla_nonull], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint>

如果要处理的数据足够大,则当 Spark 尝试广播表时会导致广播错误。

使用 not exists 而不是 in 重写查询

你可以通过使用 not exists 而不是 in 重写查询来解决该问题。

// It can be rewritten into a NOT EXISTS, which will become a regular join:
sql("select * from table_withNull where not exists (select 1 from tblA_NoNull where table_withNull.id = tblA_NoNull.id)").explain(true)

通过使用 not exists,查询将与 SortMergeJoin 一起运行。

== Physical Plan ==
SortMergeJoin [id#2482L], [id#2483L], LeftAnti
:- Sort [id#2482L ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(id#2482L, 200), [id=#2653]
:     +- *(1) FileScan parquet default.table_withnull[id#2482L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[dbfs:/user/hive/warehouse/table_withnull], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint>
+- Sort [id#2483L ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(id#2483L, 200), [id=#2656]
      +- *(2) Project [id#2483L]
         +- *(2) Filter isnotnull(id#2483L)
            +- *(2) FileScan parquet default.tbla_nonull[id#2483L] Batched: true, DataFilters: [isnotnull(id#2483L)], Format: Parquet, Location: InMemoryFileIndex[dbfs:/user/hive/warehouse/tbla_nonull], PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>

说明

Spark 不会自动执行此操作,因为 Spark 和 SQL 用于 null 处理的语义略有不同。

在 SQL 中,not in 表示如果 not in 值中有任何 null 值,则结果为空。 这就是为什么只能用 BroadcastNestedLoopJoin 来执行它的原因。 所有 not in 值都必须已知,以确保集合中没有空值。

示例笔记本

此笔记本包含一个完整的示例,展示了 Spark 不会自动将 BroadcastNestedLoopJoin 切换到 SortMergeJoin 的原因。

BroadcastNestedLoopJoin 示例笔记本

获取笔记本