数据集简介Introduction to Datasets

数据集 API 提供了 RDD 的优点(强类型化、能够使用功能强大的 lambda 函数),以及 Spark SQL 的优化执行引擎的优点。The Datasets API provides the benefits of RDDs (strong typing, ability to use powerful lambda functions) with the benefits of Spark SQL’s optimized execution engine. 你可以定义数据集 JVM 对象,然后使用类似于 RDD 的功能转换(mapflatMapfilter 等)对其进行操作。You can define a Dataset JVM objects and then manipulate them using functional transformations (map, flatMap, filter, and so on) similar to an RDD. 其优势在于,这些转换现在将应用于 结构化和强类型化 分布式集合,从而允许 Spark 利用 Spark SQL 的执行引擎进行优化,这与 RDD 不同。The benefits is that, unlike RDDs, these transformations are now applied on a structured and strongly typed distributed collection that allows Spark to leverage Spark SQL’s execution engine for optimization.

创建数据集Create a Dataset

若要将序列转换为数据集,请在序列上调用 .toDS()To convert a sequence to a Dataset, call .toDS() on the sequence.

val dataset = Seq(1, 2, 3).toDS()
dataset.show()

如果你有一系列 case 类,那么调用 .toDS() 将提供一个包含所有必要字段的数据集。If you have a sequence of case classes, calling .toDS() provides a Dataset with all the necessary fields.

case class Person(name: String, age: Int)

val personDS = Seq(Person("Max", 33), Person("Adam", 32), Person("Muller", 62)).toDS()
personDS.show()

从 RDD 创建数据集Create a Dataset from an RDD

若要将 RDD 转换为数据集,请调用 rdd.toDS()To convert an RDD into a Dataset, call rdd.toDS().

val rdd = sc.parallelize(Seq((1, "Spark"), (2, "Databricks")))
val integerDS = rdd.toDS()
integerDS.show()

从数据帧创建数据集Create a Dataset from a DataFrame

可以调用 df.as[SomeCaseClass],将数据帧转换为数据集。You can call df.as[SomeCaseClass] to convert the DataFrame to a Dataset.

case class Company(name: String, foundingYear: Int, numEmployees: Int)
val inputSeq = Seq(Company("ABC", 1998, 310), Company("XYZ", 1983, 904), Company("NOP", 2005, 83))
val df = sc.parallelize(inputSeq).toDF()

val companyDS = df.as[Company]
companyDS.show()

也可以在不使用 case class 将数据帧转换为数据集时处理元组。You can also deal with tuples while converting a DataFrame to Dataset without using a case class.

val rdd = sc.parallelize(Seq((1, "Spark"), (2, "Databricks"), (3, "Notebook")))
val df = rdd.toDF("Id", "Name")

val dataset = df.as[(Int, String)]
dataset.show()

使用数据集Work with Datasets

字数统计示例Word Count Example

val wordsDataset = sc.parallelize(Seq("Spark I am your father", "May the spark be with you", "Spark I am your father")).toDS()
val groupedDataset = wordsDataset.flatMap(_.toLowerCase.split(" "))
                                 .filter(_ != "")
                                 .groupBy("value")
val countsDataset = groupedDataset.count()
countsDataset.show()

联接数据集Join Datasets

下面的示例演示的操作包括:The following example demonstrates the following:

  • 联合多个数据集Union multiple datasets
  • 通过特定列对条件组进行内联Doing an inner join on a condition Group by a specific column
  • 对分组的数据集执行自定义聚合(求平均值)。Doing a custom aggregation (average) on the grouped dataset.

这些示例仅使用数据集 API 来演示所有可用的操作。The examples uses only Datasets API to demonstrate all the operations available. 实际上,使用数据帧进行聚合比使用 mapGroups 进行自定义聚合更简单快捷。In reality, using DataFrames for doing aggregation would be simpler and faster than doing custom aggregation with mapGroups. 下一部分将详细介绍如何将数据集转换为数据帧以及如何使用数据帧 API 进行聚合。The next section covers the details of converting Datasets to DataFrames and using DataFrames API for doing aggregations.

case class Employee(name: String, age: Int, departmentId: Int, salary: Double)
case class Department(id: Int, name: String)

case class Record(name: String, age: Int, salary: Double, departmentId: Int, departmentName: String)
case class ResultSet(departmentId: Int, departmentName: String, avgSalary: Double)

val employeeDataSet1 = sc.parallelize(Seq(Employee("Max", 22, 1, 100000.0), Employee("Adam", 33, 2, 93000.0), Employee("Eve", 35, 2, 89999.0), Employee("Muller", 39, 3, 120000.0))).toDS()
val employeeDataSet2 = sc.parallelize(Seq(Employee("John", 26, 1, 990000.0), Employee("Joe", 38, 3, 115000.0))).toDS()
val departmentDataSet = sc.parallelize(Seq(Department(1, "Engineering"), Department(2, "Marketing"), Department(3, "Sales"))).toDS()

val employeeDataset = employeeDataSet1.union(employeeDataSet2)

def averageSalary(key: (Int, String), iterator: Iterator[Record]): ResultSet = {
  val (total, count) = iterator.foldLeft(0.0, 0.0) {
      case ((total, count), x) => (total + x.salary, count + 1)
  }
  ResultSet(key._1, key._2, total/count)
}

val averageSalaryDataset = employeeDataset.joinWith(departmentDataSet, $"departmentId" === $"id", "inner")
                                          .map(record => Record(record._1.name, record._1.age, record._1.salary, record._1.departmentId, record._2.name))
                                          .filter(record => record.age > 25)
                                          .groupBy($"departmentId", $"departmentName")
                                          .avg()

averageSalaryDataset.show()

将数据集转换为数据帧Convert a Dataset to a DataFrame

上述 2 个示例演示了如何使用纯数据集 API。The above 2 examples dealt with using pure Datasets APIs. 你还可以轻松地从数据集转到数据帧,并利用数据帧 API。You can also easily move from Datasets to DataFrames and leverage the DataFrames APIs. 下面的示例展示了同时使用数据集和数据帧 API 的字数统计示例。The following example shows the word count example that uses both Datasets and DataFrames APIs.

import org.apache.spark.sql.functions._

val wordsDataset = sc.parallelize(Seq("Spark I am your father", "May the spark be with you", "Spark I am your father")).toDS()
val result = wordsDataset
              .flatMap(_.split(" "))               // Split on whitespace
              .filter(_ != "")                     // Filter empty words
              .map(_.toLowerCase())
              .toDF()                              // Convert to DataFrame to perform aggregation / sorting
              .groupBy($"value")                   // Count number of occurrences of each word
              .agg(count("*") as "numOccurances")
              .orderBy($"numOccurances" desc)      // Show most common words first
result.show()