在 Azure Databricks 上使用 Delta Lake 更改数据馈送

注意

更改数据馈送允许 Azure Databricks 跟踪 Delta 表版本之间的行级别更改。 对 Delta 表启用此功能后,运行时会记录写入该表的所有数据的“更改事件”。 这包括行数据以及指示已插入、已删除还是已更新指定行的元数据。

可以使用 Spark SQL、Apache Spark 数据帧和结构化流式处理在批处理查询中读取更改事件。

重要

更改数据馈送与表历史记录协同工作,以提供更改信息。 由于克隆 Delta 表会创建单独的历史记录,因此克隆表上的更改数据馈送与原始表的更改数据馈送不匹配。

用例

默认情况下,不启用更改数据馈送。 启用更改数据馈送有助于实现以下用例。

  • 银色和金色表:仅通过处理初始执行 MERGEUPDATEDELETE 操作后发生的行级别更改来加速和简化 ETL 和 ELT 操作,从而提高 Delta Lake 性能。
  • 具体化视图:创建最新的聚合信息视图,以在 BI 和分析中使用,无需重新处理整个基础表,而是仅在发生更改时更新。
  • 传输更改:将更改数据馈送发送至下游系统(如 Kafka 或 RDBMS),这些系统可使用它在数据管道后期阶段以增量方式进行处理。
  • 审核线索表:捕获更改数据馈送作为 Delta 表可提供永久存储和高效的查询功能,用于查看一段时间的所有更改,包括何时发生删除和进行了哪些更新。

启用更改数据馈送

必须使用以下方法之一显式启用更改数据馈送选项:

  • 新表:在 CREATE TABLE 命令中设置表属性 delta.enableChangeDataFeed = true

    CREATE TABLE student (id INT, name STRING, age INT) TBLPROPERTIES (delta.enableChangeDataFeed = true)
    
  • 现有表:在 ALTER TABLE 命令中设置表属性 delta.enableChangeDataFeed = true

    ALTER TABLE myDeltaTable SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
    
  • 所有新表

    set spark.databricks.delta.properties.defaults.enableChangeDataFeed = true;
    

重要

仅记录启用更改数据馈送后所做的更改;不会捕获之前对表所做的更改。

更改数据存储

Azure Databricks 将 UPDATEDELETEMERGE 操作的更改数据记录在表目录下的 _change_data 文件夹中。 某些操作(如仅插入操作和完整分区删除)不会在 _change_data 目录中生成数据,因为 Azure Databricks 可以直接从事务日志高效计算更改数据馈送。

_change_data 文件夹中的文件遵循表的保留策略。 因此,如果运行 VACUUM 命令,也会删除更改数据馈送数据。

在批处理查询中读取更改

可在开始和结束时提供版本或时间戳。 开始和结束版本以及时间戳包含在查询中。 若要读取从表的特定开始版本到最新版本的更改,仅指定起始版本或时间戳。

将版本指定为整数,将时间戳指定为字符串,格式为 yyyy-MM-dd[ HH:mm:ss[.SSS]]

如果提供的版本较低或提供的时间戳早于已记录更改事件的时间戳,那么启用更改数据馈送时,会引发错误,指示未启用更改数据馈送。

SQL

-- version as ints or longs e.g. changes from version 0 to 10
SELECT * FROM table_changes('tableName', 0, 10)

-- timestamp as string formatted timestamps
SELECT * FROM table_changes('tableName', '2021-04-21 05:45:46', '2021-05-21 12:00:00')

-- providing only the startingVersion/timestamp
SELECT * FROM table_changes('tableName', 0)

-- database/schema names inside the string for table name, with backticks for escaping dots and special characters
SELECT * FROM table_changes('dbName.`dotted.tableName`', '2021-04-21 06:45:46' , '2021-05-21 12:00:00')

-- path based tables
SELECT * FROM table_changes_by_path('\path', '2021-04-21 05:45:46')

Python

# version as ints or longs
spark.read.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 0) \
  .option("endingVersion", 10) \
  .table("myDeltaTable")

# timestamps as formatted timestamp
spark.read.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingTimestamp", '2021-04-21 05:45:46') \
  .option("endingTimestamp", '2021-05-21 12:00:00') \
  .table("myDeltaTable")

# providing only the startingVersion/timestamp
spark.read.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 0) \
  .table("myDeltaTable")

# path based tables
spark.read.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingTimestamp", '2021-04-21 05:45:46') \
  .load("pathToMyDeltaTable")

Scala

// version as ints or longs
spark.read.format("delta")
  .option("readChangeFeed", "true")
  .option("startingVersion", 0)
  .option("endingVersion", 10)
  .table("myDeltaTable")

// timestamps as formatted timestamp
spark.read.format("delta")
  .option("readChangeFeed", "true")
  .option("startingTimestamp", "2021-04-21 05:45:46")
  .option("endingTimestamp", "2021-05-21 12:00:00")
  .table("myDeltaTable")

// providing only the startingVersion/timestamp
spark.read.format("delta")
  .option("readChangeFeed", "true")
  .option("startingVersion", 0)
  .table("myDeltaTable")

