Delta 表上的 Low Shuffle Merge 优化

Delta Lake MERGE 命令允许用户使用高级条件更新 Delta 表。 可以使用 MERGE 命令将源表、视图或 DataFrame 中的数据更新到目标表。 但是,当前算法未针对处理未修改的行进行完全优化。 通过 Low Shuffle Merge 优化,未修改的行将从更新匹配行所需的昂贵的随机操作中排除。

为什么我们需要 Low Shuffle Merge

目前 MERGE 操作是通过两个 Join 执行完成的。 第一个联接是使用整个目标表和源数据,查找目标表中包含任何匹配行的已接触文件列表。 之后,它会执行第二个联接,仅读取那些已接触的文件和源数据,以执行实际的表更新。 尽管第一个联接是为了减少第二个联接的数据量,但已接触的文件中仍有大量未修改的行。 第一个联接查询较轻,因为它仅读取给定匹配条件中的列。 第二个用于表更新的联接需要加载所有列,这会导致昂贵的随机过程。

使用 Low Shuffle Merge 优化时,Delta 会暂时保留第一个联接中的匹配行结果,并将其用于第二个联接。 根据结果,它会从繁重的随机过程中排除未修改的行。 匹配的行和未修改的行会有两个单独的写入作业,因此,输出文件数可能是之前行为的 2 倍。 但是,预期的性能提升盖过了可能的小文件问题。

可用性

注意

  • Low Shuffle Merge 作为预览功能提供。

它适用于 Synapse Pools for Apache Spark 版本 3.2 和 3.3。

版本 可用性 默认
Delta 0.6 / Spark 2.4 -
Delta 1.2 / Spark 3.2 false
Delta 2.2 / Spark 3.3

Low Shuffle Merge 的优势

  • 已接触文件中未修改的行将单独处理,并且不会执行实际的 MERGE 操作。 它可以节省整体 MERGE 执行的时间和计算资源。 复制多个行并且仅更新几行时,增益会更大。
  • 未修改的行的行顺序会保留。 因此,如果文件已排序或按 Z-ORDERED 排序,则未修改的行的输出文件仍然可以有效地跳过数据。
  • 即使在最坏的情况下,MERGE 条件与已接触文件中的所有行匹配,开销也是很小的。

如何启用和禁用 Low Shuffle Merge

为池或会话设置配置后,所有 Spark 写入模式都将使用该功能。

若要使用 Low Shuffle Merge 优化,请使用以下配置启用它:

  1. Scala 和 PySpark
spark.conf.set("spark.microsoft.delta.merge.lowShuffle.enabled", "true")
  1. Spark SQL
SET `spark.microsoft.delta.merge.lowShuffle.enabled` = true

若要检查当前配置值,请使用如下所示的命令:

  1. Scala 和 PySpark
spark.conf.get("spark.microsoft.delta.merge.lowShuffle.enabled")
  1. Spark SQL
SET `spark.microsoft.delta.merge.lowShuffle.enabled`

要禁用此功能,请更改以下配置,如下所示:

  1. Scala 和 PySpark
spark.conf.set("spark.microsoft.delta.merge.lowShuffle.enabled", "false")
  1. Spark SQL
SET `spark.microsoft.delta.merge.lowShuffle.enabled` = false