修改表的每个操作都会创建一个新的表版本。 使用历史记录信息审核操作、回滚表或使用时间旅行在特定时间点查询表。
注意
Databricks 不建议将表历史记录用作数据存档的长期备份解决方案。 除非您已将数据和日志保留配置设置为更大的值,否则仅使用过去 7 天用于时间旅行操作。
检索表历史记录
通过运行 history 命令,检索有关每次表写入的操作、用户和时间戳等信息。 按时间倒序返回返回操作。
表历史记录保留期取决于表设置 logRetentionDuration,后者默认为 30 天。
注意
按时间顺序查看和表历史记录由不同的保留期阈值控制。 请参阅什么是时间旅行?
DESCRIBE HISTORY table_name -- get the full history of the table
DESCRIBE HISTORY table_name LIMIT 1 -- get the last operation only
有关 Spark SQL 语法的详细信息,请参阅 DESCRIBE HISTORY。
请参阅 Delta Lake API 文档,了解 Scala/Java/Python 语法详细信息。
目录资源管理器 提供此详细表信息和历史记录的可视视图。 除了表架构和示例数据之外,你还可以单击“历史记录”选项卡以查看随 一起显示的表历史记录。
历史记录架构
history 操作的输出包含以下列。
| 列 | 类型 | 说明 |
|---|---|---|
| 版本 | long | 通过操作生成的表版本。 |
| 时间戳 | 时间戳 | 提交此版本的时间。 |
| userId | 字符串 | 运行操作的用户的 ID。 |
| userName | 字符串 | 运行操作的用户的姓名。 |
| 操作 | 字符串 | 操作的名称。 |
| operationParameters | 映射 | 操作的参数(例如谓词。) |
| 作业 | 结构 | 运行操作的作业的详细信息。 |
| 笔记本 | 结构 | 运行操作的笔记本的详细信息。 |
| clusterId | 字符串 | 运行操作的群集的 ID。 |
| readVersion | long | 读取以执行写入操作的表的版本。 |
| isolationLevel | 字符串 | 用于此操作的隔离级别。 |
| isBlindAppend | boolean | 此操作是否追加数据。 |
| operationMetrics | 映射 | 操作的指标(例如已修改的行数和文件数。) |
| userMetadata | 字符串 | 用户定义的提交元数据(如果已指定) |
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
|version| timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion| isolationLevel|isBlindAppend| operationMetrics|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
| 5|2019-07-29 14:07:47| ###| ###| DELETE|[predicate -> ["(...|null| ###| ###| 4|WriteSerializable| false|[numTotalRows -> ...|
| 4|2019-07-29 14:07:41| ###| ###| UPDATE|[predicate -> (id...|null| ###| ###| 3|WriteSerializable| false|[numTotalRows -> ...|
| 3|2019-07-29 14:07:29| ###| ###| DELETE|[predicate -> ["(...|null| ###| ###| 2|WriteSerializable| false|[numTotalRows -> ...|
| 2|2019-07-29 14:06:56| ###| ###| UPDATE|[predicate -> (id...|null| ###| ###| 1|WriteSerializable| false|[numTotalRows -> ...|
| 1|2019-07-29 14:04:31| ###| ###| DELETE|[predicate -> ["(...|null| ###| ###| 0|WriteSerializable| false|[numTotalRows -> ...|
| 0|2019-07-29 14:01:40| ###| ###| WRITE|[mode -> ErrorIfE...|null| ###| ###| null|WriteSerializable| true|[numFiles -> 2, n...|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
注意
- 如果使用以下方法写入表中,其他一些列不可用:
- 将来添加的列将始终添加到最后一列的后面。
了解 partitionBy 在操作参数中的应用
该 partitionBy 字段仅在定义或更改表分区架构的 CREATE 和 OVERWRITE 操作中才有意义。
对于现有表(APPEND、INSERT、UPDATE、DELETE、MERGE)的追加作,此字段可能会显示空数组 [] 或分区列,具体取决于所使用的写入方法(.save() vs .saveAsTable())。 此不一致是预期行为,不应用于验证写入。
重要
不要依赖历史记录中的 partitionBy 来验证追加操作。 该值因实现详细信息而异,但不会影响将数据写入分区的方式。
Example
请考虑按 date 列分区的表:
# Initial table creation - partitionBy is populated
df.write.format("delta") \
.partitionBy("date") \
.saveAsTable("sales_data")
历史记录中显示的 CREATE操作为:
operationParameters: {
"mode": "ErrorIfExists",
"partitionBy": "[\"date\"]"
}
将数据追加到此表时:
# Subsequent append - partitionBy shows empty
new_df.write.format("delta") \
.mode("append") \
.saveAsTable("sales_data")
APPEND操作显示:
operationParameters: {
"mode": "Append",
"partitionBy": "[]"
}
partitionBy 的空值是预期的。 数据仍会根据表的现有分区架构写入正确的分区。 请注意,.save() 路径可能在此字段中显示分区列,但此差异属于实现细节,不会影响写入行为。
操作指标
history 操作返回 operationMetrics 列映射中操作指标的集合。
下表按操作列出了映射键定义。
| 操作 | 指标名称 | 说明 |
|---|---|---|
| WRITE、CREATE TABLE AS SELECT、REPLACE TABLE AS SELECT、COPY INTO | ||
| numFiles | 写入的文件数。 | |
| numOutputBytes | 已写入的内容的大小(以字节为单位)。 | |
| numOutputRows | 写入的行数。 | |
| 流媒体更新 | ||
| numAddedFiles | 添加的文件数。 | |
| numRemovedFiles | 删除的文件数。 | |
| numOutputRows | 写入的行数。 | |
| numOutputBytes | 写入大小(以字节为单位)。 | |
| DELETE | ||
| numAddedFiles | 添加的文件数。 删除表的分区时未提供。 | |
| numRemovedFiles | 删除的文件数。 | |
| numDeletedRows | 删除的行数。 删除表的分区时未提供。 | |
| numCopiedRows | 在删除文件期间复制的行数。 | |
| executionTimeMs | 执行整个操作所用的时间。 | |
| scanTimeMs | 扫描文件来查找匹配项所用的时间。 | |
| rewriteTimeMs | 重写匹配文件所用的时间。 | |
| TRUNCATE | ||
| numRemovedFiles | 删除的文件数。 | |
| executionTimeMs | 执行整个操作所用的时间。 | |
| MERGE | ||
| numSourceRows | 源数据帧中的行数。 | |
| numTargetRowsInserted | 插入到目标表的行数。 | |
| numTargetRowsUpdated | 目标表中更新的行数。 | |
| numTargetRowsDeleted | 目标表中删除的行数。 | |
| numTargetRowsCopied | 复制的目标行数。 | |
| numOutputRows | 写出的总行数。 | |
| numTargetFilesAdded | 添加到接收器(目标)的文件数。 | |
| numTargetFilesRemoved | 从接收器(目标)删除的文件数。 | |
| executionTimeMs | 执行整个操作所用的时间。 | |
| scanTimeMs | 扫描文件来查找匹配项所用的时间。 | |
| rewriteTimeMs | 重写匹配文件所用的时间。 | |
| UPDATE | ||
| numAddedFiles | 添加的文件数。 | |
| numRemovedFiles | 删除的文件数。 | |
| numUpdatedRows | 更新的行数。 | |
| numCopiedRows | 刚才在更新文件期间复制的行数。 | |
| executionTimeMs | 执行整个操作所用的时间。 | |
| scanTimeMs | 扫描文件来查找匹配项所用的时间。 | |
| rewriteTimeMs | 重写匹配文件所用的时间。 | |
| FSCK | numRemovedFiles | 删除的文件数。 |
| CONVERT | numConvertedFiles | 已转换的 Parquet 文件数。 |
| OPTIMIZE | ||
| numAddedFiles | 添加的文件数。 | |
| numRemovedFiles | 优化的文件数。 | |
| numAddedBytes | 优化表后添加的字节数。 | |
| numRemovedBytes | 删除的字节数。 | |
| minFileSize | 优化表后最小文件的大小。 | |
| p25FileSize | 优化表后第 25 个百分位文件的大小。 | |
| p50FileSize | 优化表后的文件大小中值。 | |
| p75FileSize | 优化表后第 75 个百分位文件的大小。 | |
| maxFileSize | 优化表后最大文件的大小。 | |
| 克隆 | ||
| sourceTableSize | 所克隆版本的源表的大小(以字节为单位)。 | |
| sourceNumOfFiles | 源表中已克隆版本的文件数。 | |
| numRemovedFiles | 如果替换了上一个表,则从目标表中删除的文件数。 | |
| removedFilesSize | 如果替换了上一个表,则从目标表中删除的文件的总大小(以字节为单位)。 | |
| numCopiedFiles | 复制到新位置的文件数。 如果是浅表克隆,则为 0。 | |
| copiedFilesSize | 复制到新位置的文件总大小(以字节为单位)。 如果是浅表克隆,则为 0。 | |
| RESTORE | ||
| tableSizeAfterRestore | 还原后的表大小(字节)。 | |
| numOfFilesAfterRestore | 还原后表中的文件数。 | |
| numRemovedFiles | 还原操作删除的文件数。 | |
| numRestoredFiles | 由于还原而添加的文件数。 | |
| removedFilesSize | 还原操作删除的文件的大小(字节)。 | |
| restoredFilesSize | 还原操作添加的文件的大小(字节)。 | |
| VACUUM | ||
| numDeletedFiles | 已删除的文件数。 | |
| numVacuumedDirectories | 已清空的目录数。 | |
| numFilesToDelete | 要删除的文件数。 |
什么是时间旅行?
时间旅行支持根据时间戳或表版本(如事务日志中记录)查询以前的表版本。 可以对应用程序使用“按时间顺序查看”,如下所述:
- 重新创建分析、报表或输出(例如,机器学习模型的输出)。 这对于调试或审核非常有用,尤其是在管控行业中。
- 编写复杂的时态查询。
- 修复数据中的错误。
- 为针对快速变化表的一组查询提供快照隔离。
重要
在 Databricks Runtime 18.0 及更高版本中,如果请求的版本早于 deletedFileRetentionDuration 表属性(默认 7 天),将阻止时间旅行查询。 对于 Unity 目录托管表,这适用于 Databricks Runtime 12.2 及更高版本。
时间旅行语法
通过在表名定义之后添加子句来查询支持时间旅行功能的表。
-
timestamp_expression可以是下列项中的任意一项:-
'2018-10-18T22:15:12.013Z',即可以强制转换为时间戳的字符串 cast('2018-10-18 13:36:32 CEST' as timestamp)-
'2018-10-18',即日期字符串 current_timestamp() - interval 12 hoursdate_sub(current_date(), 1)- 本身就是时间戳或可强制转换为时间戳的任何其他表达式
-
-
version是可以从DESCRIBE HISTORY table_spec的输出中获取的 long 值。
timestamp_expression 和 version 都不能是子查询。
只接受日期或时间戳字符串。 例如,"2019-01-01" 和 "2019-01-01T00:00:00.000Z"。 有关示例语法,请参阅以下代码:
SQL
SELECT * FROM people10m TIMESTAMP AS OF '2018-10-18T22:15:12.013Z';
SELECT * FROM people10m VERSION AS OF 123;
Python
df1 = spark.read.option("timestampAsOf", "2019-01-01").table("people10m")
df2 = spark.read.option("versionAsOf", 123).table("people10m")
还可以使用 @ 语法将时间戳或版本指定为表名称的一部分。 时间戳必须采用 yyyyMMddHHmmssSSS 格式。 你可以通过在版本前附加一个 @ 在 v 后指定版本。 有关示例语法,请参阅以下代码:
SQL
SELECT * FROM people10m@20190101000000000
SELECT * FROM people10m@v123
Python
spark.read.table("people10m@20190101000000000")
spark.read.table("people10m@v123")
什么是事务日志检查点?
表版本记录为事务日志目录中的 JSON 文件,该目录与表数据一起存储。 为了优化检查点查询,表版本会聚合到 Parquet 检查点文件,从而防止需要读取表历史记录的所有 JSON 版本。 Azure Databricks 针对数据大小和工作负荷优化检查点操作频率。 用户应该不需要直接与检查点交互。 检查点频率可能会更改,恕不另行通知。
为“按时间顺序查看”查询配置数据保留
若要查询以前的表版本,必须同时保留该版本的日志文件和数据文件。
针对表运行 VACUUM 时,会删除数据文件。 日志文件删除在检查点表版本后自动进行管理。
由于大多数表 VACUUM 都定期针对它们运行,因此时间点查询应遵循保留阈值 VACUUM,默认情况下为 7 天。
若要增加表的数据保留阈值,必须配置下表属性:
-
delta.logRetentionDuration = "interval <interval>":控制表的历史记录的保留时间长度。 默认为interval 30 days。 -
delta.deletedFileRetentionDuration = "interval <interval>":确定由VACUUM用来删除当前表版本中不再引用的数据文件的阈值。 默认为interval 7 days。
可以在创建表期间指定表属性,也可以使用语句设置它们 ALTER TABLE 。 请参阅 表属性参考。
注意
在 Databricks Runtime 18.0 及更高版本中, logRetentionDuration 必须大于或等于 deletedFileRetentionDuration。 对于 Unity 目录托管表,这适用于 Databricks Runtime 12.2 及更高版本。
若要访问 30 天的历史数据,请设置 delta.deletedFileRetentionDuration = "interval 30 days" (与默认设置 delta.logRetentionDuration匹配)。
随着维护的数据文件的增多,提高数据保留期阈值可能会导致存储成本上升。
将表还原到早期状态
可以使用命令将表还原到其早期状态 RESTORE 。 表内部维护历史版本,使它们能够还原到先前状态。
RESTORE 命令支持使用一个与早期状态相对应的版本作为选项,或支持使用一个表明早期状态何时创建的时间戳作为选项。
重要
- 可以还原已经还原的表。
- 可以还原克隆的表。
- 对于要还原的表,你必须拥有
MODIFY权限。 - 不能将表还原为通过手动方式或
vacuum方式删除数据文件的旧版本。 如果将spark.sql.files.ignoreMissingFiles设置为true,则仍可部分还原为此版本。 - 用于还原到早期状态的时间戳格式是
yyyy-MM-dd HH:mm:ss。 还支持仅提供 date(yyyy-MM-dd) 字符串。
RESTORE TABLE target_table TO VERSION AS OF <version>;
RESTORE TABLE target_table TO TIMESTAMP AS OF <timestamp>;
有关语法详细信息,请参阅 RESTORE。
重要
还原被视为一种数据更改操作。 命令添加的 RESTORE 日志条目包含 dataChange 设置为 true。 如果存在下游应用程序(例如处理表更新的 结构化流式处理 作业),则还原作添加的数据更改日志条目被视为新数据更新,处理它们可能会导致重复数据。
例如:
| 表版本 | 操作 | 日志更新 | 数据更改日志更新中的记录 |
|---|---|---|---|
| 0 | INSERT | AddFile(/path/to/file-1,dataChange = true) | (姓名 = Viktor,年龄 = 29,(姓名 = George,年龄 = 55) |
| 1 | INSERT | AddFile(/path/to/file-2,dataChange = true) | (名称 = 乔治, 年龄 = 39) |
| 2 | OPTIMIZE | AddFile(/path/to/file-3,dataChange = false),RemoveFile(/path/to/file-1),RemoveFile(/path/to/file-2) | (不会生成记录,因为“优化压缩”不会更改表中的数据) |
| 3 | RESTORE(version=1) | RemoveFile(/path/to/file-3)、AddFile(/path/to/file-1、dataChange = true)、AddFile(/path/to/file-2、dataChange = true) | (姓名 = Viktor, 年龄 = 29), (姓名 = George, 年龄 = 55), (姓名 = George, 年龄 = 39) |
在前面的示例中,该 RESTORE 命令会导致更新,这些更新已在读取表的版本 0 和 1 时被观察到。 如果流式处理查询当时正在读取此表,则这些文件将被视为新添加的数据,因此会再次对其进行处理。
还原指标
RESTORE 在操作完成后以单行数据帧的形式报告以下指标:
table_size_after_restore:还原后表的大小。num_of_files_after_restore:还原后表中的文件数。num_removed_files:从表中删除(逻辑删除)的文件数。num_restored_files:由于回退而还原的文件数。removed_files_size:从表中删除的文件的总大小(以字节为单位)。restored_files_size:已还原的文件的总大小(以字节为单位)。
使用时间旅行的示例
为用户
111修复对表的意外删除问题:INSERT INTO my_table SELECT * FROM my_table TIMESTAMP AS OF date_sub(current_date(), 1) WHERE userId = 111修复对表的意外错误更新:
MERGE INTO my_table target USING my_table TIMESTAMP AS OF date_sub(current_date(), 1) source ON source.userId = target.userId WHEN MATCHED THEN UPDATE SET *查询在过去一周内增加的新客户的数量。
SELECT ( SELECT count(distinct userId) FROM my_table ) - ( SELECT count(distinct userId) FROM my_table TIMESTAMP AS OF date_sub(current_date(), 7) ) AS new_customers
如何在 Spark 会话中找到上次提交的版本?
若要获取当前 SparkSession 在所有线程和所有表中写入的最后一个提交的版本号,请查询 SQL 配置 spark.databricks.delta.lastCommitVersionInSession。
注意
对于 Apache Iceberg 表,请使用 spark.databricks.iceberg.lastCommitVersionInSession 而不是 spark.databricks.delta.lastCommitVersionInSession。
SQL
SET spark.databricks.delta.lastCommitVersionInSession
Python
spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")
Scala
spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")
如果 SparkSession 未进行任何提交,则查询该键时将返回空值。
注意
如果在多个线程之间共享相同的 SparkSession,则类似于在多个线程之间共享变量;你可能会达到争用条件,因为配置值会同时更新。