并发控制Concurrency control

Delta Lake 提供读写之间的 ACID 事务保证。Delta Lake provides ACID transaction guarantees between reads and writes. 这表示:This means that:

  • 跨多个集群的多个作者可以同时修改一个表分区,并查看表的一致性快照视图,这些写入操作将有一定的顺序。Multiple writers across multiple clusters can simultaneously modify a table partition and see a consistent snapshot view of the table and there will be a serial order for these writes.
  • 读者可以继续查看 Azure Databricks 作业开始时使用的表的一致性快照视图,即使在作业过程中修改了某个表也是如此。Readers continue to see a consistent snapshot view of the table that the Azure Databricks job started with, even when a table is modified during a job.

乐观并发控制Optimistic concurrency control

Delta Lake 使用乐观并发控制在写入之间提供事务保证。Delta Lake uses optimistic concurrency control to provide transactional guarantees between writes. 在此机制下,写入操作分为三个阶段:Under this mechanism, writes operate in three stages:

  1. 读取 :读取(如果需要)表的最新可用版本,以标识需要修改(即重写)的文件。Read : Reads (if needed) the latest available version of the table to identify which files need to be modified (that is, rewritten).
  2. 写入 :通过写入新的数据文件来暂存所有更改。Write : Stages all the changes by writing new data files.
  3. 验证和提交 :在提交更改之前,检查建议的更改是否与自读取快照以来可能已同时提交的任何其他更改冲突。Validate and commit : Before committing the changes, checks whether the proposed changes conflict with any other changes that may have been concurrently committed since the snapshot that was read. 如果没有冲突,则所有暂存更改都将提交为新版本的快照,并且写操作成功。If there are no conflicts, all the staged changes are committed as a new versioned snapshot, and the write operation succeeds. 但是,如果存在冲突,写操作将失败,并出现并发修改异常,而不是像在 Parquet 表上进行写操作那样损坏表。However, if there are conflicts, the write operation fails with a concurrent modification exception rather than corrupting the table as would happen with the write operation on a Parquet table.

表的隔离级别定义必须从并发操作所作修改中隔离事务的程度。The isolation level of a table defines the degree to which a transaction must be isolated from modifications made by concurrent operations. 有关 Delta Lake 在 Azure Databricks 上支持的隔离级别的详细信息,请参阅隔离级别For information on the isolation levels supported by Delta Lake on Azure Databricks, see Isolation levels.

写冲突Write conflicts

下表说明了在每个隔离级别可能出现冲突的操作对。The following table describes which pairs of write operations can conflict in each isolation level.

INSERTINSERT UPDATE, DELETE, MERGE INTOUPDATE, DELETE, MERGE INTO OPTIMIZEOPTIMIZE
INSERTINSERT 不会出现冲突Cannot conflict
UPDATE, DELETE, MERGE INTOUPDATE, DELETE, MERGE INTO 在 Serializable 中可能出现冲突,在 WriteSerializable 中不会出现冲突Can conflict in Serializable, cannot conflict in WriteSerializable 在 Serializable 和 WriteSerializable 中都可能出现冲突Can conflict in Serializable and WriteSerializable
OPTIMIZEOPTIMIZE 不会出现冲突Cannot conflict 在 Serializable 和 WriteSerializable 中都可能出现冲突Can conflict in Serializable and WriteSerializable 在 Serializable 和 WriteSerializable 中都可能出现冲突Can conflict in Serializable and WriteSerializable

使用分区和非连续命令条件来避免冲突Avoid conflicts using partitioning and disjoint command conditions

在所有标记为“可能出现冲突”的情况下,这两个操作是否会冲突取决于它们是否对同一组文件进行操作。In all cases marked “can conflict”, whether the two operations will conflict depends on whether they operate on the same set of files. 通过将表分区为与操作条件中使用的列相同的列,可以使两组文件不相交。You can make the two sets of files disjoint by partitioning the table by the same columns as those used in the conditions of the operations. 例如,如果未按日期对表进行分区,则两个命令 UPDATE table WHERE date > '2010-01-01' ...DELETE table WHERE date < '2010-01-01' 将冲突,因为两者都可以尝试修改同一组文件。For example, the two commands UPDATE table WHERE date > '2010-01-01' ... and DELETE table WHERE date < '2010-01-01' will conflict if the table is not partitioned by date, as both can attempt to modify the same set of files. date 对表进行分区就可以避免此冲突。Partitioning the table by date will avoid the conflict. 因此,根据命令上常用的条件对表进行分区可以显著减少冲突。Hence, partitioning a table according to the conditions commonly used on the command can reduce conflicts significantly. 但是,由于存在大量子目录,因此按包含高基数的列对表进行分区可能会导致其他性能问题。However, partitioning a table by a column that has high cardinality can lead to other performance issues due to large number of subdirectories.

冲突异常Conflict exceptions

发生事务冲突时,你将观察到以下异常之一:When a transaction conflict occurs, you will observe one of the following exceptions:

