可以使用 MERGE SQL 操作将源表、视图或 DataFrame 中的数据更新插入目标 Delta 表。 Delta Lake 支持 MERGE 中的插入、更新和删除,并支持超出 SQL 标准的扩展语法以辅助高级用例。
假设你有一个名为 people10mupdates 的源表或 /tmp/delta/people-10m-updates 处的源路径,其中包含名为 people10m 的目标表或 /tmp/delta/people-10m 处目标路径的新数据。 其中一些新记录可能已存在于目标数据中。 若要合并新数据,需要更新已有人员的 id 的行,并插入不存在匹配的 id 的新行。 可以运行以下查询:
SQL
MERGE INTO people10m
USING people10mupdates
ON people10m.id = people10mupdates.id
WHEN MATCHED THEN
  UPDATE SET
    id = people10mupdates.id,
    firstName = people10mupdates.firstName,
    middleName = people10mupdates.middleName,
    lastName = people10mupdates.lastName,
    gender = people10mupdates.gender,
    birthDate = people10mupdates.birthDate,
    ssn = people10mupdates.ssn,
    salary = people10mupdates.salary
WHEN NOT MATCHED
  THEN INSERT (
    id,
    firstName,
    middleName,
    lastName,
    gender,
    birthDate,
    ssn,
    salary
  )
  VALUES (
    people10mupdates.id,
    people10mupdates.firstName,
    people10mupdates.middleName,
    people10mupdates.lastName,
    people10mupdates.gender,
    people10mupdates.birthDate,
    people10mupdates.ssn,
    people10mupdates.salary
  )
Python
from delta.tables import *
deltaTablePeople = DeltaTable.forName(spark, "people10m")
deltaTablePeopleUpdates = DeltaTable.forName(spark, "people10mupdates")
dfUpdates = deltaTablePeopleUpdates.toDF()
deltaTablePeople.alias('people') \
  .merge(
    dfUpdates.alias('updates'),
    'people.id = updates.id'
  ) \
  .whenMatchedUpdate(set =
    {
      "id": "updates.id",
      "firstName": "updates.firstName",
      "middleName": "updates.middleName",
      "lastName": "updates.lastName",
      "gender": "updates.gender",
      "birthDate": "updates.birthDate",
      "ssn": "updates.ssn",
      "salary": "updates.salary"
    }
  ) \
  .whenNotMatchedInsert(values =
    {
      "id": "updates.id",
      "firstName": "updates.firstName",
      "middleName": "updates.middleName",
      "lastName": "updates.lastName",
      "gender": "updates.gender",
      "birthDate": "updates.birthDate",
      "ssn": "updates.ssn",
      "salary": "updates.salary"
    }
  ) \
  .execute()
Scala(编程语言)
import io.delta.tables._
import org.apache.spark.sql.functions._
val deltaTablePeople = DeltaTable.forName(spark, "people10m")
val deltaTablePeopleUpdates = DeltaTable.forName(spark, "people10mupdates")
val dfUpdates = deltaTablePeopleUpdates.toDF()
deltaTablePeople
  .as("people")
  .merge(
    dfUpdates.as("updates"),
    "people.id = updates.id")
  .whenMatched
  .updateExpr(
    Map(
      "id" -> "updates.id",
      "firstName" -> "updates.firstName",
      "middleName" -> "updates.middleName",
      "lastName" -> "updates.lastName",
      "gender" -> "updates.gender",
      "birthDate" -> "updates.birthDate",
      "ssn" -> "updates.ssn",
      "salary" -> "updates.salary"
    ))
  .whenNotMatched
  .insertExpr(
    Map(
      "id" -> "updates.id",
      "firstName" -> "updates.firstName",
      "middleName" -> "updates.middleName",
      "lastName" -> "updates.lastName",
      "gender" -> "updates.gender",
      "birthDate" -> "updates.birthDate",
      "ssn" -> "updates.ssn",
      "salary" -> "updates.salary"
    ))
  .execute()
重要
仅源表中的单个行可以匹配目标表中的给定行。 在 Databricks Runtime 16.0 及更高版本中,MERGE 会评估 WHEN MATCHED 和 ON 子句中指定的条件,以确定重复匹配项。 在 Databricks Runtime 15.4 LTS 及更低版本中,MERGE 操作仅考虑 ON 子句中指定的条件。
请参阅 Delta Lake API 文档,了解 Scala 和 Python 语法详细信息。 有关 SQL 语法详细信息,请参阅 MERGE INTO
使用合并修改所有不匹配的行
在 Databricks SQL 和 Databricks Runtime 12.2 LTS 及更高版本中,可以使用 WHEN NOT MATCHED BY SOURCE 子句对目标表中的、在源表中没有相应记录的记录执行 UPDATE 或 DELETE 操作。 Databricks 建议添加可选条件子句,以避免完全重写目标表。
以下代码示例显示了将此子句用于删除操作、使用源表的内容覆盖目标表,以及删除目标表中不匹配的记录的基本语法。 有关源更新和删除操作有时限的表的更具可缩放性模式,请参阅将 Delta 表与源逐步同步。
Python
(targetDF
  .merge(sourceDF, "source.key = target.key")
  .whenMatchedUpdateAll()
  .whenNotMatchedInsertAll()
  .whenNotMatchedBySourceDelete()
  .execute()
)
Scala(编程语言)
targetDF
  .merge(sourceDF, "source.key = target.key")
  .whenMatched()
  .updateAll()
  .whenNotMatched()
  .insertAll()
  .whenNotMatchedBySource()
  .delete()
  .execute()
