如何使用 gapply 将 R 代码并行化How To parallelize R code with gapply

将 R 代码并行化比较困难,因为 R 代码在驱动程序上运行,并且 R data.frame 不是分布式的。Parallelization of R code is difficult, because R code runs on the driver and R data.frames are not distributed. 通常有在本地运行的现有 R 代码和转换为在 Apache Spark 上运行的现有 R 代码。Often, there is existing R code that is run locally and that is converted to run on Apache Spark. 在其他情况下,一些用于高级统计分析和机器学习技术的 SparkR 函数可能不支持分布式计算。In other cases, some SparkR functions used for advanced statistical analysis and machine learning techniques may not support distributed computing. 在这些情况下,可以使用 SparkR UDF API 在群集中分布所需的工作负载。In such cases, the SparkR UDF API can be used to distribute the desired workload across a cluster.

示例用例:你希望针对数据集的子集(按某个键分组)训练机器学习模型。Example use case: You want to train a machine learning model on subsets of a data set, grouped by a key. 如果数据的子集适合于辅助角色,则使用 SparkR UDF API 一次训练多个模型可能会更有效。If the subsets of the data fit on the workers, it may be more efficient to use the SparkR UDF API to train multiple models at once.

gapplygapplyCollect 函数将函数应用到 Spark 数据帧中的每个组。The gapply and gapplyCollect functions apply a function to each group in a Spark DataFrame. 对于 Spark 数据帧中的每个组:For each group in a Spark DataFrame:

  1. 收集每个组作为 R data.frame。Collect each group as an R data.frame.
  2. 将函数发送到辅助角色并执行。Send the function to the worker and execute.
  3. 将结果返回到由架构指定的驱动程序。Return the result to the driver as specified by the schema.

备注

调用 gapply 时,必须指定输出架构。When you call gapply, you must specify the output schema. 借助 gapplyCollect,将会为输出使用 R data.frame,从而将结果收集到驱动程序。With gapplyCollect, the result is collected to the driver using an R data.frame for the output.

在下面的示例中,单独的支持向量机模型已与每个月的 airquality 数据拟合。In the following example, a separate support vector machine model is fit on the airquality data for each month. 输出是一个 data.frame,其中包含为每个月生成的 MSE(在指定和未指定架构的情况下显示)。The output is a data.frame with the resulting MSE for each month, shown both with and without specifying the schema.

df <- createDataFrame(na.omit(airquality))

schema <- structType(
  structField("Month", "MSE"),
  structField("integer", "Number"))

result <- gapply(df, c("Month"), function(key, x) {
  library(e1071)
  data.frame(month = key, mse = svm(Ozone ~ ., x, cross = 3)$tot.MSE)
}, schema)
df <- createDataFrame(na.omit(airquality))

gapplyCollect(df, c("Month"), function(key, x) {
  library(e1071)
  y <- data.frame(month = key, mse = svm(Ozone ~ ., x, cross = 3)$tot.MSE)
 names(y) <- c("Month", "MSE")
  y
})

备注

从一个 Spark 数据帧开始,在所有辅助角色上安装包。Start with a Spark DataFrame and install packages on all workers.