教程:在 Azure Databricks 上使用 SparkR SparkDataFrame

本教程介绍如何在 Azure Databricks 中使用 SparkR 的 SparkDataFrame API 加载和转换数据。

本教程结束时,你可了解数据帧是什么并熟悉以下任务:

另请参阅 Apache SparkR API 参考

什么是数据帧?

数据帧是一种有标签的二维数据结构,其中的列可能会有不同的类型。 可将数据帧视为电子表格、SQL 表或序列对象的字典。 Apache Spark 数据帧提供了一组丰富的函数(选择列、筛选、联接、聚合),让你可以有效地解决常见的数据分析问题。

Apache Spark 数据帧是基于弹性分布式数据集 (RDD) 的抽象。 Spark 数据帧和 Spark SQL 使用统一的规划和优化引擎,使你能够在 Azure Databricks 上的所有受支持的语言(Python、SQL、Scala 和 R)中获得几乎相同的性能。

要求

若要完成以下教程,必须满足以下要求:

注意

如果没有群集控制特权,只要拥有群集访问权限,就仍然可以完成以下大部分步骤。

在主页的侧边栏中,可以访问 Azure Databricks 实体:工作区浏览器、目录资源管理器、工作流和计算。 工作区是存储 Azure Databricks 资产(如笔记本和库)的根文件夹。

步骤 1:使用 SparkR 创建数据帧

若要了解如何在 Azure Databricks 笔记本中导航,请参阅 Databricks 笔记本界面和控件

  1. 单击 新建图标 图标打开新笔记本。

  2. 将以下代码复制并粘贴到空的笔记本单元格中,然后按 Shift+Enter 运行该单元格。 下面的代码示例将创建一个包含城市人口数据的名为 df1 的数据帧,并显示该数据帧的内容。

    # Load the SparkR package that is already preinstalled on the cluster.
    
    library(SparkR)
    
    # Define the data
    data <- data.frame(
      rank = c(295),
      city = c("South Bend"),
      state = c("Indiana"),
      code = c("IN"),
      population = c(101190),
      price = c(112.9)
    )
    
    # Define the schema
    schema <- structType(
      structField("rank", "double"),
      structField("city", "string"),
      structField("state", "string"),
      structField("code", "string"),
      structField("population", "double"),
      structField("price", "double")
    )
    
    # Create the dataframe
    df1 <- createDataFrame(
        data   = data,
        schema = schema
    )
    

步骤 2:将数据从文件加载到数据帧中

/databricks-datasets 目录中更多的城市人口数据添加到 df2

若要将 data_geo.csv 文件中的数据载入数据帧 df2,请将以下代码复制并粘贴到新的空笔记本单元格中。 按 Shift+Enter 运行该单元格。

可以从许多受支持的文件格式加载数据。 以下示例使用 /databricks-datasets 目录(可从大多数工作区访问)中提供的数据集。 参阅示例数据集

# Create second dataframe from a file
df2 <- read.df("/databricks-datasets/samples/population-vs-price/data_geo.csv",
              source = "csv",
              header = "true",
              schema = schema
)

步骤 3:查看数据帧并与之交互

使用以下方法查看城市人口数据帧并与之交互。

合并数据帧

将第一个数据帧 df1 的内容与包含 data_geo.csv 内容的数据帧 df2 合并。

在笔记本中使用以下示例代码创建一个新的数据帧,该数据帧会使用联合操作将一个数据帧的行添加到另一个数据帧:

# Returns a DataFrame that combines the rows of df1 and df2
df <- union(df1, df2)

查看数据帧

若要以表格格式查看美国城市数据,请在笔记本单元格中使用 Azure Databricks display() 命令。

display(df)

Spark 使用术语“架构”来指代数据帧中列的名称和数据类型。

在笔记本中使用以下 .printSchema() 方法打印数据帧的架构。 使用生成的元数据与数据帧的内容进行交互。

printSchema(df)

筛选数据帧中的行

使用 .filter().where() 来筛选各行,发现数据集中人口最多的五个城市。 使用筛选来选择要在数据帧中返回或修改的行子集。 性能或语法没有差别,如以下示例中所示。

若要选择人口最多的城市,请继续向笔记本添加新单元格并添加以下代码示例:

# Filter rows using filter()
filtered_df <- filter(df, df$rank < 6)
display(filtered_df)
# Filter rows using .where()
filtered_df <- where(df, df$rank < 6)
display(filtered_df)

从数据帧中选择列

使用 select() 方法了解城市位于哪个州。 通过将一个或多个列名传递给 .select() 来选择列,如以下示例中所示:

# Select columns from a DataFrame
select_df <- select(df,"City", "State")
display(select_df)

创建子集数据帧

创建包含人口最多的十个城市的子集数据帧并显示生成的数据。 在笔记本中使用以下代码,组合 select 和 filter 查询来限制返回的行和列:

# Create a subset Dataframe
subset_df <- filter(select(df, col = list("city", "rank")),condition = df$rank > 1)
display(subset_df)

步骤 4:保存数据帧

可以将数据帧保存到表,或者将数据帧写入一个或多个文件。

将数据帧保存到表

默认情况下,Azure Databricks 对所有表使用 Delta Lake 格式。 若要保存数据帧,必须拥有目录和架构上的 CREATE 表权限。 以下示例将数据帧的内容保存到名为 us_cities 的表中:

# Save DataFrame to a table
saveAsTable(df, "us_cities", source = "parquet", mode = "overwrite")

大多数 Spark 应用程序都以分布式方式处理大型数据集。 Spark 会写出文件目录,而不是单个文件。 Delta Lake 会拆分 Parquet 文件夹和文件。 许多数据系统都可以读取这些目录的文件。 Azure Databricks 建议为大多数应用程序使用表而不是文件路径。

将数据帧保存到 JSON 文件

以下示例保存 JSON 文件的目录:

# Write a DataFrame to a directory of files
write.df(df, path = "/tmp/json_data", source = "json", mode = "overwrite")

从 JSON 文件读取数据帧

# Read a DataFrame from a JSON file
df3 <- read.json("/tmp/json_data")
display(df3)

其他任务:在 SparkR 中运行 SQL 查询

Spark 数据帧提供以下用于将 SQL 与 SparkR 合并的选项。 可以在为本教程创建的同一笔记本中运行以下代码。

将列指定为 SQL 查询

使用 selectExpr() 方法可将每一列指定为 SQL 查询,如以下示例中所示:

display(df_selected <- selectExpr(df, "`rank`", "upper(city) as big_name"))

使用 selectExpr()

selectExpr 允许在要指定列的任何位置使用 SQL 语法,如以下示例所示:

display(df_selected <- selectExpr(df, "`rank`", "lower(city) as little_name"))

运行任意 SQL 查询

你可以使用 spark.sql() 来运行任意 SQL 查询,如以下示例中所示:

display(query_df <- sql("SELECT * FROM us_cities"))

实现 SQL 查询参数化

可以使用 SparkR 格式来参数化 SQL 查询,如以下示例所示:

# Define the table name
table_name <- "us_cities"

# Construct the SQL query using paste()
query <- paste("SELECT * FROM", table_name)

# Query the DataFrame using the constructed SQL query
display(query_df <- sql(query))