使用 Delta Lake 选择性地覆盖数据

Azure Databricks 利用 Delta Lake 功能支持两种不同的选择性覆盖选项:

  • replaceWhere 选项以原子方式替换与给定谓词匹配的所有记录。
  • 可以根据表的分区方式使用动态分区覆盖来替换数据目录。

对于大多数操作,Databricks 建议使用 replaceWhere 来指定要覆盖的数据。

重要

如果意外覆盖数据,可以使用还原来撤消更改。

使用 replaceWhere 进行的任意选择性覆盖

可以有选择性地只覆盖与任意表达式匹配的数据。 此功能适用于 Databricks Runtime 9.1 LTS 和更高版本中的数据帧,在 Databricks Runtime 12.0(不受支持)及更高版本的 SQL 中受支持。

以下命令以原子方式替换目标表中一月的事件,该表按照 start_date 分区,数据位于 replace_data 中:

Python

(replace_data.write
  .mode("overwrite")
  .option("replaceWhere", "start_date >= '2017-01-01' AND end_date <= '2017-01-31'")
  .save("/tmp/delta/events")
)

Scala

replace_data.write
  .mode("overwrite")
  .option("replaceWhere", "start_date >= '2017-01-01' AND end_date <= '2017-01-31'")
  .save("/tmp/delta/events")

SQL

INSERT INTO TABLE events REPLACE WHERE start_data >= '2017-01-01' AND end_date <= '2017-01-31' SELECT * FROM replace_data

此示例代码在 replace_data中写出数据,验证所有行是否与谓词匹配,并使用 overwrite 语义执行原子替换。 如果操作中的任何值都超出约束范围,则此操作默认失败,并显示错误。

可以将此行为更改为谓词范围内的 overwrite 值和指定范围外的 insert 记录。 为此,请使用以下设置之一将 spark.databricks.delta.replaceWhere.constraintCheck.enabled 设置为 false 来禁用约束检查:

Python

spark.conf.set("spark.databricks.delta.replaceWhere.constraintCheck.enabled", False)

Scala

spark.conf.set("spark.databricks.delta.replaceWhere.constraintCheck.enabled", false)

SQL

SET spark.databricks.delta.replaceWhere.constraintCheck.enabled=false

在 Databricks Runtime 9.0 和更低版本中,replaceWhere 只覆盖分区列上与谓词匹配的数据。 以下命令以原子方式替换目标表中的一月,该表按照 date 分区,数据位于 df 中:

Python

(df.write
  .mode("overwrite")
  .option("replaceWhere", "birthDate >= '2017-01-01' AND birthDate <= '2017-01-31'")
  .save("/tmp/delta/people10m")
)

Scala

df.write
  .mode("overwrite")
  .option("replaceWhere", "birthDate >= '2017-01-01' AND birthDate <= '2017-01-31'")
  .save("/tmp/delta/people10m")

在 Databricks Runtime 9.1 及更高版本中,如果要回退到旧行为,可以禁用 spark.databricks.delta.replaceWhere.dataColumns.enabled 标志:

Python

spark.conf.set("spark.databricks.delta.replaceWhere.dataColumns.enabled", False)

Scala

spark.conf.set("spark.databricks.delta.replaceWhere.dataColumns.enabled", false)

SQL

SET spark.databricks.delta.replaceWhere.dataColumns.enabled=false

动态分区覆盖

重要

此功能目前以公共预览版提供。

Databricks Runtime 11.1 及更高版本支持分区表的“动态”分区覆盖模式。 对于具有多个分区的表,Databricks Runtime 12.0 及更低版本仅在所有分区列的数据类型相同时才支持动态分区覆盖。

在动态分区覆盖模式下,操作会覆盖每个逻辑分区中的所有现有数据,写入操作会为这些分区提交新数据。 写入不包含数据的任何现有逻辑分区都保持不变。 仅当以覆盖模式写入数据时,此模式才适用:在 SQL 中为 INSERT OVERWRITE,或者 DataFrame 使用 df.write.mode("overwrite") 写入。

通过将 Spark 会话配置 spark.sql.sources.partitionOverwriteMode 设置为 dynamic 来配置动态分区覆盖模式。 也可通过将 DataFrameWriter 选项 partitionOverwriteMode 设置为 dynamic 来启用此模式。 如果存在,查询特定选项将覆盖会话配置中定义的模式。 partitionOverwriteMode 的默认值是 static

重要

验证使用动态分区覆盖写入的数据是否仅触及预期分区。 错误分区中的单个行可能会导致意外覆盖整个分区。

以下示例演示如何使用动态分区覆盖:

SQL

SET spark.sql.sources.partitionOverwriteMode=dynamic;
INSERT OVERWRITE TABLE default.people10m SELECT * FROM morePeople;

Python

(df.write
  .mode("overwrite")
  .option("partitionOverwriteMode", "dynamic")
  .saveAsTable("default.people10m")
)

Scala

df.write
  .mode("overwrite")
  .option("partitionOverwriteMode", "dynamic")
  .saveAsTable("default.people10m")

注意

  • 动态分区覆盖与分区表的选项 replaceWhere 冲突。
    • 如果 Spark 会话配置中启用了动态分区覆盖,并且 replaceWhere 作为 DataFrameWriter 选项提供,则 Delta Lake 会根据 replaceWhere 表达式覆盖数据(查询特定选项覆盖会话配置)。
    • 如果 DataFrameWriter 选项同时启用动态分区覆盖和 replaceWhere,则会收到错误。
  • 使用动态分区覆盖时,不能将 overwriteSchema 指定为 true