通过文件管理优化性能Optimize performance with file management

为提高查询速度,Azure Databricks 上的 Delta Lake 支持优化存储在云存储中的数据的布局。To improve query speed, Delta Lake on Azure Databricks supports the ability to optimize the layout of data stored in cloud storage. Azure Databricks 上的 Delta Lake 支持两种布局算法:二进制打包和 Z 排序。Delta Lake on Azure Databricks supports two layout algorithms: bin-packing and Z-Ordering.

本文介绍如何运行优化命令、两种布局算法的工作原理以及如何清理过时的表快照。This article describes how to run the optimization commands, how the two layout algorithms work, and how to clean up stale table snapshots.

压缩(二进制打包) Compaction (bin-packing)

Azure Databricks 上的 Delta Lake 可以将小文件合并为较大的文件,从而提高表中读取查询的速度。Delta Lake on Azure Databricks can improve the speed of read queries from a table by coalescing small files into larger ones. 通过运行 OPTIMIZE 命令触发压缩:You trigger compaction by running the OPTIMIZE command:

OPTIMIZE delta.`/data/events`

or

OPTIMIZE events

如果拥有大量数据,并且只想要优化其中的一个子集,则可以使用 WHERE 指定一个可选的分区谓词:If you have a large amount of data and only want to optimize a subset of it, you can specify an optional partition predicate using WHERE:

OPTIMIZE events WHERE date >= '2017-01-01'

备注

  • 二进制打包优化幂等,这意味着如果在同一数据集上运行两次,则第二次运行不起作用。Bin-packing optimization is idempotent, meaning that if it is run twice on the same dataset, the second run has no effect.
  • 二进制打包旨在根据其在磁盘上的大小生成均匀平衡的数据文件,但不一定是每个文件的元组数。Bin-packing aims to produce evenly-balanced data files with respect to their size on disk, but not necessarily number of tuples per file. 但是,这两个度量值通常是相关的。However, the two measures are most often correlated.

Delta 表的读取器使用快照隔离,这意味着,当 OPTIMIZE 从事务日志中删除不必要的文件时,它们不会中断。Readers of Delta tables use snapshot isolation, which means that they are not interrupted when OPTIMIZE removes unnecessary files from the transaction log. OPTIMIZE 不会对表进行任何数据相关更改,因此,在 OPTIMIZE 之前和之后读取都具有相同的结果。OPTIMIZE makes no data related changes to the table, so a read before and after an OPTIMIZE has the same results. 对作为流式处理源的表执行 OPTIMIZE 不会影响将此表视为源的任何当前或未来的流。Performing OPTIMIZE on a table that is a streaming source does not affect any current or future streams that treat this table as a source. OPTIMIZE 返回所删除文件的文件统计信息(最小值、最大值、总计等)和操作添加的文件。OPTIMIZE returns the file statistics (min, max, total, and so on) for the files removed and the files added by the operation. 优化统计信息还包含 Z 排序统计信息、批处理数和已优化分区数。Optimize stats also contains the Z-Ordering statistics, the number of batches, and partitions optimized.

备注

在 Databricks Runtime 6.0 及更高版本中可用。Available in Databricks Runtime 6.0 and above.

你还可以使用自动优化自动压缩小文件。You can also compact small files automatically using Auto Optimize.

跳过数据 Data skipping

向 Delta 表中写入数据时,会自动收集跳过数据信息。Data skipping information is collected automatically when you write data into a Delta table. Azure Databricks 上的 Delta Lake 会在查询时利用此信息(最小值和最大值)来提供更快的查询。Delta Lake on Azure Databricks takes advantage of this information (minimum and maximum values) at query time to provide faster queries. 不需要配置跳过的数据;此功能会在适用时激活。You do not need to configure data skipping; the feature is activated whenever applicable. 但其有效性取决于数据的布局。However, its effectiveness depends on the layout of your data. 为了获取最佳结果,请应用 Z 排序For best results, apply Z-Ordering.

