使用表历史记录

修改表的每个操作都会创建一个新的表版本。 使用历史记录信息审核操作、回滚表或使用时间旅行在特定时间点查询表。

注释

Databricks 不建议将表历史记录用作数据存档的长期备份解决方案。 除非您已将数据和日志保留配置设置为更大的值,否则仅使用过去 7 天用于时间旅行操作。

检索表历史记录

通过运行 history 命令,检索有关每次表写入的操作、用户和时间戳等信息。 操作按时间倒序返回。

表历史记录保留期取决于表设置 logRetentionDuration,后者默认为 30 天。

注释

时间旅行和表历史记录由不同的保留阈值控制。 请参阅什么是时间旅行?

DESCRIBE HISTORY table_name       -- get the full history of the table

DESCRIBE HISTORY table_name LIMIT 1  -- get the last operation only

有关 Spark SQL 语法的详细信息,请参阅 DESCRIBE HISTORY

请参阅 Delta Lake API 文档,了解 Scala/Java/Python 语法详细信息。

目录资源管理器 提供此详细表信息和历史记录的可视视图。 除了表架构和示例数据之外,你还可以单击“历史记录”选项卡以查看随 一起显示的表历史记录。

历史模式

history 操作的输出包含下列列。

类型 Description
版本 long 通过操作生成的表版本。
时间戳 timestamp 提交此版本的时间。
userId string 运行操作的用户的 ID。
userName string 运行操作的用户的姓名。
操作 string 操作的名称。
操作参数 map 操作的参数(例如谓词。)
作业 struct 执行该操作的 Lakeflow 作业的详细信息。 仅对由 Lakeflow 作业写入的提交进行填充。 否则为 null
笔记本 struct 执行该操作所用的 Databricks 笔记本的详细信息。 仅对从 Databricks 笔记本中创建的提交填充此字段。 否则为 null
clusterId string 运行操作的群集的 ID。
读取版本 long 读取以执行写入操作的表的版本。
isolationLevel string 用于此操作的隔离级别。
isBlindAppend boolean 此操作是否追加数据。
operationMetrics map 操作的指标(例如已修改的行数和文件数。)
userMetadata string 用户定义的提交元数据(如果已指定)。
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
|version|          timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion|   isolationLevel|isBlindAppend|    operationMetrics|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
|      5|2019-07-29 14:07:47|   ###|     ###|   DELETE|[predicate -> ["(...|null|     ###|      ###|          4|WriteSerializable|        false|[numTotalRows -> ...|
|      4|2019-07-29 14:07:41|   ###|     ###|   UPDATE|[predicate -> (id...|null|     ###|      ###|          3|WriteSerializable|        false|[numTotalRows -> ...|
|      3|2019-07-29 14:07:29|   ###|     ###|   DELETE|[predicate -> ["(...|null|     ###|      ###|          2|WriteSerializable|        false|[numTotalRows -> ...|
|      2|2019-07-29 14:06:56|   ###|     ###|   UPDATE|[predicate -> (id...|null|     ###|      ###|          1|WriteSerializable|        false|[numTotalRows -> ...|
|      1|2019-07-29 14:04:31|   ###|     ###|   DELETE|[predicate -> ["(...|null|     ###|      ###|          0|WriteSerializable|        false|[numTotalRows -> ...|
|      0|2019-07-29 14:01:40|   ###|     ###|    WRITE|[mode -> ErrorIfE...|null|     ###|      ###|       null|WriteSerializable|         true|[numFiles -> 2, n...|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+

注释

了解 partitionBy 在操作参数中的应用

partitionBy 字段仅在定义或更改表分区架构的 CREATE 和 OVERWRITE 操作中才有意义。

对于现有表(APPEND、INSERT、UPDATE、DELETE、MERGE)的追加作,此字段可能会显示空数组 [] 或分区列,具体取决于所使用的写入方法(.save() vs .saveAsTable())。 此不一致是预期行为,不应用于验证写入。

Important

不要依赖历史记录中的 partitionBy 来验证追加操作。 该值因实现详细信息而异,但不会影响将数据写入分区的方式。

例子

请考虑按 date 列分区的表:

# Initial table creation - partitionBy is populated
df.write.format("delta") \
  .partitionBy("date") \
  .saveAsTable("sales_data")

历史记录中显示的 CREATE操作为:

operationParameters: {
  "mode": "ErrorIfExists",
  "partitionBy": "[\"date\"]"
}

将数据追加到此表时:

# Subsequent append - partitionBy shows empty
new_df.write.format("delta") \
  .mode("append") \
  .saveAsTable("sales_data")

