Apache Spark 的数据处理优化Data processing optimization for Apache Spark

本文介绍如何在 Azure HDInsight 上优化 Apache Spark 群集的配置,以获得最佳性能。This article discusses how to optimize the configuration of your Apache Spark cluster for best performance on Azure HDInsight.

概述Overview

如果某个联接和数据重组操作上有速度较慢的作业,可能是由数据倾斜引起的。If you have slow jobs on a Join or Shuffle, the cause is probably data skew. 数据倾斜是指作业数据不对称。Data skew is asymmetry in your job data. 例如,运行映射作业可能需要 20 秒,For example, a map job may take 20 seconds. 但运行对数据进行联接或重组的作业则需数小时。But running a job where the data is joined or shuffled takes hours. 若要解决数据倾斜问题,应对整个键进行加盐加密,或对仅仅一部分键使用独立的加密盐To fix data skew, you should salt the entire key, or use an isolated salt for only some subset of keys. 如果使用独立的加密盐,应进一步进行筛选,将映射联接中已进行加盐加密的键的子集隔离出来。If you're using an isolated salt, you should further filter to isolate your subset of salted keys in map joins. 另一种做法是引入 Bucket 列,先在 Bucket 中进行预聚合。Another option is to introduce a bucket column and pre-aggregate in buckets first.

导致联接变慢的另一个因素可能是联接类型。Another factor causing slow joins could be the join type. 默认情况下,Spark 使用 SortMerge 联接类型。By default, Spark uses the SortMerge join type. 这种类型的联接最适合大型数据集。This type of join is best suited for large data sets. 但另一方面又会占用大量计算资源,因为它必须先对数据的左右两侧进行排序,然后才进行合并。But is otherwise computationally expensive because it must first sort the left and right sides of data before merging them.

Broadcast 联接最适合小型数据集,或者联接的一侧比另一侧小得多的情况。A Broadcast join is best suited for smaller data sets, or where one side of the join is much smaller than the other side. 这种联接会将一侧数据广播到所有执行程序,因此通常需要为广播提供更多内存。This type of join broadcasts one side to all executors, and so requires more memory for broadcasts in general.

可以通过设置 spark.sql.autoBroadcastJoinThreshold 来更改配置中的联接类型,也可以使用 DataFrame API (dataframe.join(broadcast(df2))) 来设置联接提示。You can change the join type in your configuration by setting spark.sql.autoBroadcastJoinThreshold, or you can set a join hint using the DataFrame APIs (dataframe.join(broadcast(df2))).

// Option 1
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 1*1024*1024*1024)

// Option 2
val df1 = spark.table("FactTableA")
val df2 = spark.table("dimMP")
df1.join(broadcast(df2), Seq("PK")).
    createOrReplaceTempView("V_JOIN")

sql("SELECT col1, col2 FROM V_JOIN")

如果使用由 Bucket 存储的表,则有第三种联接类型,即 Merge 联接。If you're using bucketed tables, then you have a third join type, the Merge join. 已进行正确预分区和预排序的数据集将跳过 SortMerge 联接中成本高昂的排序阶段。A correctly pre-partitioned and pre-sorted dataset will skip the expensive sort phase from a SortMerge join.

联接的顺序至关重要,尤其是在较为复杂的查询中。The order of joins matters, particularly in more complex queries. 应先从最严格的联接开始。Start with the most selective joins. 此外,尽可能移动在聚合后增加行数的联接。Also, move joins that increase the number of rows after aggregations when possible.

若要管理笛卡尔联接的并行度,可以添加嵌套结构,进行窗口化,以及在可能的情况下跳过 Spark 作业中的一个或多个步骤。To manage parallelism for Cartesian joins, you can add nested structures, windowing, and perhaps skip one or more steps in your Spark Job.

优化作业执行Optimize job execution

  • 根据需要进行缓存,例如,如果数据要使用两次,则缓存它。Cache as necessary, for example if you use the data twice, then cache it.
  • 将变量广播到所有执行程序。Broadcast variables to all executors. 只对变量执行一次序列化,以便加快查找速度。The variables are only serialized once, resulting in faster lookups.
  • 使用驱动程序上的线程池,这会加快许多任务的操作速度。Use the thread pool on the driver, which results in faster operation for many tasks.

定期监视正在运行的作业,看是否有性能问题。Monitor your running jobs regularly for performance issues. 如果需要更深入地了解某些问题,请考虑使用以下性能分析工具之一:If you need more insight into certain issues, consider one of the following performance profiling tools:

Spark 2.x 查询性能的关键在于 Tungsten 引擎,这取决于全程代码生成。Key to Spark 2.x query performance is the Tungsten engine, which depends on whole-stage code generation. 在某些情况下,可能会禁用全程代码生成。In some cases, whole-stage code generation may be disabled. 例如,如果在聚合表达式中使用非可变类型 (string),则会显示 SortAggregate,而不是 HashAggregateFor example, if you use a non-mutable type (string) in the aggregation expression, SortAggregate appears instead of HashAggregate. 例如,为了提高性能,可尝试运行以下命令,然后重新启用代码生成:For example, for better performance, try the following and then re-enable code generation:

MAX(AMOUNT) -> MAX(cast(AMOUNT as DOUBLE))

后续步骤Next steps