使用 Delta Lake 表历史记录

修改 Delta Lake 表的每个操作都会创建新的表版本。 可以使用历史记录信息来审核操作、回滚表或查询特定时间点的表(使用按时间顺序查看)。

注意

Databricks 不建议使用 Delta Lake 表历史记录作为数据存档的长期备份解决方案。 Databricks 建议仅使用过去 7 天进行“按时间顺序查看”操作,除非你已将数据和日志保留配置设置为更大的值。

检索 Delta 表历史记录

可以通过运行 history 命令来检索信息,包括每次将内容写入 Delta 表时对应的操作、用户和时间戳。 按时间倒序返回返回操作。

表历史记录保留期取决于表设置 delta.logRetentionDuration,后者默认为 30 天。

注意

按时间顺序查看和表历史记录由不同的保留期阈值控制。 请参阅什么是 Delta Lake 按时间顺序查看?

DESCRIBE HISTORY '/data/events/'          -- get the full history of the table

DESCRIBE HISTORY delta.`/data/events/`

DESCRIBE HISTORY '/data/events/' LIMIT 1  -- get the last operation only

DESCRIBE HISTORY eventsTable

有关 Spark SQL 语法的详细信息,请参阅 DESCRIBE HISTORY

请参阅 Delta Lake API 文档,了解 Scala/Java/Python 语法详细信息。

目录资源管理器提供此详细表信息的视觉对象视图和 Delta 表的历史记录。 除了表架构和示例数据之外,你还可以单击“历史记录”选项卡以查看随 DESCRIBE HISTORY 一起显示的表历史记录。

历史记录架构

history 操作的输出包含以下列。

类型​​ 说明
版本 long 通过操作生成的表版本。
timestamp timestamp 提交此版本的时间。
userId 字符串 运行操作的用户的 ID。
userName 字符串 运行操作的用户的姓名。
operation 字符串 操作的名称。
operationParameters map 操作的参数(例如谓词。)
作业 (job) struct 运行操作的作业的详细信息。
笔记本 struct 运行操作的笔记本的详细信息。
clusterId 字符串 运行操作的群集的 ID。
readVersion long 读取以执行写入操作的表的版本。
isolationLevel 字符串 用于此操作的隔离级别。
isBlindAppend boolean 此操作是否追加数据。
operationMetrics map 操作的指标(例如已修改的行数和文件数。)
userMetadata 字符串 用户定义的提交元数据(如果已指定)
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
|version|          timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion|   isolationLevel|isBlindAppend|    operationMetrics|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
|      5|2019-07-29 14:07:47|   ###|     ###|   DELETE|[predicate -> ["(...|null|     ###|      ###|          4|WriteSerializable|        false|[numTotalRows -> ...|
|      4|2019-07-29 14:07:41|   ###|     ###|   UPDATE|[predicate -> (id...|null|     ###|      ###|          3|WriteSerializable|        false|[numTotalRows -> ...|
|      3|2019-07-29 14:07:29|   ###|     ###|   DELETE|[predicate -> ["(...|null|     ###|      ###|          2|WriteSerializable|        false|[numTotalRows -> ...|
|      2|2019-07-29 14:06:56|   ###|     ###|   UPDATE|[predicate -> (id...|null|     ###|      ###|          1|WriteSerializable|        false|[numTotalRows -> ...|
|      1|2019-07-29 14:04:31|   ###|     ###|   DELETE|[predicate -> ["(...|null|     ###|      ###|          0|WriteSerializable|        false|[numTotalRows -> ...|
|      0|2019-07-29 14:01:40|   ###|     ###|    WRITE|[mode -> ErrorIfE...|null|     ###|      ###|       null|WriteSerializable|         true|[numFiles -> 2, n...|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+

注意

操作指标说明

history 操作返回 operationMetrics 列映射中操作指标的集合。

下表按操作列出了映射键定义。

