最佳做法:Delta Lake

本文介绍了使用 Delta Lake 时的最佳做法。

删除并在同一位置重新创建表时,应始终使用 CREATE OR REPLACE TABLE 语句。 请参阅删除或替换 Delta 表

使用 liquid 聚类分析优化数据跳过

Databricks 建议使用 liquid 聚类分析,而不是分区、Z 顺序或其他数据组织策略来优化数据跳过的数据布局。 请参阅对 Delta 表使用 liquid 聚类分析

压缩文件

Databricks 建议经常运行 OPTIMIZE 命令来压缩小文件。

注意

此操作不会移除旧文件。 若要移除它们,请运行 VACUUM 命令。

替换表的内容或架构

有时候,你可能希望替换 Delta 表。 例如: 。

  • 你发现表中的数据不正确,需要对内容进行替换。
  • 你希望重写整个表,以执行不兼容架构更改(例如更改列类型)。

尽管可以删除 Delta 表的整个目录并在同一路径上创建新表,但不建议这样做,因为:

  • 删除目录效率不高。 删除某个包含极大文件的目录可能需要数小时甚至数天的时间。
  • 删除的文件中的所有内容都会丢失;如果删除了错误的表,就很难恢复。
  • 目录删除不是原子操作。 删除表时,某个读取表的并发查询可能会失败或看到的是部分表。

如果无需更改表架构,可以从 Delta 表中删除数据并插入新数据,或通过更新表来纠正不正确的值。

如果要更改表架构,则能够以原子方式替换整个表。 例如: 。

Python

dataframe.write \
  .format("delta") \
  .mode("overwrite") \
  .option("overwriteSchema", "true") \
  .saveAsTable("<your-table>") # Managed table

dataframe.write \
  .format("delta") \
  .mode("overwrite") \
  .option("overwriteSchema", "true") \
  .option("path", "<your-table-path>") \
  .saveAsTable("<your-table>") # External table

SQL

REPLACE TABLE <your-table> USING DELTA AS SELECT ... -- Managed table
REPLACE TABLE <your-table> USING DELTA LOCATION "<your-table-path>" AS SELECT ... -- External table

Scala

dataframe.write
  .format("delta")
  .mode("overwrite")
  .option("overwriteSchema", "true")
  .saveAsTable("<your-table>") // Managed table

dataframe.write
  .format("delta")
  .mode("overwrite")
  .option("overwriteSchema", "true")
  .option("path", "<your-table-path>")
  .saveAsTable("<your-table>") // External table

此方法有多个优点:

  • 覆盖表的速度要快得多,因为它不需要以递归方式列出目录或删除任何文件。
  • 表的旧版本仍然存在。 如果删除了错误的表,则可以使用“按时间顺序查看”轻松检索旧数据。 请参阅使用 Delta Lake 表历史记录
  • 这是一个原子操作。 在删除表时,并发查询仍然可以读取表。
  • 由于 Delta Lake ACID 事务保证,如果覆盖表失败,则该表将处于其以前的状态。

此外,如果想要在覆盖表后删除旧文件来节省存储成本,可使用 VACUUM 来删除它们。 它针对文件删除进行了优化,通常比删除整个目录要快。

Spark 缓存

Databricks 不建议出于以下原因使用 Spark 缓存

  • 丢失的任何数据跳过都可能归因于在缓存的 DataFrame 顶部添加的其他筛选器。
  • 如果使用其他标识符访问表,则可能不会更新缓存的数据。

Apache Spark 上的 Delta Lake 与 Parquet 之间的区别

Delta Lake 会自动处理以下操作。 不应手动执行以下操作:

  • REFRESH TABLE:Delta 表始终返回最新信息,因此无需在更改后手动调用 REFRESH TABLE
  • 添加和删除分区:Delta Lake 会自动跟踪表中存在的分区集,并在添加或删除数据时更新列表。 因此,无需运行 ALTER TABLE [ADD|DROP] PARTITIONMSCK
  • 加载单个分区:不需要直接读取分区。 例如,无需运行 spark.read.format("parquet").load("/data/date=2017-01-01")。 请改用 WHERE 子句来跳过数据,例如 spark.read.table("<table-name>").where("date = '2017-01-01'")
  • 不要手动修改数据文件:Delta Lake 使用事务日志自动提交对表的更改。 不要直接修改、添加或删除 Delta 表中的 Parquet 数据文件,因为这可能会导致数据丢失或表损坏。

提高 Delta Lake 合并的性能

