SparkR 1.6 概述SparkR 1.6 overview


有关最新 SparkR 库的信息,请参阅适用于 R 开发人员的 Azure DatabricksFor information about the latest SparkR library, see the Azure Databricks for R developers.

SparkR 是一个 R 包,它提供一个轻型前端,以用于通过 R 使用 Apache Spark。从 Spark 1.5.1 开始,SparkR 提供分布式数据帧实现,该实现支持选择、筛选和聚合等操作(类似于 R 数据帧和 dplyr),但针对的是大型数据集。SparkR is an R package that provides a light-weight frontend to use Apache Spark from R. Starting with Spark 1.5.1, SparkR provides a distributed DataFrame implementation that supports operations like selection, filtering, and aggregation (similar to R data frames and dplyr) but on large datasets. SparkR 还使用 MLlib 支持分布式机器学习。SparkR also supports distributed machine learning using MLlib.

创建 SparkR 数据帧Creating SparkR DataFrames

应用程序可以从本地 R 数据帧或数据源创建数据帧,也可以使用 Spark SQL 查询创建数据帧。Applications can create DataFrames from a local R data frame, from data sources, or using Spark SQL queries.

创建数据帧的最简单方法是将本地 R 数据帧转换为 SparkR 数据帧。The simplest way to create a DataFrame is to convert a local R data frame into a SparkR DataFrame. 具体来说,我们可以使用“创建数据帧”并传入本地 R 数据帧,来创建 SparkR 数据帧。Specifically we can use create a DataFrame and pass in the local R data frame to create a SparkR DataFrame. 例如,下面的单元使用 R 中的 faithful 数据集创建一个数据帧。As an example, the following cell creates a DataFrame using the faithful dataset from R.

df <- createDataFrame(sqlContext, faithful)

# Displays the content of the DataFrame to stdout

使用 Spark SQL 从数据源创建From data sources using Spark SQL

从数据源创建数据帧的常规方法为 read.df。The general method for creating DataFrames from data sources is read.df. 此方法采用 SQLContext、要加载的文件的路径和数据源类型作为参数。This method takes in the SQLContext, the path for the file to load and the type of data source. SparkR 以原生方式支持读取 JSON 和 Parquet 文件,并且你可以通过 Spark 包找到常用文件格式(如 CSV 和 Avro)的数据源连接器。SparkR supports reading JSON and Parquet files natively and through Spark Packages you can find data source connectors for popular file formats like CSV and Avro.

%fs rm dbfs:/tmp/people.json
%fs put dbfs:/tmp/people.json
'{"age": 10, "name": "John"}
{"age": 20, "name": "Jane"}
{"age": 30, "name": "Andy"}'
people <- read.df(sqlContext, "dbfs:/tmp/people.json", source="json")

SparkR 会自动从 JSON 文件推断架构。SparkR automatically infers the schema from the JSON file.


通过 Spark 包使用数据源连接器创建Using data source connectors with Spark Packages

例如,我们将使用 Spark CSV 包加载 CSV 文件。As an example, we will use the Spark CSV package to load a CSV file. 在此处找到 Databricks 提供的 Spark 包的列表You can find a list of Spark Packages by Databricks here.

diamonds <- read.df(sqlContext, "/databricks-datasets/Rdatasets/data-001/csv/ggplot2/diamonds.csv",
                    source = "com.databricks.spark.csv", header="true", inferSchema = "true")

数据源 API 也可用于将数据帧保存成多种文件格式。The data sources API can also be used to save out DataFrames into multiple file formats. 例如,我们可以使用 write df 将上一个示例中的数据帧保存到 Parquet 文件For example we can save the DataFrame from the previous example to a Parquet file using write.df

%fs rm -r dbfs:/tmp/people.parquet
write.df(people, path="dbfs:/tmp/people.parquet", source="parquet", mode="overwrite")
%fs ls dbfs:/tmp/people.parquet

通过 Spark SQL 查询创建From Spark SQL queries

还可以使用 Spark SQL 查询创建 SparkR 数据帧。You can also create SparkR DataFrames using Spark SQL queries.

# Register earlier df as temp view
createOrReplaceTempView(people, "peopleTemp")
# Create a df consisting of only the 'age' column using a Spark SQL query
age <- sql(sqlContext, "SELECT age FROM peopleTemp")
# Resulting df is a SparkR df

