什么是用户定义函数 (UDF)?

用户定义函数 (UDF) 是由用户定义的函数,用于在用户环境中重复使用自定义逻辑。 Azure Databricks 支持使用许多不同类型的 UDF 来分发可扩展逻辑。 本文介绍 UDF 的一些常规优势和限制。

注意

并非所有形式的 UDF 在 Azure Databricks 上的所有执行环境中都可用。 如果你使用的是 Unity Catalog,请参阅 Unity Catalog 中的用户定义函数 (UDF)

定义不影响序列化的自定义逻辑

Azure Databricks 从 Apache Spark 继承了其大部分 UDF 行为,包括许多 UDF 类型的效率限制。 请参阅哪些 UDF 最高效?

你可以安全地模块化代码,而无需担心与 UDF 相关的潜在效率弊端。 为此,必须使用 SQL 或 Spark 数据帧将逻辑定义为一系列 Spark 内置方法。 例如,以下 SQL 和 Python 函数结合 Spark 内置方法,将单位转换定义为可重用函数:

SQL

CREATE FUNCTION convert_f_to_c(unit STRING, temp DOUBLE)
RETURNS DOUBLE
RETURN CASE
  WHEN unit = "F" THEN (temp - 32) * (5/9)
  ELSE temp
END;

SELECT convert_f_to_c(unit, temp) AS c_temp
FROM tv_temp;

Python

def convertFtoC(unitCol, tempCol):
  from pyspark.sql.functions import when
  return when(unitCol == "F", (tempCol - 32) * (5/9)).otherwise(tempCol)

from pyspark.sql.functions import col

df_query = df.select(convertFtoC(col("unit"), col("temp"))).toDF("c_temp")
display(df_query)

若要运行上述 UDF,可以创建示例数据

哪些 UDF 最高效?

UDF 可能会在代码执行中带来严重的处理瓶颈。 Azure Databricks 自动对通过包含的 Apache Spark、SQL 和 Delta Lake 语法编写的代码使用许多不同的优化器。 当 UDF 引入自定义逻辑时,这些优化器无法围绕此自定义逻辑有效地计划任务。 此外,在 JVM 外部执行的逻辑会在数据序列化方面产生额外的成本。

注意

如果你使用支持 Photon 的计算,则 Azure Databricks 会使用 Photon 优化许多函数。 Photon 只能优化将 Spark 或 SQL 数据帧命令链接在一起的函数。

某些 UDF 比另外一些 UDF 更高效。 在性能方面:

  • 由于存在 Azure Databricks 优化器,内置函数的速度最快。
  • 在 JVM 中执行的代码(Scala、Java、Hive UDF)会比 Python UDF 快。
  • Pandas UDF 使用 Arrow 来降低与 Python UDF 关联的序列化成本。
  • Python UDF 适用于过程逻辑,但应避免将其用于大型数据集上的生产 ETL 工作负载。

注意

在 Databricks Runtime 12.2 LTS 及更低版本中,使用共享访问模式的群集上的 Unity Catalog 中不支持 Python 标量 UDF 和 Pandas UDF。 Databricks Runtime 13.3 LTS 及更高版本在所有访问模式下都支持这些 UDF。

在 Databricks Runtime 14.1 及以下版本中,在使用共享访问模式的群集上,Unity Catalog 不支持 Scala 标量 UDF。 Databricks Runtime 14.2 及更高版本中的所有访问模式都支持这些 UDF。

在 Databricks Runtime 13.3 LTS 及更高版本中,可以使用 SQL 语法将标量 Python UDF 注册到 Unity Catalog。 请参阅 Unity Catalog 中的用户定义函数 (UDF)

类型 已优化 执行环境
Hive UDF JVM
Python UDF Python
Pandas UDF Python (Arrow)
Scala UDF JVM
Spark SQL JVM (Photon)
Spark 数据帧 JVM (Photon)

在什么情况下使用 UDF?

UDF 的一个主要好处是允许用户使用熟悉的语言来表达逻辑,从而降低与重构代码相关的人工成本。 对于即席查询、手动数据清理、探索性数据分析以及针对小型或中型数据集的大多数操作,与 UDF 关联的延迟开销成本不太可能超过与重构代码相关的成本。

对于 ETL 作业、流式处理操作、在超大型数据集上执行的操作或其他定期执行或持续执行的工作负荷,通过重构逻辑来使用原生 Apache Spark 方法可快速获得收益。

示例 UDF 的示例数据

本文中的代码示例使用 UDF 在摄氏和华氏之间转换温度。 如果要执行这些函数,可以使用以下 Python 代码创建示例数据集:

import numpy as np
import pandas as pd

Fdf = pd.DataFrame(np.random.normal(55, 25, 10000000), columns=["temp"])
Fdf["unit"] = "F"

Cdf = pd.DataFrame(np.random.normal(10, 10, 10000000), columns=["temp"])
Cdf["unit"] = "C"

df = spark.createDataFrame(pd.concat([Fdf, Cdf]).sample(frac=1))

df.cache().count()
df.createOrReplaceTempView("tv_temp")