使用合并以更新插入的方式插入到 Delta Lake 表中

可以使用 MERGE SQL 操作将源表、视图或 DataFrame 中的数据更新插入目标 Delta 表。 Delta Lake 支持 MERGE 中的插入、更新和删除,并支持超出 SQL 标准的扩展语法以辅助高级用例。

假设你有一个名为 people10mupdates 的源表或 /tmp/delta/people-10m-updates 处的源路径,其中包含名为 people10m 的目标表或 /tmp/delta/people-10m 处目标路径的新数据。 其中一些新记录可能已存在于目标数据中。 若要合并新数据,需要更新已有人员的 id 的行,并插入不存在匹配的 id 的新行。 可以运行以下查询:

SQL

MERGE INTO people10m
USING people10mupdates
ON people10m.id = people10mupdates.id
WHEN MATCHED THEN
  UPDATE SET
    id = people10mupdates.id,
    firstName = people10mupdates.firstName,
    middleName = people10mupdates.middleName,
    lastName = people10mupdates.lastName,
    gender = people10mupdates.gender,
    birthDate = people10mupdates.birthDate,
    ssn = people10mupdates.ssn,
    salary = people10mupdates.salary
WHEN NOT MATCHED
  THEN INSERT (
    id,
    firstName,
    middleName,
    lastName,
    gender,
    birthDate,
    ssn,
    salary
  )
  VALUES (
    people10mupdates.id,
    people10mupdates.firstName,
    people10mupdates.middleName,
    people10mupdates.lastName,
    people10mupdates.gender,
    people10mupdates.birthDate,
    people10mupdates.ssn,
    people10mupdates.salary
  )

Python

from delta.tables import *

deltaTablePeople = DeltaTable.forName(spark, "people10m")
deltaTablePeopleUpdates = DeltaTable.forName(spark, "people10mupdates")

dfUpdates = deltaTablePeopleUpdates.toDF()

deltaTablePeople.alias('people') \
  .merge(
    dfUpdates.alias('updates'),
    'people.id = updates.id'
  ) \
  .whenMatchedUpdate(set =
    {
      "id": "updates.id",
      "firstName": "updates.firstName",
      "middleName": "updates.middleName",
      "lastName": "updates.lastName",
      "gender": "updates.gender",
      "birthDate": "updates.birthDate",
      "ssn": "updates.ssn",
      "salary": "updates.salary"
    }
  ) \
  .whenNotMatchedInsert(values =
    {
      "id": "updates.id",
      "firstName": "updates.firstName",
      "middleName": "updates.middleName",
      "lastName": "updates.lastName",
      "gender": "updates.gender",
      "birthDate": "updates.birthDate",
      "ssn": "updates.ssn",
      "salary": "updates.salary"
    }
  ) \
  .execute()

Scala

import io.delta.tables._
import org.apache.spark.sql.functions._

val deltaTablePeople = DeltaTable.forName(spark, "people10m")
val deltaTablePeopleUpdates = DeltaTable.forName(spark, "people10mupdates")
val dfUpdates = deltaTablePeopleUpdates.toDF()

deltaTablePeople
  .as("people")
  .merge(
    dfUpdates.as("updates"),
    "people.id = updates.id")
  .whenMatched
  .updateExpr(
    Map(
      "id" -> "updates.id",
      "firstName" -> "updates.firstName",
      "middleName" -> "updates.middleName",
      "lastName" -> "updates.lastName",
      "gender" -> "updates.gender",
      "birthDate" -> "updates.birthDate",
      "ssn" -> "updates.ssn",
      "salary" -> "updates.salary"
    ))
  .whenNotMatched
  .insertExpr(
    Map(
      "id" -> "updates.id",
      "firstName" -> "updates.firstName",
      "middleName" -> "updates.middleName",
      "lastName" -> "updates.lastName",
      "gender" -> "updates.gender",
      "birthDate" -> "updates.birthDate",
      "ssn" -> "updates.ssn",
      "salary" -> "updates.salary"
    ))
  .execute()

重要

仅源表中的单个行可以匹配目标表中的给定行。 在 Databricks Runtime 16.0 及更高版本中,MERGE 会评估 WHEN MATCHEDON 子句中指定的条件,以确定重复匹配项。 在 Databricks Runtime 15.4 LTS 及更低版本中,MERGE 操作仅考虑 ON 子句中指定的条件。

请参阅 Delta Lake API 文档,了解 Scala 和 Python 语法详细信息。 有关 SQL 语法详细信息,请参阅 MERGE INTO

使用合并修改所有不匹配的行

在 Databricks SQL 和 Databricks Runtime 12.2 LTS 及更高版本中,可以使用 WHEN NOT MATCHED BY SOURCE 子句对目标表中的、在源表中没有相应记录的记录执行 UPDATEDELETE 操作。 Databricks 建议添加可选条件子句,以避免完全重写目标表。

以下代码示例显示了将此子句用于删除操作、使用源表的内容覆盖目标表,以及删除目标表中不匹配的记录的基本语法。 有关源更新和删除操作有时限的表的更具可缩放性模式,请参阅将 Delta 表与源逐步同步