详细了解 Azure Databricks 上的 Delta Lake 跳过数据和 Z 排序的优点,请参阅优化示例中的笔记本。For an example of the benefits of Delta Lake on Azure Databricks data skipping and Z-Ordering, see the notebooks in Optimization examples. 默认情况下,Azure Databricks 上的 Delta Lake 收集你的表架构中定义的前 32 列的统计信息。By default Delta Lake on Azure Databricks collects statistics on the first 32 columns defined in your table schema. 你可以使用表属性 dataSkippingNumIndexedCols 来更改此值。You can change this value using the table property dataSkippingNumIndexedCols. 在写入文件时,添加更多的列来收集统计信息会增加额外的开销。Adding more columns to collect statistics would add additional overhead as you write files.

收集长字符串的统计信息成本高昂。Collecting statistics on long strings is an expensive operation. 若要避免收集长字符串的统计信息,可以将表属性 dataSkippingNumIndexedCols 配置为避免包含长字符串的列,或使用 ALTER TABLE CHANGE COLUMN 将包含长字符串的列移动到大于 dataSkippingNumIndexedCols 的列。To avoid collecting statistics on long strings, you can either configure the table property dataSkippingNumIndexedCols to avoid columns containing long strings or move columns containing long strings to a column greater than dataSkippingNumIndexedCols using ALTER TABLE CHANGE COLUMN. 请参阅See

为了收集统计信息,嵌套列中的每个字段都被视为单独的列。For the purposes of collecting statistics, each field within a nested column is considered as an individual column.

有关详细信息,请参阅博客文章:通过 Databricks Delta 以在数秒内处理数 PB 的数据You can read more on this article in the blog post: Processing Petabytes of Data in Seconds with Databricks Delta.

Z 排序(多维聚类) Z-Ordering (multi-dimensional clustering)

Z 排序是并置同一组文件中相关信息的方法Z-Ordering is a technique to colocate related information in the same set of files. Azure Databricks 上的 Delta Lake 数据跳过算法会自动使用此并置,大幅减少需要读取的数据量。This co-locality is automatically used by Delta Lake on Azure Databricks data-skipping algorithms to dramatically reduce the amount of data that needs to be read. 对于 Z 排序数据,请在 ZORDER BY 子句中指定要排序的列:To Z-Order data, you specify the columns to order on in the ZORDER BY clause:

OPTIMIZE events
WHERE date >= current_timestamp() - INTERVAL 1 day
ZORDER BY (eventType)

如果希望在查询谓词中常规使用某一列,并且该列具有较高的基数(即包含多个非重复值),请使用 ZORDER BYIf you expect a column to be commonly used in query predicates and if that column has high cardinality (that is, a large number of distinct values), then use ZORDER BY.

可以将 ZORDER BY 的多个列指定为以逗号分隔的列表。You can specify multiple columns for ZORDER BY as a comma-separated list. 但是,区域的有效性会随每个附加列一起删除。However, the effectiveness of the locality drops with each additional column. Z 排序对于未收集统计信息的列无效并且会浪费资源,因为需要列本地统计信息(如最小值、最大值和总计)才能跳过数据。Z-Ordering on columns that do not have statistics collected on them would be ineffective and a waste of resources as data skipping requires column-local stats such as min, max, and count. 可以通过对架构中的列重新排序或增加从中收集统计信息的列数,对某些列配置统计信息收集。You can configure statistics collection on certain columns by re-ordering columns in the schema or increasing the number of columns to collect statistics on. (有关详细信息,请参阅跳过数据部分)。See the section Data skipping for more details.

备注

  • Z 排序不是幂等的,而应该是增量操作。Z-Ordering is not idempotent but aims to be an incremental operation. 多次运行不能保证 Z 排序所需的时间减少。The time it takes for Z-Ordering is not guaranteed to reduce over multiple runs. 但是,如果没有将新数据添加到刚刚进行 Z 排序的分区,则该分区的另一个 Z 排序将不会产生任何效果。However, if no new data was added to a partition that was just Z-Ordered, another Z-Ordering of that partition will not have any effect.

  • Z 排序旨在根据元组的数量生成均匀平衡的数据文件,但不一定是磁盘上的数据大小。Z-Ordering aims to produce evenly-balanced data files with respect to the number of tuples, but not necessarily data size on disk. 这两个度量值通常是相关的,但可能会有例外的情况,导致优化任务时间出现偏差。The two measures are most often correlated, but there can be situations when that is not the case, leading to skew in optimize task times.

    例如,如果 ZORDER BY 日期,并且最新记录的宽度比过去多很多(例如数组或字符串值较长),则 OPTIMIZE 作业的任务持续时间和所生成文件的大小都会出现偏差。For example, if you ZORDER BY date and your most recent records are all much wider (for example longer arrays or string values) than the ones in the past, it is expected that the OPTIMIZE job’s task durations will be skewed, as well as the resulting file sizes. 但这只是 OPTIMIZE 命令本身的问题;它不应对后续查询产生任何负面影响。This is, however, only a problem for the OPTIMIZE command itself; it should not have any negative impact on subsequent queries.

