使用 Delta Lake 来选择性覆盖数据

Delta Lake 具有以下不同的选择性覆盖选项:

  • replaceWhere 选项以原子方式替换与给定谓词匹配的所有记录。
  • 可以根据表的分区方式使用动态分区覆盖来替换数据目录。

对于大多数操作,Databricks 建议使用 replaceWhere 来指定要覆盖的数据。

重要

如果数据被意外覆盖,可以使用还原来撤消更改。

使用replaceWhere进行任意选择性覆盖

可以有选择性地只覆盖与任意表达式匹配的数据。

注意

SQL 需要 Databricks Runtime 12.2 LTS 或更高版本。

例如,将分区表start_date在一月份的事件以原子方式替换为replace_data中的数据。

Python

(replace_data.write
  .mode("overwrite")
  .option("replaceWhere", "start_date >= '2017-01-01' AND end_date <= '2017-01-31'")
  .saveAsTable("events")
)

Scala

replace_data.write
  .mode("overwrite")
  .option("replaceWhere", "start_date >= '2017-01-01' AND end_date <= '2017-01-31'")
  .saveAsTable("events")

SQL

INSERT INTO TABLE events REPLACE WHERE start_date >= '2017-01-01' AND end_date <= '2017-01-31' SELECT * FROM replace_data

此示例代码在 replace_data中写出数据,验证所有行是否与谓词匹配,并使用 overwrite 语义执行原子替换。 如果操作中的任何值都超出约束范围,则此操作默认失败,并显示错误。

可以将此行为更改为谓词范围内的 overwrite 值和指定范围外的 insert 记录。 为此,请使用以下设置之一将 spark.databricks.delta.replaceWhere.constraintCheck.enabled 设置为 false 来禁用约束检查:

Python

spark.conf.set("spark.databricks.delta.replaceWhere.constraintCheck.enabled", False)

Scala

spark.conf.set("spark.databricks.delta.replaceWhere.constraintCheck.enabled", false)

SQL

SET spark.databricks.delta.replaceWhere.constraintCheck.enabled=false

注意

REPLACE WHERE 接受的 boolean_expression 有一些限制。 请参阅 INSERT SQL 语言参考。

遗留行为

如果使用旧行为 replaceWhere,则查询将覆盖仅与分区列上的谓词匹配的数据。 以下命令会以原子方式替换目标表中按 date 分区的一月数据为 df

Python

(df.write
  .mode("overwrite")
  .option("replaceWhere", "birthDate >= '2017-01-01' AND birthDate <= '2017-01-31'")
  .saveAsTable("people10m")
)

Scala

df.write
  .mode("overwrite")
  .option("replaceWhere", "birthDate >= '2017-01-01' AND birthDate <= '2017-01-31'")
  .saveAsTable("people10m")

若要使用旧行为,请将 spark.databricks.delta.replaceWhere.dataColumns.enabled 标志设置为 false

Python

spark.conf.set("spark.databricks.delta.replaceWhere.dataColumns.enabled", False)

Scala

spark.conf.set("spark.databricks.delta.replaceWhere.dataColumns.enabled", false)

SQL

SET spark.databricks.delta.replaceWhere.dataColumns.enabled=false

动态分区覆盖

动态分区覆盖仅更新写入提交新数据的分区,并使其他分区保持不变。

Azure Databricks建议使用REPLACE USING来触发动态分区覆盖。 此模式适用于所有计算类型,包括 Databricks SQL 仓库、无服务器计算和经典计算,无需设置 Spark 会话配置。 请参阅具有REPLACE USING的动态分区覆盖。

partitionOverwriteMode 是动态分区覆盖的旧模式,需要使用经典计算并设置 Spark 会话配置。 Databricks SQL 或无服务器计算不支持它。 请参阅动态分区覆盖(partitionOverwriteMode旧版)

以下各节演示如何使用每个模式。

动态分区重写 REPLACE USING

Databricks Runtime 16.3 及更高版本支持使用 REPLACE USING 表的动态分区覆盖。 你可以选择性地覆盖所有计算类型的数据,而无需设置 Spark 会话配置。 REPLACE USING 启用计算无关的原子覆盖行为,该行为适用于 Databricks SQL 仓库、无服务器计算和经典计算。

REPLACE USING 仅覆盖传入数据所针对的分区。 所有其他分区保持不变。

REPLACE USING 仅在 SQL 中受支持。 有关详细信息,请参阅 INSERT SQL 语言参考。

以下示例使用 REPLACE USING

INSERT INTO TABLE events
  REPLACE USING (event_id, start_date)
  SELECT * FROM source_data

对于动态分区覆盖,请记住以下约束和行为:

  • 必须在USING子句中指定表的分区列全部集合。
  • 始终验证写入的数据是否仅涉及预期的分区。 错误分区中的单个行可能会无意中覆盖整个分区。

如果需要比REPLACE USING支持的更加可自定义的匹配逻辑,例如视NULL值为相等,请改用互补的REPLACE ON。 有关详细信息,请参阅 INSERT

动态分区覆盖( partitionOverwriteMode 旧版)

重要

此功能目前以公共预览版提供。

Databricks Runtime 11.3 LTS 及更高版本支持使用覆盖模式对已分区表进行动态分区覆盖: INSERT OVERWRITE 在 SQL 中,或者使用 DataFrame 写入 df.write.mode("overwrite")。 这种类型的覆盖仅适用于经典计算,而不适用于 Databricks SQL 仓库或无服务器计算。

Warning

如果可能,请使用 INSERT OVERWRITE REPLACE USING 而不是分区覆盖 INSERT OVERWRITE PARTITIONspark.sql.sources.partitionOverwriteMode=dynamic。 分区重写在分区更改时可能会引用过时的数据。

若要使用动态分区覆盖模式,请将 Spark 会话配置 spark.sql.sources.partitionOverwriteMode 设置为 dynamic。 或者,可以将选项DataFrameWriter设置为 partitionOverwriteModedynamic。 如果存在,查询特定选项将覆盖会话配置中定义的模式。 spark.sql.sources.partitionOverwriteMode 的默认值为 static

下面的示例使用 partitionOverwriteMode

SQL

SET spark.sql.sources.partitionOverwriteMode=dynamic;
INSERT OVERWRITE TABLE default.people10m SELECT * FROM morePeople;

Python

(df.write
  .mode("overwrite")
  .option("partitionOverwriteMode", "dynamic")
  .saveAsTable("default.people10m")
)

Scala

df.write
  .mode("overwrite")
  .option("partitionOverwriteMode", "dynamic")
  .saveAsTable("default.people10m")

请牢记以下关于 partitionOverwriteMode 的约束和行为:

  • 无法将 overwriteSchema 设置为 true.
  • 不能在同一partitionOverwriteMode操作中同时指定replaceWhereDataFrameWriter
  • 如果通过选项指定 replaceWhere 条件DataFrameWriter ,Delta Lake 将应用该条件来控制被覆盖的数据。 此选项优先于 partitionOverwriteMode 会话级配置。
  • 始终验证写入的数据是否仅涉及预期的分区。 错误分区中的单个行可能会无意中覆盖整个分区。