数据集教程 Datasets tutorial

Apache Spark 数据集 API 提供了一个类型安全、面向对象的编程接口。The Apache Spark Dataset API provides a type-safe, object-oriented programming interface. DataFrame 是非类型化的 Dataset [Row] 的别名。DataFrame is an alias for an untyped Dataset [Row]. 数据集提供编译时类型安全性(这意味着可以在运行生产应用程序之前检查其中的错误),并且允许对用户定义的类进行直接操作。Datasets provide compile-time type safety—which means that production applications can be checked for errors before they are run—and they allow direct operations over user-defined classes. 数据集 API 还提供高级特定于域的语言操作(如 sum()avg()join()select()groupBy()),使代码更易于表达、读取和写入。The Dataset API also offers high-level domain-specific language operations like sum(), avg(), join(), select(), groupBy(), making the code a lot easier to express, read, and write.

本教程模块介绍如何执行以下操作:In this tutorial module, you will learn how to:

我们还提供了一个示例笔记本,你可以导入该笔记本,访问并运行模块中包含的所有代码示例。We also provide a sample notebook that you can import to access and run all of the code examples included in the module.

创建示例数据Create sample data

创建数据集的方法有两种:动态和使用 SparkSession 从 JSON 文件读取。There two ways to create Datasets: dynamically and by reading from a JSON file using SparkSession. 首先,对于示例或演示中的基元类型,可以在 Scala 或 Python 笔记本中或在示例 Spark 应用程序中创建数据集。First, for primitive types in examples or demos, you can create Datasets within a Scala or Python notebook or in your sample Spark application. 例如,下面是在笔记本中创建包含 100 个整数的数据集的方法。For example, here’s a way to create a Dataset of 100 integers in a notebook. 我们使用 spark 变量创建 100 个整数作为 Dataset[Long]We use the spark variable to create 100 integers as Dataset[Long].

// range of 100 numbers to create a Dataset.
val range100 = spark.range(100)
range100.collect()

收集数据集Collect dataset

加载示例数据 Load sample data

更常用的方式是从外部数据源(如 HDFS、对象存储、NoSQL、RDBMS 或本地文件系统)读取数据文件。The more common way is to read a data file from an external data source, such HDFS, object storage, NoSQL, RDBMS, or local filesystem. Spark 支持多种格式:JSON、CSV、Text、Parquet、ORC 等。Spark supports multiple formats: JSON, CSV, Text, Parquet, ORC, and so on. 若要读取 JSON 文件,还可以使用 SparkSession 变量 sparkTo read a JSON file, you also use the SparkSession variable spark.

开始使用数据集的最简单方法是使用 Azure Databricks 工作区中可访问的 /databricks-datasets 文件夹中可用的 Azure Databricks 数据集示例。The easiest way to start working with Datasets is to use an example Azure Databricks dataset available in the /databricks-datasets folder accessible within the Azure Databricks workspace.

val df = spark.read.json("/databricks-datasets/samples/people/people.json")

在读取 JSON 文件时,Spark 不知道你的数据结构。At the time of reading the JSON file, Spark does not know the structure of your data. 也就是说,它不知道你希望如何将数据组织成类型特定的 JVM 对象。That is, it doesn’t know how you want to organize your data into a typed-specific JVM object. 它尝试从 JSON 文件推断架构,并创建泛型 Row 对象的 DataFrame = Dataset[Row]It attempts to infer the schema from the JSON file and creates a DataFrame = Dataset[Row] of generic Row objects.

可以通过定义特定于域的 Scala case class 并将 DataFrame 转换为该类型,将 DataFrame 显式转换为反映 Scala 类对象的 DatasetYou can explicitly convert your DataFrame into a Dataset reflecting a Scala class object by defining a domain-specific Scala case class and converting the DataFrame into that type:

// First, define a case class that represents a type-specific Scala JVM Object
case class Person (name: String, age: Long)

// Read the JSON file, convert the DataFrames into a type-specific JVM Scala object
// Person. At this stage Spark, upon reading JSON, created a generic
// DataFrame = Dataset[Rows]. By explicitly converting DataFrame into Dataset
// results in a type-specific rows or collection of objects of type Person
val ds = spark.read.json("/databricks-datasets/samples/people/people.json").as[Person]

可以对 JSON 文件中捕获的 IoT 设备状态信息进行类似操作:定义 case class、读取 JSON 文件并转换 DataFrame = Dataset[DeviceIoTData]You can do something similar with IoT device state information captured in a JSON file: define a case class, read the JSON file, and convert the DataFrame = Dataset[DeviceIoTData].

DataFrame 转换为特定于类型的 JVM 对象有两个原因。There are two reasons to convert a DataFrame into a type-specific JVM object. 首先,在显式转换之后,对于使用数据集 API 的所有关系表达式和查询表达式,你会获得编译类型安全性。First, after an explicit conversion, for all relational and query expressions using Dataset API, you get compile-type safety. 例如,如果使用的筛选器操作所使用的数据类型是错误的,Spark 会检测到类型不匹配并发出编译错误信息而不是发出执行运行时错误信息,以便你更早获取错误信息。For example, if you use a filter operation using the wrong data type, Spark detects mismatch types and issues a compile error rather an execution runtime error, so that you catch errors earlier. 其次,数据集 API 提供了高阶方法,这使代码更易于阅读和开发。Second, the Dataset API provides high-order methods, which makes code much easier to read and develop. 处理并可视化数据集部分中,请注意如何使用 Dataset 类型化对象使代码更易于表达和读取。In the section Process and visualize the Dataset, notice how using Dataset typed objects makes the code easier to express and read.