笔记本Notebooks

有关优化优点的示例,请参阅以下笔记本:For an example of the benefits of optimization, see the following notebooks:

提高交互式查询性能 Improve interactive query performance

Delta Engine 提供了一些额外的机制来提高查询性能。Delta Engine offers a few additional mechanisms to improve query performance.

管理数据时效性Manage data recency

在每个查询开头,Delta 表自动更新到最新版本的表。At the beginning of each query Delta tables auto-update to the latest version of the table. 当命令状态报告 Updating the Delta table's state 时,可以在笔记本中观察到此过程。This process can be observed in notebooks when the command status reports: Updating the Delta table's state. 但是,当对表运行历史分析时,你可能不需要最新的数据,尤其是在频繁引入流式处理数据的表中。However, when running historical analysis on a table, you may not necessarily need up-to-the-last-minute data, especially for tables where streaming data is being ingested frequently. 在这些情况下,可以在 Delta 表的过时快照上运行查询。In these cases, queries can be run on stale snapshots of your Delta table. 这会降低从查询获取结果的延迟时间。This can lower latency in getting results from queries.

可以通过将 Spark 会话配置 spark.databricks.delta.stalenessLimit 设置为时间字符串值(例如 1h15m1d 分别为 1 小时、15 分钟和 1 天)来配置表数据的过时程度。You can configure how stale your table data is by setting the Spark session configuration spark.databricks.delta.stalenessLimit with a time string value, for example, 1h, 15m, 1d for 1 hour, 15 minutes, and 1 day respectively. 此配置特定于会话,因此不会影响其他用户从其他笔记本、作业或 BI 工具访问此表。This configuration is session specific, therefore won’t affect other users accessing this table from other notebooks, jobs, or BI tools. 此外,此设置不会阻止更新表;它只会阻止查询等待表更新。In addition, this setting doesn’t prevent your table from updating; it only prevents a query from having to wait for the table to update. 此更新仍会在后台发生,并将在群集中公平地共享资源。The update still occurs in the background, and will share resources fairly across the cluster. 如果超过过期限制,则查询将在表状态更新上阻止。If the staleness limit is exceeded, then the query will block on the table state update.

用于低延迟查询的增强检查点 Enhanced checkpoints for low-latency queries

Delta Lake 写入检查点作为 Delta 表的聚合状态,每 10 次提交写入一次。Delta Lake writes checkpoints as an aggregate state of a Delta table every 10 commits. 这些检查点作为计算表的最新状态的起点。These checkpoints serve as the starting point to compute the latest state of the table. 如果没有检查点,Delta Lake 就必须读取一个大型 JSON 文件(“delta”文件)的集合,表示提交到事务日志以计算表的状态。Without checkpoints, Delta Lake would have to read a large collection of JSON files (“delta” files) representing commits to the transaction log to compute the state of a table. 此外,列级统计信息 Delta Lake 用于执行存储在检查点中的数据跳过操作。In addition, the column-level statistics Delta Lake uses to perform data skipping are stored in the checkpoint.

重要

Delta Lake 检查点与结构化流检查点不同。Delta Lake checkpoints are different than Structured Streaming checkpoints.

在 Databricks Runtime 7.2 及更低的级别中,列级统计信息作为 JSON 列存储在 Delta Lake 检查点中。In Databricks Runtime 7.2 and below, column-level statistics are stored in Delta Lake checkpoints as a JSON column. 在 Databricks Runtime 7.3 及更高版本中,列级统计信息作为结构。In Databricks Runtime 7.3 and above, column-level statistics as a struct. 结构格式使得 Delta Lake 读取速度快得多,因为:The struct format makes Delta Lake reads much faster, because:

  • Delta Lake 不会执行昂贵的 JSON 分析来获取列级统计信息。Delta Lake doesn’t perform expensive JSON parsing to obtain column-level statistics.
  • Parquet 列修剪功能可以显著减少读取列的统计信息所需的 I/O。Parquet column pruning capabilities significantly reduce the I/O required to read the statistics for a column.

