Apache Spark 上对优化写入的需求

使用标准化的较大文件大小时,Apache Spark 等大数据处理引擎上的分析工作负载执行效率最高。 文件大小、文件数量、Spark 工作器数量及其配置之间的关系对性能起着至关重要的作用。 将工作负载引入到数据湖表中可能会继承持续写入大量小文件的特征;这种情况通常被称为“小文件问题”。

优化写入是 Synapse 上的 Delta Lake 的一项功能,可减少写入的文件数,旨在增加写入数据的单个文件大小。 它会动态优化分区,同时生成默认大小为 128 MB 的文件。 可以使用配置来根据工作负载要求更改目标文件大小。

此功能通过对分区使用额外的数据随机排布阶段来实现文件大小,因此在写入数据时会导致额外的处理成本。 表的读取效率应该可以弥补轻微的写入损失。

注意

  • 它在适用于 3.1 以上版本的 Apache Spark 的 Synapse 池上可用。

优化写入的优势

  • 它适用于批处理和流式处理写入模式的 Delta Lake 表。
  • 无需更改 spark.write 命令模式。 该功能通过配置设置或表属性启用。
  • 与 OPTIMIZE 命令相比,它可以减少写入事务的数量。
  • OPTIMIZE 操作速度更快,因为它处理更少的文件。
  • 用于删除旧的未引用文件的 VACUUM 命令也会运行得更快。
  • 查询将以更优化的文件大小扫描更少的文件,从而提高读取性能或资源使用率。

优化写入的使用方案

何时使用

  • Delta Lake 已分区表局限于写入模式,这种模式生成欠佳(小于 128 MB)或非标准化的文件大小(文件的大小不同)。
  • 以欠佳文件大小将重新分区的数据帧写入磁盘。
  • 小批 SQL 命令(例如 UPDATE、DELETE、MERGE、CREATE TABLE AS SELECT、INSERT INTO 等)针对的 Delta Lake 已分区表。
  • 将具有追加数据模式的引入方案流式传输到容许更高写入延迟的 Delta Lake 已分区表。

何时避免使用

  • 未分区表。
  • 不能接受更高写入延迟的用例。
  • 明确定义了优化计划和读取模式的大型表。

如何启用和禁用优化写入功能

默认已禁用优化写入功能。 在 Spark 3.3 池中,默认情况下已为分区表启用它。

为池或会话设置配置后,所有 Spark 写入模式都将使用该功能。

若要使用优化写入功能,请使用以下配置启用它:

  1. Scala 和 PySpark
spark.conf.set("spark.microsoft.delta.optimizeWrite.enabled", "true")
  1. Spark SQL
SET `spark.microsoft.delta.optimizeWrite.enabled` = true

若要检查当前配置值,请使用如下所示的命令:

  1. Scala 和 PySpark
spark.conf.get("spark.microsoft.delta.optimizeWrite.enabled")
  1. Spark SQL
SET `spark.microsoft.delta.optimizeWrite.enabled`

若要禁用优化写入功能,请如下所示更改以下配置:

  1. Scala 和 PySpark
spark.conf.set("spark.microsoft.delta.optimizeWrite.enabled", "false")
  1. Spark SQL
SET `spark.microsoft.delta.optimizeWrite.enabled` = false

使用表属性控制优化写入

在新表上

  1. SQL
CREATE TABLE <table_name> TBLPROPERTIES (delta.autoOptimize.optimizeWrite = true)
  1. Scala

使用 DeltaTableBuilder API

val table = DeltaTable.create()
  .tableName("<table_name>")
  .addColumnn("<colName>", <dataType>)
  .location("<table_location>")
  .property("delta.autoOptimize.optimizeWrite", "true") 
  .execute()

在现有表上

  1. SQL
ALTER TABLE <table_name> SET TBLPROPERTIES (delta.autoOptimize.optimizeWrite = true)
  1. Scala

使用 DeltaTableBuilder API

val table = DeltaTable.replace()
  .tableName("<table_name>")
  .location("<table_location>")
  .property("delta.autoOptimize.optimizeWrite", "true") 
  .execute()

如何获取和更改优化写入的当前最大文件大小配置

若要获取当前配置值,请使用以下命令。 默认值为 128 MB。

  1. Scala 和 PySpark
spark.conf.get("spark.microsoft.delta.optimizeWrite.binSize")
  1. SQL
SET `spark.microsoft.delta.optimizeWrite.binSize`
  • 更改配置值
  1. Scala 和 PySpark
spark.conf.set("spark.microsoft.delta.optimizeWrite.binSize", "134217728")
  1. SQL
SET `spark.microsoft.delta.optimizeWrite.binSize` = 134217728

后续步骤