可使用以下方法缩短合并所用的时间:

  • 减少匹配项的搜索空间:默认情况下,merge 操作会搜索整个 Delta 表以在源表中查找匹配项。 加速 merge 的一种方法是通过在匹配条件中添加已知约束来缩小搜索范围。 例如,假设你有一个由 countrydate 分区的表,并且你希望使用 merge 更新最后一天和特定国家/地区的信息。 添加以下条件可加快查询速度,因为它仅在相关分区中查找匹配项:

    events.date = current_date() AND events.country = 'USA'
    

    此外,该查询还会减少与其他并发操作发生冲突的机会。 有关更多详细信息,请参阅 Azure Databricks 上的隔离级别和写入冲突

  • 压缩文件:如果数据存储在许多小文件中,则读取数据以搜索匹配项可能会变慢。 可以将小文件压缩为更大的文件,以提高读取吞吐量。 有关详细信息,请参阅在 Delta Lake 上使用 optimize 来压缩数据文件

  • 控制写入的随机分区merge 操作多次随机播放数据以计算和写入更新的数据。 用于随机排列的任务的数量由 Spark 会话配置 spark.sql.shuffle.partitions 控制。 设置此参数不仅可以控制并行度,还可以确定输出文件的数量。 增大该值可提高并行度,但也会生成大量较小的数据文件。

  • 启用优化写入:对于已分区表,merge 生成的小文件数量远大于随机分区的数量。 这是因为每个随机任务都可以在多个分区中写入多个文件,并可能成为性能瓶颈。 可以通过启用优化写入来减少文件数量。 请参阅 Azure Databricks 上的 Delta Lake 的优化写入

  • 调整表中的文件大小:Azure Databricks 可以自动检测 Delta 表是否在频繁执行重写文件的 merge 操作,并可能会减小重写文件的大小,以备将来执行更多文件重写操作。 有关详细信息,请参阅有关调整文件大小的部分。

  • 低随机排列合并低随机排列合并提供了 MERGE 的优化实现,可为大多数常见工作负载提供更好的性能。 此外,它还保留了现有的数据布局优化,例如对未修改数据的进行 Z 排序

管理数据时效性

在每个查询开头,Delta 表自动更新到最新版本的表。 当命令状态报告 Updating the Delta table's state 时,可以在笔记本中观察到此过程。 但是,当对表运行历史分析时,你可能不需要最新的数据,尤其是在频繁引入流式处理数据的表中。 在这些情况下,可以在 Delta 表的过时快照上运行查询。 此方法可降低从查询获取结果的延迟时间。

可通过将 Spark 会话配置 spark.databricks.delta.stalenessLimit 设置为时间字符串值(例如 1h15m,分别为 1 小时或 15 分钟),来配置对过期数据的容忍度。 此配置特定于会话,不会影响访问此表的其他客户端。 如果表状态已在过期限制内更新,则针对表的查询将返回结果,而无需等待最新的表更新。 此设置永远不会阻止表更新,并且在返回过期数据时,会在后台进行更新。 如果最后一次表更新在过期期限之前,则查询不会返回结果,直到表状态更新完成为止。

用于低延迟查询的增强检查点

Delta Lake 以优化的频率将检查点写入增量表的聚合状态。 这些检查点作为计算表的最新状态的起点。 如果没有检查点,Delta Lake 就必须读取一个大型 JSON 文件(“delta”文件)的集合,表示提交到事务日志以计算表的状态。 此外,列级统计信息 Delta Lake 用于执行存储在检查点中的数据跳过操作。

重要

Delta Lake 检查点与结构化流检查点不同。

列级统计信息存储为结构和 JSON(以实现后向兼容性)。 结构格式使得 Delta Lake 读取速度快得多,因为:

  • Delta Lake 不会执行昂贵的 JSON 分析来获取列级统计信息。
  • Parquet 列修剪功能可以显著减少读取列的统计信息所需的 I/O。

结构格式启用一系列优化,这些优化可以将增量 Delta Lake 读取操作的开销从数秒降低到数十毫秒,大大降低短查询的延迟。

在检查点中管理列级统计信息

使用表属性 delta.checkpoint.writeStatsAsJsondelta.checkpoint.writeStatsAsStruct 来管理如何在检查点中写入统计信息。 如果两个表属性都为 false,则 Delta Lake 无法执行跳过数据。

  • 批处理以 JSON 格式和结构格式编写写入统计信息。 delta.checkpoint.writeStatsAsJson 上声明的默认值为 true
  • delta.checkpoint.writeStatsAsStruct 在默认情况下是未定义的。
  • 读取器在可用时使用结构列,否则回退到使用 JSON 列。

重要

增强的检查点不会破坏与开源 Delta Lake 读取器的兼容性。 但是,将 delta.checkpoint.writeStatsAsJson 设置为 false 可能会影响专有的 Delta Lake 读取器。 请与供应商联系,以了解有关性能影响的详细信息。

为结构化流式处理查询启用增强的检查点

如果结构化流式处理工作负载没有低延迟要求(即要求延迟在一分钟以内),你可以运行以下 SQL 命令来启用增强的检查点:

ALTER TABLE [<table-name>|delta.`<path-to-table>`] SET TBLPROPERTIES
('delta.checkpoint.writeStatsAsStruct' = 'true')

可以通过设置以下表属性来降低检查点写入延迟:

ALTER TABLE [<table-name>|delta.`<path-to-table>`] SET TBLPROPERTIES
(
 'delta.checkpoint.writeStatsAsStruct' = 'true',
 'delta.checkpoint.writeStatsAsJson' = 'false'
)

如果跳过数据不适用于你的应用程序,可以将这两个属性都设置为 false, 这样就不会收集或写入任何统计信息。 Databricks 不建议使用此配置。