使用 Delta Lake 来选择性覆盖数据

Delta Lake 具有以下不同的选择性覆盖选项:

选项 用例 支持的计算类型 最低版本
REPLACE WHERE 以原子方式覆盖与谓词匹配的行。 用于具有固定匹配条件的替换项,例如 colA = 5int_col IN (1, 2, 3) 所有计算类型。 Databricks Runtime 12.2 LTS 及更高版本中的 SQL。 Databricks Runtime 9.1 LTS 及更高版本中的 Python 和 Scala。
REPLACE USING 动态数据覆写。 根据提供的数据集中列值的相等比较替换与指定列匹配的所有行。 所有计算类型。 Databricks Runtime 16.3 及更高版本中的 SQL。 Databricks Runtime 18.2 及更高版本中的 Python 和 Scala。
REPLACE ON 通过布尔表达式覆盖动态数据。 用于具有复杂或 NULL 安全匹配条件的替换,例如 s.colA <=> t.colA AND s.colB <=> t.colB 所有计算类型。 Databricks Runtime 17.1 及更高版本中的 SQL。 Databricks Runtime 18.2 及更高版本中的 Python 和 Scala。
partitionOverwriteMode 旧动态分区覆盖,这会覆盖写入将提交新数据的每个分区中的所有现有数据。 不建议用于新工作负载。 SQL 仅支持经典计算。 Python和 Scala 支持所有计算类型。 Databricks Runtime 11.3 LTS 及更高版本中的 SQL、Python 和 Scala。

对于大多数用例,Databricks 建议使用 REPLACE USINGREPLACE WHERE。 仅当用例需要复杂或 NULL 安全匹配条件时使用 REPLACE ON

有关每个选项的替换行为的详细信息,请参阅 INSERT。 有关 Delta Lake 选项的完整列表,请参阅 DataFrameWriterDelta Lake 和 Apache Iceberg

在 Scala 和 Python 中,不能将 replaceOnreplaceUsingreplaceWherepartitionOverwriteModeoverwriteSchema 结合使用。

当源查询为空时,REPLACE USINGREPLACE ON 都不会删除数据;但是,REPLACE WHERE 可能会删除数据。

重要

如果数据被意外覆盖,可以使用还原来撤消更改。

REPLACE WHERE

可以选择性地只覆盖与任意表达式 REPLACE WHERE匹配的数据。

若要在目标表中以原子方式用 replace_data 中的数据替换 1 月的事件(该表按 start_date 分区):

Python

(replace_data.write
  .mode("overwrite")
  .option("replaceWhere", "start_date >= '2017-01-01' AND end_date <= '2017-01-31'")
  .saveAsTable("events")
)

Scala

replace_data.write
  .mode("overwrite")
  .option("replaceWhere", "start_date >= '2017-01-01' AND end_date <= '2017-01-31'")
  .saveAsTable("events")

SQL

INSERT INTO TABLE events REPLACE WHERE start_date >= '2017-01-01' AND end_date <= '2017-01-31' SELECT * FROM replace_data

此示例代码在 replace_data中写出数据,验证所有行是否与谓词匹配,并使用 overwrite 语义执行原子替换。 如果操作中的任何值都超出谓词范围,则此操作默认失败,并出现错误。

在经典计算模式下,若要将此行为更改为:对谓词范围内的值使用 overwrite,对指定范围之外的记录使用 insert,请将 spark.databricks.delta.replaceWhere.constraintCheck.enabled 设置为 false 以移除约束检查:

Python

spark.conf.set("spark.databricks.delta.replaceWhere.constraintCheck.enabled", False)

Scala

spark.conf.set("spark.databricks.delta.replaceWhere.constraintCheck.enabled", false)

SQL

SET spark.databricks.delta.replaceWhere.constraintCheck.enabled=false

注意

REPLACE WHERE 接受的 boolean_expression 有一些限制。 请参阅 INSERT SQL 语言参考。

对于空源查询, REPLACE WHERE 可能会删除表行。

遗留行为

旧版 replaceWhere 仅在经典计算上可用。 请参阅 经典计算概述

如果使用旧行为 replaceWhere,则查询将覆盖仅与分区列上的谓词匹配的数据。 以下命令会以原子方式替换目标表中按 date 分区的一月数据为 df

Python
(df.write
  .mode("overwrite")
  .option("replaceWhere", "birthDate >= '2017-01-01' AND birthDate <= '2017-01-31'")
  .saveAsTable("people10m")
)
Scala
df.write
  .mode("overwrite")
  .option("replaceWhere", "birthDate >= '2017-01-01' AND birthDate <= '2017-01-31'")
  .saveAsTable("people10m")

若要使用旧版行为,请将 spark.databricks.delta.replaceWhere.dataColumns.enabled 设置为 false

Python
spark.conf.set("spark.databricks.delta.replaceWhere.dataColumns.enabled", False)
Scala
spark.conf.set("spark.databricks.delta.replaceWhere.dataColumns.enabled", false)
SQL
SET spark.databricks.delta.replaceWhere.dataColumns.enabled=false

动态数据覆写

动态数据覆盖会选择性替换匹配指定键列或满足布尔表达式的数据,其余所有数据保持不变。 支持分区表、未分区表和具有液体聚类分析的表。

动态分区覆盖是动态数据覆盖行为的子集。 动态分区覆盖会替换每个将写入新数据的分区中的所有现有数据,而其他所有分区则保持不变。 仅支持分区表。

