Apache Spark 的数据存储优化Data storage optimization for Apache Spark

本文介绍了用于优化数据存储以在 Azure HDInsight 上高效执行 Apache Spark 作业的策略。This article discusses strategies to optimize data storage for efficient Apache Spark job execution on Azure HDInsight.

概述Overview

Spark 支持多种格式,比如 csv、json、xml、parquet、orc 和 avro。Spark supports many formats, such as csv, json, xml, parquet, orc, and avro. Spark 可以借助外部数据源进行扩展,以支持更多格式 — 有关详细信息,请参阅 Apache Spark 包Spark can be extended to support many more formats with external data sources - for more information, see Apache Spark packages.

最能提高性能的格式是采用 Snappy 压缩的 Parquet,这是 Spark 2.x 中的默认格式。The best format for performance is parquet with snappy compression, which is the default in Spark 2.x. Parquet 以分列格式存储数据,并在 Spark 中得到了高度优化。Parquet stores data in columnar format, and is highly optimized in Spark.

选择数据抽象Choose data abstraction

早期的 Spark 版本使用 RDD 提取数据,Spark 1.3 和 1.6 分别引入了 DataFrame 和 DataSet。Earlier Spark versions use RDDs to abstract data, Spark 1.3, and 1.6 introduced DataFrames and DataSets, respectively. 请仔细衡量下列优缺点:Consider the following relative merits:

  • DataFrameDataFrames
    • 大多数情况下的最佳选择。Best choice in most situations.
    • 通过 Catalyst 提供查询优化。Provides query optimization through Catalyst.
    • 全阶段代码生成。Whole-stage code generation.
    • 直接内存访问。Direct memory access.
    • 垃圾回收 (GC) 开销低。Low garbage collection (GC) overhead.
    • 不像数据集那样易于开发者使用,因为没有编译时检查或域对象编程。Not as developer-friendly as DataSets, as there are no compile-time checks or domain object programming.
  • DataSetDataSets
    • 适合可容忍性能受影响的复杂 ETL 管道。Good in complex ETL pipelines where the performance impact is acceptable.
    • 不适合需要考虑性能受影响的聚合。Not good in aggregations where the performance impact can be considerable.
    • 通过 Catalyst 提供查询优化。Provides query optimization through Catalyst.
    • 提供域对象编程和编译时检查,适合开发。Developer-friendly by providing domain object programming and compile-time checks.
    • 增加序列化/反序列化开销。Adds serialization/deserialization overhead.
    • GC 开销高。High GC overhead.
    • 中断全阶段代码生成。Breaks whole-stage code generation.
  • RDDRDDs
    • 不必使用 RDD,除非需要生成新的自定义 RDD。You don't need to use RDDs, unless you need to build a new custom RDD.
    • 不能通过 Catalyst 提供查询优化。No query optimization through Catalyst.
    • 不提供全阶段代码生成。No whole-stage code generation.
    • GC 开销高。High GC overhead.
    • 必须使用 Spark 1.x 旧版 API。Must use Spark 1.x legacy APIs.

选择默认存储Select default storage

创建新的 Spark 群集时,可以将 Azure Blob 存储或 Azure Data Lake Storage 用作群集的默认存储。When you create a new Spark cluster, you can select Azure Blob Storage or Azure Data Lake Storage as your cluster's default storage. 这两个选项都具有可长期存储暂时性群集的优势。Both options give you the benefit of long-term storage for transient clusters. 这样,在删除群集时,就不会自动删除数据。So your data doesn't get automatically deleted when you delete your cluster. 用户可以重新创建暂时性群集,并且依然能访问数据。You can recreate a transient cluster and still access your data.

