简化链式转换Simplify chained transformations

有时可能需要在数据帧上执行多个转换:Sometimes you may need to perform multiple transformations on your DataFrame:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.DataFrame

val testDf = (1 to 10).toDF("col")

def func0(x: Int => Int, y: Int)(in: DataFrame): DataFrame = {
  in.filter('col > x(y))
}
def func1(x: Int)(in: DataFrame): DataFrame = {
  in.selectExpr("col", s"col + $x as col1")
}
def func2(add: Int)(in: DataFrame): DataFrame = {
  in.withColumn("col2", expr(s"col1 + $add"))
}

应用这些转换时,可能最终生成如下所示的面条式代码:When you apply these transformations, you may end up with spaghetti code like this:

def inc(i: Int) = i + 1

val tmp0 = func0(inc, 3)(testDf)
val tmp1 = func1(1)(tmp0)
val tmp2 = func2(2)(tmp1)
val res = tmp2.withColumn("col3", expr("col2 + 3"))

本文介绍了几种简化链式转换的方法。This article describes several methods to simplify chained transformations.

数据帧 transform APIDataFrame transform API

若要利用 Spark 中的函数编程样式,可以使用数据帧 transform API,例如:To benefit from the functional programming style in Spark, you can leverage the DataFrame transform API, for example:

val res = testDf.transform(func0(inc, 4))
                .transform(func1(1))
                .transform(func2(2))
                .withColumn("col3", expr("col2 + 3"))

Function.chain APIFunction.chain API

若要进一步了解,可以利用 Scala 函数库进行 chain 转换,例如:To go even further, you can leverage the Scala Function library, to chain the transformations, for example:

val chained = Function.chain(List(func0(inc, 4)(_), func1(1)(_), func2(2)(_)))
val res = testDf.transform(chained)
                .withColumn("col3", expr("col2 + 3"))

implicitimplicit class

另一种替代方案是定义 Scala implicit 类,使用该类可消除数据帧 transform API:Another alternative is to define a Scala implicit class, which allows you to eliminate the DataFrame transform API:

implicit class MyTransforms(df: DataFrame) {
    def func0(x: Int => Int, y: Int): DataFrame = {
        df.filter('col > x(y))
    }
    def func1(x: Int): DataFrame = {
        df.selectExpr("col", s"col + $x as col1")
    }
    def func2(add: Int): DataFrame = {
        df.withColumn("col2", expr(s"col1 + $add"))
    }
}

然后,可以直接调用函数:Then you can call the functions directly:

val res = testDf.func0(inc, 1)
            .func1(2)
            .func2(3)
            .withColumn("col3", expr("col2 + 3"))