SparkR 概述SparkR overview

SparkR 是一个 R 包,它提供轻型前端来使用 R 中的 Apache Spark。SparkR 还支持使用 MLlib 的分布式机器学习。SparkR is an R package that provides a light-weight frontend to use Apache Spark from R. SparkR also supports distributed machine learning using MLlib.

笔记本中的 SparkRSparkR in notebooks

  • 对于 Spark 2.0 及更高版本,无需将 sqlContext 对象显式传递给每个函数调用。For Spark 2.0 and above, you do not need to explicitly pass a sqlContext object to every function call. 本文使用新语法。This article uses the new syntax. 有关旧语法示例,请参阅 SparkR 1.6 概述For old syntax examples, see SparkR 1.6 overview.
  • 对于 Spark 2.2 及更高版本,默认情况下,笔记本不再导入 SparkR,因为 SparkR 函数与其他常用包中名称类似的函数冲突。For Spark 2.2 and above, notebooks no longer import SparkR by default because SparkR functions were conflicting with similarly named functions from other popular packages. 若要使用 SparkR,你可以在笔记本中调用 library(SparkR)To use SparkR you can call library(SparkR) in your notebooks. 已配置 SparkR 会话,并且所有 SparkR 函数都将使用现有会话与连接的群集通信。The SparkR session is already configured, and all SparkR functions will talk to your attached cluster using the existing session.

spark-submit 作业中的 SparkRSparkR in spark-submit jobs

你可以运行在 Azure Databricks 上使用 SparkR 作为 spark-submit 作业的脚本,只需进行少量代码修改。You can run scripts that use SparkR on Azure Databricks as spark-submit jobs, with minor code modifications. 有关示例,请参阅 创建并运行适用于 R 脚本的 spark-submit 作业For an example, refer to Create and run a spark-submit job for R scripts.

创建 SparkR 数据帧Create SparkR DataFrames

可以从本地 R data.frame、数据源或使用 Spark SQL 查询创建数据帧。You can create a DataFrame from a local R data.frame, from a data source, or using a Spark SQL query.

从本地 R data.frame 创建数据帧From a local R data.frame

创建数据帧最简单的方法是将本地 R data.frame 转换为 SparkDataFrameThe simplest way to create a DataFrame is to convert a local R data.frame into a SparkDataFrame. 具体来说,我们可以使用 createDataFrame,并传入本地 R data.frame 以创建 SparkDataFrameSpecifically we can use createDataFrame and pass in the local R data.frame to create a SparkDataFrame. 与大多数其他 SparkR 函数一样,Spark 2.0 中的 createDataFrame 语法有所变化。Like most other SparkR functions, createDataFrame syntax changed in Spark 2.0. 可以在以下代码片段中查看此语法示例。You can see examples of this in the code snippet bellow. 有关更多示例,请参阅 createDataFrameRefer to createDataFrame for more examples.

library(SparkR)
df <- createDataFrame(faithful)

# Displays the content of the DataFrame to stdout
head(df)

使用数据源 API 创建数据帧Using the data source API

从数据源创建数据帧的常规方法是 read.dfThe general method for creating a DataFrame from a data source is read.df. 此方法需要获取要加载的文件的路径和数据源的类型。This method takes the path for the file to load and the type of data source. SparkR 支持原生读取 CSV、JSON、文本和 Parquet 文件。SparkR supports reading CSV, JSON, text, and Parquet files natively.

library(SparkR)
diamondsDF <- read.df("/databricks-datasets/Rdatasets/data-001/csv/ggplot2/diamonds.csv", source = "csv", header="true", inferSchema = "true")
head(diamondsDF)

SparkR 会自动从 CSV 文件推断架构。SparkR automatically infers the schema from the CSV file.

使用 Spark 包添加数据源连接器Adding a data source connector with Spark Packages

通过 Spark 包,可以找到常用文件格式(如 Avro)的数据源连接器。Through Spark Packages you can find data source connectors for popular file formats such as Avro. 例如,使用 spark-avro 包加载 Avro 文件。As an example, use the spark-avro package to load an Avro file. spark-avro 包的可用性取决于群集的映像版本The availability of the spark-avro package depends on your cluster’s image version. 请参阅 Avro 文件See Avro file.

首先获取现有 data.frame,将其转换为 Spark 数据帧,并将其另存为 Avro 文件。First take an existing data.frame, convert to a Spark DataFrame, and save it as an Avro file.