存储类型Store Type 文件系统File System SpeedSpeed 暂时性Transient 用例Use Cases
Azure Blob 存储Azure Blob Storage wasb: //url/wasb://url/ 标准Standard Yes 暂时性群集Transient cluster
Azure Blob 存储(安全)Azure Blob Storage (secure) wasbs: //url/wasbs://url/ 标准Standard Yes 暂时性群集Transient cluster
Azure Data Lake Storage Gen 2Azure Data Lake Storage Gen 2 abfs: //url/abfs://url/ 较快Faster Yes 暂时性群集Transient cluster
本地 HDFSLocal HDFS hdfs: //url/hdfs://url/ 最快Fastest No 全天候交互型群集Interactive 24/7 cluster

使用缓存Use the cache

Spark 提供自己的本机缓存机制,可通过各种方法(比如 .persist().cache()CACHE TABLE)使用。Spark provides its own native caching mechanisms, which can be used through different methods such as .persist(), .cache(), and CACHE TABLE. 这种本机缓存适用于小型数据集以及需要缓存中间结果的 ETL 管道。This native caching is effective with small data sets and in ETL pipelines where you need to cache intermediate results. 但是,Spark 本机缓存目前不可用于分区,因为缓存表不保留分区数据。However, Spark native caching currently doesn't work well with partitioning, since a cached table doesn't keep the partitioning data. 存储层缓存是一种更通用且更可靠的缓存技术。A more generic and reliable caching technique is storage layer caching.

  • 本机 Spark 缓存(不推荐)Native Spark caching (not recommended)

    • 适用于小型数据集。Good for small datasets.
    • 不可用于分区,但将来的 Spark 版本可能会实现这一点。Doesn't work with partitioning, which may change in future Spark releases.
  • 存储级缓存(推荐)Storage level caching (recommended)

    • 可以在 HDInsight 上使用 IO 缓存功能实现。Can be implemented on HDInsight using the IO Cache feature.
    • 使用内存中和 SSD 缓存。Uses in-memory and SSD caching.
  • 本地 HDFS(推荐)Local HDFS (recommended)

    • hdfs://mycluster 路径。hdfs://mycluster path.
    • 使用 SSD 缓存。Uses SSD caching.
    • 删除群集时,缓存数据将丢失,需要重新生成缓存。Cached data will be lost when you delete the cluster, requiring a cache rebuild.

优化数据序列化Optimize data serialization

Spark 作业是分布式作业,因此,适当的数据序列化对实现最佳性能很重要。Spark jobs are distributed, so appropriate data serialization is important for the best performance. Spark 有两个序列化选项:There are two serialization options for Spark:

  • Java 序列化是默认选项。Java serialization is the default.
  • Kryo 序列化是一种较新的格式,可带来比 Java 更快、更紧凑的序列化。Kryo serialization is a newer format and can result in faster and more compact serialization than Java. Kryo 要求在程序中注册类,并且尚不支持所有的可序列化类型。Kryo requires that you register the classes in your program, and it doesn't yet support all Serializable types.

使用 Bucket 存储Use bucketing

Bucket 存储类似于数据分区。Bucketing is similar to data partitioning. 但每个 Bucket 都可以保存一组列值,而不只是一个列值。But each bucket can hold a set of column values rather than just one. 此方法非常适合对大量(数以百万计或更多)值(比如产品标识符)分区。This method works well for partitioning on large (in the millions or more) numbers of values, such as product identifiers. 通过哈希行的 Bucket 键可以确定 Bucket。A bucket is determined by hashing the bucket key of the row. 由 Bucket 存储的表可提供独一无二的优化,因为它们存储了有关其 Bucket 存储方式和排序方式的元数据。Bucketed tables offer unique optimizations because they store metadata about how they were bucketed and sorted.

下面是一些高级 Bucket 存储功能:Some advanced bucketing features are:

  • 基于 Bucket 存储元信息的查询优化。Query optimization based on bucketing meta-information.
  • 优化的聚合。Optimized aggregations.
  • 优化的联接。Optimized joins.

可以同时使用分区和 Bucket 存储。You can use partitioning and bucketing at the same time.

后续步骤Next steps