共用方式為

用于 Python 的 Databricks Connect 中的用户定义的函数

注释

本文介绍 Databricks Connect for Databricks Runtime 13.1 及更高版本。

用于 Python 的 Databricks Connect 支持用户定义的函数 (UDF)。 执行包含 UDF 的数据帧作时,UDF 由 Databricks Connect 序列化,并作为请求的一部分发送到服务器。

有关用于 Scala 的 Databricks Connect 的 UDF 的信息,请参阅 Databricks Connect for Scala 中的用户定义的函数

注释

由于用户定义的函数已序列化和反序列化,因此客户端的 Python 版本必须与 Azure Databricks 计算中的 Python 版本匹配。 有关支持的版本,请参阅 版本支持矩阵

定义 UDF

若要在用于 Python 的 Databricks Connect 中创建 UDF,请使用以下受支持的函数之一:

例如,以下 Python 代码设置了一个简单的用户定义函数(UDF),用于对列中的值进行平方。

from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType
from databricks.connect import DatabricksSession

@udf(returnType=IntegerType())
def double(x):
    return x * x

spark = DatabricksSession.builder.getOrCreate()

df = spark.range(1, 2)
df = df.withColumn("doubled", double(col("id")))

df.show()

具有依赖项的 UDF

重要

此功能为公共预览版,需要 Databricks Connect for Python 16.4 或更高版本,以及运行 Databricks Runtime 16.4 或更高版本的群集。 若要使用此功能,请在工作区 中的 Unity 目录中启用预览增强型 Python UDF

Databricks Connect 支持指定 UDF 所需的 Python 依赖项。 这些依赖项作为 UDF Python 环境的一部分安装在 Databricks 计算上。

此功能允许用户指定除基础环境中提供的包外,UDF 所需的依赖项。 它还可用于安装与 基础环境中提供的不同版本的包。

可以从以下源安装依赖项:

  • PyPI 包
    • 可以根据 PEP 508(例如,dicepyjokes<1simplejson==3.19.*)指定 PyPI 包。
  • 存储在 Unity 目录卷中的文件
    • 支持 Wheel 软件包(.whl)和 gzipped tar 文件(.tar.gz)。 用户必须在 re:[UC] 卷中获得对文件的 READ_FILE 权限。
    • 从 Unity 目录卷安装包时,若要调用 UDF,用户需要 READ VOLUME 对源卷具有权限。 向所有帐户用户授予此权限会自动为新用户启用此权限。
    • 应将 Unity Catalog 卷文件指定为 dbfs:<path>,例如,dbfs:/Volumes/users/someone@example.com/wheels/my_private_dep.whldbfs:/Volumes/users/someone@example.com/tars/my_private_deps.tar.gz

若要在 UDF 中包含自定义依赖项,请在 withDependencies 中指定它们,然后使用该环境创建 Spark 会话。 依赖项安装在 Databricks 计算中,并且将在使用此 Spark 会话的所有 UDF 中可用。

以下代码将 PyPI 包 dice 声明为依赖项:

from databricks.connect import DatabricksSession, DatabricksEnv
env = DatabricksEnv().withDependencies("dice==3.1.0")
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()

或者,若要在卷中指定滚轮的依赖项::

from databricks.connect import DatabricksSession, DatabricksEnv

env = DatabricksEnv().withDependencies("dbfs:/Volumes/users/someone@example.com/wheels/my_private_dep.whl")
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()

Databricks 笔记本和作业中的行为

在笔记本和作业中,需要直接在 REPL 中安装 UDF 依赖项。 Databricks Connect 通过验证是否已安装所有指定的依赖项来验证 REPL Python 环境,如果未安装任何依赖项,则会引发异常。

对 PyPI 和 Unity 目录卷依赖项执行笔记本环境验证。 卷依赖项需要按照PEP-427或更高版本的标准 Python 打包规范进行打包,用于轮子文件,而源分发文件则需遵循PEP-241或更高版本的规范。 有关 Python 打包标准的详细信息,请参阅 PyPA 文档

局限性

  • 本地开发计算机上的 Python 滚轮或源分发等文件不能直接指定为依赖项。 必须先将它们上传到 Unity 目录卷。
  • 不支持在窗口函数中使用pyspark.sql.streaming.DataStreamWriter.foreachpyspark.sql.streaming.DataStreamWriter.foreachBatch以及pandas聚合UDF的依赖项。

例子

以下示例定义了环境中的 PyPI 依赖项和卷依赖项,创建与该环境的会话,然后定义使用这些依赖项的用户自定义函数(UDF)并调用它们:

from databricks.connect import DatabricksSession, DatabricksEnv
from pyspark.sql.functions import udf, col, pandas_udf
from pyspark.sql.types import IntegerType, LongType, StringType
import pandas as pd

pypi_deps = ["pyjokes>=0.8,<1"]

volumes_deps = [
    # Example library from: https://pypi.org/project/dice/#files
    "dbfs:/Volumes/main/someone@example.com/test/dice-4.0.0-py3-none-any.whl"
    # Example library from: https://pypi.org/project/simplejson/#files
    "dbfs:/Volumes/main/someone@example.com/test/simplejson-3.19.3.tar.gz",
]

env = DatabricksEnv().withDependencies(pypi_deps).withDependencies(volumes_deps)
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()

# UDFs
@udf(returnType=StringType())
def get_joke():
    from pyjokes import get_joke
    return get_joke()

@udf(returnType=IntegerType())
def double_and_json_parse(x):
    import simplejson
    return simplejson.loads(simplejson.dumps(x * 2))


@pandas_udf(returnType=LongType())
def multiply_and_add_roll(a: pd.Series, b: pd.Series) -> pd.Series:
    import dice
    return a * b + dice.roll(f"1d10")[0]


df = spark.range(1, 10)
df = df.withColumns({
    "joke": get_joke(),
    "doubled": double_and_json_parse(col("id")),
    "mutliplied_with_roll": multiply_and_add_roll(col("id"), col("doubled"))
})
df.show()

Python 基础环境

UDF 在 Databricks 计算中执行,而不是在客户端上执行。 执行 UDF 的基本 Python 环境取决于所选的 Databricks 计算。

基本 Python 环境是在群集上运行的 Databricks Runtime 版本的 Python 环境。 此基本环境中的 Python 版本和包列表位于 Databricks Runtime 发行说明的系统环境和已安装的 Python 库部分下。