require(SparkR)
irisDF <- createDataFrame(iris)
write.df(irisDF, source = "com.databricks.spark.avro", path = "dbfs:/tmp/iris.avro", mode = "overwrite")

验证 Avro 文件是否已保存:To verify that an Avro file was saved:

%fs ls /tmp/iris

现在再次使用 spark-avro 包读回数据。Now use the spark-avro package again to read back the data.

irisDF2 <- read.df(path = "/tmp/iris.avro", source = "com.databricks.spark.avro")
head(irisDF2)

数据源 API 还可用于将数据帧保存为多种文件格式。The data source API can also be used to save DataFrames into multiple file formats. 例如,可以使用 write.df 将上一个示例中的数据帧保存到 Parquet 文件。For example, you can save the DataFrame from the previous example to a Parquet file using write.df.

write.df(irisDF2, path="dbfs:/tmp/iris.parquet", source="parquet", mode="overwrite")
%fs ls dbfs:/tmp/people.parquet

从 Spark SQL 查询创建数据帧From a Spark SQL query

还可以使用 Spark SQL 查询创建 SparkR 数据帧。You can also create SparkR DataFrames using Spark SQL queries.

# Register earlier df as temp view
createOrReplaceTempView(people, "peopleTemp")
# Create a df consisting of only the 'age' column using a Spark SQL query
age <- sql("SELECT age FROM peopleTemp")

age 是 SparkDataFrame。age is a SparkDataFrame.

数据帧操作DataFrame operations

Spark 数据帧支持多种函数来进行结构化数据处理。Spark DataFrames support a number of functions to do structured data processing. 下面是一些基本示例。Here are some basic examples. 你可以在 API 文档中找到完整列表。A complete list can be found in the API docs.

选择行和列Select rows and columns

# Import SparkR package if this is a new notebook
require(SparkR)

# Create DataFrame
df <- createDataFrame(faithful)
# Select only the "eruptions" column
head(select(df, df$eruptions))
# You can also pass in column name as strings
head(select(df, "eruptions"))
# Filter the DataFrame to only retain rows with wait times shorter than 50 mins
head(filter(df, df$waiting < 50))

分组和聚合Grouping and aggregation

Spark 数据帧支持多种常用函数,用于在分组后聚合数据。SparkDataFrames support a number of commonly used functions to aggregate data after grouping. 例如,你可以计算每个等待时间在保真数据集中出现的次数。For example you can count the number of times each waiting time appears in the faithful dataset.

head(count(groupBy(df, df$waiting)))
# You can also sort the output from the aggregation to get the most common waiting times
waiting_counts <- count(groupBy(df, df$waiting))
head(arrange(waiting_counts, desc(waiting_counts$count)))

列操作Column operations

SparkR 提供了许多函数,这些函数可直接应用于列以进行数据处理和聚合。SparkR provides a number of functions that can be directly applied to columns for data processing and aggregation. 以下示例演示基本算术函数的用法。The following example shows the use of basic arithmetic functions.

# Convert waiting time from hours to seconds.
# You can assign this to a new column in the same DataFrame
df$waiting_secs <- df$waiting * 60
head(df)

机器学习 Machine learning

SparkR 公开大多数 MLLib 算法。SparkR exposes most of MLLib algorithms. 实际上,SparkR 使用 MLlib 来训练模型。Under the hood, SparkR uses MLlib to train the model.

以下示例演示如何使用 SparkR 来生成 gaussian GLM 模型。The following example shows how to build a gaussian GLM model using SparkR. 若要运行线性回归,请将系列设置为 "gaussian"To run linear regression, set family to "gaussian". 若要运行逻辑回归,请将系列设置为 "binomial"To run logistic regression, set family to "binomial". 当使用 SparkML GLM 时,SparkR 会自动对分类特征进行独热编码,因此不需要手动执行此操作。When using SparkML GLM SparkR automatically performs one-hot encoding of categorical features so that it does not need to be done manually. 除了 String 和 Double 类型特征以外,还可以针对 MLlib 矢量特征进行拟合,以与其他 MLlib 组件兼容。Beyond String and Double type features, it is also possible to fit over MLlib Vector features, for compatibility with other MLlib components.

# Create the DataFrame
df <- createDataFrame(iris)

# Fit a linear model over the dataset.
model <- glm(Sepal_Length ~ Sepal_Width + Species, data = df, family = "gaussian")

# Model coefficients are returned in a similar format to R's native glm().
summary(model)

有关教程,请参阅 SparkR ML 教程For tutorials, see SparkR ML tutorials.