操作 指标名称 说明
WRITE、CREATE TABLE AS SELECT、REPLACE TABLE AS SELECT、COPY INTO
numFiles 写入的文件数。
numOutputBytes 已写入的内容的大小(以字节为单位)。
numOutputRows 写入的行数。
STREAMING UPDATE
numAddedFiles 添加的文件数。
numRemovedFiles 删除的文件数。
numOutputRows 写入的行数。
numOutputBytes 写入大小(以字节为单位)。
DELETE
numAddedFiles 添加的文件数。 删除表的分区时未提供。
numRemovedFiles 删除的文件数。
numDeletedRows 删除的行数。 删除表的分区时未提供。
numCopiedRows 在删除文件期间复制的行数。
executionTimeMs 执行整个操作所用的时间。
scanTimeMs 扫描文件来查找匹配项所用的时间。
rewriteTimeMs 重写匹配文件所用的时间。
TRUNCATE
numRemovedFiles 删除的文件数。
executionTimeMs 执行整个操作所用的时间。
MERGE
numSourceRows 源数据帧中的行数。
numTargetRowsInserted 插入到目标表的行数。
numTargetRowsUpdated 目标表中更新的行数。
numTargetRowsDeleted 目标表中删除的行数。
numTargetRowsCopied 复制的目标行数。
numOutputRows 写出的总行数。
numTargetFilesAdded 添加到接收器(目标)的文件数。
numTargetFilesRemoved 从接收器(目标)删除的文件数。
executionTimeMs 执行整个操作所用的时间。
scanTimeMs 扫描文件来查找匹配项所用的时间。
rewriteTimeMs 重写匹配文件所用的时间。
UPDATE
numAddedFiles 添加的文件数。
numRemovedFiles 删除的文件数。
numUpdatedRows 更新的行数。
numCopiedRows 刚才在更新文件期间复制的行数。
executionTimeMs 执行整个操作所用的时间。
scanTimeMs 扫描文件来查找匹配项所用的时间。
rewriteTimeMs 重写匹配文件所用的时间。
FSCK numRemovedFiles 删除的文件数。
CONVERT numConvertedFiles 已转换的 Parquet 文件数。
OPTIMIZE
numAddedFiles 添加的文件数。
numRemovedFiles 优化的文件数。
numAddedBytes 优化表后添加的字节数。
numRemovedBytes 删除的字节数。
minFileSize 优化表后最小文件的大小。
p25FileSize 优化表后第 25 个百分位文件的大小。
p50FileSize 优化表后的文件大小中值。
p75FileSize 优化表后第 75 个百分位文件的大小。
maxFileSize 优化表后最大文件的大小。
克隆
sourceTableSize 所克隆版本的源表的大小(以字节为单位)。
sourceNumOfFiles 源表中已克隆版本的文件数。
numRemovedFiles 目标表中删除的文件数(如果替换了先前的 Delta 表)。
removedFilesSize 如果替换了先前的 Delta 表,则为目标表中删除文件的总大小(以字节为单位)。
numCopiedFiles 复制到新位置的文件数。 如果是浅表克隆,则为 0。
copiedFilesSize 复制到新位置的文件总大小(以字节为单位)。 如果是浅表克隆,则为 0。
RESTORE
tableSizeAfterRestore 还原后的表大小(字节)。
numOfFilesAfterRestore 还原后表中的文件数。
numRemovedFiles 还原操作删除的文件数。
numRestoredFiles 由于还原而添加的文件数。
removedFilesSize 还原操作删除的文件的大小(字节)。
restoredFilesSize 还原操作添加的文件的大小(字节)。
VACUUM
numDeletedFiles 已删除的文件数。
numVacuumedDirectories 已清空的目录数。
numFilesToDelete 要删除的文件数。

什么是 Delta Lake 按时间顺序查看?

Delta Lake 按时间顺序查看支持根据时间戳或表版本(正如事务日志中记录的那样)查询以前的表版本。 可以对应用程序使用“按时间顺序查看”,如下所述:

  • 重新创建分析、报表或输出(例如,机器学习模型的输出)。 这对于调试或审核非常有用,尤其是在管控行业中。
  • 编写复杂的时态查询。
  • 修复数据中的错误。
  • 为针对快速变化表的一组查询提供快照隔离。

重要

