使用合并以更新插入的方式插入到 Delta Lake 表中
可以使用 MERGE
SQL 操作将源表、视图或 DataFrame 中的数据更新插入目标 Delta 表。 Delta Lake 支持 MERGE
中的插入、更新和删除,并支持超出 SQL 标准的扩展语法以辅助高级用例。
假设你有一个名为 people10mupdates
的源表或 /tmp/delta/people-10m-updates
处的源路径,其中包含名为 people10m
的目标表或 /tmp/delta/people-10m
处目标路径的新数据。 其中一些新记录可能已存在于目标数据中。 若要合并新数据,需要更新已有人员的 id
的行,并插入不存在匹配的 id
的新行。 可以运行以下查询:
SQL
MERGE INTO people10m
USING people10mupdates
ON people10m.id = people10mupdates.id
WHEN MATCHED THEN
UPDATE SET
id = people10mupdates.id,
firstName = people10mupdates.firstName,
middleName = people10mupdates.middleName,
lastName = people10mupdates.lastName,
gender = people10mupdates.gender,
birthDate = people10mupdates.birthDate,
ssn = people10mupdates.ssn,
salary = people10mupdates.salary
WHEN NOT MATCHED
THEN INSERT (
id,
firstName,
middleName,
lastName,
gender,
birthDate,
ssn,
salary
)
VALUES (
people10mupdates.id,
people10mupdates.firstName,
people10mupdates.middleName,
people10mupdates.lastName,
people10mupdates.gender,
people10mupdates.birthDate,
people10mupdates.ssn,
people10mupdates.salary
)
Python
from delta.tables import *
deltaTablePeople = DeltaTable.forName(spark, "people10m")
deltaTablePeopleUpdates = DeltaTable.forName(spark, "people10mupdates")
dfUpdates = deltaTablePeopleUpdates.toDF()
deltaTablePeople.alias('people') \
.merge(
dfUpdates.alias('updates'),
'people.id = updates.id'
) \
.whenMatchedUpdate(set =
{
"id": "updates.id",
"firstName": "updates.firstName",
"middleName": "updates.middleName",
"lastName": "updates.lastName",
"gender": "updates.gender",
"birthDate": "updates.birthDate",
"ssn": "updates.ssn",
"salary": "updates.salary"
}
) \
.whenNotMatchedInsert(values =
{
"id": "updates.id",
"firstName": "updates.firstName",
"middleName": "updates.middleName",
"lastName": "updates.lastName",
"gender": "updates.gender",
"birthDate": "updates.birthDate",
"ssn": "updates.ssn",
"salary": "updates.salary"
}
) \
.execute()
Scala
import io.delta.tables._
import org.apache.spark.sql.functions._
val deltaTablePeople = DeltaTable.forName(spark, "people10m")
val deltaTablePeopleUpdates = DeltaTable.forName(spark, "people10mupdates")
val dfUpdates = deltaTablePeopleUpdates.toDF()
deltaTablePeople
.as("people")
.merge(
dfUpdates.as("updates"),
"people.id = updates.id")
.whenMatched
.updateExpr(
Map(
"id" -> "updates.id",
"firstName" -> "updates.firstName",
"middleName" -> "updates.middleName",
"lastName" -> "updates.lastName",
"gender" -> "updates.gender",
"birthDate" -> "updates.birthDate",
"ssn" -> "updates.ssn",
"salary" -> "updates.salary"
))
.whenNotMatched
.insertExpr(
Map(
"id" -> "updates.id",
"firstName" -> "updates.firstName",
"middleName" -> "updates.middleName",
"lastName" -> "updates.lastName",
"gender" -> "updates.gender",
"birthDate" -> "updates.birthDate",
"ssn" -> "updates.ssn",
"salary" -> "updates.salary"
))
.execute()
重要
仅源表中的单个行可以匹配目标表中的给定行。 在 Databricks Runtime 16.0 及更高版本中,MERGE
会评估 WHEN MATCHED
和 ON
子句中指定的条件,以确定重复匹配项。 在 Databricks Runtime 15.4 LTS 及更低版本中,MERGE
操作仅考虑 ON
子句中指定的条件。
请参阅 Delta Lake API 文档,了解 Scala 和 Python 语法详细信息。 有关 SQL 语法详细信息,请参阅 MERGE INTO
使用合并修改所有不匹配的行
在 Databricks SQL 和 Databricks Runtime 12.2 LTS 及更高版本中,可以使用 WHEN NOT MATCHED BY SOURCE
子句对目标表中的、在源表中没有相应记录的记录执行 UPDATE
或 DELETE
操作。 Databricks 建议添加可选条件子句,以避免完全重写目标表。
以下代码示例显示了将此子句用于删除操作、使用源表的内容覆盖目标表,以及删除目标表中不匹配的记录的基本语法。 有关源更新和删除操作有时限的表的更具可缩放性模式,请参阅将 Delta 表与源逐步同步。
Python
(targetDF
.merge(sourceDF, "source.key = target.key")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.whenNotMatchedBySourceDelete()
.execute()
)
Scala
targetDF
.merge(sourceDF, "source.key = target.key")
.whenMatched()
.updateAll()
.whenNotMatched()
.insertAll()
.whenNotMatchedBySource()
.delete()
.execute()
SQL
MERGE INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
UPDATE SET *
WHEN NOT MATCHED THEN
INSERT *
WHEN NOT MATCHED BY SOURCE THEN
DELETE
以下示例将条件添加到 WHEN NOT MATCHED BY SOURCE
子句并指定要在不匹配的目标行中更新的值。
Python
(targetDF
.merge(sourceDF, "source.key = target.key")
.whenMatchedUpdate(
set = {"target.lastSeen": "source.timestamp"}
)
.whenNotMatchedInsert(
values = {
"target.key": "source.key",
"target.lastSeen": "source.timestamp",
"target.status": "'active'"
}
)
.whenNotMatchedBySourceUpdate(
condition="target.lastSeen >= (current_date() - INTERVAL '5' DAY)",
set = {"target.status": "'inactive'"}
)
.execute()
)
Scala
targetDF
.merge(sourceDF, "source.key = target.key")
.whenMatched()
.updateExpr(Map("target.lastSeen" -> "source.timestamp"))
.whenNotMatched()
.insertExpr(Map(
"target.key" -> "source.key",
"target.lastSeen" -> "source.timestamp",
"target.status" -> "'active'",
)
)
.whenNotMatchedBySource("target.lastSeen >= (current_date() - INTERVAL '5' DAY)")
.updateExpr(Map("target.status" -> "'inactive'"))
.execute()
SQL
MERGE INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
UPDATE SET target.lastSeen = source.timestamp
WHEN NOT MATCHED THEN
INSERT (key, lastSeen, status) VALUES (source.key, source.timestamp, 'active')
WHEN NOT MATCHED BY SOURCE AND target.lastSeen >= (current_date() - INTERVAL '5' DAY) THEN
UPDATE SET target.status = 'inactive'
合并操作语义
下面是 merge
编程操作语义的详细说明。
可以有任意数量的
whenMatched
和whenNotMatched
子句。当源行根据匹配条件与目标表行匹配时,将执行
whenMatched
子句。 这些子句具有以下语义。whenMatched
子句最多可以有 1 个update
和 1 个delete
操作。merge
中的update
操作只更新匹配目标行的指定列(类似于update
操作)。delete
操作删除匹配的行。每个
whenMatched
子句都可以有一个可选条件。 如果存在此子句条件,则仅当该子句条件成立时,才对任何匹配的源-目标行对执行update
或delete
操作。如果存在多个
whenMatched
子句,则会按照它们的指定顺序对其进行求值。 除最后一个之外,所有whenMatched
子句都必须具有条件。如果对于匹配合并条件的源行和目标行对,没有任何
whenMatched
条件的计算结果为 true,则目标行保持不变。若要使用源数据集的相应列更新目标 Delta 表的所有列,请使用
whenMatched(...).updateAll()
。 这等效于:whenMatched(...).updateExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
针对目标 Delta 表的所有列。 因此,此操作假定源表的列与目标表的列相同,否则查询将引发分析错误。
注意
启用自动架构迁移后,此行为将发生变化。 有关详细信息,请参阅自动架构演变。
当源行根据匹配条件与任何目标行都不匹配时,将执行
whenNotMatched
子句。 这些子句具有以下语义。whenNotMatched
子句只能具有insert
操作。 新行是基于指定的列和相应的表达式生成的。 你无需指定目标表中的所有列。 对于未指定的目标列,将插入NULL
。每个
whenNotMatched
子句都可以有一个可选条件。 如果存在子句条件,则仅当源条件对该行成立时才插入该行。 否则,将忽略源列。如果存在多个
whenNotMatched
子句,则会按照它们的指定顺序对其进行求值。 除最后一个之外,所有whenNotMatched
子句都必须具有条件。若要使用源数据集的相应列插入目标 Delta 表的所有列,请使用
whenNotMatched(...).insertAll()
。 这等效于:whenNotMatched(...).insertExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
针对目标 Delta 表的所有列。 因此,此操作假定源表的列与目标表的列相同,否则查询将引发分析错误。
注意
启用自动架构迁移后,此行为将发生变化。 有关详细信息,请参阅自动架构演变。
当源行根据匹配条件与任何目标行都不匹配时,将执行
whenNotMatchedBySource
子句。 这些子句具有以下语义。whenNotMatchedBySource
子句可以指定delete
和update
操作。- 每个
whenNotMatchedBySource
子句都可以有一个可选条件。 如果存在子句条件,则仅当目标条件对该行成立时才修改该行。 否则,目标行保持不变。 - 如果存在多个
whenNotMatchedBySource
子句,则会按照它们的指定顺序对其进行求值。 除最后一个之外,所有whenNotMatchedBySource
子句都必须具有条件。 - 根据定义,
whenNotMatchedBySource
子句没有可从中拉取列值的源行,因此无法引用源列。 对于要修改的每一列,可以指定文本或对目标列执行操作,例如SET target.deleted_count = target.deleted_count + 1
。
重要
- 如果源数据集的多行匹配,并且合并尝试更新目标 Delta 表的相同行,则
merge
操作可能会失败。 根据合并的 SQL 语义,这种更新操作模棱两可,因为尚不清楚应使用哪个源行来更新匹配的目标行。 你可以预处理源表来消除出现多个匹配项的可能性。 - 仅当视图已定义为
CREATE VIEW viewName AS SELECT * FROM deltaTable
时,才能对 SQL VIEW 应用 SQLMERGE
操作。
写入 Delta 表时进行重复数据删除
一个常见的 ETL 用例是通过将日志附加到表中来将其收集到 Delta 表中。 但是,源通常可以生成重复的日志记录,因此需要下游重复数据删除步骤来处理它们。 通过 merge
,你可以避免插入重复记录。
SQL
MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId
WHEN NOT MATCHED
THEN INSERT *
Python
deltaTable.alias("logs").merge(
newDedupedLogs.alias("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId") \
.whenNotMatchedInsertAll() \
.execute()
Scala
deltaTable
.as("logs")
.merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId")
.whenNotMatched()
.insertAll()
.execute()
Java
deltaTable
.as("logs")
.merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId")
.whenNotMatched()
.insertAll()
.execute();
注意
包含新日志的数据集需要在其内部进行重复数据删除。 根据合并的 SQL 语义,该数据集会将新数据与表中的现有数据进行匹配并删除重复数据,但如果新数据集中存在重复数据,则将插入。 因此,在合并到表之前,请对新数据进行重复数据删除。
如果你知道只在几天之内有重复记录,则可以通过按日期对表进行分区,然后指定要匹配的目标表的日期范围来进一步优化查询。
SQL
MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS
WHEN NOT MATCHED AND newDedupedLogs.date > current_date() - INTERVAL 7 DAYS
THEN INSERT *
Python
deltaTable.alias("logs").merge(
newDedupedLogs.alias("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS") \
.whenNotMatchedInsertAll("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS") \
.execute()
Scala
deltaTable.as("logs").merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
.whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
.insertAll()
.execute()
Java
deltaTable.as("logs").merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
.whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
.insertAll()
.execute();
这种方法比使用前面的命令更有效,因为它仅在日志的最后 7 天而不是整个表中查找重复项。 此外,你还可以将此 insert-only merge 与结构化流式处理一起使用,以执行日志的连续重复数据删除。
- 在流式处理查询中,可以使用
foreachBatch
中的 merge 操作将具有重复数据删除功能的所有流数据连续写入 Delta 表。 请参阅以下流式处理查询示例,了解有关foreachBatch
的详细信息。 - 在另一个流式处理查询中,你可以从此 Delta 表中连续读取重复数据删除的数据。 这是可能的,因为 insert-only merge 仅将新数据附加到 Delta 表。
使用 Delta Lake 的缓慢变化数据 (SCD) 和变更数据捕获 (CDC)
Delta Live Tables 原生支持跟踪和应用 SCD 类型 1 和类型 2。 将 APPLY CHANGES INTO
与 Delta Live Tables 配合使用可确保在处理 CDC 源时正确处理无序记录。 请参阅APPLY CHANGES API:使用增量实时表简化变更数据捕获。
将 Delta 表与源逐步同步
在 Databricks SQL 和 Databricks Runtime 12.2 LTS 及更高版本中,可以使用 WHEN NOT MATCHED BY SOURCE
创建任意条件以自动删除和替换表的一部分。 如果所用源表的记录可能会在初始数据输入后的几天内更改或删除,但最终会稳定到最终状态,则这会特别有用。
以下查询显示使用此模式从源中选择 5 天的记录,更新目标中的匹配记录,将新记录从源插入目标,并删除目标中过去 5 天的所有不匹配记录。
MERGE INTO target AS t
USING (SELECT * FROM source WHERE created_at >= (current_date() - INTERVAL '5' DAY)) AS s
ON t.key = s.key
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
WHEN NOT MATCHED BY SOURCE AND created_at >= (current_date() - INTERVAL '5' DAY) THEN DELETE
通过在源表和目标表上提供同一布尔筛选器,可以将更改从源表动态传播到目标表,包括删除。
注意
尽管可以在不使用任何条件子句的情况下使用此模式,但这将会完全重写目标表,从而可能导致较高的开销。