结构格式启用一系列优化,这些优化可以将增量 Delta Lake 读取操作的开销从数秒降低到数十毫秒,大大降低短查询的延迟。The struct format enables a collection of optimizations that reduce the overhead of Delta Lake read operations from seconds to tens of milliseconds, which significantly reduces the latency for short queries.

在检查点中管理列级统计信息Manage column-level statistics in checkpoints

使用表属性 delta.checkpoint.writeStatsAsJsondelta.checkpoint.writeStatsAsStruct 来管理如何在检查点中写入统计信息。You manage how statistics are written in checkpoints using the table properties delta.checkpoint.writeStatsAsJson and delta.checkpoint.writeStatsAsStruct. 如果两个表属性都为 false,则 Delta Lake 无法执行跳过数据。If both table properties are false, Delta Lake cannot perform data skipping.

在 Databricks Runtime 7.3 LTS 及更高版本中:In Databricks Runtime 7.3 LTS and above:

  • 批处理以 JSON 格式和结构格式编写写入统计信息。Batch writes write statistics in both JSON and struct format. delta.checkpoint.writeStatsAsJson 上声明的默认值为 truedelta.checkpoint.writeStatsAsJson is true.
  • 流式处理以 JSON 格式写入写入统计信息(以最大程度地减少检查点对写入延迟的影响)。Streaming writes write statistics in only JSON format (to minimize the impact of checkpoints on write latency). 若要同时编写结构格式,请参阅为结构化流查询启用增强的检查点To also write the struct format, see Enable enhanced checkpoints for Structured Streaming queries.
  • 在这两种情况下,都默认未定义 delta.checkpoint.writeStatsAsStructIn both cases, delta.checkpoint.writeStatsAsStruct is undefined by default.
  • 读取器在可用时使用结构列,否则回退到使用 JSON 列。Readers use the struct column when available and otherwise fall back to using the JSON column.

在 Databricks Runtime 7.2 及更旧版本中,读取器只使用 JSON 列。In Databricks Runtime 7.2 and below, readers only use the JSON column. 因此,如果 delta.checkpoint.writeStatsAsJsonfalse,则此类读取器不能执行跳过数据。Therefore, if delta.checkpoint.writeStatsAsJson is false, such readers cannot perform data skipping.

重要

增强的检查点不会破坏与开源 Delta Lake 读取器的兼容性。Enhanced checkpoints do not break compatibility with open source Delta Lake readers. 但是,将 delta.checkpoint.writeStatsAsJson 设置为 false 可能会影响专有的 Delta Lake 读取器。However, setting delta.checkpoint.writeStatsAsJson to false may have implications on proprietary Delta Lake readers. 请与供应商联系,以了解有关性能影响的详细信息。Contact your vendors to learn more about performance implications.

检查点中统计信息的权衡Trade-offs with statistics in checkpoints

由于在检查点中写入统计信息会产生成本(通常小于一分钟,即使是对大表),因此需要权衡编写检查点所花的时间和与 Databricks Runtime 7.2 及更旧版本的兼容性。Since writing statistics in a checkpoint has a cost (usually < a minute even for large tables), there is a tradeoff between the time taken to write a checkpoint and compatibility with Databricks Runtime 7.2 and below. 如果能够将所有工作负载升级到 Databricks Runtime 7.3 LTS 或更高版本,则可以通过禁用旧版 JSON 统计信息来降低写入检查点的成本。If you are able to upgrade all of your workloads to Databricks Runtime 7.3 LTS or above you can reduce the cost of writing a checkpoint by disabling the legacy JSON statistics. 下表汇总了此权衡。This tradeoff is summarized in the following table.

如果跳过数据不适用于你的应用程序,可以将这两个属性都设置为 false,这样就不会收集或写入任何统计信息。If data skipping is not useful in your application, you can set both properties to false, and no statistics are collected or written. 我们不建议此配置。We do not recommend this configuration.