APPEND操作显示:

operationParameters: {
  "mode": "Append",
  "partitionBy": "[]"
}

partitionBy 的空值是预期的。 数据仍会根据表的现有分区架构写入正确的分区。 请注意,.save() 路径可能在此字段中显示分区列,但此差异属于实现细节,不会影响写入行为。

操作指标

history 操作返回 operationMetrics 列映射中操作指标的集合。

下表按操作将映射键的定义列出。

WRITE, CREATE TABLE AS SELECT, REPLACE TABLE AS SELECT, COPY INTO

以下指标可用于这些操作:

指标名称 Description
numFiles 写入的文件数。
numOutputBytes 写入内容的大小(以字节为单位)。
numOutputRows 写入的行数。

STREAMING UPDATE

以下指标可用于此操作:

指标名称 Description
numAddedFiles 添加的文件数。
numRemovedFiles 已删除的文件数。
numOutputRows 写入的行数。
numOutputBytes 写入大小(以字节为单位)。

DELETE

以下指标可用于此操作:

指标名称 Description
numAddedFiles 添加的文件数。 在删除表的分区时未提供。
numRemovedFiles 已删除的文件数。
numDeletedRows 删除的行数。 在删除表的分区时未提供。
numCopiedRows 在删除文件时复制的行数。
executionTimeMs 执行整个操作所需的时间。
scanTimeMs 扫描文件以查找匹配项所需的时间。
rewriteTimeMs 重写匹配文件所需的时间。

TRUNCATE

以下指标可用于此操作:

指标名称 Description
numRemovedFiles 已删除的文件数。
executionTimeMs 执行整个操作所需的时间。

MERGE

以下指标可用于此操作:

指标名称 Description
numSourceRows 源数据帧中的行数。
numTargetRowsInserted 插入到目标表中的行数。
numTargetRowsUpdated 目标表中更新的行数。
numTargetRowsDeleted 目标表中删除的行数。
numTargetRowsCopied 复制的目标行数。
numOutputRows 已写入的总行数。
numTargetFilesAdded 添加到接收器(目标)中的文件数量。
numTargetFilesRemoved 从接收器中删除的文件数(目标)。
executionTimeMs 执行整个操作所需的时间。
scanTimeMs 扫描文件以查找匹配项所需的时间。
rewriteTimeMs 重写匹配文件所需的时间。

UPDATE

以下指标可用于此操作:

指标名称 Description
numAddedFiles 添加的文件数。
numRemovedFiles 已删除的文件数。
numUpdatedRows 更新的行数。
numCopiedRows 在更新文件时刚刚复制的行数。
executionTimeMs 执行整个操作所需的时间。
scanTimeMs 扫描文件以查找匹配项所需的时间。
rewriteTimeMs 重写匹配文件所需的时间。

FSCK

以下指标可用于此操作:

指标名称 Description
numRemovedFiles 已删除的文件数。

CONVERT

以下指标可用于此操作:

指标名称 Description
numConvertedFiles 已转换的 Parquet 文件数。

OPTIMIZE

以下指标可用于此操作:

指标名称 Description
numAddedFiles 添加的文件数。
numRemovedFiles 已优化的文件数。
numAddedBytes 优化表后添加的字节数。
numRemovedBytes 删除的字节数。
minFileSize 优化表后最小文件的大小。
p25FileSize 优化表后第 25 百分位文件的大小。
p50FileSize 表优化后的文件大小中位数。
p75FileSize 表优化后的第 75 百分位文件大小。
maxFileSize 表优化后的最大文件大小。

CLONE

以下指标可用于此操作:

指标名称 Description
sourceTableSize 克隆版本的源表的大小(以字节为单位)。
sourceNumOfFiles 克隆版本的源表中的文件数。
numRemovedFiles 如果替换了先前的表,则从目标表中删除的文件数量。
removedFilesSize 如果替换了上一个表,则从目标表中删除的文件的总大小(以字节为单位)。
numCopiedFiles 复制到新位置的文件数。 如果是浅表克隆,则为 0。
copiedFilesSize 复制到新位置的文件的总大小(以字节为单位)。 如果是浅表克隆,则为 0。

RESTORE

以下指标可用于此操作:

指标名称 Description
tableSizeAfterRestore 还原后的表大小(以字节为单位)。
numOfFilesAfterRestore 还原后表中的文件数。
numRemovedFiles 还原操作删除的文件数。
numRestoredFiles 由于还原而添加的文件数。
removedFilesSize 还原删除的文件的大小(以字节为单位)。
restoredFilesSize 还原添加的文件的大小(以字节为单位)。