SQL
MERGE INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
  UPDATE SET *
WHEN NOT MATCHED THEN
  INSERT *
WHEN NOT MATCHED BY SOURCE THEN
  DELETE
以下示例将条件添加到 WHEN NOT MATCHED BY SOURCE 子句并指定要在不匹配的目标行中更新的值。
Python
(targetDF
  .merge(sourceDF, "source.key = target.key")
  .whenMatchedUpdate(
    set = {"target.lastSeen": "source.timestamp"}
  )
  .whenNotMatchedInsert(
    values = {
      "target.key": "source.key",
      "target.lastSeen": "source.timestamp",
      "target.status": "'active'"
    }
  )
  .whenNotMatchedBySourceUpdate(
    condition="target.lastSeen >= (current_date() - INTERVAL '5' DAY)",
    set = {"target.status": "'inactive'"}
  )
  .execute()
)
Scala(编程语言)
targetDF
  .merge(sourceDF, "source.key = target.key")
  .whenMatched()
  .updateExpr(Map("target.lastSeen" -> "source.timestamp"))
  .whenNotMatched()
  .insertExpr(Map(
    "target.key" -> "source.key",
    "target.lastSeen" -> "source.timestamp",
    "target.status" -> "'active'",
    )
  )
  .whenNotMatchedBySource("target.lastSeen >= (current_date() - INTERVAL '5' DAY)")
  .updateExpr(Map("target.status" -> "'inactive'"))
  .execute()
SQL
MERGE INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
  UPDATE SET target.lastSeen = source.timestamp
WHEN NOT MATCHED THEN
  INSERT (key, lastSeen, status) VALUES (source.key,  source.timestamp, 'active')
WHEN NOT MATCHED BY SOURCE AND target.lastSeen >= (current_date() - INTERVAL '5' DAY) THEN
  UPDATE SET target.status = 'inactive'
合并操作语义
下面是 merge 编程操作语义的详细说明。
- 可以有任意数量的 - whenMatched和- whenNotMatched子句。
- 当源行根据匹配条件与目标表行匹配时,将执行 - whenMatched子句。 这些子句具有以下语义。- whenMatched子句最多可以有 1 个- update和 1 个- delete操作。- update中的- merge操作只更新匹配目标行的指定列(类似于- update操作)。- delete操作删除匹配的行。
- 每个 - whenMatched子句都可以有一个可选条件。 如果存在此子句条件,则仅当该子句条件成立时,才对任何匹配的源-目标行对执行- update或- delete操作。
- 如果存在多个 - whenMatched子句,则会按照它们的指定顺序对其进行求值。 除最后一个之外,所有- whenMatched子句都必须具有条件。
- 如果对于匹配合并条件的源行和目标行对,没有任何 - whenMatched条件的计算结果为 true,则目标行保持不变。
- 若要使用源数据集的相应列更新目标 Delta 表的所有列,请使用 - whenMatched(...).updateAll()。 这等效于:- whenMatched(...).updateExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))- 针对目标 Delta 表的所有列。 因此,此操作假定源表的列与目标表的列相同,否则查询将引发分析错误。 - 注意 - 启用自动架构迁移后,此行为将发生变化。 有关详细信息,请参阅自动架构演变。 
 
- 当源行根据匹配条件与任何目标行都不匹配时,将执行 - whenNotMatched子句。 这些子句具有以下语义。- whenNotMatched子句只能具有- insert操作。 新行是基于指定的列和相应的表达式生成的。 你无需指定目标表中的所有列。 对于未指定的目标列,将插入- NULL。
- 每个 - whenNotMatched子句都可以有一个可选条件。 如果存在子句条件,则仅当源条件对该行成立时才插入该行。 否则,将忽略源列。
- 如果存在多个 - whenNotMatched子句,则会按照它们的指定顺序对其进行求值。 除最后一个之外,所有- whenNotMatched子句都必须具有条件。
- 若要使用源数据集的相应列插入目标 Delta 表的所有列,请使用 - whenNotMatched(...).insertAll()。 这等效于:- whenNotMatched(...).insertExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))- 针对目标 Delta 表的所有列。 因此,此操作假定源表的列与目标表的列相同,否则查询将引发分析错误。 - 注意 - 启用自动架构迁移后,此行为将发生变化。 有关详细信息,请参阅自动架构演变。 
 
- 当源行根据匹配条件与任何目标行都不匹配时,将执行 - whenNotMatchedBySource子句。 这些子句具有以下语义。- 
              whenNotMatchedBySource子句可以指定delete和update操作。