统计信息权衡Stats tradeoffs

Enable enhanced checkpoints for Structured Streaming queries

如果结构化流式处理工作负载没有低延迟要求(即要求延迟在一分钟以内),你可以运行以下 SQL 命令来启用增强的检查点:If your Structured Streaming workloads don’t have low latency requirements (sub-minute latencies), you can enable enhanced checkpoints by running the following SQL command:

ALTER TABLE [<table-name>|delta.`<path-to-table>`] SET TBLPROPERTIES
('delta.checkpoint.writeStatsAsStruct' = 'true')

如果不使用 Databricks Runtime 7.2 或更旧版本来查询数据,还可以通过设置以下表属性来降低检查点写入延迟:If you do not use Databricks Runtime 7.2 or below to query your data, you can also improve the checkpoint write latency by setting the following table properties:

ALTER TABLE [<table-name>|delta.`<path-to-table>`] SET TBLPROPERTIES
(
 'delta.checkpoint.writeStatsAsStruct' = 'true',
 'delta.checkpoint.writeStatsAsJson' = 'false'
)

禁用写入无统计结构的检查点的群集写入Disable writes from clusters that write checkpoints without the stats struct

Databricks Runtime 7.2 及更旧版本的写入器会写入无统计结构的检查点,这会阻止优化 Databricks Runtime 7.3 LTS 读取器。Writers in Databricks Runtime 7.2 and below write checkpoints without the stats struct, which prevents optimizations for Databricks Runtime 7.3 LTS readers. 若要阻止运行 Databricks Runtime 7.2 及更旧版本的群集写入 Delta 表,你可以使用 upgradeTableProtocol 方法升级 Delta 表:To block clusters running Databricks Runtime 7.2 and below from writing to a Delta table, you can upgrade the Delta table using the upgradeTableProtocol method:

PythonPython
from delta.tables import DeltaTable
delta = DeltaTable.forPath(spark, "path_to_table") # or DeltaTable.forName
delta.upgradeTableProtocol(1, 3)
ScalaScala
import io.delta.tables.DeltaTable
val delta = DeltaTable.forPath(spark, "path_to_table") // or DeltaTable.forName
delta.upgradeTableProtocol(1, 3)

警告

应用 upgradeTableProtocol 方法可阻止运行 Databricks Runtime 7.2 及更旧版本的群集写入表,此更改不可逆。Applying the upgradeTableProtocol method prevents clusters running Databricks Runtime 7.2 and below from writing to your table and this change is irreversible. 建议仅在提交到新格式后才升级表。We recommend upgrading your tables only after you are committed to the new format. 可以通过使用 Databricks Runtime 7.3 LTS 创建表的浅层克隆来尝试这些优化。You can try out these optimizations by creating a shallow CLONE of your tables using Databricks Runtime 7.3 LTS.

升级表编写器版本后,编写器必须遵循 'delta.checkpoint.writeStatsAsStruct''delta.checkpoint.writeStatsAsJson' 的设置。Once you upgrade the table writer version, writers must obey your settings for 'delta.checkpoint.writeStatsAsStruct' and 'delta.checkpoint.writeStatsAsJson'.

下表总结了如何在各种版本的 Databricks Runtime、表协议版本和编写器类型中利用增强的检查点。The following table summarizes how to take advantage of enhanced checkpoints in various versions of Databricks Runtime, table protocol versions, and writer types.

增强的检查点Enhanced checkpoints

禁止从使用旧检查点格式的群集写入Disable writes from clusters using old checkpoint formats

Databricks Runtime 7.2 及更低版本的写入器可以写入旧格式的检查点,这将阻止对 Databricks Runtime 7.3 LTS 编写器进行优化。Writers from Databricks Runtime 7.2 and below can write old format checkpoints, which would prevent optimizations for Databricks Runtime 7.3 LTS writers. 若要阻止运行 Databricks Runtime 7.2 及更旧版本的群集写入 Delta 表,你可以使用 upgradeTableProtocol 方法升级 Delta 表:To block clusters running Databricks Runtime 7.2 and below from writing to a Delta table, you can upgrade the Delta table using the upgradeTableProtocol method:

