修改表的每个操作都会创建一个新的表版本。 使用历史记录信息审核操作、回滚表或使用时间旅行在特定时间点查询表。
注释
Databricks 不建议将表历史记录用作数据存档的长期备份解决方案。 除非您已将数据和日志保留配置设置为更大的值,否则仅使用过去 7 天用于时间旅行操作。
检索表历史记录
通过运行 history 命令,检索有关每次表写入的操作、用户和时间戳等信息。 操作按时间倒序返回。
表历史记录保留期取决于表设置 logRetentionDuration,后者默认为 30 天。
注释
时间旅行和表历史记录由不同的保留阈值控制。 请参阅什么是时间旅行?
DESCRIBE HISTORY table_name -- get the full history of the table
DESCRIBE HISTORY table_name LIMIT 1 -- get the last operation only
有关 Spark SQL 语法的详细信息,请参阅 DESCRIBE HISTORY。
请参阅 Delta Lake API 文档,了解 Scala/Java/Python 语法详细信息。
目录资源管理器 提供此详细表信息和历史记录的可视视图。 除了表架构和示例数据之外,你还可以单击“历史记录”选项卡以查看随 一起显示的表历史记录。
历史模式
history 操作的输出包含下列列。
| 列 | 类型 | Description |
|---|---|---|
| 版本 | long |
通过操作生成的表版本。 |
| 时间戳 | timestamp |
提交此版本的时间。 |
| userId | string |
运行操作的用户的 ID。 |
| userName | string |
运行操作的用户的姓名。 |
| 操作 | string |
操作的名称。 |
| 操作参数 | map |
操作的参数(例如谓词。) |
| 作业 | struct |
执行该操作的 Lakeflow 作业的详细信息。 仅对由 Lakeflow 作业写入的提交进行填充。 否则为 null。 |
| 笔记本 | struct |
执行该操作所用的 Databricks 笔记本的详细信息。 仅对从 Databricks 笔记本中创建的提交填充此字段。 否则为 null。 |
| clusterId | string |
运行操作的群集的 ID。 |
| 读取版本 | long |
读取以执行写入操作的表的版本。 |
| isolationLevel | string |
用于此操作的隔离级别。 |
| isBlindAppend | boolean |
此操作是否追加数据。 |
| operationMetrics | map |
操作的指标(例如已修改的行数和文件数。) |
| userMetadata | string |
用户定义的提交元数据(如果已指定)。 |
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
|version| timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion| isolationLevel|isBlindAppend| operationMetrics|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
| 5|2019-07-29 14:07:47| ###| ###| DELETE|[predicate -> ["(...|null| ###| ###| 4|WriteSerializable| false|[numTotalRows -> ...|
| 4|2019-07-29 14:07:41| ###| ###| UPDATE|[predicate -> (id...|null| ###| ###| 3|WriteSerializable| false|[numTotalRows -> ...|
| 3|2019-07-29 14:07:29| ###| ###| DELETE|[predicate -> ["(...|null| ###| ###| 2|WriteSerializable| false|[numTotalRows -> ...|
| 2|2019-07-29 14:06:56| ###| ###| UPDATE|[predicate -> (id...|null| ###| ###| 1|WriteSerializable| false|[numTotalRows -> ...|
| 1|2019-07-29 14:04:31| ###| ###| DELETE|[predicate -> ["(...|null| ###| ###| 0|WriteSerializable| false|[numTotalRows -> ...|
| 0|2019-07-29 14:01:40| ###| ###| WRITE|[mode -> ErrorIfE...|null| ###| ###| null|WriteSerializable| true|[numFiles -> 2, n...|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
注释
- 如果使用以下方法写入表中,其他一些列不可用:
- 将来添加的列将始终添加在最后一列之后。
了解 partitionBy 在操作参数中的应用
该 partitionBy 字段仅在定义或更改表分区架构的 CREATE 和 OVERWRITE 操作中才有意义。
对于现有表(APPEND、INSERT、UPDATE、DELETE、MERGE)的追加作,此字段可能会显示空数组 [] 或分区列,具体取决于所使用的写入方法(.save() vs .saveAsTable())。 此不一致是预期行为,不应用于验证写入。
Important
不要依赖历史记录中的 partitionBy 来验证追加操作。 该值因实现详细信息而异,但不会影响将数据写入分区的方式。
例子
请考虑按 date 列分区的表:
# Initial table creation - partitionBy is populated
df.write.format("delta") \
.partitionBy("date") \
.saveAsTable("sales_data")
历史记录中显示的 CREATE操作为:
operationParameters: {
"mode": "ErrorIfExists",
"partitionBy": "[\"date\"]"
}
将数据追加到此表时:
# Subsequent append - partitionBy shows empty
new_df.write.format("delta") \
.mode("append") \
.saveAsTable("sales_data")
APPEND操作显示:
operationParameters: {
"mode": "Append",
"partitionBy": "[]"
}
partitionBy 的空值是预期的。 数据仍会根据表的现有分区架构写入正确的分区。 请注意,.save() 路径可能在此字段中显示分区列,但此差异属于实现细节,不会影响写入行为。
操作指标
history 操作返回 operationMetrics 列映射中操作指标的集合。
下表按操作将映射键的定义列出。
WRITE, CREATE TABLE AS SELECT, REPLACE TABLE AS SELECT, COPY INTO
以下指标可用于这些操作:
| 指标名称 | Description |
|---|---|
numFiles |
写入的文件数。 |
numOutputBytes |
写入内容的大小(以字节为单位)。 |
numOutputRows |
写入的行数。 |
STREAMING UPDATE
以下指标可用于此操作:
| 指标名称 | Description |
|---|---|
numAddedFiles |
添加的文件数。 |
numRemovedFiles |
已删除的文件数。 |
numOutputRows |
写入的行数。 |
numOutputBytes |
写入大小(以字节为单位)。 |
DELETE
以下指标可用于此操作:
| 指标名称 | Description |
|---|---|
numAddedFiles |
添加的文件数。 在删除表的分区时未提供。 |
numRemovedFiles |
已删除的文件数。 |
numDeletedRows |
删除的行数。 在删除表的分区时未提供。 |
numCopiedRows |
在删除文件时复制的行数。 |
executionTimeMs |
执行整个操作所需的时间。 |
scanTimeMs |
扫描文件以查找匹配项所需的时间。 |
rewriteTimeMs |
重写匹配文件所需的时间。 |
TRUNCATE
以下指标可用于此操作:
| 指标名称 | Description |
|---|---|
numRemovedFiles |
已删除的文件数。 |
executionTimeMs |
执行整个操作所需的时间。 |
MERGE
以下指标可用于此操作:
| 指标名称 | Description |
|---|---|
numSourceRows |
源数据帧中的行数。 |
numTargetRowsInserted |
插入到目标表中的行数。 |
numTargetRowsUpdated |
目标表中更新的行数。 |
numTargetRowsDeleted |
目标表中删除的行数。 |
numTargetRowsCopied |
复制的目标行数。 |
numOutputRows |
已写入的总行数。 |
numTargetFilesAdded |
添加到接收器(目标)中的文件数量。 |
numTargetFilesRemoved |
从接收器中删除的文件数(目标)。 |
executionTimeMs |
执行整个操作所需的时间。 |
scanTimeMs |
扫描文件以查找匹配项所需的时间。 |
rewriteTimeMs |
重写匹配文件所需的时间。 |
UPDATE
以下指标可用于此操作:
| 指标名称 | Description |
|---|---|
numAddedFiles |
添加的文件数。 |
numRemovedFiles |
已删除的文件数。 |
numUpdatedRows |
更新的行数。 |
numCopiedRows |
在更新文件时刚刚复制的行数。 |
executionTimeMs |
执行整个操作所需的时间。 |
scanTimeMs |
扫描文件以查找匹配项所需的时间。 |
rewriteTimeMs |
重写匹配文件所需的时间。 |
FSCK
以下指标可用于此操作:
| 指标名称 | Description |
|---|---|
numRemovedFiles |
已删除的文件数。 |
CONVERT
以下指标可用于此操作:
| 指标名称 | Description |
|---|---|
numConvertedFiles |
已转换的 Parquet 文件数。 |
OPTIMIZE
以下指标可用于此操作:
| 指标名称 | Description |
|---|---|
numAddedFiles |
添加的文件数。 |
numRemovedFiles |
已优化的文件数。 |
numAddedBytes |
优化表后添加的字节数。 |
numRemovedBytes |
删除的字节数。 |
minFileSize |
优化表后最小文件的大小。 |
p25FileSize |
优化表后第 25 百分位文件的大小。 |
p50FileSize |
表优化后的文件大小中位数。 |
p75FileSize |
表优化后的第 75 百分位文件大小。 |
maxFileSize |
表优化后的最大文件大小。 |
CLONE
以下指标可用于此操作:
| 指标名称 | Description |
|---|---|
sourceTableSize |
克隆版本的源表的大小(以字节为单位)。 |
sourceNumOfFiles |
克隆版本的源表中的文件数。 |
numRemovedFiles |
如果替换了先前的表,则从目标表中删除的文件数量。 |
removedFilesSize |
如果替换了上一个表,则从目标表中删除的文件的总大小(以字节为单位)。 |
numCopiedFiles |
复制到新位置的文件数。 如果是浅表克隆,则为 0。 |
copiedFilesSize |
复制到新位置的文件的总大小(以字节为单位)。 如果是浅表克隆,则为 0。 |
RESTORE
以下指标可用于此操作:
| 指标名称 | Description |
|---|---|
tableSizeAfterRestore |
还原后的表大小(以字节为单位)。 |
numOfFilesAfterRestore |
还原后表中的文件数。 |
numRemovedFiles |
还原操作删除的文件数。 |
numRestoredFiles |
由于还原而添加的文件数。 |
removedFilesSize |
还原删除的文件的大小(以字节为单位)。 |
restoredFilesSize |
还原添加的文件的大小(以字节为单位)。 |
VACUUM
以下指标可用于此操作:
| 指标名称 | Description |
|---|---|
numDeletedFiles |
已删除的文件数。 |
numVacuumedDirectories |
已清理的目录数量。 |
numFilesToDelete |
要删除的文件数。 |
什么是时间旅行?
时间旅行支持根据时间戳或表版本(如事务日志中记录)查询以前的表版本。 可以使用时间旅行应用于以下情况:
- 重新创建分析、报表或输出(例如,机器学习模型的输出)。 这对于调试或审核非常有用,尤其是在管控行业中。
- 编写复杂的时态查询。
- 修复数据中的错误。
- 为针对快速变化表的一组查询提供快照隔离。
Important
在 Databricks Runtime 18.0 及更高版本中,如果请求的版本早于 deletedFileRetentionDuration 表属性(默认 7 天),将阻止时间旅行查询。 对于 Unity 目录托管表,这适用于 Databricks Runtime 12.2 及更高版本。
时间旅行语法
通过在表名定义之后添加子句来查询支持时间旅行功能的表。
-
timestamp_expression可以是下列项中的任意一项:-
'2018-10-18T22:15:12.013Z',即可以转换为时间戳的字符串 cast('2018-10-18 13:36:32 CEST' as timestamp)-
'2018-10-18',即日期字符串 current_timestamp() - interval 12 hoursdate_sub(current_date(), 1)- 能够转换为时间戳或本身就是时间戳的任何其他表达式
-
-
version是可以从DESCRIBE HISTORY table_spec的输出中获取的 long 值。
timestamp_expression 和 version 都不能是子查询。
只接受日期或时间戳字符串。 例如,"2019-01-01" 和 "2019-01-01T00:00:00.000Z"。 有关示例语法,请参阅以下代码:
SQL
SELECT * FROM people10m TIMESTAMP AS OF '2018-10-18T22:15:12.013Z';
SELECT * FROM people10m VERSION AS OF 123;
Python
df1 = spark.read.option("timestampAsOf", "2019-01-01").table("people10m")
df2 = spark.read.option("versionAsOf", 123).table("people10m")
还可以使用 @ 语法将时间戳或版本指定为表名称的一部分。 时间戳必须采用 yyyyMMddHHmmssSSS 格式。 你可以通过在 @ 后附加一个 v 来指定版本。 有关示例语法,请参阅以下代码:
SQL
SELECT * FROM people10m@20190101000000000
SELECT * FROM people10m@v123
Python
spark.read.table("people10m@20190101000000000")
spark.read.table("people10m@v123")
什么是事务日志检查点?
表版本记录为事务日志目录中的 JSON 文件,该目录与表数据一起存储。 为了优化检查点查询,表版本会聚合到 Parquet 检查点文件,从而防止需要读取表历史记录的所有 JSON 版本。 Azure Databricks 针对数据大小和工作负荷优化检查点操作频率。 用户应该不需要直接与检查点交互。 检查点频率可能会更改,恕不另行通知。
为“时光旅行查询”配置数据保留
若要查询以前的表版本,必须同时保留该版本的日志文件和数据文件。
当 VACUUM 针对某个表运行时,会删除数据文件。 日志文件删除在检查点表版本后自动进行管理。
由于大多数表 VACUUM 都定期针对它们运行,因此时间点查询应遵循保留阈值 VACUUM,默认情况下为 7 天。
若要增加表的数据保留阈值,必须配置下表属性:
-
delta.logRetentionDuration = "interval <interval>":控制表的历史记录的保留时间长度。 默认值为interval 30 days。 对于 Apache Iceberg 表,请改用iceberg.logRetentionDuration。 -
delta.deletedFileRetentionDuration = "interval <interval>":确定由VACUUM用来删除当前表版本中不再引用的数据文件的阈值。 默认值为interval 7 days。 对于 Apache Iceberg 表,请改用iceberg.deletedFileRetentionDuration。
可以在创建表期间指定表属性,也可以使用语句设置它们 ALTER TABLE 。 请参阅 表属性参考。
注释
在 Databricks Runtime 18.0 及更高版本中, logRetentionDuration 必须大于或等于 deletedFileRetentionDuration。 对于 Unity 目录托管表,这适用于 Databricks Runtime 12.2 及更高版本。
若要访问 30 天的历史数据,请设置 delta.deletedFileRetentionDuration = "interval 30 days" (与默认设置 delta.logRetentionDuration匹配)。
随着维护的数据文件的增多,提高数据保留期阈值可能会导致存储成本上升。
将表还原到早期状态
可以使用命令将表还原到其早期状态 RESTORE 。 表内部维护历史版本,使它们能够还原到先前状态。
RESTORE 命令支持使用一个与早期状态相对应的版本作为选项,或支持使用一个表明早期状态何时创建的时间戳作为选项。
Important
- 可以再次还原已经还原的表。
- 可以还原克隆的表。
- 对于要还原的表,你必须拥有
MODIFY权限。 - 不能将表还原为通过手动方式或
vacuum方式删除数据文件的旧版本。 如果将spark.sql.files.ignoreMissingFiles设置为true,则仍可部分还原为此版本。 - 用于还原到早期状态的时间戳格式是
yyyy-MM-dd HH:mm:ss。 还支持仅提供 date(yyyy-MM-dd) 字符串。
RESTORE TABLE target_table TO VERSION AS OF <version>;
RESTORE TABLE target_table TO TIMESTAMP AS OF <timestamp>;
有关语法详细信息,请参阅 RESTORE。
Important
还原被视为一种数据更改操作。 命令添加的 RESTORE 日志条目包含 dataChange 设置为 true。 如果存在下游应用程序(例如处理表更新的 结构化流式处理 作业),则还原作添加的数据更改日志条目被视为新数据更新,处理它们可能会导致重复数据。
例如:
| 表格版本 | Operation | 日志更新 | 数据变更日志更新中的记录 |
|---|---|---|---|
| 0 | INSERT | AddFile(/path/to/file-1,dataChange = true) | (name = Viktor, age = 29), (name = George, age = 55) |
| 1 | INSERT | AddFile(/path/to/file-2,dataChange = true) | (名称 = 乔治, 年龄 = 39) |
| 2 | 优化 | AddFile(/path/to/file-3,dataChange = false),RemoveFile(/path/to/file-1),RemoveFile(/path/to/file-2) | 无记录。 OPTIMIZE 压缩不会更改表中的数据。 |
| 3 | RESTORE(version=1) | RemoveFile(/path/to/file-3)、AddFile(/path/to/file-1、dataChange = true)、AddFile(/path/to/file-2、dataChange = true) | (姓名 = Viktor, 年龄 = 29), (姓名 = George, 年龄 = 55), (姓名 = George, 年龄 = 39) |
在前面的示例中,该 RESTORE 命令会导致更新,这些更新已在读取表的版本 0 和 1 时被观察到。 如果流式处理查询当时正在读取此表,则这些文件将被视为新添加的数据,因此会再次对其进行处理。
还原指标
RESTORE 在操作完成后以单行数据帧的形式报告以下指标:
table_size_after_restore:表还原后的大小。num_of_files_after_restore:还原后表中的文件数。num_removed_files:从表中删除(逻辑删除)的文件数。num_restored_files:由于回退而还原的文件数。removed_files_size:从表中删除的文件的总大小(以字节为单位)。restored_files_size:已还原的文件的总大小(以字节为单位)。
使用时间旅行的示例
为用户
111修复对表的意外删除问题:INSERT INTO my_table SELECT * FROM my_table TIMESTAMP AS OF date_sub(current_date(), 1) WHERE userId = 111修复表中意外发生的错误更新:
MERGE INTO my_table target USING my_table TIMESTAMP AS OF date_sub(current_date(), 1) source ON source.userId = target.userId WHEN MATCHED THEN UPDATE SET *查询在过去一周内增加的新客户的数量。
SELECT ( SELECT count(distinct userId) FROM my_table ) - ( SELECT count(distinct userId) FROM my_table TIMESTAMP AS OF date_sub(current_date(), 7) ) AS new_customers
如何在 Spark 会话中找到上次提交的版本?
在所有线程和所有表中,要获取当前 SparkSession 写入的最后一次提交的版本号,请查询 SQL 配置 spark.databricks.delta.lastCommitVersionInSession。
注释
对于 Apache Iceberg 表,请使用 spark.databricks.iceberg.lastCommitVersionInSession 而不是 spark.databricks.delta.lastCommitVersionInSession。
SQL
SET spark.databricks.delta.lastCommitVersionInSession
Python
spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")
Scala
spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")
如果 SparkSession 未进行任何提交,则查询该键时将返回空值。
注释
如果在多个线程之间共享相同的 SparkSession,这类似于在多个线程之间共享变量;你可能会遇到竞争条件,因为配置值会被同时更新。