- 每个 whenNotMatchedBySource子句都可以有一个可选条件。 如果存在子句条件,则仅当目标条件对该行成立时才修改该行。 否则,目标行保持不变。
- 如果存在多个 whenNotMatchedBySource子句,则会按照它们的指定顺序对其进行求值。 除最后一个之外,所有whenNotMatchedBySource子句都必须具有条件。
- 根据定义,whenNotMatchedBySource子句没有可从中拉取列值的源行,因此无法引用源列。 对于要修改的每个列,可以指定一个固定值或对目标列执行操作,例如SET target.deleted_count = target.deleted_count + 1。
 
- 
              
重要
- 如果源数据集的多行匹配,并且合并尝试更新目标 Delta 表的相同行,则 merge操作可能会失败。 根据合并的 SQL 语义,这种更新操作模棱两可,因为尚不清楚应使用哪个源行来更新匹配的目标行。 你可以预处理源表来消除出现多个匹配项的可能性。
- 仅当视图已定义为 MERGE时,才能对 SQL VIEW 应用 SQLCREATE VIEW viewName AS SELECT * FROM deltaTable操作。
写入 Delta 表时进行重复数据删除
一个常见的 ETL 用例是通过将日志附加到表中来将其收集到 Delta 表中。 但是,源通常可以生成重复的日志记录,因此需要下游重复数据删除步骤来处理它们。 通过 merge,你可以避免插入重复记录。
SQL
MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId
WHEN NOT MATCHED
  THEN INSERT *
Python
deltaTable.alias("logs").merge(
    newDedupedLogs.alias("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId") \
  .whenNotMatchedInsertAll() \
  .execute()
Scala(编程语言)
deltaTable
  .as("logs")
  .merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId")
  .whenNotMatched()
  .insertAll()
  .execute()
Java
deltaTable
  .as("logs")
  .merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId")
  .whenNotMatched()
  .insertAll()
  .execute();
注意
包含新日志的数据集需要在其内部进行重复数据删除。 根据合并的 SQL 语义,该数据集会将新数据与表中的现有数据进行匹配并删除重复数据,但如果新数据集中存在重复数据,则将插入。 因此,在合并到表之前,请对新数据进行重复数据删除。
如果你知道只在几天之内有重复记录,则可以通过按日期对表进行分区,然后指定要匹配的目标表的日期范围来进一步优化查询。
SQL
MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS
WHEN NOT MATCHED AND newDedupedLogs.date > current_date() - INTERVAL 7 DAYS
  THEN INSERT *
Python
deltaTable.alias("logs").merge(
    newDedupedLogs.alias("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS") \
  .whenNotMatchedInsertAll("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS") \
  .execute()
Scala(编程语言)
deltaTable.as("logs").merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
  .whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
  .insertAll()
  .execute()
Java
deltaTable.as("logs").merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
  .whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
  .insertAll()
  .execute();
这种方法比使用前面的命令更有效,因为它仅在日志的最后 7 天而不是整个表中查找重复项。 此外,你还可以将此 insert-only merge 与结构化流式处理一起使用,以执行日志的连续重复数据删除。
- 在流式处理查询中,可以使用 foreachBatch中的 merge 操作将具有重复数据删除功能的所有流数据连续写入 Delta 表。 请参阅以下流式处理查询示例,了解有关foreachBatch的详细信息。
- 在另一个流式处理查询中,你可以从此 Delta 表中连续读取重复数据删除的数据。 这是可能的,因为 insert-only merge 仅将新数据附加到 Delta 表。
使用 Delta Lake 的缓慢变化数据 (SCD) 和变更数据捕获 (CDC)
Lakeflow 声明式管道原生支持跟踪和应用 SCD 类型 1 和类型 2。 在处理 CDC 数据流时,与 Lakeflow 声明性管道一起使用 AUTO CDC ... INTO ,以确保无序记录得到正确处理。 请参阅 AUTO CDC API:使用 Lakeflow 声明性管道简化变更数据捕获。
将 Delta 表与源逐步同步
在 Databricks SQL 和 Databricks Runtime 12.2 LTS 及更高版本中,可以使用 WHEN NOT MATCHED BY SOURCE 创建任意条件以自动删除和替换表的一部分。 如果所用源表的记录可能会在初始数据输入后的几天内更改或删除,但最终会稳定到最终状态,则这会特别有用。
以下查询显示使用此模式从源中选择 5 天的记录,更新目标中的匹配记录,将新记录从源插入目标,并删除目标中过去 5 天的所有不匹配记录。
MERGE INTO target AS t
USING (SELECT * FROM source WHERE created_at >= (current_date() - INTERVAL '5' DAY)) AS s
ON t.key = s.key
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
WHEN NOT MATCHED BY SOURCE AND created_at >= (current_date() - INTERVAL '5' DAY) THEN DELETE
通过在源表和目标表上提供同一布尔筛选器,可以将更改从源表动态传播到目标表,包括删除。
注意
尽管可以在不使用任何条件子句的情况下使用此模式,但这将会完全重写目标表,从而可能导致较高的开销。