ConcurrentAppendExceptionConcurrentAppendException

当并发操作在操作读取的同一分区(或未分区表中的任何位置)中添加文件时,会发生此异常。This exception occurs when a concurrent operation adds files in the same partition (or anywhere in an unpartitioned table) that your operation reads. 文件添加操作可能是由 INSERTDELETEUPDATEMERGE 操作引起的。The file additions can be caused by INSERT, DELETE, UPDATE, or MERGE operations.

在默认隔离级别WriteSerializable 的情况下,通过盲目的 INSERT 操作(即,盲目追加数据而不读取任何数据的操作)添加的文件不会与任何操作冲突,即使它们接触相同的分区(或未分区表中的任何位置)也是如此。With the default isolation level of WriteSerializable, files added by blind INSERT operations (that is, operations that blindly append data without reading any data) do not conflict with any operation, even if they touch the same partition (or anywhere in an unpartitioned table). 如果隔离级别设置为 Serializable,则盲目追加可能会产生冲突。If the isolation level is set to Serializable, then blind appends may conflict.

通常执行 DELETEUPDATEMERGE 并发操作时会引发此异常。This exception is often thrown during concurrent DELETE, UPDATE, or MERGE operations. 尽管并发操作可能会物理上更新不同的分区目录,但其中一个可能会读取另一个分区同时更新的同一分区,从而导致冲突。While the concurrent operations may be physically updating different partition directories, one of them may read the same partition that the other one concurrently updates, thus causing a conflict. 可以通过在操作条件中设置显式隔离来避免这种情况。You can avoid this by making the separation explicit in the operation condition. 请看下面的示例。Consider the following example.

// Target 'deltaTable' is partitioned by date and country
deltaTable.as("t").merge(
    source.as("s"),
    "s.user_id = t.user_id AND s.date = t.date AND s.country = t.country")
  .whenMatched().updateAll()
  .whenNotMatched().insertAll()
  .execute()

假设你在不同的日期或国家/地区同时运行上述代码。Suppose you run the above code concurrently for different dates or countries. 由于每个作业都在目标 Delta 表上的独立分区上运行,因此你不会遇到任何冲突。Since each job is working on an independent partition on the target Delta table, you don’t expect any conflicts. 但是,该条件不够明确,可能会扫描整个表,并且可能与更新任何其他分区的并发操作冲突。However, the condition is not explicit enough and can scan the entire table and can conflict with concurrent operations updating any other partitions. 相反,你可以重写语句以将特定日期和国家/地区添加到合并条件中,如以下示例所示。Instead, you can rewrite your statement to add specific date and country to the merge condition, as shown in the following example.

// Target 'deltaTable' is partitioned by date and country
deltaTable.as("t").merge(
    source.as("s"),
    "s.user_id = t.user_id AND s.date = t.date AND s.country = t.country AND t.date = '" + <date> + "' AND t.country = '" + <country> + "'")
  .whenMatched().updateAll()
  .whenNotMatched().insertAll()
  .execute()

现在可以安全地在不同日期和国家/地区同时运行此操作。This operation is now safe to run concurrently on different dates and countries.

ConcurrentDeleteReadExceptionConcurrentDeleteReadException

如果某个并发操作删除了你的操作读取的文件,则会发生此异常。This exception occurs when a concurrent operation deleted a file that your operation read. 常见的原因是,DELETEUPDATEMERGE 操作导致了重写文件。Common causes are a DELETE, UPDATE, or MERGE operation that rewrites files.

ConcurrentDeleteDeleteExceptionConcurrentDeleteDeleteException

如果某个并发操作删除了你的操作也删除的文件,则会发生此异常。This exception occurs when a concurrent operation deleted a file that your operation also deletes. 这可能是由于两个并发 操作重写相同的文件引起的。This could be caused by two concurrent operations rewriting the same files.

MetadataChangedExceptionMetadataChangedException

当并发事务更新 Delta 表的元数据时,将发生此异常。This exception occurs when a concurrent transaction updates the metadata of a Delta table. 常见原因是进行 ALTER TABLE 操作或写入 Delta 表以更新表的架构。Common causes are ALTER TABLE operations or writes to your Delta table that update the schema of the table.

ConcurrentTransactionExceptionConcurrentTransactionException

如果使用同一检查点位置的流式处理查询同时启动多次,并尝试同时写入 Delta 表。If a streaming query using the same checkpoint location is started multiple times concurrently and tries to write to the Delta table at the same time. 请勿让两个流式处理查询使用相同的检查点位置并同时运行。You should never have two streaming queries use the same checkpoint location and run at the same time.

ProtocolChangedExceptionProtocolChangedException

当你的 Delta 表升级到新版本时,就会发生这种情况。This occurs when your Delta table is upgraded to a new version. 为了使将来的操作成功,你可能需要升级 Delta Lake 版本。For future operations to succeed you may need to upgrade your Delta Lake version.

有关详细信息,请参阅表版本控制See Table versioning for more details.