修改 Delta Lake 表的每个操作都会创建新的表版本。 可以使用历史记录信息来审核操作、回滚表或查询特定时间点的表(使用按时间顺序查看)。
注意
Databricks 不建议使用 Delta Lake 表历史记录作为数据存档的长期备份解决方案。 Databricks 建议仅使用过去 7 天进行“按时间顺序查看”操作,除非你已将数据和日志保留配置设置为更大的值。
检索 Delta 表历史记录
可以通过运行 history 命令来检索信息,包括每次将内容写入 Delta 表时对应的操作、用户和时间戳。 按时间倒序返回返回操作。
表历史记录保留期取决于表设置 delta.logRetentionDuration,后者默认为 30 天。
注意
按时间顺序查看和表历史记录由不同的保留期阈值控制。 请参阅什么是 Delta Lake 按时间顺序查看?。
DESCRIBE HISTORY table_name       -- get the full history of the table
DESCRIBE HISTORY table_name LIMIT 1  -- get the last operation only
有关 Spark SQL 语法的详细信息,请参阅 DESCRIBE HISTORY。
请参阅 Delta Lake API 文档,了解 Scala/Java/Python 语法详细信息。
目录资源管理器提供此详细表信息的视觉对象视图和 Delta 表的历史记录。 除了表架构和示例数据之外,你还可以单击“历史记录”选项卡以查看随 DESCRIBE HISTORY 一起显示的表历史记录。
历史记录架构
history 操作的输出包含以下列。
| 列 | 类型 | 说明 | 
|---|---|---|
| 版本 | long | 通过操作生成的表版本。 | 
| timestamp | timestamp | 提交此版本的时间。 | 
| userId | 字符串 | 运行操作的用户的 ID。 | 
| userName | 字符串 | 运行操作的用户的姓名。 | 
| operation | 字符串 | 操作的名称。 | 
| operationParameters | map | 操作的参数(例如谓词。) | 
| 作业 (job) | struct | 运行操作的作业的详细信息。 | 
| 笔记本 | struct | 运行操作的笔记本的详细信息。 | 
| clusterId | 字符串 | 运行操作的群集的 ID。 | 
| readVersion | long | 读取以执行写入操作的表的版本。 | 
| isolationLevel | 字符串 | 用于此操作的隔离级别。 | 
| isBlindAppend | boolean | 此操作是否追加数据。 | 
| operationMetrics | map | 操作的指标(例如已修改的行数和文件数。) | 
| userMetadata | 字符串 | 用户定义的提交元数据(如果已指定) | 
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
|version|          timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion|   isolationLevel|isBlindAppend|    operationMetrics|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
|      5|2019-07-29 14:07:47|   ###|     ###|   DELETE|[predicate -> ["(...|null|     ###|      ###|          4|WriteSerializable|        false|[numTotalRows -> ...|
|      4|2019-07-29 14:07:41|   ###|     ###|   UPDATE|[predicate -> (id...|null|     ###|      ###|          3|WriteSerializable|        false|[numTotalRows -> ...|
|      3|2019-07-29 14:07:29|   ###|     ###|   DELETE|[predicate -> ["(...|null|     ###|      ###|          2|WriteSerializable|        false|[numTotalRows -> ...|
|      2|2019-07-29 14:06:56|   ###|     ###|   UPDATE|[predicate -> (id...|null|     ###|      ###|          1|WriteSerializable|        false|[numTotalRows -> ...|
|      1|2019-07-29 14:04:31|   ###|     ###|   DELETE|[predicate -> ["(...|null|     ###|      ###|          0|WriteSerializable|        false|[numTotalRows -> ...|
|      0|2019-07-29 14:01:40|   ###|     ###|    WRITE|[mode -> ErrorIfE...|null|     ###|      ###|       null|WriteSerializable|         true|[numFiles -> 2, n...|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
注意
- 如果使用以下方法写入 Delta 表,则其他一些列不可用:
- 将来添加的列将始终添加到最后一列的后面。
操作指标说明
history 操作返回 operationMetrics 列映射中操作指标的集合。
下表按操作列出了映射键定义。
| 操作 | 指标名称 | 说明 | 
|---|---|---|
| WRITE、CREATE TABLE AS SELECT、REPLACE TABLE AS SELECT、COPY INTO | ||
| numFiles | 写入的文件数。 | |
| numOutputBytes | 已写入的内容的大小(以字节为单位)。 | |
| numOutputRows | 写入的行数。 | |
| STREAMING UPDATE | ||
| numAddedFiles | 添加的文件数。 | |
| numRemovedFiles | 删除的文件数。 | |
| numOutputRows | 写入的行数。 | |
| numOutputBytes | 写入大小(以字节为单位)。 | |
| DELETE | ||
| numAddedFiles | 添加的文件数。 删除表的分区时未提供。 | |
| numRemovedFiles | 删除的文件数。 | |
| numDeletedRows | 删除的行数。 删除表的分区时未提供。 | |
| numCopiedRows | 在删除文件期间复制的行数。 | |
| executionTimeMs | 执行整个操作所用的时间。 | |
| scanTimeMs | 扫描文件来查找匹配项所用的时间。 | |
| rewriteTimeMs | 重写匹配文件所用的时间。 | |
| TRUNCATE | ||
| numRemovedFiles | 删除的文件数。 | |
| executionTimeMs | 执行整个操作所用的时间。 | |
| MERGE | ||
| numSourceRows | 源数据帧中的行数。 | |
| numTargetRowsInserted | 插入到目标表的行数。 | |
| numTargetRowsUpdated | 目标表中更新的行数。 | |
| numTargetRowsDeleted | 目标表中删除的行数。 | |
| numTargetRowsCopied | 复制的目标行数。 | |
| numOutputRows | 写出的总行数。 | |
| numTargetFilesAdded | 添加到接收器(目标)的文件数。 | |
| numTargetFilesRemoved | 从接收器(目标)删除的文件数。 | |
| executionTimeMs | 执行整个操作所用的时间。 | |
| scanTimeMs | 扫描文件来查找匹配项所用的时间。 | |
| rewriteTimeMs | 重写匹配文件所用的时间。 | |
| UPDATE | ||
| numAddedFiles | 添加的文件数。 | |
| numRemovedFiles | 删除的文件数。 | |
| numUpdatedRows | 更新的行数。 | |
| numCopiedRows | 刚才在更新文件期间复制的行数。 | |
| executionTimeMs | 执行整个操作所用的时间。 | |
| scanTimeMs | 扫描文件来查找匹配项所用的时间。 | |
| rewriteTimeMs | 重写匹配文件所用的时间。 | |
| FSCK | numRemovedFiles | 删除的文件数。 | 
| CONVERT | numConvertedFiles | 已转换的 Parquet 文件数。 | 
| OPTIMIZE | ||
| numAddedFiles | 添加的文件数。 | |
| numRemovedFiles | 优化的文件数。 | |
| numAddedBytes | 优化表后添加的字节数。 | |
| numRemovedBytes | 删除的字节数。 | |
| minFileSize | 优化表后最小文件的大小。 | |
| p25FileSize | 优化表后第 25 个百分位文件的大小。 | |
| p50FileSize | 优化表后的文件大小中值。 | |
| p75FileSize | 优化表后第 75 个百分位文件的大小。 | |
| maxFileSize | 优化表后最大文件的大小。 | |
| 克隆 | ||
| sourceTableSize | 所克隆版本的源表的大小(以字节为单位)。 | |
| sourceNumOfFiles | 源表中已克隆版本的文件数。 | |
| numRemovedFiles | 目标表中删除的文件数(如果替换了先前的 Delta 表)。 | |
| removedFilesSize | 如果替换了先前的 Delta 表,则为目标表中删除文件的总大小(以字节为单位)。 | |
| numCopiedFiles | 复制到新位置的文件数。 如果是浅表克隆,则为 0。 | |
| copiedFilesSize | 复制到新位置的文件总大小(以字节为单位)。 如果是浅表克隆,则为 0。 | |
| RESTORE | ||
| tableSizeAfterRestore | 还原后的表大小(字节)。 | |
| numOfFilesAfterRestore | 还原后表中的文件数。 | |
| numRemovedFiles | 还原操作删除的文件数。 | |
| numRestoredFiles | 由于还原而添加的文件数。 | |
| removedFilesSize | 还原操作删除的文件的大小(字节)。 | |
| restoredFilesSize | 还原操作添加的文件的大小(字节)。 | |
| VACUUM | ||
| numDeletedFiles | 已删除的文件数。 | |
| numVacuumedDirectories | 已清空的目录数。 | |
| numFilesToDelete | 要删除的文件数。 | 
什么是 Delta Lake 按时间顺序查看?
Delta Lake 按时间顺序查看支持根据时间戳或表版本(正如事务日志中记录的那样)查询以前的表版本。 可以对应用程序使用“按时间顺序查看”,如下所述:
- 重新创建分析、报表或输出(例如,机器学习模型的输出)。 这对于调试或审核非常有用,尤其是在管控行业中。
- 编写复杂的时态查询。
- 修复数据中的错误。
- 为针对快速变化表的一组查询提供快照隔离。
重要
可通过“按时间顺序查看”访问的表版本取决于事务日志文件的保留期阈值以及 VACUUM 操作的频率和指定保留期。 如果使用默认值每天运行 VACUUM,则 7 天的数据可用于“按时间顺序查看”。
Delta 按时间顺序查看语法
可以通过在表名规范后添加子句,来使用“按时间顺序查看”查询 Delta 表。
- timestamp_expression可以是下列项中的任意一项:- '2018-10-18T22:15:12.013Z',即可以强制转换为时间戳的字符串
- cast('2018-10-18 13:36:32 CEST' as timestamp)
- '2018-10-18',即日期字符串
- current_timestamp() - interval 12 hours
- date_sub(current_date(), 1)
- 本身就是时间戳或可强制转换为时间戳的任何其他表达式
 
