使用 Delta Lake 选择性地覆盖数据

Azure Databricks 利用 Delta Lake 功能支持两种不同的选择性覆盖选项:

  • replaceWhere 选项以原子方式替换与给定谓词匹配的所有记录。
  • 可以根据表的分区方式使用动态分区覆盖来替换数据目录。

对于大多数操作,Databricks 建议使用 replaceWhere 来指定要覆盖的数据。

重要

如果意外覆盖数据,可以使用还原来撤消更改。

使用 replaceWhere 进行的任意选择性覆盖

可以有选择性地只覆盖与任意表达式匹配的数据。

注意

SQL 需要 Databricks Runtime 12.2 LTS 或更高版本。

以下命令以原子方式替换目标表中一月的事件,该表按照 start_date 分区,数据位于 replace_data 中:

Python

(replace_data.write
  .mode("overwrite")
  .option("replaceWhere", "start_date >= '2017-01-01' AND end_date <= '2017-01-31'")
  .table("events")
)

Scala

replace_data.write
  .mode("overwrite")
  .option("replaceWhere", "start_date >= '2017-01-01' AND end_date <= '2017-01-31'")
  .table("events")

SQL

INSERT INTO TABLE events REPLACE WHERE start_date >= '2017-01-01' AND end_date <= '2017-01-31' SELECT * FROM replace_data

此示例代码在 replace_data中写出数据,验证所有行是否与谓词匹配,并使用 overwrite 语义执行原子替换。 如果操作中的任何值都超出约束范围,则此操作默认失败,并显示错误。

可以将此行为更改为谓词范围内的 overwrite 值和指定范围外的 insert 记录。 为此,请使用以下设置之一将 spark.databricks.delta.replaceWhere.constraintCheck.enabled 设置为 false 来禁用约束检查:

Python

spark.conf.set("spark.databricks.delta.replaceWhere.constraintCheck.enabled", False)

Scala

spark.conf.set("spark.databricks.delta.replaceWhere.constraintCheck.enabled", false)

SQL

SET spark.databricks.delta.replaceWhere.constraintCheck.enabled=false

旧行为

在旧的默认行为中,replaceWhere 仅覆盖与分区列上的谓词匹配的数据。 在此旧模型中,以下命令以原子方式替换目标表中的一月,该表按照 date 分区,数据位于 df 中:

Python

(df.write
  .mode("overwrite")
  .option("replaceWhere", "birthDate >= '2017-01-01' AND birthDate <= '2017-01-31'")
  .table("people10m")
)

Scala

df.write
  .mode("overwrite")
  .option("replaceWhere", "birthDate >= '2017-01-01' AND birthDate <= '2017-01-31'")
  .table("people10m")

如果要回退到旧行为,可以禁用 spark.databricks.delta.replaceWhere.dataColumns.enabled 标志:

Python

spark.conf.set("spark.databricks.delta.replaceWhere.dataColumns.enabled", False)

Scala

spark.conf.set("spark.databricks.delta.replaceWhere.dataColumns.enabled", false)

SQL

SET spark.databricks.delta.replaceWhere.dataColumns.enabled=false

动态分区覆盖

动态分区覆盖仅更新那些提交了新数据的分区。 它会覆盖这些分区中的所有现有数据,并使其他人保持不变。

Azure Databricks 支持两种方法:

  • REPLACE USING (建议) - 适用于所有计算类型,包括 Databricks SQL 仓库、无服务器计算和经典计算。 不需要设置 Spark 会话配置。
  • partitionOverwriteMode (旧版) - 需要经典计算和设置 Spark 会话配置。 在 Databricks SQL 或无服务器计算环境中不支持此功能。

以下各节演示如何使用每个方法。

动态分区重写 REPLACE USING

重要

此功能目前以公共预览版提供。

Databricks Runtime 16.3 及更高版本支持对已分区表使用 REPLACE USING动态分区覆盖。 此方法允许你有选择地覆写所有计算资源的数据,而无需设置 Spark 会话配置。 REPLACE USING 启用计算无关的原子覆盖行为,该行为适用于 Databricks SQL 仓库、无服务器计算和经典计算。

REPLACE USING 仅覆盖传入数据所针对的分区。 所有其他分区保持不变。

以下示例演示如何使用动态分区覆盖与 REPLACE USING。 目前,只能使用 SQL,不能使用 Python 或 Scala。 有关详细信息,请参阅 SQL 语言参考中的 INSERT

INSERT INTO TABLE events
  REPLACE USING (event_id, start_date)
  SELECT * FROM source_data

对于动态分区覆盖,请记住以下约束和行为:

  • 必须在USING子句中指定表的分区列全部集合。
  • 始终验证写入的数据是否仅涉及预期的分区。 错误分区中的单个行可能会无意中覆盖整个分区。

如果需要比支持更多的REPLACE USING可自定义匹配逻辑,例如将值视为NULL相等,请改用互补。REPLACE ON 有关详细信息,请参阅 INSERT

动态分区覆盖( partitionOverwriteMode 旧版)

重要

此功能目前以公共预览版提供。

Databricks Runtime 11.3 LTS 及更高版本支持使用覆盖模式对已分区表进行动态分区覆盖: INSERT OVERWRITE 在 SQL 中,或者使用 DataFrame 写入 df.write.mode("overwrite")。 这种类型的覆盖仅适用于经典计算,而不适用于 Databricks SQL 仓库或无服务器计算。

通过将 Spark 会话配置 spark.sql.sources.partitionOverwriteMode 设置为 dynamic 来配置动态分区覆盖模式。 或者,可以将选项DataFrameWriter设置为 partitionOverwriteModedynamic。 如果存在,查询特定选项将覆盖会话配置中定义的模式。 spark.sql.sources.partitionOverwriteMode 的默认值是 static

以下示例演示如何使用 partitionOverwriteMode

SQL

SET spark.sql.sources.partitionOverwriteMode=dynamic;
INSERT OVERWRITE TABLE default.people10m SELECT * FROM morePeople;

Python

(df.write
  .mode("overwrite")
  .option("partitionOverwriteMode", "dynamic")
  .saveAsTable("default.people10m")
)

Scala

df.write
  .mode("overwrite")
  .option("partitionOverwriteMode", "dynamic")
  .saveAsTable("default.people10m")

请牢记以下关于 partitionOverwriteMode 的约束和行为:

  • 无法将 overwriteSchema 设置为 true.
  • 不能在同一partitionOverwriteMode操作中同时指定replaceWhereDataFrameWriter
  • 如果通过选项指定 replaceWhere 条件DataFrameWriter ,Delta Lake 将应用该条件来控制被覆盖的数据。 此选项优先于 partitionOverwriteMode 会话级配置。
  • 始终验证写入的数据是否仅涉及预期的分区。 错误分区中的单个行可能会无意中覆盖整个分区。