在 Delta Lake 上使用 optimize 来压缩数据文件

请参阅 OPTIMIZE

Azure Databricks 上的 Delta Lake 可提高表中读取查询的速度。 要提高此速度,一种方法是将小文件合并成较大的文件。

注意

在 Databricks Runtime 13.3 及更高版本中,Databricks 建议对 Delta 表布局使用聚类分析。 请参阅对 Delta 表使用 liquid 聚类分析

语法示例

通过运行 OPTIMIZE 命令触发压缩:

SQL

OPTIMIZE delta.`/data/events`

Python

from delta.tables import *
deltaTable = DeltaTable.forPath(spark, "/data/events")
deltaTable.optimize().executeCompaction()

Scala

import io.delta.tables._
val deltaTable = DeltaTable.forPath(spark, "/data/events")
deltaTable.optimize().executeCompaction()

或者:

SQL

OPTIMIZE events

Python

from delta.tables import *
deltaTable = DeltaTable.forName(spark, "events")
deltaTable.optimize().executeCompaction()

Scala

import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "events")
deltaTable.optimize().executeCompaction()

如果拥有大量数据,并且只想要优化其中的一个子集,则可以使用 WHERE 指定一个可选的分区谓词:

SQL

OPTIMIZE events WHERE date >= '2022-11-18'

Python

from delta.tables import *
deltaTable = DeltaTable.forName(spark, "events")
deltaTable.optimize().where("date='2021-11-18'").executeCompaction()

Scala

import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "events")
deltaTable.optimize().where("date='2021-11-18'").executeCompaction()

注意

  • 二进制打包优化幂等,这意味着如果在同一数据集上运行两次,则第二次运行不起作用。
  • 二进制打包旨在根据其在磁盘上的大小生成均匀平衡的数据文件,但不一定是每个文件的元组数。 但是,这两个度量值通常是相关的。
  • Databricks Runtime 11.0 及更高版本提供用于执行 OPTIMIZE 操作的 Python 和 Scala API。

Delta 表的读取器使用快照隔离,这意味着,当 OPTIMIZE 从事务日志中删除不必要的文件时,它们不会中断。 OPTIMIZE 不会对表进行任何数据相关更改,因此,在 OPTIMIZE 之前和之后读取都具有相同的结果。 对作为流式处理源的表执行 OPTIMIZE 不会影响将此表视为源的任何当前或未来的流。 OPTIMIZE 返回所删除文件的文件统计信息(最小值、最大值、总计等)和操作添加的文件。 优化统计信息还包含 Z 排序统计信息、批处理数和已优化分区数。

你还可以使用自动压缩来自动压缩小文件。 请参阅 Azure Databricks 上的 Delta Lake 的自动压缩

我应该多久运行一次 OPTIMIZE

如果选择运行 OPTIMIZE 的频率,则会在性能和成本之间进行权衡。 为了提高最终用户查询性能,请更频繁地运行 OPTIMIZE。 由于资源使用量增加,这将产生更高的成本。 若要优化成本,请减少运行频率。

Databricks 建议从每天运行一次 OPTIMIZE 开始,然后调整频率以平衡成本和性能权衡。

运行 OPTIMIZE(二进制打包和 Z 排序)的最佳实例类型是什么?

这两个操作都是执行大量 Parquet 解码和编码的 CPU 密集型操作。

Databricks 建议使用计算优化实例类型。 此外,OPTIMIZE 也受益于附加的 SSD。