Python

(targetDF
  .merge(sourceDF, "source.key = target.key")
  .whenMatchedUpdateAll()
  .whenNotMatchedInsertAll()
  .whenNotMatchedBySourceDelete()
  .execute()
)

Scala

targetDF
  .merge(sourceDF, "source.key = target.key")
  .whenMatched()
  .updateAll()
  .whenNotMatched()
  .insertAll()
  .whenNotMatchedBySource()
  .delete()
  .execute()

SQL

MERGE INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
  UPDATE SET *
WHEN NOT MATCHED THEN
  INSERT *
WHEN NOT MATCHED BY SOURCE THEN
  DELETE

以下示例将条件添加到 WHEN NOT MATCHED BY SOURCE 子句并指定要在不匹配的目标行中更新的值。

Python

(targetDF
  .merge(sourceDF, "source.key = target.key")
  .whenMatchedUpdate(
    set = {"target.lastSeen": "source.timestamp"}
  )
  .whenNotMatchedInsert(
    values = {
      "target.key": "source.key",
      "target.lastSeen": "source.timestamp",
      "target.status": "'active'"
    }
  )
  .whenNotMatchedBySourceUpdate(
    condition="target.lastSeen >= (current_date() - INTERVAL '5' DAY)",
    set = {"target.status": "'inactive'"}
  )
  .execute()
)

Scala

targetDF
  .merge(sourceDF, "source.key = target.key")
  .whenMatched()
  .updateExpr(Map("target.lastSeen" -> "source.timestamp"))
  .whenNotMatched()
  .insertExpr(Map(
    "target.key" -> "source.key",
    "target.lastSeen" -> "source.timestamp",
    "target.status" -> "'active'",
    )
  )
  .whenNotMatchedBySource("target.lastSeen >= (current_date() - INTERVAL '5' DAY)")
  .updateExpr(Map("target.status" -> "'inactive'"))
  .execute()

SQL

MERGE INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
  UPDATE SET target.lastSeen = source.timestamp
WHEN NOT MATCHED THEN
  INSERT (key, lastSeen, status) VALUES (source.key,  source.timestamp, 'active')
WHEN NOT MATCHED BY SOURCE AND target.lastSeen >= (current_date() - INTERVAL '5' DAY) THEN
  UPDATE SET target.status = 'inactive'

合并操作语义

下面是 merge 编程操作语义的详细说明。

  • 可以有任意数量的 whenMatchedwhenNotMatched 子句。

  • 当源行根据匹配条件与目标表行匹配时,将执行 whenMatched 子句。 这些子句具有以下语义。

    • whenMatched 子句最多可以有 1 个 update 和 1 个 delete 操作。 merge 中的 update 操作只更新匹配目标行的指定列(类似于 update 操作)。 delete 操作删除匹配的行。

    • 每个 whenMatched 子句都可以有一个可选条件。 如果存在此子句条件,则仅当该子句条件成立时,才对任何匹配的源-目标行对执行 updatedelete 操作。

    • 如果存在多个 whenMatched 子句,则会按照它们的指定顺序对其进行求值。 除最后一个之外,所有 whenMatched 子句都必须具有条件。

    • 如果对于匹配合并条件的源行和目标行对,没有任何 whenMatched 条件的计算结果为 true,则目标行保持不变。

    • 若要使用源数据集的相应列更新目标 Delta 表的所有列,请使用 whenMatched(...).updateAll()。 这等效于:

      whenMatched(...).updateExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
      

      针对目标 Delta 表的所有列。 因此,此操作假定源表的列与目标表的列相同,否则查询将引发分析错误。

      注意

      启用自动架构迁移后,此行为将发生变化。 有关详细信息,请参阅自动架构演变

  • 当源行根据匹配条件与任何目标行都不匹配时,将执行 whenNotMatched 子句。 这些子句具有以下语义。

    • whenNotMatched 子句只能具有 insert 操作。 新行是基于指定的列和相应的表达式生成的。 你无需指定目标表中的所有列。 对于未指定的目标列,将插入 NULL

    • 每个 whenNotMatched 子句都可以有一个可选条件。 如果存在子句条件,则仅当源条件对该行成立时才插入该行。 否则,将忽略源列。

    • 如果存在多个 whenNotMatched 子句,则会按照它们的指定顺序对其进行求值。 除最后一个之外,所有 whenNotMatched 子句都必须具有条件。

    • 若要使用源数据集的相应列插入目标 Delta 表的所有列,请使用 whenNotMatched(...).insertAll()。 这等效于:

      whenNotMatched(...).insertExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
      

      针对目标 Delta 表的所有列。 因此,此操作假定源表的列与目标表的列相同,否则查询将引发分析错误。

      注意

      启用自动架构迁移后,此行为将发生变化。 有关详细信息,请参阅自动架构演变

  • 当源行根据匹配条件与任何目标行都不匹配时,将执行 whenNotMatchedBySource 子句。 这些子句具有以下语义。

    • whenNotMatchedBySource 子句可以指定 deleteupdate 操作。
    • 每个 whenNotMatchedBySource 子句都可以有一个可选条件。 如果存在子句条件,则仅当目标条件对该行成立时才修改该行。 否则,目标行保持不变。
    • 如果存在多个 whenNotMatchedBySource 子句,则会按照它们的指定顺序对其进行求值。 除最后一个之外,所有 whenNotMatchedBySource 子句都必须具有条件。
    • 根据定义,whenNotMatchedBySource 子句没有可从中拉取列值的源行,因此无法引用源列。 对于要修改的每一列,可以指定文本或对目标列执行操作,例如 SET target.deleted_count = target.deleted_count + 1