可通过“按时间顺序查看”访问的表版本取决于事务日志文件的保留期阈值以及 VACUUM 操作的频率和指定保留期。 如果使用默认值每天运行 VACUUM,则 7 天的数据可用于“按时间顺序查看”。

Delta 按时间顺序查看语法

可以通过在表名规范后添加子句,来使用“按时间顺序查看”查询 Delta 表。

  • timestamp_expression 可以是下列项中的任意一项:
    • '2018-10-18T22:15:12.013Z',即可以强制转换为时间戳的字符串
    • cast('2018-10-18 13:36:32 CEST' as timestamp)
    • '2018-10-18',即日期字符串
    • current_timestamp() - interval 12 hours
    • date_sub(current_date(), 1)
    • 本身就是时间戳或可强制转换为时间戳的任何其他表达式
  • version 是可以从 DESCRIBE HISTORY table_spec 的输出中获取的 long 值。

timestamp_expressionversion 都不能是子查询。

只接受日期或时间戳字符串。 例如,"2019-01-01""2019-01-01T00:00:00.000Z"。 有关示例语法,请参阅以下代码:

SQL

SELECT * FROM people10m TIMESTAMP AS OF '2018-10-18T22:15:12.013Z'
SELECT * FROM delta.`/tmp/delta/people10m` VERSION AS OF 123

Python

df1 = spark.read.option("timestampAsOf", "2019-01-01").table("people10m")
df2 = spark.read.option("versionAsOf", 123).load("/tmp/delta/people10m")

还可以使用 @ 语法将时间戳或版本指定为表名称的一部分。 时间戳必须采用 yyyyMMddHHmmssSSS 格式。 你可以通过在版本前附加一个 v@ 后指定版本。 有关示例语法,请参阅以下代码:

SQL

SELECT * FROM people10m@20190101000000000
SELECT * FROM people10m@v123

Python

spark.read.table("people10m@20190101000000000")
spark.read.table("people10m@v123")

spark.read.load("/tmp/delta/people10m@20190101000000000")
spark.read.load("/tmp/delta/people10m@v123")

什么是事务日志检查点?

Delta Lake 将表版本记录为 _delta_log 目录中的 JSON 文件,该目录与表数据一起存储。 为了优化检查点查询,Delta Lake 将表版本聚合到 Parquet 检查点文件,因此无需读取表历史记录的所有 JSON 版本。 Azure Databricks 针对数据大小和工作负荷优化检查点操作频率。 用户应该不需要直接与检查点交互。 检查点频率可能会更改,恕不另行通知。

为“按时间顺序查看”查询配置数据保留

若要查询以前的表版本,必须同时保留该版本的日志文件和数据文件。

针对表运行 VACUUM 时,会删除数据文件。 Delta Lake 会在对表版本执行检查点操作后自动管理日志文件删除操作。

由于会针对大多数 Delta 表定期运行 VACUUM,因此时间点查询应认可 VACUUM 的保留期阈值(默认为 7 天)。

若要提高 Delta 表的数据保留期阈值,必须配置以下表属性:

  • delta.logRetentionDuration = "interval <interval>":控制表的历史记录的保留时间长度。 默认为 interval 30 days
  • delta.deletedFileRetentionDuration = "interval <interval>":确定由 VACUUM 用来删除当前表版本中不再引用的数据文件的阈值。 默认为 interval 7 days

可以在创建表期间指定 Delta 属性,也可使用 ALTER TABLE 语句设置它们。 请参阅 Delta 表属性参考

注意

必须设置这两个属性,以确保对于频繁执行 VACUUM 操作的表,表历史记录可以保留较长时间。 例如,若要访问 30 天的历史数据,请设置 delta.deletedFileRetentionDuration = "interval 30 days"(与 delta.logRetentionDuration 的默认设置匹配)。

随着维护的数据文件的增多,提高数据保留期阈值可能会导致存储成本上升。

将 Delta 表还原到早期状态

你可以使用 RESTORE 命令将 Delta 表还原到其以前的状态。 Delta 表在内部维护该表的历史版本,使其能够还原到以前的状态。 RESTORE 命令支持使用一个与早期状态相对应的版本作为选项,或支持使用一个表明早期状态何时创建的时间戳作为选项。