REPLACE USING

Databricks Runtime 16.3 及更高版本支持 SQL。 Databricks Runtime 18.2 及更高版本中支持的 Python 和 Scala。 有关 Databricks Runtime 16.3 到 17.1 中的行为差异,请参阅 旧版行为

REPLACE USING 启用计算无关的原子覆盖行为,该行为适用于 Databricks SQL 仓库、无服务器计算和经典计算。 REPLACE USING 不需要设置 Spark 会话配置。

REPLACE USING 在指定列按相等关系比较结果相等时替换行。 所有其他数据保持不变。

使用 REPLACE USING 进行动态数据覆盖:

Python

(sourceDataDF.write
  .mode("overwrite")
  .option("replaceUsing", "event_id, start_date")
  .saveAsTable("events")
)

Scala

sourceDataDF.write
  .mode("overwrite")
  .option("replaceUsing", "event_id, start_date")
  .saveAsTable("events")

SQL

INSERT INTO TABLE events
  REPLACE USING (event_id, start_date)
  SELECT * FROM source_data

对于空源查询, REPLACE USING 不会删除任何表行。

对于复杂或 NULL 安全匹配条件,请改用 REPLACE ON 。 请参阅 REPLACE ON

请参阅 INSERT SQL 语言参考。

遗留行为

在 Databricks Runtime 16.3 到 17.1 中, REPLACE USING 使用旧行为,只允许动态分区覆盖,而 Databricks Runtime 17.2 及更高版本允许动态数据覆盖。

对于旧行为, REPLACE USING 请记住以下约束和行为:

  • 必须在USING子句中指定表的分区列全部集合。
  • 始终验证写入的数据是否仅涉及预期的分区。 错误分区中的单个行可能会无意中覆盖整个分区。

REPLACE ON

SQL 在 Databricks Runtime 17.1 及以上版本中受支持。 Databricks Runtime 18.2 及更高版本中支持的 Python 和 Scala。

REPLACE ON 会在行匹配用户定义的条件时替换这些行,而 REPLACE USING 则会在指定列按相等性比较结果相等时替换这些行。 需要匹配不支持的REPLACE ON逻辑(例如将值视为REPLACE USING相等)时使用NULL

(可选)使用 targetAlias 选项指定目标表的别名和 .as().alias() API 来指定源数据的别名。

有关 SQL 语法,请参阅 INSERT

Python

(sourceDataDF.alias("s")
  .write
  .mode("overwrite")
  .option("targetAlias", "t")
  .option("replaceOn", "s.event_id <=> t.event_id AND s.start_date <=> t.start_date")
  .saveAsTable("events")
)

Scala

sourceDataDF.as("s")
  .write
  .mode("overwrite")
  .option("targetAlias", "t")
  .option("replaceOn", "s.event_id <=> t.event_id AND s.start_date <=> t.start_date")
  .saveAsTable("events")

SQL

INSERT INTO TABLE events AS t
  REPLACE ON (s.event_id <=> t.event_id AND s.start_date <=> t.start_date)
  (SELECT * FROM source_data) AS s

对于空源查询, REPLACE ON 不会删除任何表行。

使用 partitionOverwriteMode 的动态分区覆盖(旧版)

重要

此功能目前以公共预览版提供。

Databricks Runtime 11.3 LTS 及更高版本支持使用覆盖模式对已分区表进行动态分区覆盖: INSERT OVERWRITE 在 SQL 中,或者使用 DataFrame 写入 df.write.mode("overwrite")。 这种类型的覆盖仅适用于经典计算,而不适用于 Databricks SQL 仓库或无服务器计算。

Warning

如果可能,请使用 INSERT REPLACE USING 而不是分区覆盖 INSERT OVERWRITE PARTITIONspark.sql.sources.partitionOverwriteMode=dynamic。 分区重写在分区更改时可能会引用过时的数据。

若要使用动态分区覆盖模式,请将 Spark 会话配置 spark.sql.sources.partitionOverwriteMode 设置为 dynamic。 或者,可以将选项DataFrameWriter设置为 partitionOverwriteModedynamic。 如果存在,查询特定选项将覆盖会话配置中定义的模式。 spark.sql.sources.partitionOverwriteMode 的默认值为 static

下面的示例使用 partitionOverwriteMode

SQL

SET spark.sql.sources.partitionOverwriteMode=dynamic;
INSERT OVERWRITE TABLE default.people10m SELECT * FROM morePeople;

Python

(df.write
  .mode("overwrite")
  .option("partitionOverwriteMode", "dynamic")
  .saveAsTable("default.people10m")
)

Scala

df.write
  .mode("overwrite")
  .option("partitionOverwriteMode", "dynamic")
  .saveAsTable("default.people10m")

请牢记以下关于 partitionOverwriteMode 的约束和行为:

  • 无法将 overwriteSchema 设置为 true.
  • 不能在同一partitionOverwriteMode操作中同时指定replaceWhereDataFrameWriter
  • 如果通过选项指定 replaceWhere 条件DataFrameWriter ,Delta Lake 将应用该条件来控制被覆盖的数据。 此选项优先于 partitionOverwriteMode 会话级配置。
  • 始终验证写入的数据是否仅涉及预期的分区。 错误分区中的单个行可能会无意中覆盖整个分区。