如在 Person 示例中,此处创建了一个封装 Scala 对象的 case classAs in the Person example, here create a case class that encapsulates the Scala object. 若要访问包含 IoT 数据的文件,请加载文件 /databricks-datasets/iot/iot_devices.jsonTo access the file that contains IoT data, load the file /databricks-datasets/iot/iot_devices.json.

// define a case class that represents the device data.
case class DeviceIoTData (
  battery_level: Long,
  c02_level: Long,
  cca2: String,
  cca3: String,
  cn: String,
  device_id: Long,
  device_name: String,
  humidity: Long,
  ip: String,
  latitude: Double,
  longitude: Double,
  scale: String,
  temp: Long,
  timestamp: Long
)

// read the JSON file and create the Dataset from the ``case class`` DeviceIoTData
// ds is now a collection of JVM Scala objects DeviceIoTData
val ds = spark.read.json("/databricks-datasets/iot/iot_devices.json").as[DeviceIoTData]

查看数据集 View the Dataset

若要以表格格式查看数据,而不是将其导出到第三方工具,可以使用 Azure Databricks display() 命令。To view the data in a tabular format instead of exporting it to a third-party tool, you can use the Azure Databricks display() command. 加载 JSON 数据并将其转换为类型特定的 JVM 对象的集合的 Dataset 后,可以使用 display() 或标准 Spark 命令(如 take()foreach()println() API 调用)查看它们,与查看 DataFrame 时一样。Once you have loaded the JSON data and converted it into a Dataset for your type-specific collection of JVM objects, you can view them as you would view a DataFrame, by using either display() or standard Spark commands, such as take(), foreach(), and println() API calls.

// display the dataset table just read in from the JSON file
display(ds)
// Using the standard Spark commands, take() and foreach(), print the first
// 10 rows of the Datasets.
ds.take(10).foreach(println(_))
// Print first 10 rows of a dataset

打印 10 个数据集行Print 10 dataset rows

处理和可视化数据集 Process and visualize the Dataset

数据集具有转换和操作。A Dataset has transformations and actions. 最重要的是高级域特定操作,例如 sum()select()avg()join()union()Most important are the high-level domain specific operations such as sum(), select(), avg(), join(), and union(). 有关详细信息,请参阅 Scala 数据集 APIFor more information, see the Scala Dataset API.

在本示例中,可以使用 filter()map()groupBy()avg() 等所有高级别方法来创建新 DatasetsIn this example, you can use filter(), map(), groupBy(), and avg(), all higher-level methods, to create new Datasets. 值得注意的是,你可以像 case class 中定义的那样按属性的名称访问属性。What’s noteworthy is that you can access the attributes by their names as defined in the case class. 也就是说,使用点符号访问单个字段。That is, use the dot notation to access individual fields. 这样,就可以轻松地读取和写入代码。As such, it makes code easy to read and write.

// filter out all devices whose temperature exceed 25 degrees and generate
// another Dataset with three fields that of interest and then display
// the mapped Dataset
val dsTemp = ds.filter(d => d.temp > 25).map(d => (d.temp, d.device_name, d.cca3))
display(dsTemp)

显示筛选的数据集Display filtered dataset

// Apply higher-level Dataset API methods such as groupBy() and avg().
// Filter temperatures > 25, along with their corresponding
// devices' humidity, compute averages, groupBy cca3 country codes,
// and display the results, using table and bar charts
val dsAvgTmp = ds.filter(d => {d.temp > 25}).map(d => (d.temp, d.humidity, d.cca3)).groupBy($"_3").avg()

// display averages as a table, grouped by the country
display(dsAvgTmp)

显示平均值Display averages

// Select individual fields using the Dataset method select()
// where battery_level is greater than 6. Note this high-level
// domain specific language API reads like a SQL query
display(ds.select($"battery_level", $"c02_level", $"device_name").where($"battery_level" > 6).sort($"c02_level"))

选择包含值的字段Select fields with value

下面是一个动画 gif,演示使用数据集和 Azure Databricks display() 命令可以多么迅速地在地图、表和图表之间转换。Here is an animated gif showing how quickly you can go from table to map to charts using Datasets and Azure Databricks display() command.

表到图表Table to chart

使用 Databricks display() 命令的另一个好处是,可以使用许多嵌入式可视化效果快速查看此数据。An additional benefit of using the Databricks display() command is that you can quickly view this data with a number of embedded visualizations. 例如,在新单元格中,可以发出 SQL 查询并单击地图以查看数据。For example, in a new cell, you can issue SQL queries and click the map to see the data. 但首先,需要将数据集 ds 保存为表或临时视图。But first you must save your dataset, ds, as a table or temporary view.

// registering your Dataset as a temporary view to which you can issue SQL queries
ds.createOrReplaceTempView("iot_device_data")

将 DeviceIoTData 的 Dataset 保存为表或临时视图后,可以向其发出 SQL 查询。Having saved the Dataset of DeviceIoTData as a table or temporary view, you can issue SQL queries to it.

%sql select cca3, count (distinct device_id) as device_id from iot_device_data group by cca3 order by device_id desc limit 100

查询数据集作为表Query dataset as table

笔记本 Notebook

若要访问这些代码示例、可视化效果等,请导入以下笔记本。To access these code examples, visualizations, and more, import the following notebook. 有关数据集的详细信息,请参阅数据帧和数据集For more Dataset examples, see DataFrames and Datasets.

Apache Spark 数据集笔记本Apache Spark Datasets notebook

获取笔记本Get notebook