什么是用户定义函数 (UDF)?

用户定义函数 (UDF) 允许重复使用和共享在 Azure Databricks 上扩展内置功能的代码。 使用用户定义函数 (UDF) 执行特定任务,例如复杂的计算、转换或自定义数据操作。

何时使用 UDF 与 Apache Spark 函数?

将 UDF 用于难以通过内置 Apache Spark 函数表达的逻辑。 内置的 Apache Spark 函数已针对分布式处理进行了优化,可大规模提供更好的性能。 有关详细信息,请参阅 Functions

Databricks 建议使用 UDF 进行临时查询、手动数据清理、探索性数据分析,以及对中小型数据集执行操作。 UDF 的常见用例包括数据加密、解密、哈希、JSON 分析和验证。

使用 Apache Spark 方法对非常大的数据集进行操作,并且任何工作负荷都定期或持续执行,包括 ETL 作业和流式处理操作。

了解 UDF 类型

从以下选项卡中选择 UDF 类型,以查看说明、示例和链接以了解详细信息。

标量 UDF

标量 UDF 对单个行进行作,并为每个行返回单个结果值。 它们可以由 Unity Catalog 治理或将范围限于会话。

以下示例使用标量 UDF 计算列中每个名称 name 的长度,并在新列中 name_length添加值。

+-------+-------+
| name  | score |
+-------+-------+
| alice |  10.0 |
| bob   |  20.0 |
| carol |  30.0 |
| dave  |  40.0 |
| eve   |  50.0 |
+-------+-------+
-- Create a SQL UDF for name length
CREATE OR REPLACE FUNCTION main.test.get_name_length(name STRING)
RETURNS INT
RETURN LENGTH(name);

-- Use the UDF in a SQL query
SELECT name, main.test.get_name_length(name) AS name_length
FROM your_table;
+-------+-------+-------------+
| name  | score | name_length |
+-------+-------+-------------+
| alice |  10.0 |      5      |
|  bob  |  20.0 |      3      |
| carol |  30.0 |      5      |
| dave  |  40.0 |      4      |
|  eve  |  50.0 |      3      |
+-------+-------+-------------+

若要使用 PySpark 在 Databricks 笔记本中实现此目的:

from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

@udf(returnType=IntegerType())
def get_name_length(name):
  return len(name)

df = df.withColumn("name_length", get_name_length(df.name))

# Show the result
display(df)

请参阅 Unity 目录中的用户定义函数(UDF)用户定义的标量函数 - Python

批量标量 UDF

在保持输入输出行1:1对应关系的同时批量处理数据。 这减少了在进行大规模数据处理时的逐行操作负担。 批处理 UDF 还维护批处理之间的状态,以更高效地运行、重复使用资源并处理需要跨数据区块上下文的复杂计算。

它们可以由 Unity Catalog 治理或将范围限于会话。

以下 Batch Unity Catalog Python UDF 在批量处理行时计算 BMI:

+-------------+-------------+
| weight_kg   | height_m    |
+-------------+-------------+
|      90     |     1.8     |
|      77     |     1.6     |
|      50     |     1.5     |
+-------------+-------------+
%sql
CREATE OR REPLACE FUNCTION main.test.calculate_bmi_pandas(weight_kg DOUBLE, height_m DOUBLE)
RETURNS DOUBLE
LANGUAGE PYTHON
PARAMETER STYLE PANDAS
HANDLER 'handler_function'
AS $$
import pandas as pd
from typing import Iterator, Tuple

