使用 Delta Lake 选择性地覆盖数据
Azure Databricks 利用 Delta Lake 功能支持两种不同的选择性覆盖选项:
replaceWhere
选项以原子方式替换与给定谓词匹配的所有记录。- 可以根据表的分区方式使用动态分区覆盖来替换数据目录。
对于大多数操作,Databricks 建议使用 replaceWhere
来指定要覆盖的数据。
重要
如果意外覆盖数据,可以使用还原来撤消更改。
使用 replaceWhere
进行的任意选择性覆盖
可以有选择性地只覆盖与任意表达式匹配的数据。
注意
SQL 需要 Databricks Runtime 12.2 LTS 或更高版本。
以下命令以原子方式替换目标表中一月的事件,该表按照 start_date
分区,数据位于 replace_data
中:
Python
(replace_data.write
.mode("overwrite")
.option("replaceWhere", "start_date >= '2017-01-01' AND end_date <= '2017-01-31'")
.save("/tmp/delta/events")
)
Scala
replace_data.write
.mode("overwrite")
.option("replaceWhere", "start_date >= '2017-01-01' AND end_date <= '2017-01-31'")
.save("/tmp/delta/events")
SQL
INSERT INTO TABLE events REPLACE WHERE start_date >= '2017-01-01' AND end_date <= '2017-01-31' SELECT * FROM replace_data
此示例代码在 replace_data
中写出数据,验证所有行是否与谓词匹配,并使用 overwrite
语义执行原子替换。 如果操作中的任何值都超出约束范围,则此操作默认失败,并显示错误。
可以将此行为更改为谓词范围内的 overwrite
值和指定范围外的 insert
记录。 为此,请使用以下设置之一将 spark.databricks.delta.replaceWhere.constraintCheck.enabled
设置为 false 来禁用约束检查:
Python
spark.conf.set("spark.databricks.delta.replaceWhere.constraintCheck.enabled", False)
Scala
spark.conf.set("spark.databricks.delta.replaceWhere.constraintCheck.enabled", false)
SQL
SET spark.databricks.delta.replaceWhere.constraintCheck.enabled=false
旧行为
在旧的默认行为中,replaceWhere
仅覆盖与分区列上的谓词匹配的数据。 在此旧模型中,以下命令以原子方式替换目标表中的一月,该表按照 date
分区,数据位于 df
中:
Python
(df.write
.mode("overwrite")
.option("replaceWhere", "birthDate >= '2017-01-01' AND birthDate <= '2017-01-31'")
.save("/tmp/delta/people10m")
)
Scala
df.write
.mode("overwrite")
.option("replaceWhere", "birthDate >= '2017-01-01' AND birthDate <= '2017-01-31'")
.save("/tmp/delta/people10m")
如果要回退到旧行为,可以禁用 spark.databricks.delta.replaceWhere.dataColumns.enabled
标志:
Python
spark.conf.set("spark.databricks.delta.replaceWhere.dataColumns.enabled", False)
Scala
spark.conf.set("spark.databricks.delta.replaceWhere.dataColumns.enabled", false)
SQL
SET spark.databricks.delta.replaceWhere.dataColumns.enabled=false
动态分区覆盖
重要
此功能目前以公共预览版提供。
Databricks Runtime 11.3 LTS 及更高版本支持分区表的“动态”分区覆盖模式。 对于具有多个分区的表,Databricks Runtime 11.3 LTS 及更低版本仅在所有分区列的数据类型相同时才支持动态分区覆盖。
在动态分区覆盖模式下,操作会覆盖每个逻辑分区中的所有现有数据,写入操作会为这些分区提交新数据。 写入不包含数据的任何现有逻辑分区都保持不变。 仅当以覆盖模式写入数据时,此模式才适用:在 SQL 中为 INSERT OVERWRITE
,或者 DataFrame 使用 df.write.mode("overwrite")
写入。
通过将 Spark 会话配置 spark.sql.sources.partitionOverwriteMode
设置为 dynamic
来配置动态分区覆盖模式。 也可通过将 DataFrameWriter
选项 partitionOverwriteMode
设置为 dynamic
来启用此模式。 如果存在,查询特定选项将覆盖会话配置中定义的模式。 partitionOverwriteMode
的默认值是 static
。
重要
验证使用动态分区覆盖写入的数据是否仅触及预期分区。 错误分区中的单个行可能会导致意外覆盖整个分区。
以下示例演示如何使用动态分区覆盖:
SQL
SET spark.sql.sources.partitionOverwriteMode=dynamic;
INSERT OVERWRITE TABLE default.people10m SELECT * FROM morePeople;
Python
(df.write
.mode("overwrite")
.option("partitionOverwriteMode", "dynamic")
.saveAsTable("default.people10m")
)
Scala
df.write
.mode("overwrite")
.option("partitionOverwriteMode", "dynamic")
.saveAsTable("default.people10m")
注意
- 动态分区覆盖与分区表的选项
replaceWhere
冲突。- 如果 Spark 会话配置中启用了动态分区覆盖,并且
replaceWhere
作为DataFrameWriter
选项提供,则 Delta Lake 会根据replaceWhere
表达式覆盖数据(查询特定选项覆盖会话配置)。 - 如果
DataFrameWriter
选项同时启用动态分区覆盖和replaceWhere
,则会收到错误。
- 如果 Spark 会话配置中启用了动态分区覆盖,并且
- 使用动态分区覆盖时,不能将
overwriteSchema
指定为true
。