表删除、更新和合并Table deletes, updates, and merges

Delta Lake 支持多个语句,以便在 Delta 表中删除数据和更新数据。Delta Lake supports several statements to facilitate deleting data from and updating data in Delta tables.

从表中删除 Delete from a table

可以从 Delta 表中删除与谓词匹配的数据。You can remove data that matches a predicate from a Delta table. 例如,若要删除 2017 之前的所有事件,可以运行以下命令:For instance, to delete all events from before 2017, you can run the following:

SQLSQL

DELETE FROM events WHERE date < '2017-01-01'

DELETE FROM delta.`/data/events/` WHERE date < '2017-01-01'

PythonPython

备注

可在 Databricks Runtime 6.1 及更高版本中使用 Python API。The Python API is available in Databricks Runtime 6.1 and above.

from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forPath(spark, "/data/events/")

deltaTable.delete("date < '2017-01-01'")        # predicate using SQL formatted string

deltaTable.delete(col("date") < "2017-01-01")   # predicate using Spark SQL functions

ScalaScala

备注

可在 Databricks Runtime 6.0 及更高版本中使用 Scala API。The Scala API is available in Databricks Runtime 6.0 and above.

import io.delta.tables._

val deltaTable = DeltaTable.forPath(spark, "/data/events/")

deltaTable.delete("date < '2017-01-01'")        // predicate using SQL formatted string

import org.apache.spark.sql.functions._
import spark.implicits._

deltaTable.delete(col("date") < "2017-01-01")       // predicate using Spark SQL functions and implicits

JavaJava

备注

可在 Databricks Runtime 6.0 及更高版本中使用 Java API。The Java API is available in Databricks Runtime 6.0 and above.

import io.delta.tables.*;
import org.apache.spark.sql.functions;

DeltaTable deltaTable = DeltaTable.forPath(spark, "/data/events/");

deltaTable.delete("date < '2017-01-01'");            // predicate using SQL formatted string

deltaTable.delete(functions.col("date").lt(functions.lit("2017-01-01")));   // predicate using Spark SQL functions

有关更多详细信息,请参阅 API 参考See the API reference for details.

重要

delete 可以从最新版本的 Delta 表中删除数据,但是直到显式删除旧的版本后才能从物理存储中删除数据。delete removes the data from the latest version of the Delta table but does not remove it from the physical storage until the old versions are explicitly vacuumed. 有关详细信息,请参阅清空See vacuum for details.

提示

如果可能,请在分区的 Delta 表的分区列上提供谓词,因为这样的谓词可以显著加快操作速度。When possible, provide predicates on the partition columns for a partitioned Delta table as such predicates can significantly speed up the operation.

更新表 Update a table

可以在 Delta 表中更新与谓词匹配的数据。You can update data that matches a predicate in a Delta table. 例如,若要解决 eventType 中的拼写错误,可以运行以下命令:For example, to fix a spelling mistake in the eventType, you can run the following:

SQLSQL

UPDATE events SET eventType = 'click' WHERE eventType = 'clck'

UPDATE delta.`/data/events/` SET eventType = 'click' WHERE eventType = 'clck'

PythonPython

备注

可在 Databricks Runtime 6.1 及更高版本中使用 Python API。The Python API is available in Databricks Runtime 6.1 and above.

from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forPath(spark, "/data/events/")

deltaTable.update("eventType = 'clck'", { "eventType": "'click'" } )   # predicate using SQL formatted string

deltaTable.update(col("eventType") == "clck", { "eventType": lit("click") } )   # predicate using Spark SQL functions

ScalaScala

备注

可在 Databricks Runtime 6.0 及更高版本中使用 Scala API。The Scala API is available in Databricks Runtime 6.0 and above.

import io.delta.tables._

val deltaTable = DeltaTable.forPath(spark, "/data/events/")