数据帧操作DataFrame operations

SparkR 数据帧支持许多函数执行结构化数据处理。SparkR DataFrames support a number of functions to do structured data processing. 此处提供了一些基本示例,完整的列表可以在 API 文档中找到。Here we include some basic examples and a complete list can be found in the API docs.

选择行和列Selecting rows and columns

# Create DataFrame
df <- createDataFrame(sqlContext, faithful)
# Select only the "eruptions" column
head(select(df, df$eruptions))
# You can also pass in column name as strings
head(select(df, "eruptions"))
# Filter the DataFrame to only retain rows with wait times shorter than 50 mins
head(filter(df, df$waiting < 50))

分组和聚合Grouping and aggregation

SparkR 数据帧支持许多常用函数,以便在分组后聚合数据。SparkR DataFrames support a number of commonly used functions to aggregate data after grouping. 例如,我们可以计算每个等待时间在 faithful 数据集中出现的次数。For example we can count the number of times each waiting time appears in the faithful dataset.

head(count(groupBy(df, df$waiting)))
# We can also sort the output from the aggregation to get the most common waiting times
waiting_counts <- count(groupBy(df, df$waiting))
head(arrange(waiting_counts, desc(waiting_counts$count)))

列操作Column operations

SparkR 提供了许多函数,这些函数可直接应用于列以进行数据处理和聚合。SparkR provides a number of functions that can be directly applied to columns for data processing and aggregation. 下面的示例演示基本算术函数的用法。The following example shows the use of basic arithmetic functions.

# Convert waiting time from hours to seconds.
# You can assign this to a new column in the same DataFrame
df$waiting_secs <- df$waiting * 60

机器学习Machine learning

从 Spark 1.5 起,SparkR 允许使用 glm() 函数在 SparkR 数据帧上拟合通用线性模型。As of Spark 1.5, SparkR allows the fitting of generalized linear models over SparkR DataFrames using the glm() function. 在内部,SparkR 使用 MLlib 训练指定系列的模型。Under the hood, SparkR uses MLlib to train a model of the specified family. 对于模型拟合,我们支持可用 R 公式运算符的子集,包括“~”、“.”、“+”和“-”。We support a subset of the available R formula operators for model fitting, including ‘~’, ‘.’, ‘+’, and ‘-‘.

在内部,SparkR 会自动对分类特征执行 one-hot 编码,这样便不需要手动执行此操作。Under the hood, SparkR automatically performs one-hot encoding of categorical features so that it does not need to be done manually. 除了 String 和 Double 类型的特征以外,还可以在 MLlib 矢量特征上进行拟合,以便与其他 MLlib 组件兼容。Beyond String and Double type features, it is also possible to fit over MLlib Vector features, for compatibility with other MLlib components.

下面的示例演示如何使用 SparkR 生成高斯 GLM 模型。The following example shows the use of building a gaussian GLM model using SparkR. 若要运行线性回归,请将系列设置为“gaussian”。To run Linear Regression, set family to “gaussian”. 若要运行逻辑回归,请将系列设置为“binomial”。To run Logistic Regression, set family to “binomial”.

# Create the DataFrame
df <- createDataFrame(sqlContext, iris)

# Fit a linear model over the dataset.
model <- glm(Sepal_Length ~ Sepal_Width + Species, data = df, family = "gaussian")

# Model coefficients are returned in a similar format to R's native glm().

将本地 R 数据帧转换为 SparkR 数据帧Converting local R data frames to SparkR DataFrames

可以使用 createDataFrame 将本地 R 数据帧转换为 SparkR 数据帧。You can use createDataFrame to convert local R data frames to SparkR DataFrames.

# Create SparkR DataFrame using localDF
convertedSparkDF <- createDataFrame(sqlContext, localDF)
# Another example: Create SparkR DataFrame with a local R data frame
anotherSparkDF <- createDataFrame(sqlContext, data.frame(surname = c("Tukey", "Venables", "Tierney", "Ripley", "McNeil"),
                                                         nationality = c("US", "Australia", "US", "UK", "Australia"),
                                                         deceased = c("yes", rep("no", 4))))