VACUUM

以下指标可用于此操作:

指标名称 Description
numDeletedFiles 已删除的文件数。
numVacuumedDirectories 已清理的目录数量。
numFilesToDelete 要删除的文件数。

什么是时间旅行?

时间旅行支持根据时间戳或表版本(如事务日志中记录)查询以前的表版本。 可以使用时间旅行应用于以下情况:

  • 重新创建分析、报表或输出(例如,机器学习模型的输出)。 这对于调试或审核非常有用,尤其是在管控行业中。
  • 编写复杂的时态查询。
  • 修复数据中的错误。
  • 为针对快速变化表的一组查询提供快照隔离。

Important

在 Databricks Runtime 18.0 及更高版本中,如果请求的版本早于 deletedFileRetentionDuration 表属性(默认 7 天),将阻止时间旅行查询。 对于 Unity 目录托管表,这适用于 Databricks Runtime 12.2 及更高版本。

时间旅行语法

通过在表名定义之后添加子句来查询支持时间旅行功能的表。

  • timestamp_expression 可以是下列项中的任意一项:
    • '2018-10-18T22:15:12.013Z',即可以转换为时间戳的字符串
    • cast('2018-10-18 13:36:32 CEST' as timestamp)
    • '2018-10-18',即日期字符串
    • current_timestamp() - interval 12 hours
    • date_sub(current_date(), 1)
    • 能够转换为时间戳或本身就是时间戳的任何其他表达式
  • version 是可以从 DESCRIBE HISTORY table_spec 的输出中获取的 long 值。

timestamp_expressionversion 都不能是子查询。

只接受日期或时间戳字符串。 例如,"2019-01-01""2019-01-01T00:00:00.000Z"。 有关示例语法,请参阅以下代码:

SQL

SELECT * FROM people10m TIMESTAMP AS OF '2018-10-18T22:15:12.013Z';
SELECT * FROM people10m VERSION AS OF 123;

Python

df1 = spark.read.option("timestampAsOf", "2019-01-01").table("people10m")
df2 = spark.read.option("versionAsOf", 123).table("people10m")

还可以使用 @ 语法将时间戳或版本指定为表名称的一部分。 时间戳必须采用 yyyyMMddHHmmssSSS 格式。 你可以通过在 @ 后附加一个 v 来指定版本。 有关示例语法,请参阅以下代码:

SQL

SELECT * FROM people10m@20190101000000000
SELECT * FROM people10m@v123

Python

spark.read.table("people10m@20190101000000000")
spark.read.table("people10m@v123")

什么是事务日志检查点?

表版本记录为事务日志目录中的 JSON 文件,该目录与表数据一起存储。 为了优化检查点查询,表版本会聚合到 Parquet 检查点文件,从而防止需要读取表历史记录的所有 JSON 版本。 Azure Databricks 针对数据大小和工作负荷优化检查点操作频率。 用户应该不需要直接与检查点交互。 检查点频率可能会更改,恕不另行通知。

为“时光旅行查询”配置数据保留

若要查询以前的表版本,必须同时保留该版本的日志文件和数据文件。

VACUUM 针对某个表运行时,会删除数据文件。 日志文件删除在检查点表版本后自动进行管理。

由于大多数表 VACUUM 都定期针对它们运行,因此时间点查询应遵循保留阈值 VACUUM,默认情况下为 7 天。

若要增加表的数据保留阈值,必须配置下表属性:

  • delta.logRetentionDuration = "interval <interval>":控制表的历史记录的保留时间长度。 默认值为 interval 30 days。 对于 Apache Iceberg 表,请改用 iceberg.logRetentionDuration
  • delta.deletedFileRetentionDuration = "interval <interval>":确定由 VACUUM 用来删除当前表版本中不再引用的数据文件的阈值。 默认值为 interval 7 days。 对于 Apache Iceberg 表,请改用 iceberg.deletedFileRetentionDuration

可以在创建表期间指定表属性,也可以使用语句设置它们 ALTER TABLE 。 请参阅 表属性参考

注释

在 Databricks Runtime 18.0 及更高版本中, logRetentionDuration 必须大于或等于 deletedFileRetentionDuration。 对于 Unity 目录托管表,这适用于 Databricks Runtime 12.2 及更高版本。

若要访问 30 天的历史数据,请设置 delta.deletedFileRetentionDuration = "interval 30 days" (与默认设置 delta.logRetentionDuration匹配)。

