适用于 Scala 的 Databricks Connect 中的用户定义的函数

注意

本文介绍适用于 Databricks Runtime 14.1 及更高版本的 Databricks Connect。

本文介绍如何使用适用于 Scala 的 Databricks Connect 执行用户定义的函数。 Databricks Connect 使你能够将常用 IDE、笔记本服务器和自定义应用程序连接到 Azure Databricks 群集。 有关本文的 Python 版本,请参阅适用于 Python 的 Databricks Connect 中的用户定义的函数

注意

在开始使用 Databricks Connect 之前,必须先设置 Databricks Connect 客户端

对于 Databricks Runtime 14.1 及更高版本,适用于 Scala 的 Databricks Connect 支持运行用户定义的函数 (UDF)。

若要运行 UDF,必须将 UDF 所需的已编译类和 JAR 上传到群集。 addCompiledArtifacts() API 可用于指定必须上传的已编译类和 JAR 文件。

注意

客户端使用的 Scala 必须与 Azure Databricks 群集上的 Scala 版本匹配。 若要查看群集的 Scala 版本,请参阅 Databricks Runtime 发行说明版本和兼容性中的“系统环境”部分。

以下 Scala 程序会设置一个简单的 UDF,用于在列中计算值的平方。

import com.databricks.connect.DatabricksSession
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, udf}

object Main {
  def main(args: Array[String]): Unit = {
    val sourceLocation = getClass.getProtectionDomain.getCodeSource.getLocation.toURI

    val spark = DatabricksSession.builder()
      .addCompiledArtifacts(sourceLocation)
      .getOrCreate()

    def squared(x: Int): Int = x * x

    val squared_udf = udf(squared _)

    spark.range(3)
      .withColumn("squared", squared_udf(col("id")))
      .select("squared")
      .show()
  }
}

在前面的示例中,由于 UDF 完全包含在 Main 中,因此只会添加 Main 的已编译项目。 如果 UDF 分布在其他类上或使用外部库(即 JAR),则还应包括所有这些库。

初始化 Spark 会话后,可以使用 spark.addArtifact() API 上传进一步编译的类和 JAR。

注意

上传 JAR 时,必须包括所有可传递依赖项 JAR 才能上传。 API 不对可传递依赖项执行任何自动检测。

类型化数据集 API

上一节所述的 UDF 机制同样适用于类型化数据集 API。

类型化数据集 API 支持对生成的数据集运行转换,例如映射、筛选和聚合。 这些操作也与 Databricks 群集上的 UDF 类似。

以下 Scala 应用程序使用 map() API 将结果列中的数字修改为前缀字符串。

import com.databricks.connect.DatabricksSession
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, udf}

object Main {
  def main(args: Array[String]): Unit = {
    val sourceLocation = getClass.getProtectionDomain.getCodeSource.getLocation.toURI

    val spark = DatabricksSession.builder()
      .addCompiledArtifacts(sourceLocation)
      .getOrCreate()

    spark.range(3).map(f => s"row-$f").show()
  }
}

虽然此示例使用 map() API,但这也适用于其他类型化数据集 API,例如 filter()mapPartitions() 等。