教程:使用 Apache Spark Scala 数据帧加载和转换数据

本教程介绍如何在 Azure Databricks 中使用 Apache Spark Scala 数据帧 API 加载和转换美国城市数据。

本教程结束时,你将了解数据帧是什么并熟悉以下任务:

另请参阅 Apache Spark Scala API 参考

什么是数据帧?

数据帧是一种有标签的二维数据结构,其中的列可能会有不同的类型。 可将数据帧视为电子表格、SQL 表或序列对象的字典。 Apache Spark 数据帧提供了一组丰富的函数(选择列、筛选、联接、聚合),让你可以有效地解决常见的数据分析问题。

Apache Spark 数据帧是基于弹性分布式数据集 (RDD) 的抽象。 Spark 数据帧和 Spark SQL 使用统一的规划和优化引擎,使你能够在 Azure Databricks 上的所有受支持的语言(Python、SQL、Scala 和 R)中获得几乎相同的性能。

什么是 Spark 数据集?

Apache Spark 数据集 API 提供了一个类型安全、面向对象的编程接口。 DataFrame 是非类型化的 Dataset [Row] 的别名。 请参阅数据集 API

Azure Databricks 文档使用术语“数据帧”,因为本文档适用于 Python、Scala 和 R。请参阅“示例笔记本:Scala 数据集聚合器”

要求

若要完成以下教程,必须满足以下要求:

注意

如果没有群集创建特权,只要拥有现有群集访问权限,就仍然可以完成以下大部分步骤。

在主页上的侧栏中,访问 Azure Databricks 实体:工作区浏览器、目录、资源管理器、工作流和计算。 工作区是存储 Azure Databricks 资产(如笔记本和库)的根文件夹。

步骤 1:使用 Scala 创建数据帧

若要了解如何在 Azure Databricks 笔记本中导航,请参阅 Databricks 笔记本界面和控件

  1. 打开一个新笔记本,然后单击 New Icon 图标插入新单元格。

  2. 将以下代码复制并粘贴到空的笔记本单元格中,然后按 Shift+Enter 运行该单元格。 下面的代码示例将创建一个包含城市人口数据的名为 df1 的数据帧,并显示该数据帧的内容。

    case class City(rank: Long, city: String, state: String, code: String, population: Long, price: Double)
    
    val df1 = Seq(new City(295, "South Bend", "Indiana", "IN", 101190, 112.9)).toDF
    
    display(df1)
    

步骤 2:将数据从文件加载到数据帧中

/databricks-datasets 目录中更多的城市人口数据添加到 df2

若要将数据从 data_geo.csv 文件加载到 数据帧 df2,请执行以下操作:

  1. 在笔记本中创建新的单元格。
  2. 将以下代码复制并粘贴到空的笔记本单元格中,然后按 Shift+Enter 运行该单元格。

可以从许多受支持的文件格式加载数据。 以下示例使用 /databricks-datasets 目录(可从大多数工作区访问)中提供的数据集。 参阅示例数据集

val df2 = spark.read
  .format("csv")
  .option("header", "true")
  .option("inferSchema", "true")
  .load("/databricks-datasets/samples/population-vs-price/data_geo.csv")

步骤 3:查看数据帧并与之交互

使用以下方法查看城市人口数据帧并与之交互。

合并数据帧

将第一个数据帧的内容与包含 data_geo.csv 的内容的数据帧合并在一起。

在笔记本中使用以下示例代码创建一个新的数据帧,该数据帧会使用联合操作将一个数据帧的行添加到另一个数据帧:

// Returns a DataFrame that combines the rows of df1 and df2
val df = df1.union(df2)

查看数据帧

若要以表格格式查看美国城市数据,请在笔记本单元格中使用 Azure Databricks display() 命令。

display(df)

Spark 使用术语“架构”来指代数据帧中列的名称和数据类型。

在笔记本中使用以下 .printSchema() 方法打印数据帧的架构。 使用生成的元数据与数据帧的内容进行交互。

df.printSchema()

筛选数据帧中的行

使用 .filter().where() 来筛选各行,发现数据集中人口最多的五个城市。 使用筛选来选择要在数据帧中返回或修改的行子集。 性能或语法没有差别,如以下示例中所示。

若要选择人口最多的城市,请继续向笔记本添加新单元格并添加以下代码示例:

// Filter rows using .filter()
val filtered_df = df.filter(df("rank") < 6)
display(filtered_df)

// Filter rows using .where()
val filtered_df = df.where(df("rank") < 6)
display(filtered_df)

从数据帧中选择列

使用 select() 方法了解城市位于哪个州。 通过将一个或多个列名传递给 .select() 来选择列,如以下示例中所示:

val select_df = df.select("City", "State")
display(select_df)

创建子集数据帧

创建包含人口最多的十个城市的子集数据帧并显示生成的数据。 在笔记本中使用以下代码,组合 select 和 filter 查询来限制返回的行和列:

val subset_df = df.filter(df("rank") < 11).select("City")
display(subset_df)

步骤 4:保存数据帧

可以将数据帧保存到表,或者将数据帧写入一个或多个文件。

将数据帧保存到表

默认情况下,Azure Databricks 对所有表使用 Delta Lake 格式。 若要保存数据帧,必须拥有目录和架构上的 CREATE 表权限。 以下示例将数据帧的内容保存到名为 us_cities 的表中:

df.write.saveAsTable("us_cities")

大多数 Spark 应用程序都以分布式方式处理大型数据集。 Spark 会写出文件目录,而不是单个文件。 Delta Lake 会拆分 Parquet 文件夹和文件。 许多数据系统都可以读取这些目录的文件。 Azure Databricks 建议为大多数应用程序使用表而不是文件路径。

将数据帧保存到 JSON 文件

以下示例保存 JSON 文件的目录:

# Write a DataFrame to a collection of files
df.write.format("json").save("/tmp/json_data")

从 JSON 文件读取数据帧

# Read a DataFrame from a JSON file
val df3 = spark.read.format("json").json("/tmp/json_data")
display(df3)

其他任务:在 Scala 中运行 SQL 查询

Spark 数据帧提供以下选项,用于将 SQL 与 Scala 合并在一起。 你可以在为本教程创建的同一笔记本中运行以下代码。

将列指定为 SQL 查询

使用 selectExpr() 方法可将每一列指定为 SQL 查询,如以下示例中所示:

display(df.selectExpr("`rank`", "upper(city) as big_name"))

导入 expr()

可以导入 org.apache.spark.sql.functions.expr,以便在要指定列的任何位置使用 SQL 语法,如以下示例中所示:

import org.apache.spark.sql.functions.expr

display(df.select($"rank", expr("lower(city) as little_name")))

运行任意 SQL 查询

你可以使用 spark.sql() 来运行任意 SQL 查询,如以下示例中所示:

val query_df = spark.sql("SELECT * FROM us_cities")

实现 SQL 查询参数化

你可以使用 Scala 格式设置来实现 SQL 查询参数化,如以下示例中所示:

val table_name = "us_cities"

val query_df = spark.sql(s"SELECT * FROM $table_name")

示例笔记本:Scala 数据集聚合器

以下笔记本演示了如何使用数据集聚合器。

数据集聚合器笔记本

获取笔记本