重要

  • 可以还原已经还原的表。
  • 可以还原克隆的表。
  • 对于要还原的表,你必须拥有 MODIFY 权限。
  • 不能将表还原为通过手动方式或 vacuum 方式删除数据文件的旧版本。 如果将 spark.sql.files.ignoreMissingFiles 设置为 true,则仍可部分还原为此版本。
  • 用于还原到早期状态的时间戳格式是 yyyy-MM-dd HH:mm:ss。 还支持仅提供 date(yyyy-MM-dd) 字符串。
RESTORE TABLE db.target_table TO VERSION AS OF <version>
RESTORE TABLE delta.`/data/target/` TO TIMESTAMP AS OF <timestamp>

有关语法详细信息,请参阅 RESTORE

重要

还原被视为一种数据更改操作。 RESTORE 命令添加的 Delta Lake 日志条目包含设置为 true 的 dataChange。 如果存在下游应用程序,例如用于处理对 Delta Lake 表的更新的结构化流式处理作业,则还原操作添加的数据更改日志条目将被视为新的数据更新,处理这些更新可能导致重复数据。

例如:

表版本 操作 增量日志更新 数据更改日志更新中的记录
0 INSERT AddFile(/path/to/file-1, dataChange = true) (name = Viktor, age = 29, (name = George, age = 55)
1 INSERT AddFile(/path/to/file-2, dataChange = true) (name = George, age = 39)
2 OPTIMIZE AddFile(/path/to/file-3, dataChange = false), RemoveFile(/path/to/file-1), RemoveFile(/path/to/file-2) (不会生成记录,因为“优化压缩”不会更改表中的数据)
3 RESTORE(version=1) RemoveFile(/path/to/file-3), AddFile(/path/to/file-1, dataChange = true), AddFile(/path/to/file-2, dataChange = true) (name = Viktor, age = 29), (name = George, age = 55), (name = George, age = 39)

在上面的示例中,RESTORE 命令导致在读取增量表版本 0 和 1 时已经可以看到更新。 如果流式处理查询当时正在读取此表,则这些文件将被视为新添加的数据,因此会再次对其进行处理。

还原指标

RESTORE 在操作完成后以单行数据帧的形式报告以下指标:

  • table_size_after_restore:还原后表的大小。

  • num_of_files_after_restore:还原后表中的文件数。

  • num_removed_files:从表中删除(逻辑删除)的文件数。

  • num_restored_files:由于回退而还原的文件数。

  • removed_files_size:从表中删除的文件的总大小(以字节为单位)。

  • restored_files_size:已还原的文件的总大小(以字节为单位)。

    还原指标示例

使用 Delta Lake 按时间顺序查看的示例

  • 为用户 111 修复对表的意外删除问题:

    INSERT INTO my_table
      SELECT * FROM my_table TIMESTAMP AS OF date_sub(current_date(), 1)
      WHERE userId = 111
    
  • 修复对表的意外错误更新:

    MERGE INTO my_table target
      USING my_table TIMESTAMP AS OF date_sub(current_date(), 1) source
      ON source.userId = target.userId
      WHEN MATCHED THEN UPDATE SET *
    
  • 查询在过去一周内增加的新客户的数量。

    SELECT count(distinct userId)
    FROM my_table  - (
      SELECT count(distinct userId)
      FROM my_table TIMESTAMP AS OF date_sub(current_date(), 7))
    

如何在 Spark 会话中找到上次提交的版本?

若要获取当前 SparkSession 在所有线程和所有表中写入的最后一个提交的版本号,请查询 SQL 配置 spark.databricks.delta.lastCommitVersionInSession

SQL

SET spark.databricks.delta.lastCommitVersionInSession

Python

spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")

Scala

spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")

如果 SparkSession 未进行任何提交,则查询该键时将返回空值。

注意

如果在多个线程之间共享相同的 SparkSession,则类似于在多个线程之间共享变量;你可能会达到争用条件,因为配置值会同时更新。