重要

  • 如果源数据集的多行匹配,并且合并尝试更新目标 Delta 表的相同行,则 merge 操作可能会失败。 根据合并的 SQL 语义,这种更新操作模棱两可,因为尚不清楚应使用哪个源行来更新匹配的目标行。 你可以预处理源表来消除出现多个匹配项的可能性。
  • 仅当视图已定义为 CREATE VIEW viewName AS SELECT * FROM deltaTable 时,才能对 SQL VIEW 应用 SQL MERGE 操作。

写入 Delta 表时进行重复数据删除

一个常见的 ETL 用例是通过将日志附加到表中来将其收集到 Delta 表中。 但是,源通常可以生成重复的日志记录,因此需要下游重复数据删除步骤来处理它们。 通过 merge,你可以避免插入重复记录。

SQL

MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId
WHEN NOT MATCHED
  THEN INSERT *

Python

deltaTable.alias("logs").merge(
    newDedupedLogs.alias("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId") \
  .whenNotMatchedInsertAll() \
  .execute()

Scala

deltaTable
  .as("logs")
  .merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId")
  .whenNotMatched()
  .insertAll()
  .execute()

Java

deltaTable
  .as("logs")
  .merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId")
  .whenNotMatched()
  .insertAll()
  .execute();

注意

包含新日志的数据集需要在其内部进行重复数据删除。 根据合并的 SQL 语义,该数据集会将新数据与表中的现有数据进行匹配并删除重复数据,但如果新数据集中存在重复数据,则将插入。 因此,在合并到表之前,请对新数据进行重复数据删除。

如果你知道只在几天之内有重复记录,则可以通过按日期对表进行分区,然后指定要匹配的目标表的日期范围来进一步优化查询。

SQL

MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS
WHEN NOT MATCHED AND newDedupedLogs.date > current_date() - INTERVAL 7 DAYS
  THEN INSERT *

Python

deltaTable.alias("logs").merge(
    newDedupedLogs.alias("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS") \
  .whenNotMatchedInsertAll("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS") \
  .execute()

Scala

deltaTable.as("logs").merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
  .whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
  .insertAll()
  .execute()

Java

deltaTable.as("logs").merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
  .whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
  .insertAll()
  .execute();

这种方法比使用前面的命令更有效,因为它仅在日志的最后 7 天而不是整个表中查找重复项。 此外,你还可以将此 insert-only merge 与结构化流式处理一起使用,以执行日志的连续重复数据删除。

  • 在流式处理查询中,可以使用 foreachBatch 中的 merge 操作将具有重复数据删除功能的所有流数据连续写入 Delta 表。 请参阅以下流式处理查询示例,了解有关 foreachBatch 的详细信息。
  • 在另一个流式处理查询中,你可以从此 Delta 表中连续读取重复数据删除的数据。 这是可能的,因为 insert-only merge 仅将新数据附加到 Delta 表。

使用 Delta Lake 的缓慢变化数据 (SCD) 和变更数据捕获 (CDC)

Delta Live Tables 原生支持跟踪和应用 SCD 类型 1 和类型 2。 将 APPLY CHANGES INTO 与 Delta Live Tables 配合使用可确保在处理 CDC 源时正确处理无序记录。 请参阅APPLY CHANGES API:使用增量实时表简化变更数据捕获

将 Delta 表与源逐步同步

在 Databricks SQL 和 Databricks Runtime 12.2 LTS 及更高版本中,可以使用 WHEN NOT MATCHED BY SOURCE 创建任意条件以自动删除和替换表的一部分。 如果所用源表的记录可能会在初始数据输入后的几天内更改或删除,但最终会稳定到最终状态,则这会特别有用。

以下查询显示使用此模式从源中选择 5 天的记录,更新目标中的匹配记录,将新记录从源插入目标,并删除目标中过去 5 天的所有不匹配记录。

MERGE INTO target AS t
USING (SELECT * FROM source WHERE created_at >= (current_date() - INTERVAL '5' DAY)) AS s
ON t.key = s.key
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
WHEN NOT MATCHED BY SOURCE AND created_at >= (current_date() - INTERVAL '5' DAY) THEN DELETE

通过在源表和目标表上提供同一布尔筛选器,可以将更改从源表动态传播到目标表,包括删除。

注意

尽管可以在不使用任何条件子句的情况下使用此模式,但这将会完全重写目标表,从而可能导致较高的开销。