// path based tables
spark.read.format("delta")
  .option("readChangeFeed", "true")
  .option("startingTimestamp", "2021-04-21 05:45:46")
  .load("pathToMyDeltaTable")

在流式处理查询中读取更改

Python

# providing a starting version
spark.readStream.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 0) \
  .table("myDeltaTable")

# providing a starting timestamp
spark.readStream.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingTimestamp", "2021-04-21 05:35:43") \
  .load("/pathToMyDeltaTable")

# not providing a starting version/timestamp will result in the latest snapshot being fetched first
spark.readStream.format("delta") \
  .option("readChangeFeed", "true") \
  .table("myDeltaTable")

Scala

// providing a starting version
spark.readStream.format("delta")
  .option("readChangeFeed", "true")
  .option("startingVersion", 0)
  .table("myDeltaTable")

// providing a starting timestamp
spark.readStream.format("delta")
  .option("readChangeFeed", "true")
  .option("startingVersion", "2021-04-21 05:35:43")
  .load("/pathToMyDeltaTable")

// not providing a starting version/timestamp will result in the latest snapshot being fetched first
spark.readStream.format("delta")
  .option("readChangeFeed", "true")
  .table("myDeltaTable")

若要在读取表的同时获取更改数据,请将选项 readChangeFeed 设置为 truestartingVersionstartingTimestamp 是可选项,如果未提供,则流会在流式传输时将表的最新快照返回为 INSERT,并将未来的更改返回为更改数据。 读取更改数据时还支持速率限制(maxFilesPerTriggermaxBytesPerTrigger)和 excludeRegex 等选项。

注意

对于除起始快照版本之外的版本,速率限制可以是原子性的。 也就是说,整个提交版本将受到速率限制,或者将返回整个提交。

默认情况下,如果用户传入的版本或时间戳超过了表的最后一次提交,则会引发错误 timestampGreaterThanLatestCommit。 在 Databricks Runtime 11.3 LTS 及更高版本中,如果用户将以下配置设置为 true,则更改数据馈送可以处理范围外版本的情况:

set spark.databricks.delta.changeDataFeed.timestampOutOfRange.enabled = true;

如果提供的起始版本大于表上最后提交版本,或提供的起始时间戳比表上最后提交时间戳还要新,则启用上述配置后,将返回空的读取结果。

如果提供的最终版本大于表上最后提交版本,或提供的最终时间戳比表上最后提交时间戳还要新,则启用上述配置后,将返回空的读取结果。

什么是更改数据馈送的架构?

当你读取表的更改数据馈送时,将使用最新表版本的架构。

注意

完全支持大多数架构更改和演变操作。 启用了列映射的表不支持所有用例,它们会表现出不同的行为。 请参阅启用了列映射的表的更改数据馈送限制

除了 Delta 表架构中的数据列之外,更改数据馈送还包含用于标识更改事件类型的元数据列:

列名称 类型
_change_type 字符串 insert, update_preimage , update_postimage, deleteinsert
_commit_version Long 包含更改的 Delta 日志或表版本。
_commit_timestamp 时间戳 与提交创建关联的时间戳。

(1) 是更新前的值,postimage 是更新后的值。

注意

如果架构包含与这些添加的列同名的列,则你无法在表上启用更改数据馈送。 在尝试启用更改数据馈送之前重命名表中的列即可解决此冲突。

启用了列映射的表的更改数据馈送限制

在 Delta 表上启用列映射后,可以删除或重命名表中的列,而无需重写现有数据的数据文件。 如果启用了列映射,在执行非添加性架构更改(例如重命名或删除列、更改数据类型或更改可为 null 性)后,更改数据馈送存在限制。

重要

  • 无法使用批处理语义读取发生非添加性架构更改的事务或范围的更改数据馈送。
  • 在 Databricks Runtime 13.0 及更高版本中,启用了列映射且发生非添加性架构更改的表不支持对更改数据馈送进行流式读取。 请参阅使用列映射和架构更改进行流式处理
  • 在 Databricks Runtime 12.0 及更低版本中,无法读取启用了列映射且经历过列重命名或删除操作的表的更改数据馈送。

在 Databricks Runtime 12.1 及更高版本中,可以对启用了列映射且经历过非添加性架构更改的表的更改数据馈送执行批量读取。 读取操作不使用表的最新版本的架构,而是使用查询中指定的表的最终版本的架构。 如果指定的版本范围涵盖非添加性架构更改,查询仍会失败。

常见问题解答 (FAQ)

启用更改数据馈送会产生多大的开销?

没有明显的影响。 更改数据记录是在查询执行过程中以内联方式生成的,通常比重写文件的总大小要小得多。

更改记录的保留策略是什么?

更改记录遵循的保留策略与过时的表版本相同,如果更改记录超过了指定的保留期,会通过 VACUUM 将其清理。

新记录何时在更改数据馈送中提供?

更改数据是连同 Delta Lake 事务一起提交的,当新数据出现在表中时,更改数据也会出现在其中。

笔记本示例:使用增量更改数据馈送传播更改

此笔记本展示如何将对有关疫苗接种绝对数量的银色表所做的更改传播到有关疫苗接种率的金色表。

更改数据馈送笔记本

获取笔记本