def handler_function(batch_iter: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
  for weight_series, height_series in batch_iter:
    yield weight_series / (height_series ** 2)
$$;

select main.test.calculate_bmi_pandas(cast(70 as double), cast(1.8 as double));
+--------+
|  BMI   |
+--------+
|  27.8  |
|  30.1  |
|  22.2  |
+--------+

请参阅 Unity 目录中的用户定义函数(UDF)Unity 目录中的 Batch Python 用户定义的函数(UDF)。

非标量 UDF

非标量 UDF 对整个数据集/列进行操作,具有灵活的输入/输出比率(1:N 或多:多)。

会话范围的批量 Pandas UDF 可以是以下类型:

  • 系列到系列
  • 系列迭代器到系列迭代器
  • 多系列迭代器到系列迭代器
  • 系列到标量

以下是一个系列到系列 Pandas UDF 示例。

from pyspark.sql.functions import pandas_udf
import pandas as pd

df = spark.createDataFrame([(70, 1.75), (80, 1.80), (60, 1.65)], ["Weight", "Height"])

@pandas_udf("double")
def calculate_bmi_pandas(weight: pd.Series, height: pd.Series) -> pd.Series:
  return weight / (height ** 2)

df.withColumn("BMI", calculate_bmi_pandas(df["Weight"], df["Height"])).display()

请参阅 pandas 用户定义的函数

UDAF

UDAF函数对多行进行操作,并返回一个聚合的结果。 UDAF 仅限会话范围。

以下 UDAF 示例通过姓名长度来聚合分数。

from pyspark.sql.functions import pandas_udf
from pyspark.sql import SparkSession
import pandas as pd

# Define a pandas UDF for aggregating scores
@pandas_udf("int")
def total_score_udf(scores: pd.Series) -> int:
  return scores.sum()

# Group by name length and aggregate
result_df = (df.groupBy("name_length")
  .agg(total_score_udf(df["score"]).alias("total_score")))

display(result_df)
+-------------+-------------+
| name_length | total_score |
+-------------+-------------+
|      3      |     70.0    |
|      4      |     40.0    |
|      5      |     40.0    |
+-------------+-------------+

请参阅 pandas 用户定义函数 ,了解 Python 和 用户定义的聚合函数 - Scala

UDTF

UDTF 采用一个或多个输入参数,并为每个输入行返回多行(可能有多个列)。 UDTF 仅限会话范围。

在以下示例中,分数列中的每一个值都对应于类别列表。 UDTF 将逗号分隔的列表拆分为多个行。

+-------+-------+-----------------+
| name  | score |   categories    |
+-------+-------+-----------------+
| alice |  10.0 |  math,science   |
|  bob  |  20.0 |  history,math   |
| carol |  30.0 | science,history |
| dave  |  40.0 |    math,art     |
|  eve  |  50.0 |  science,art    |
+-------+-------+-----------------+
from pyspark.sql.functions import udtf

@udtf(returnType="name: string, score: int, category: string")
class ScoreCategoriesUDTF:
  def eval(self, name: str, score: int, categories: str):
    category_list = categories.split(',')
    for category in category_list:
      yield (name, score, category)

ScoreCategoriesUDTF(lit("Alice"), lit(85), lit("Math,Science,English")).display()
+-------+-------+----------+
| name  | score | category |
+-------+-------+----------+
| alice |  10.0 |   math   |
| alice |  10.0 | science  |
|  bob  |  20.0 | history  |
|  bob  |  20.0 |   math   |
| carol |  30.0 | science  |
| carol |  30.0 | history  |
| dave  |  40.0 |   math   |
| dave  |  40.0 |   art    |
|  eve  |  50.0 | science  |
|  eve  |  50.0 |   art    |
+-------+-------+----------+

请参阅 Python 用户定义表函数 (UDTF)

Unity Catalog 治理与会话范围的 UDF

Unity 目录 Python UDF 和 Batch Unity 目录 Python UDF 保留在 Unity 目录中,以提高治理、重用性和可发现性。 所有其他 UDF 都是基于会话的,这意味着它们是在笔记本或作业中定义的,并且范围限定为当前的 SparkSession。 可以使用 Scala 或 Python 定义和访问会话范围的 UDF。

Unity Catalog 治理的 UDF 速查表

Unity 目录治理的 UDF 允许跨计算环境定义、使用、安全共享和管理自定义函数。 请参阅 Unity Catalog 中的用户定义函数 (UDF)

UDF 类型 支持的计算 DESCRIPTION
Unity Catalog Python UDF
  • 无服务器笔记本和作业
  • 具有标准访问模式的经典计算(Databricks Runtime 13.3 LTS 及更高版本)
  • SQL 仓库(无服务器、专业和经典)
  • DLT (经典和无服务器)
在 Python 中定义 UDF 并将其注册到 Unity 目录中进行治理。
标量 UDF 对单个行进行作,并为每个行返回单个结果值。
批处理 Unity Catalog Python UDF
  • 无服务器笔记本和作业
  • 具有标准访问模式的经典计算(Databricks Runtime 16.3 及更高版本)
  • SQL 仓库(无服务器、专业和经典)
在 Python 中定义 UDF 并将其注册到 Unity 目录中进行治理。
对多个值进行批处理,并返回多个值。 减少大规模数据处理中逐行操作的开销。

用于用户隔离计算的会话范围 UDF 速查表

会话范围的 UDF 在笔记本或作业中定义,范围限定为当前的 SparkSession。 可以使用 Scala 或 Python 定义和访问会话范围的 UDF。

UDF 类型 支持的计算 DESCRIPTION
Python 标量
  • 无服务器笔记本和作业
  • 具有标准访问模式的经典计算(Databricks Runtime 13.3 LTS 及更高版本)
  • DLT (经典和无服务器)
标量 UDF 对单个行进行作,并为每个行返回单个结果值。
Python 非标量
  • 无服务器笔记本和作业
  • 具有标准访问模式的经典计算(Databricks Runtime 14.3 LTS 及更高版本)
  • DLT (经典和无服务器)
非标量 UDF 包括 pandas_udfmapInPandasmapInArrowapplyInPandas。 Pandas UDF 使用 Apache Arrow 传输数据,并使用 pandas 来处理数据。 Pandas UDF 支持矢量化操作,这些操作可以大幅提高逐行处理的标量 UDF 的性能。
Python UDDF
  • 无服务器笔记本和作业
  • 具有标准访问模式的经典计算(Databricks Runtime 14.3 LTS 及更高版本)
  • DLT (经典和无服务器)
UDTF 采用一个或多个输入参数,并为每个输入行返回多行(可能有多个列)。
Scala 标量 UDF
  • 具有标准访问模式的经典计算(Databricks Runtime 13.3 LTS 及更高版本)
标量 UDF 对单个行进行作,并为每个行返回单个结果值。
Scala UDAF
  • 具有标准访问模式的经典计算(Databricks Runtime 14.2 LTS 及更高版本)
UDAF函数对多行进行操作,并返回一个聚合的结果。

性能注意事项

  • 内置函数 SQL UDF 是最有效的选项。

  • Scala UDF 通常比 Python UDF 快。

    • 未隔离的 Scala UDF 在 Java 虚拟机(JVM)中运行,因此能够避免数据在 JVM 内外移动的开销。
    • 隔离的 Scala UDF 必须将数据移入和移出 JVM,但它们仍比 Python UDF 更快,因为它们更有效地处理内存。
  • Python UDFpandas UDF 往往比 Scala UDF 慢,因为它们需要序列化数据并将其移出 JVM 到 Python 解释器。

    • Pandas UDF 的速度比 Python UDF 快 100 倍,因为它们使用 Apache Arrow 来降低序列化成本。