PythonPython
from delta.tables import DeltaTable
delta = DeltaTable.forPath(spark, "path_to_table") # or DeltaTable.forName
delta.upgradeTableProtocol(1, 3)
ScalaScala
import io.delta.tables.DeltaTable
val delta = DeltaTable.forPath(spark, "path_to_table") // or DeltaTable.forName
delta.upgradeTableProtocol(1, 3)

警告

应用 upgradeTableProtocol 方法可阻止运行 Databricks Runtime 7.2 及更低版本的群集写入到你的表。Applying the upgradeTableProtocol method prevents clusters running Databricks Runtime 7.2 and below from writing to your table. 此更改不可逆。The change is irreversible. 因此,建议仅在提交到新格式后才升级表。Therefore, we recommend upgrading your tables only after you are committed to the new format. 可以通过使用 Databricks Runtime 7.3 LTS 创建表的浅层克隆来尝试这些优化:You can try out these optimizations by creating a shallow CLONE of your tables using Databricks Runtime 7.3 LTS:

常见问题解答 (FAQ) Frequently asked questions (FAQ)

为什么 OPTIMIZE 不是自动的?Why isn’t OPTIMIZE automatic?

OPTIMIZE 操作启动多个 Spark 作业,以便通过压缩优化文件大小调整(并选择性执行 Z 排序)。The OPTIMIZE operation starts up many Spark jobs in order to optimize the file sizing via compaction (and optionally perform Z-Ordering). 由于 OPTIMIZE 执行的内容大多是压缩小文件,因此在此操作生效之前,你必须先积累许多小文件。Since much of what OPTIMIZE does is compact small files, you must first accumulate many small files before this operation has an effect. 因此,OPTIMIZE 操作不会自动运行。Therefore, the OPTIMIZE operation is not run automatically.

而且,运行 OPTIMIZE(特别是 ZORDER)是时间和资源成本高昂的操作。Moreover, running OPTIMIZE, especially with ZORDER, is an expensive operation in time and resources. 如果 Databricks 自动运行 OPTIMIZE 或等待分批写入数据,则将不可运行(以 Delta 表为源的)低延迟 Delta Lake 流。If Databricks ran OPTIMIZE automatically or waited to write out data in batches, it would remove the ability to run low-latency Delta Lake streams (where a Delta table is the source). 许多客户都有一个从未优化的 Delta 表,因为他们仅流式传输这些表中的数据,而享受不到 OPTIMIZE 可提供的查询优势。Many customers have Delta tables that are never optimized because they only stream data from these tables, obviating the query benefits that OPTIMIZE would provide.

最后,Delta Lake 会自动收集有关写入表的文件(无论是否通过 OPTIMIZE 操作)的统计信息。Lastly, Delta Lake automatically collects statistics about the files that are written to the table (whether through an OPTIMIZE operation or not). 这意味着,从 Delta 表的读取将利用此信息,无论该表或分区上是否运行了 OPTIMIZE 操作。This means that reads from Delta tables leverage this information whether or not the table or a partition has had the OPTIMIZE operation run on it.

我应该多久运行一次 OPTIMIZEHow often should I run OPTIMIZE?

如果选择运行 OPTIMIZE 的频率,则会在性能和成本之间进行权衡。When you choose how often to run OPTIMIZE, there is a trade-off between performance and cost. 如果希望获得更好的最终用户查询性能,则应更频繁地运行 OPTIMIZE(根据资源使用量,可能需要较高的成本)。You should run OPTIMIZE more often if you want better end-user query performance (necessarily at a higher cost because of resource usage). 如果要优化成本,应减少运行它。You should run it less often if you want to optimize cost.

建议从每天运行一次 OPTIMIZE 开始。We recommend you start by running OPTIMIZE on a daily basis. 然后在此修改作业。Then modify your job from there.

运行 OPTIMIZE(二进制打包和 Z 排序)的最佳实例类型是什么?What’s the best instance type to run OPTIMIZE (bin-packing and Z-Ordering) on?

这两个操作都是执行大量 Parquet 解码和编码的 CPU 密集型操作。Both operations are CPU intensive operations doing large amounts of Parquet decoding and encoding.

对于这些工作负载,建议采用 F 或 Fsv2 系列。For these workloads we recommend the F or Fsv2 series.