Spark 2.0.0 群集花费很长时间来追加数据Spark 2.0.0 cluster takes a long time to append data

如果你发现使用 Spark 2.0.0 版本的群集需要更长的时间才能将数据追加到现有数据集,尤其是所有 Spark 作业都已完成而命令尚未完成,这是因为驱动程序节点正在将任务的输出文件从作业临时目录逐个移动到最终的目标(使用云存储时,此过程较慢)。If you find that a cluster using Spark 2.0.0 version takes a longer time to append data to an existing dataset and in particular, all of Spark jobs have finished, but your command has not finished, it is because driver node is moving the output files of tasks from the job temporary directory to the final destination one-by-one, which is slow with cloud storage. 要解决此问题,请将 mapreduce.fileoutputcommitter.algorithm.version 设置为 2.To resolve this issue, set mapreduce.fileoutputcommitter.algorithm.version to 2. 此问题不会影响覆盖数据集或将数据写入到新位置。This issue does not affect overwriting a dataset or writing data to a new location.

备注

Spark 2.0.1-db1 开始,mapreduce.fileoutputcommitter.algorithm.version 的默认值为 2。Starting with Spark 2.0.1-db1, the default value of mapreduce.fileoutputcommitter.algorithm.version is 2. 如果使用的是 Spark 2.0.0,请在遇到此缓慢问题时手动设置此配置。If you are using Spark 2.0.0, manually set this config if you experience this slowness issue.

如何确认是否遇到此问题?How to confirm if I am experiencing this issue?

可以通过检查以下内容来确认是否遇到了这个问题:You can confirm if you are experiencing this issue by checking the following things:

  1. 所有 Spark 作业都已完成,而单元格尚未完成。All of your Spark jobs have finished and your cell has not finished. 进度栏应如下所示The progress bar should look like

    no-alternative-textno-alternative-text

  2. 驱动程序的线程转储(可以在 Spark UI 的执行程序页上找到)显示 FileOutputCommitter 类的 commitJob 方法中有一个线程花费了很长时间。The thread dump of the driver (you can find it on the executor page of the Spark UI) shows that there is a thread spending a long time inside the commitJob method of FileOutputCommitter class.

如何将 spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version 设置为 2?How do I set spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version to 2?

可以使用以下任一方法来设置此配置:You can set this config by using any of the following methods:

  • 启动群集时,可以将 spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version 2 放入 Spark 配置。When you launch your cluster, you can put spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version 2 in the Spark config.
  • 可以在笔记本中运行 %sql set mapreduce.fileoutputcommitter.algorithm.version=2spark.conf.set("mapreduce.fileoutputcommitter.algorithm.version", "2")spark 是 Databricks 笔记本随附的 SparkSession 对象)。In your notebook, you can run %sql set mapreduce.fileoutputcommitter.algorithm.version=2 or spark.conf.set("mapreduce.fileoutputcommitter.algorithm.version", "2") (spark is a SparkSession object provided with Databricks notebooks).
  • 使用数据集 API 写入数据时,可以在选项中设置它,即 dataset.write.option("mapreduce.fileoutputcommitter.algorithm.version", "2")When you write data using Dataset API, you can set it in the option, i.e. dataset.write.option("mapreduce.fileoutputcommitter.algorithm.version", "2").

原因是什么?What is the cause?

当 Spark 将数据追加到现有数据集时,Spark 使用 FileOutputCommitter 来管理暂存输出文件和最终输出文件。When Spark appends data to an existing dataset, Spark uses FileOutputCommitter to manage staging output files and final output files. FileOutputCommitter 的行为直接影响写入数据的作业的性能。The behavior of FileOutputCommitter has direct impact on the performance of jobs that write data.

FileOutputCommitter 有两种方法,commitTaskcommitJobA FileOutputCommitter has two methods, commitTask and commitJob. Apache Spark 2.0 和更高版本使用 Apache Hadoop 2,而 Apache Hadoop 2 使用 mapreduce.fileoutputcommitter.algorithm.version 的值控制 commitTaskcommitJob 的工作方式。Apache Spark 2.0 and higher versions use Apache Hadoop 2, which uses the value of mapreduce.fileoutputcommitter.algorithm.version to control how commitTask and commitJob work. 在 Hadoop 2 中,mapreduce.fileoutputcommitter.algorithm.version 的默认值为 1。In Hadoop 2, the default value of mapreduce.fileoutputcommitter.algorithm.version is 1. 对于此版本,commitTask 将任务生成的数据从任务临时目录移动到作业临时目录,当所有任务完成时,commitJob 将数据从作业临时目录移动到最终目标 [1]For this version, commitTask moves data generated by a task from the task temporary directory to job temporary directory and when all tasks complete, commitJob moves data to from job temporary directory to the final destination [1]. 由于驱动程序正在执行 commitJob 的工作,因此对于云存储,此操作可能需要很长时间。Because the driver is doing the work of commitJob, for cloud storage, this operation can take a long time. 你可能常常认为你的单元格处于“挂起”状态。You may often think that your cell is “hanging”. 但是,当 mapreduce.fileoutputcommitter.algorithm.version 的值为 2 时,commitTask 将任务生成的数据直接移动到最终目标,而 commitJob 基本上不执行任何操作。However, when the value of mapreduce.fileoutputcommitter.algorithm.version is 2, commitTask moves data generated by a task directly to the final destination and commitJob is basically a no-op.