deltaTable.updateExpr(            // predicate and update expressions using SQL formatted string
  "eventType = 'clck'",
  Map("eventType" -> "'click'")

import org.apache.spark.sql.functions._
import spark.implicits._

deltaTable.update(                // predicate using Spark SQL functions and implicits
  col("eventType") === "clck",
  Map("eventType" -> lit("click")));

JavaJava

备注

可在 Databricks Runtime 6.0 及更高版本中使用 Scala API。The Scala API is available in Databricks Runtime 6.0 and above.

import io.delta.tables.*;
import org.apache.spark.sql.functions;
import java.util.HashMap;

DeltaTable deltaTable = DeltaTable.forPath(spark, "/data/events/");

deltaTable.updateExpr(            // predicate and update expressions using SQL formatted string
  "eventType = 'clck'",
  new HashMap<String, String>() {{
    put("eventType", "'click'");
  }}
);

deltaTable.update(                // predicate using Spark SQL functions
  functions.col(eventType).eq("clck"),
  new HashMap<String, Column>() {{
    put("eventType", functions.lit("click"));
  }}
);

有关更多详细信息,请参阅 API 参考See the API reference for details.

提示

与删除类似,在分区上使用谓词可以显著提高更新操作的速度。Similar to delete, update operations can get a significant speedup with predicates on partitions.

使用合并操作在表中执行更新插入 Upsert into a table using merge

可以使用 merge 操作将源表、视图或 DataFrame 中的数据更新插入到 Delta 表中。You can upsert data from a source table, view, or DataFrame into a target Delta table using the merge operation. 此操作类似于 SQL MERGE INTO 命令,但另外还支持更新、插入和删除操作中的删除操作和附加条件。This operation is similar to the SQL MERGE INTO command but has additional support for deletes and extra conditions in updates, inserts, and deletes.

假设你有一个 Spark DataFrame,它包含带有 eventId 的事件的新数据。Suppose you have a Spark DataFrame that contains new data for events with eventId. 其中一些事件可能已经存在于 events 表中。Some of these events may already be present in the events table. 若要将新数据合并到 events 表中,你需要更新匹配的行(即,eventId 已经存在)并插入新行(即,eventId 不存在)。To merge the new data into the events table, you want to update the matching rows (that is, eventId already present) and insert the new rows (that is, eventId not present). 可以运行以下查询:You can run the following:

SQLSQL

MERGE INTO events
USING updates
ON events.eventId = updates.eventId
WHEN MATCHED THEN
  UPDATE SET events.data = updates.data
WHEN NOT MATCHED
  THEN INSERT (date, eventId, data) VALUES (date, eventId, data)

有关语法的详细信息,请参阅 MERGE INTO SQL 命令See the MERGE INTO SQL command for syntax details.

PythonPython

from delta.tables import *

deltaTable = DeltaTable.forPath(spark, "/data/events/")

deltaTable.alias("events").merge(
    updatesDF.alias("updates"),
    "events.eventId = updates.eventId") \
  .whenMatchedUpdate(set = { "data" : "updates.data" } ) \
  .whenNotMatchedInsert(values =
    {
      "date": "updates.date",
      "eventId": "updates.eventId",
      "data": "updates.data"
    }
  ) \
  .execute()

ScalaScala

import io.delta.tables._
import org.apache.spark.sql.functions._

val updatesDF = ...  // define the updates DataFrame[date, eventId, data]

DeltaTable.forPath(spark, "/data/events/")
  .as("events")
  .merge(
    updatesDF.as("updates"),
    "events.eventId = updates.eventId")
  .whenMatched
  .updateExpr(
    Map("data" -> "updates.data"))
  .whenNotMatched
  .insertExpr(
    Map(
      "date" -> "updates.date",
      "eventId" -> "updates.eventId",
      "data" -> "updates.data"))
  .execute()

JavaJava

import io.delta.tables.*;
import org.apache.spark.sql.functions;
import java.util.HashMap;

Dataset<Row> updatesDF = ...  // define the updates DataFrame[date, eventId, data]

DeltaTable.forPath(spark, "/data/events/")
  .as("events")
  .merge(
    updatesDF.as("updates"),
    "events.eventId = updates.eventId")
  .whenMatched()
  .updateExpr(
    new HashMap<String, String>() {{
      put("data", "events.data");
    }})
  .whenNotMatched()
  .insertExpr(
    new HashMap<String, String>() {{
      put("date", "updates.date");
      put("eventId", "updates.eventId");
      put("data", "updates.data");
    }})
  .execute();

有关 Scala、Java 和 Python 语法的详细信息,请查看 API 参考See the API reference for Scala, Java, and Python syntax details.

操作语义Operation semantics

下面是 merge 编程操作的详细说明。Here is a detailed description of the merge programmatic operation.

  • 可以有任意数量的 whenMatchedwhenNotMatched 子句。There can be any number of whenMatched and whenNotMatched clauses.

    备注

    在 Databricks Runtime 7.2 及更低版本中,merge 最多可以有 2 个 whenMatched 子句和最多 1 个 whenNotMatched 子句。In Databricks Runtime 7.2 and below, merge can have at most 2 whenMatched clauses and at most 1 whenNotMatched clause.

  • 当源行根据匹配条件与目标表行匹配时,将执行 whenMatched 子句。whenMatched clauses are executed when a source row matches a target table row based on the match condition. 这些子句具有以下语义。These clauses have the following semantics.

    • whenMatched 子句最多可以有 1 个 update 和 1 个 delete 操作。whenMatched clauses can have at most on update and one delete action. merge 中的 update 操作只更新匹配目标行的指定列(类似于 update 操作)。The update action in merge only updates the specified columns (similar to the update operation) of the matched target row. delete 操作删除匹配的行。The delete action deletes the matched row.

    • 每个 whenMatched 子句都可以有一个可选条件。Each whenMatched clause can have an optional condition. 如果存在此子句条件,则仅当该子句条件成立时,才对任何匹配的源-目标行对行执行 updatedelete 操作。If this clause condition exists, the update or delete action is executed for any matching source-target row pair row only when when the clause condition is true.

    • 如果有多个 whenMatched 子句,则将按照指定的顺序对其进行求值(即,子句的顺序很重要)。If there are multiple whenMatched clauses, then they are evaluated in order they are specified (that is, the order of the clauses matter). 除最后一个之外,所有 whenMatched 子句都必须具有条件。All whenMatched clauses, except the last one, must have conditions.

    • 如果 2 个 whenMatched 子句都具有条件,并且对于匹配的源-目标行对都没有条件成立,那么匹配的目标行将保持不变。If both whenMatched clauses have conditions and neither of the conditions are true for a matching source-target row pair, then the matched target row is left unchanged.

    • 若要使用源数据集的相应列更新目标 Delta 表的所有列,请使用 whenMatched(...).updateAll()To update all the columns of the target Delta table with the corresponding columns of the source dataset, use whenMatched(...).updateAll(). 这等效于:This is equivalent to:

      whenMatched(...).updateExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
      

      针对目标 Delta 表的所有列。for all the columns of the target Delta table. 因此,此操作假定源表的列与目标表的列相同,否则查询将引发分析错误。Therefore, this action assumes that the source table has the same columns as those in the target table, otherwise the query throws an analysis error.

      备注

      启用自动架构迁移后,此行为将更改。This behavior changes when automatic schema migration is enabled. 有关详细信息,请参阅自动架构演变See Automatic schema evolution for details.

  • 当源行根据匹配条件与任何目标行都不匹配时,将执行 whenNotMatched 子句。whenNotMatched clauses are executed when a source rows does not match any target row based on the match condition. 这些子句具有以下语义。These clauses have the following semantics.

    • whenNotMatched 子句只能具有 insert 操作。whenNotMatched clauses can have only the insert action. 新行是基于指定的列和相应的表达式生成的。The new row is generated based on the specified column and corresponding expressions. 你无需指定目标表中的所有列。You do not need to specify all the columns in the target table. 对于未指定的目标列,将插入 NULLFor unspecified target columns, NULL is inserted.

      备注

      在 Databricks Runtime 6.5 及更低版本中,必须为 INSERT 操作提供目标表中的所有列。In Databricks Runtime 6.5 and below, you must provide all the columns in the target table for the INSERT action.

    • 每个 whenNotMatched 子句都可以有一个可选条件。Each whenNotMatched clause can have an optional condition. 如果存在子句条件,则仅当源条件对该行成立时才插入该行。If the clause condition is present, a source row is inserted only if that condition is true for that row. 否则,将忽略源列。Otherwise, the source column is ignored.

    • 如果有多个 whenNotMatched 子句,则将按照指定的顺序对其进行求值(即,子句的顺序很重要)。If there are multiple whenNotMatched clauses, then they are evaluated in order they are specified (that is, the order of the clauses matter). 除最后一个之外,所有 whenNotMatched 子句都必须具有条件。All whenNotMatched clauses, except the last one, must have conditions.

    • 若要使用源数据集的相应列插入目标 Delta 表的所有列,请使用 whenNotMatched(...).insertAll()To insert all the columns of the target Delta table with the corresponding columns of the source dataset, use whenNotMatched(...).insertAll(). 这等效于:This is equivalent to:

      whenNotMatched(...).insertExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
      

      针对目标 Delta 表的所有列。for all the columns of the target Delta table. 因此,此操作假定源表的列与目标表的列相同,否则查询将引发分析错误。Therefore, this action assumes that the source table has the same columns as those in the target table, otherwise the query throws an analysis error.

      备注

      启用自动架构迁移后,此行为将更改。This behavior changes when automatic schema migration is enabled. 有关详细信息,请参阅自动架构演变See Automatic schema evolution for details.

重要

如果源数据集的多行匹配并尝试更新目标 Delta 表的相同行,则 merge 操作可能会失败。A merge operation can fail if multiple rows of the source dataset match and attempt to update the same rows of the target Delta table. 根据合并的 SQL 语义,这种更新操作模棱两可,因为尚不清楚应使用哪个源行来更新匹配的目标行。According to the SQL semantics of merge, such an update operation is ambiguous as it is unclear which source row should be used to update the matched target row. 你可以预处理源表来消除出现多个匹配项的可能性。You can preprocess the source table to eliminate the possibility of multiple matches. 请参阅变更数据捕获示例 - 它对变更数据集(即源数据集)进行预处理,以仅保留每键的最新更改,然后再将更改应用到目标 Delta 表中。See the Change data capture example—it preprocesses the change dataset (that is, the source dataset) to retain only the latest change for each key before applying that change into the target Delta table.

备注

在 Databricks Runtime 7.3 中,无条件删除匹配项时允许多个匹配项(因为即使有多个匹配项,无条件删除也非常明确)。In Databricks Runtime 7.3, multiple matches are allowed when matches are unconditionally deleted (since unconditional delete is not ambiguous even if there are multiple matches).

架构验证Schema validation

merge 自动验证通过插入和更新表达式生成的数据的架构是否与表的架构兼容。merge automatically validates that the schema of the data generated by insert and update expressions are compatible with the schema of the table. 它使用以下规则来确定 merge 操作是否兼容:It uses the following rules to determine whether the merge operation is compatible:

  • 对于 updateinsert 操作,指定的目标列必须存在于目标 Delta 表中。For update and insert actions, the specified target columns must exist in the target Delta table.
  • 对于 updateAllinsertAll 操作,源数据集必须具有目标 Delta 表的所有列。For updateAll and insertAll actions, the source dataset must have all the columns of the target Delta table. 源数据集可以包含额外的列,它们将被忽略。The source dataset can have extra columns and they are ignored.
  • 对于所有操作,如果由生成目标列的表达式生成的数据类型与目标 Delta 表中的对应列不同,则 merge 会尝试将其转换为表中的类型。For all actions, if the data type generated by the expressions producing the target columns are different from the corresponding columns in the target Delta table, merge tries to cast them to the types in the table.

自动架构演变 Automatic schema evolution

备注

merge 中的架构演变在 Databricks Runtime 6.6 及更高版本中可用。Schema evolution in merge is available in Databricks Runtime 6.6 and above.

默认情况下,updateAllinsertAll 使用来自源数据集的同名列来分配目标 Delta 表中的所有列。By default, updateAll and insertAll assign all the columns in the target Delta table with columns of the same name from the source dataset. 而忽略源数据集中与目标表中的列不匹配的任何列。Any columns in the source dataset that don’t match columns in the target table are ignored. 但是,在某些用例中,需要自动将源列添加到目标 Delta 表中。However, in some use cases, it is desirable to automatically add source columns to the target Delta table. 若要在使用 updateAllinsertAll(至少其中一个)执行 merge 操作期间自动更新表架构,可以在运行 merge 操作之前将 Spark 会话配置 spark.databricks.delta.schema.autoMerge.enabled 设置为 trueTo automatically update the table schema during a merge operation with updateAll and insertAll (at least one of them), you can set the Spark session configuration spark.databricks.delta.schema.autoMerge.enabled to true before running the merge operation.

备注

  • 架构演变仅在存在 updateAllinsertAll 操作时(或者两者都存在时)才发生。Schema evolution occurs only when there is either an updateAll or an insertAll action, or both.
  • 在合并中的架构演变期间,仅顶级列(即非嵌套字段)会被更改。Only top-level columns (that is, not nested fields) are altered during schema evolution in merge.
  • updateinsert 操作不能显式引用目标表中尚不存在的目标列(即使有 updateAllinsertAll 作为子句之一)。update and insert actions cannot explicitly refer to target columns that do not already exist in the target table (even it there are updateAll or insertAll as one of the clauses). 请参下面的示例。See the examples below.

以下示例展示了在有架构演变和没有架构演变的情况下 merge 操作的效果。Here are a few examples of the effects of merge operation with and without schema evolution.

Columns 查询(在 Scala 中)Query (in Scala) 无架构演变的行为(默认值)Behavior without schema evolution (default) 有架构演变的行为Behavior with schema evolution
目标列:key, valueTarget columns: key, value

源列:key, value, newValueSource columns: key, value, newValue
targetDeltaTable.alias("t")
.merge(
sourceDataFrame.alias("s"),
"t.key = s.key")
.whenMatched().updateAll()
.whenNotMatched().insertAll()
.execute()
表架构保持不变;仅更新/插入列 keyvalueThe table schema remains unchanged; only columns key, value are updated/inserted. 表架构更改为 (key, value, newValue)The table schema is changed to (key, value, newValue). updateAll 更新列valuenewValue,而 insertAll 插入行 (key, value, newValue)updateAll updates columns value and newValue, and insertAll inserts rows (key, value, newValue).
目标列:key, oldValueTarget columns: key, oldValue

源列:key, newValueSource columns: key, newValue
targetDeltaTable.alias("t")
.merge(
sourceDataFrame.alias("s"),
"t.key = s.key")
.whenMatched().updateAll()
.whenNotMatched().insertAll()
.execute()
updateAllinsertAll 操作会引发错误,因为目标列 oldValue 不在源中。updateAll and insertAll actions throw an error because the target column oldValue is not in the source. 表架构更改为 (key, oldValue, newValue)The table schema is changed to (key, oldValue, newValue). updateAll 更新列 keynewValue,并保留 oldValue 不变,而 insertAll 插入行 (key, NULL, newValue)(即,将 oldValue 作为 NULL 值插入)。updateAll updates columns key and newValue leaving oldValue unchanged, and insertAll inserts rows (key, NULL, newValue) (that is, oldValue is inserted as NULL).
目标列:key, oldValueTarget columns: key, oldValue

源列:key, newValueSource columns: key, newValue
targetDeltaTable.alias("t")
.merge(
sourceDataFrame.alias("s"),
"t.key = s.key")
.whenMatched().update(Map(
"newValue" -> col("s.newValue")))
.whenNotMatched().insertAll()
.execute()
update 引发错误,因为目标表中不存在列 newValueupdate throws an error because column newValue does not exist in the target table. update 仍然引发错误,因为目标表中不存在列 newValueupdate still throws an error because column newValue does not exist in the target table.
目标列:key, oldValueTarget columns: key, oldValue

源列:key, newValueSource columns: key, newValue
targetDeltaTable.alias("t")
.merge(
sourceDataFrame.alias("s"),
"t.key = s.key")
.whenMatched().updateAll()
.whenNotMatched().insert(Map(
"key" -> col("s.key"),
"newValue" -> col("s.newValue")))
.execute()
insert 引发错误,因为目标表中不存在列 newValueinsert throws an error because column newValue does not exist in the target table. insert 仍然引发错误,因为目标表中不存在列 newValueinsert still throws an error as column newValue does not exist in the target table.

性能调优Performance tuning

可以使用以下方法缩短合并所用时间:You can reduce the time taken by merge using the following approaches:

  • 缩小匹配项的搜索范围: 默认情况下,merge 操作搜索整个 Delta 表以在源表中查找匹配项。Reduce the search space for matches: By default, the merge operation searches the entire Delta table to find matches in the source table. 加速 merge 的一种方法是通过在匹配条件中添加已知约束来缩小搜索范围。One way to speed up merge is to reduce the search space by adding known constraints in the match condition. 例如,假设你有一个由 countrydate 分区的表,并且你希望使用 merge 更新最后一天和特定国家/地区的信息。For example, suppose you have a table that is partitioned by country and date and you want to use merge to update information for the last day and a specific country. 添加条件Adding the condition

    events.date = current_date() AND events.country = 'USA'
    

    将使查询更快,因为它只在相关分区中查找匹配项。will make the query faster as it looks for matches only in the relevant partitions. 此外,该方法还有助于减少与其他并发操作发生冲突的机会。Furthermore, it will also reduce the chances of conflicts with other concurrent operations. 有关详细信息,请参阅并发控制See Concurrency control for more details.

  • 压缩文件: 如果数据存储在许多小文件中,则读取数据来搜索匹配项可能会变慢。Compact files: If the data is stored in many small files, reading the data to search for matches can become slow. 可以将小文件压缩为更大的文件,以提高读取吞吐量。You can compact small files into larger files to improve read throughput. 有关详细信息,请参阅压缩文件See Compact files for details.

  • 控制写入的无序分区: merge 操作多次对数据进行随机排列以计算和写入更新的数据。Control the shuffle partitions for writes: The merge operation shuffles data multiple times to compute and write the updated data. 用于随机排列的任务的数量由 Spark 会话配置 spark.sql.shuffle.partitions 控制。The number of tasks used to shuffle is controlled by the Spark session configuration spark.sql.shuffle.partitions. 设置此参数不仅可以控制并行度,还可以确定输出文件的数量。Setting this parameter not only controls the parallelism but also determines the number of output files. 增大该值可提高并行度,但也会生成大量较小的数据文件。Increasing the value increases parallelism but also generates a larger number of smaller data files.

  • 启用优化写入: 对于已分区表,merge 生成的小文件数量远大于随机分区的数量。Enable optimized writes: For partitioned tables, merge can produce a much larger number of small files than the number of shuffle partitions. 这是因为每个随机任务都可以在多个分区中写入多个文件,并可能成为性能瓶颈。This is because every shuffle task can write multiple files in multiple partitions, and can become a performance bottleneck. 你可以通过启用优化写入来优化这一点。You can optimize this by enabling Optimized Writes.

合并操作示例Merge examples

下面是一些关于如何在不同场景中使用 merge 的示例。Here are a few examples on how to use merge in different scenarios.

本节内容:In this section:

写入 Delta 表时进行重复数据删除 Data deduplication when writing into Delta tables

一个常见的 ETL 用例是通过将日志附加到表中来将其收集到 Delta 表中。A common ETL use case is to collect logs into Delta table by appending them to a table. 但是,源通常可以生成重复的日志记录,因此需要下游重复数据删除步骤来处理它们。However, often the sources can generate duplicate log records and downstream deduplication steps are needed to take care of them. 通过 merge,你可以避免插入重复记录。With merge, you can avoid inserting the duplicate records.

SQLSQL

MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId
WHEN NOT MATCHED
  THEN INSERT *

PythonPython

deltaTable.alias("logs").merge(
    newDedupedLogs.alias("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId") \
  .whenNotMatchedInsertAll() \
  .execute()

ScalaScala

deltaTable
  .as("logs")
  .merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId")
  .whenNotMatched()
  .insertAll()
  .execute()

JavaJava

deltaTable
  .as("logs")
  .merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId")
  .whenNotMatched()
  .insertAll()
  .execute();

备注

包含新日志的数据集需要在其内部进行重复数据删除。The dataset containing the new logs needs to be deduplicated within itself. 根据合并的 SQL 语义,该数据集会将新数据与表中的现有数据进行匹配并删除重复数据,但如果新数据集中存在重复数据,则将插入。By the SQL semantics of merge, it matches and deduplicates the new data with the existing data in the table, but if there is duplicate data within the new dataset, it is inserted. 因此,在合并到表之前,请对新数据进行重复数据删除。Hence, deduplicate the new data before merging into the table.

如果你知道几天之内可能会得到重复记录,则可以通过按日期对表进行分区,然后指定要匹配的目标表的日期范围来进一步优化查询。If you know that you may get duplicate records only for a few days, you can optimized your query further by partitioning the table by date, and then specifying the date range of the target table to match on.

SQLSQL

MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS
WHEN NOT MATCHED AND newDedupedLogs.date > current_date() - INTERVAL 7 DAYS
  THEN INSERT *

PythonPython

deltaTable.alias("logs").merge(
    newDedupedLogs.alias("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS") \
  .whenNotMatchedInsertAll("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS") \
  .execute()

ScalaScala

deltaTable.as("logs").merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
  .whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
  .insertAll()
  .execute()

JavaJava

deltaTable.as("logs").merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
  .whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
  .insertAll()
  .execute();

这种方法比使用前面的命令更有效,因为它仅在日志的最后 7 天而不是整个表中查找重复项。This is more efficient than the previous command as it looks for duplicates only in the last 7 days of logs, not the entire table. 此外,你还可以将此 insert-only merge 与结构化流式处理一起使用,以执行日志的连续重复数据删除。Furthermore, you can use this insert-only merge with Structured Streaming to perform continuous deduplication of the logs.

  • 在流式处理查询中,可以使用 foreachBatch 中的 merge 操作将具有重复数据删除功能的所有流数据连续写入 Delta 表。In a streaming query, you can use merge operation in foreachBatch to continuously write any streaming data to a Delta table with deduplication. 请参阅以下流式处理查询示例,了解有关 foreachBatch 的详细信息。See the following streaming example for more information on foreachBatch.
  • 在另一个流式处理查询中,你可以从此 Delta 表中连续读取重复数据删除的数据。In another streaming query, you can continuously read deduplicated data from this Delta table. 这是可能的,因为 insert-only merge 仅将新数据附加到 Delta 表。This is possible because an insert-only merge only appends new data to the Delta table.

备注

Insert-only merge 经过优化后,仅在 Databricks Runtime 6.2 及更高版本中追加数据。Insert-only merge is optimized to only append data in Databricks Runtime 6.2 and above. 在 Databricks Runtime 6.1 及更低版本中,insert-only merge 操作的写入不能作为流读取。In Databricks Runtime 6.1 and below, writes from insert-only merge operations cannot be read as a stream.

将数据 (SCD) Type 2 操作渐变到 Delta 表 Slowly changing data (SCD) Type 2 operation into Delta tables

另一个常见的操作是 SCD Type 2,它维护维度表中每个键所做的所有更改的历史记录。Another common operation is SCD Type 2, which maintains history of all changes made to each key in a dimensional table. 此类操作需要更新现有行以将键的以前值标记为旧值,并将新行作为最新值插入。Such operations require updating existing rows to mark previous values of keys as old, and the inserting the new rows as the latest values. 给定一个包含更新的源表和包含维数据的目标表,SCD Type 2 可以用 merge 表示。Given a source table with updates and the target table with the dimensional data, SCD Type 2 can be expressed with merge.

下面是一个维护客户地址历史记录以及每个地址的有效日期范围的具体示例。Here is a concrete example of maintaining the history of addresses for a customer along with the active date range of each address. 如果需要更新客户的地址,则必须将先前的地址标记为不是当前地址,更新其有效日期范围,然后将新地址添加为当前地址。When a customer’s address needs to be updated, you have to mark the previous address as not the current one, update its active date range, and add the new address as the current one.

使用 merge 笔记本的 SCD Type 2SCD Type 2 using merge notebook

获取笔记本Get notebook

将更改数据写入 Delta 表 Write change data into a Delta table

与 SCD 相似,另一种常见用例是将从外部数据库生成的所有数据更改应用于 Delta 表,这通常称为变更数据捕获 (CDC)。Similar to SCD, another common use case, often called change data capture (CDC), is to apply all data changes generated from an external database into a Delta table. 换句话说,需要将一组应用于外部表的更新、删除和插入操作应用于 Delta 表。In other words, a set of updates, deletes, and inserts applied to an external table needs to be applied to a Delta table. 可以通过使用 merge 来执行此操作,如下所示。You can do this using merge as follows.

使用 MERGE 笔记本写入更改数据Write change data using MERGE notebook

获取笔记本Get notebook

使用 foreachBatch 从流式处理查询进行更新插入 Upsert from streaming queries using foreachBatch

可以使用 mergeforeachBatch 的组合(有关详细信息,请参阅 foreachbatch)将复杂的更新插入操作从流式处理查询写入 Delta 表。You can use a combination of merge and foreachBatch (see foreachbatch for more information) to write complex upserts from a streaming query into a Delta table. 例如: 。For example:

  • 以更新模式写入流式处理聚合: 这比完成模式要有效得多。Write streaming aggregates in Update Mode: This is much more efficient than Complete Mode.
  • 将数据库更改流写入 Delta 表: 用于写入变更数据 合并查询可以在 foreachBatch 中使用,以连续将变更流应用于 Delta 表。Write a stream of database changes into a Delta table: The merge query for writing change data can be used in foreachBatch to continuously apply a stream of changes to a Delta table.
  • 使用重复数据删除将流数据写入 Delta 表: 用于重复数据删除的 insert-only merge 查询可以在 foreachBatch 中使用自动重复数据删除功能将数据(带有重复项)连续写入到 Delta 表中。Write a stream data into Delta table with deduplication: The insert-only merge query for deduplication can be used in foreachBatch to continuously write data (with duplicates) to a Delta table with automatic deduplication.

备注

  • 请确保 foreachBatch 中的 merge 语句是幂等的,因为重启流式处理查询可以将操作多次应用于同一批数据。Make sure that your merge statement inside foreachBatch is idempotent as restarts of the streaming query can apply the operation on the same batch of data multiple times.
  • foreachBatch 中使用 merge 时,流式处理查询的输入数据速率(通过 StreamingQueryProgress 报告并在笔记本计算机速率图中可见)可以报告为源处生成数据的实际速率的倍数。When merge is used in foreachBatch, the input data rate of the streaming query (reported through StreamingQueryProgress and visible in the notebook rate graph) may be reported as a multiple of the actual rate at which data is generated at the source. 这是因为 merge 多次读取输入数据,导致输入指标倍增。This is because merge reads the input data multiple times causing the input metrics to be multiplied. 如果这是一个瓶颈,则可以在 merge 之前缓存批处理 DataFrame,然后在 merge 之后取消缓存。If this is a bottleneck, you can cache the batch DataFrame before merge and then uncache it after merge.

使用 merge 和 foreachBatch 笔记本在更新模式下写入流式处理聚合Write streaming aggregates in update mode using merge and foreachBatch notebook

获取笔记本Get notebook