随着维护的数据文件的增多,提高数据保留期阈值可能会导致存储成本上升。

将表还原到早期状态

可以使用命令将表还原到其早期状态 RESTORE 。 表内部维护历史版本,使它们能够还原到先前状态。 RESTORE 命令支持使用一个与早期状态相对应的版本作为选项,或支持使用一个表明早期状态何时创建的时间戳作为选项。

Important

  • 可以再次还原已经还原的表。
  • 可以还原克隆的表。
  • 对于要还原的表,你必须拥有 MODIFY 权限。
  • 不能将表还原为通过手动方式或 vacuum 方式删除数据文件的旧版本。 如果将 spark.sql.files.ignoreMissingFiles 设置为 true,则仍可部分还原为此版本。
  • 用于还原到早期状态的时间戳格式是 yyyy-MM-dd HH:mm:ss。 还支持仅提供 date(yyyy-MM-dd) 字符串。
RESTORE TABLE target_table TO VERSION AS OF <version>;
RESTORE TABLE target_table TO TIMESTAMP AS OF <timestamp>;

有关语法详细信息,请参阅 RESTORE

Important

还原被视为一种数据更改操作。 命令添加的 RESTORE 日志条目包含 dataChange 设置为 true。 如果存在下游应用程序(例如处理表更新的 结构化流式处理 作业),则还原作添加的数据更改日志条目被视为新数据更新,处理它们可能会导致重复数据。

例如:

表格版本 Operation 日志更新 数据变更日志更新中的记录
0 INSERT AddFile(/path/to/file-1,dataChange = true) (name = Viktor, age = 29), (name = George, age = 55)
1 INSERT AddFile(/path/to/file-2,dataChange = true) (名称 = 乔治, 年龄 = 39)
2 优化 AddFile(/path/to/file-3,dataChange = false),RemoveFile(/path/to/file-1),RemoveFile(/path/to/file-2) 无记录。 OPTIMIZE 压缩不会更改表中的数据。
3 RESTORE(version=1) RemoveFile(/path/to/file-3)、AddFile(/path/to/file-1、dataChange = true)、AddFile(/path/to/file-2、dataChange = true) (姓名 = Viktor, 年龄 = 29), (姓名 = George, 年龄 = 55), (姓名 = George, 年龄 = 39)

在前面的示例中,该 RESTORE 命令会导致更新,这些更新已在读取表的版本 0 和 1 时被观察到。 如果流式处理查询当时正在读取此表,则这些文件将被视为新添加的数据,因此会再次对其进行处理。

还原指标

RESTORE 在操作完成后以单行数据帧的形式报告以下指标:

  • table_size_after_restore:表还原后的大小。

  • num_of_files_after_restore:还原后表中的文件数。

  • num_removed_files:从表中删除(逻辑删除)的文件数。

  • num_restored_files:由于回退而还原的文件数。

  • removed_files_size:从表中删除的文件的总大小(以字节为单位)。

  • restored_files_size:已还原的文件的总大小(以字节为单位)。

    还原指标示例

使用时间旅行的示例

  • 为用户 111 修复对表的意外删除问题:

    INSERT INTO my_table
      SELECT * FROM my_table TIMESTAMP AS OF date_sub(current_date(), 1)
      WHERE userId = 111
    
  • 修复表中意外发生的错误更新:

    MERGE INTO my_table target
      USING my_table TIMESTAMP AS OF date_sub(current_date(), 1) source
      ON source.userId = target.userId
      WHEN MATCHED THEN UPDATE SET *
    
  • 查询在过去一周内增加的新客户的数量。

    SELECT
    (
      SELECT count(distinct userId)
      FROM my_table
    )
    -
    (
      SELECT count(distinct userId)
      FROM my_table TIMESTAMP AS OF date_sub(current_date(), 7)
    ) AS new_customers
    

如何在 Spark 会话中找到上次提交的版本?

在所有线程和所有表中,要获取当前 SparkSession 写入的最后一次提交的版本号,请查询 SQL 配置 spark.databricks.delta.lastCommitVersionInSession

注释

对于 Apache Iceberg 表,请使用 spark.databricks.iceberg.lastCommitVersionInSession 而不是 spark.databricks.delta.lastCommitVersionInSession

SQL

SET spark.databricks.delta.lastCommitVersionInSession

Python

spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")

Scala

spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")

如果 SparkSession 未进行任何提交,则查询该键时将返回空值。

注释

如果在多个线程之间共享相同的 SparkSession,这类似于在多个线程之间共享变量;你可能会遇到竞争条件,因为配置值会被同时更新。