- version是可以从- DESCRIBE HISTORY table_spec的输出中获取的 long 值。
timestamp_expression 和 version 都不能是子查询。
只接受日期或时间戳字符串。 例如,"2019-01-01" 和 "2019-01-01T00:00:00.000Z"。 有关示例语法,请参阅以下代码:
SQL
SELECT * FROM people10m TIMESTAMP AS OF '2018-10-18T22:15:12.013Z';
SELECT * FROM people10m VERSION AS OF 123;
Python
df1 = spark.read.option("timestampAsOf", "2019-01-01").table("people10m")
df2 = spark.read.option("versionAsOf", 123).table("people10m")
还可以使用 @ 语法将时间戳或版本指定为表名称的一部分。 时间戳必须采用 yyyyMMddHHmmssSSS 格式。 你可以通过在版本前附加一个 v 在 @ 后指定版本。 有关示例语法,请参阅以下代码:
SQL
SELECT * FROM people10m@20190101000000000
SELECT * FROM people10m@v123
Python
spark.read.table("people10m@20190101000000000")
spark.read.table("people10m@v123")
什么是事务日志检查点?
Delta Lake 将表版本记录为 _delta_log 目录中的 JSON 文件,该目录与表数据一起存储。 为了优化检查点查询,Delta Lake 将表版本聚合到 Parquet 检查点文件,因此无需读取表历史记录的所有 JSON 版本。 Azure Databricks 针对数据大小和工作负荷优化检查点操作频率。 用户应该不需要直接与检查点交互。 检查点频率可能会更改,恕不另行通知。
为“按时间顺序查看”查询配置数据保留
若要查询以前的表版本,必须同时保留该版本的日志文件和数据文件。
针对表运行 VACUUM 时,会删除数据文件。 Delta Lake 会在对表版本执行检查点操作后自动管理日志文件删除操作。
由于会针对大多数 Delta 表定期运行 VACUUM,因此时间点查询应认可 VACUUM 的保留期阈值(默认为 7 天)。
若要提高 Delta 表的数据保留期阈值,必须配置以下表属性:
- delta.logRetentionDuration = "interval <interval>":控制表的历史记录的保留时间长度。 默认为- interval 30 days。
- delta.deletedFileRetentionDuration = "interval <interval>":确定由- VACUUM用来删除当前表版本中不再引用的数据文件的阈值。 默认为- interval 7 days。
可以在创建表期间指定 Delta 属性,也可使用 ALTER TABLE 语句设置它们。 请参阅 Delta 表属性参考。
注意
必须设置这两个属性,以确保对于频繁执行 VACUUM 操作的表,表历史记录可以保留较长时间。 例如,若要访问 30 天的历史数据,请设置 delta.deletedFileRetentionDuration = "interval 30 days"(与 delta.logRetentionDuration 的默认设置匹配)。
随着维护的数据文件的增多,提高数据保留期阈值可能会导致存储成本上升。
将 Delta 表还原到早期状态
你可以使用 RESTORE 命令将 Delta 表还原到其以前的状态。 Delta 表在内部维护该表的历史版本,使其能够还原到以前的状态。
RESTORE 命令支持使用一个与早期状态相对应的版本作为选项,或支持使用一个表明早期状态何时创建的时间戳作为选项。
重要
- 可以还原已经还原的表。
- 可以还原克隆的表。
- 对于要还原的表,你必须拥有 MODIFY权限。
- 不能将表还原为通过手动方式或 vacuum方式删除数据文件的旧版本。 如果将spark.sql.files.ignoreMissingFiles设置为true,则仍可部分还原为此版本。
- 用于还原到早期状态的时间戳格式是 yyyy-MM-dd HH:mm:ss。 还支持仅提供 date(yyyy-MM-dd) 字符串。
RESTORE TABLE target_table TO VERSION AS OF <version>;
RESTORE TABLE target_table TO TIMESTAMP AS OF <timestamp>;
有关语法详细信息,请参阅 RESTORE。
重要
还原被视为一种数据更改操作。 RESTORE 命令添加的 Delta Lake 日志条目包含设置为 true 的 dataChange。 如果存在下游应用程序,例如用于处理对 Delta Lake 表的更新的结构化流式处理作业,则还原操作添加的数据更改日志条目将被视为新的数据更新,处理这些更新可能导致重复数据。
例如:
| 表版本 | 操作 | 增量日志更新 | 数据更改日志更新中的记录 | 
|---|---|---|---|
| 0 | INSERT | AddFile(/path/to/file-1, dataChange = true) | (name = Viktor, age = 29, (name = George, age = 55) | 
| 1 | INSERT | AddFile(/path/to/file-2, dataChange = true) | (name = George, age = 39) | 
| 2 | OPTIMIZE | AddFile(/path/to/file-3, dataChange = false), RemoveFile(/path/to/file-1), RemoveFile(/path/to/file-2) | (不会生成记录,因为“优化压缩”不会更改表中的数据) | 
| 3 | RESTORE(version=1) | RemoveFile(/path/to/file-3), AddFile(/path/to/file-1, dataChange = true), AddFile(/path/to/file-2, dataChange = true) | (name = Viktor, age = 29), (name = George, age = 55), (name = George, age = 39) | 
在上面的示例中,RESTORE 命令导致在读取增量表版本 0 和 1 时已经可以看到更新。 如果流式处理查询当时正在读取此表,则这些文件将被视为新添加的数据,因此会再次对其进行处理。
还原指标
RESTORE 在操作完成后以单行数据帧的形式报告以下指标:
- table_size_after_restore:还原后表的大小。
- num_of_files_after_restore:还原后表中的文件数。
- num_removed_files:从表中删除(逻辑删除)的文件数。
- num_restored_files:由于回退而还原的文件数。
- removed_files_size:从表中删除的文件的总大小(以字节为单位)。
- restored_files_size:已还原的文件的总大小(以字节为单位)。 
使用 Delta Lake 按时间顺序查看的示例
- 为用户 - 111修复对表的意外删除问题:- INSERT INTO my_table SELECT * FROM my_table TIMESTAMP AS OF date_sub(current_date(), 1) WHERE userId = 111
- 修复对表的意外错误更新: - MERGE INTO my_table target USING my_table TIMESTAMP AS OF date_sub(current_date(), 1) source ON source.userId = target.userId WHEN MATCHED THEN UPDATE SET *
- 查询在过去一周内增加的新客户的数量。 - SELECT count(distinct userId) FROM my_table - ( SELECT count(distinct userId) FROM my_table TIMESTAMP AS OF date_sub(current_date(), 7))
如何在 Spark 会话中找到上次提交的版本?
若要获取当前 SparkSession 在所有线程和所有表中写入的最后一个提交的版本号,请查询 SQL 配置 spark.databricks.delta.lastCommitVersionInSession。
SQL
SET spark.databricks.delta.lastCommitVersionInSession
Python
spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")
Scala
spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")
如果 SparkSession 未进行任何提交,则查询该键时将返回空值。
注意
如果在多个线程之间共享相同的 SparkSession,则类似于在多个线程之间共享变量;你可能会达到争用条件,因为配置值会同时更新。