注释
本文介绍 Databricks Connect for Databricks Runtime 13.1 及更高版本。
用于 Python 的 Databricks Connect 支持用户定义的函数 (UDF)。 执行包含 UDF 的数据帧作时,UDF 由 Databricks Connect 序列化,并作为请求的一部分发送到服务器。
有关用于 Scala 的 Databricks Connect 的 UDF 的信息,请参阅 Databricks Connect for Scala 中的用户定义的函数。
注释
由于用户定义的函数已序列化和反序列化,因此客户端的 Python 版本必须与 Azure Databricks 计算中的 Python 版本匹配。 有关支持的版本,请参阅 版本支持矩阵。
定义 UDF
若要在用于 Python 的 Databricks Connect 中创建 UDF,请使用以下受支持的函数之一:
- PySpark 用户定义的函数
- PySpark 流式处理函数
例如,以下 Python 代码设置了一个简单的用户定义函数(UDF),用于对列中的值进行平方。
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType
from databricks.connect import DatabricksSession
@udf(returnType=IntegerType())
def double(x):
return x * x
spark = DatabricksSession.builder.getOrCreate()
df = spark.range(1, 2)
df = df.withColumn("doubled", double(col("id")))
df.show()
具有依赖项的 UDF
重要
此功能为公共预览版,需要 Databricks Connect for Python 16.4 或更高版本,以及运行 Databricks Runtime 16.4 或更高版本的群集。 若要使用此功能,请在工作区 中的 Unity 目录中启用预览增强型 Python UDF 。
Databricks Connect 支持指定 UDF 所需的 Python 依赖项。 这些依赖项作为 UDF Python 环境的一部分安装在 Databricks 计算上。
此功能允许用户指定除基础环境中提供的包外,UDF 所需的依赖项。 它还可用于安装与 基础环境中提供的不同版本的包。
可以从以下源安装依赖项:
- PyPI 包
- 可以根据 PEP 508(例如,
dice
pyjokes<1
或simplejson==3.19.*
)指定 PyPI 包。
- 可以根据 PEP 508(例如,
- 存储在 Unity 目录卷中的文件
- 支持 Wheel 软件包(
.whl
)和 gzipped tar 文件(.tar.gz
)。 用户必须在 re:[UC] 卷中获得对文件的READ_FILE
权限。 - 从 Unity 目录卷安装包时,若要调用 UDF,用户需要
READ VOLUME
对源卷具有权限。 向所有帐户用户授予此权限会自动为新用户启用此权限。 - 应将 Unity Catalog 卷文件指定为
dbfs:<path>
,例如,dbfs:/Volumes/users/someone@example.com/wheels/my_private_dep.whl
或dbfs:/Volumes/users/someone@example.com/tars/my_private_deps.tar.gz
。
- 支持 Wheel 软件包(
若要在 UDF 中包含自定义依赖项,请在 withDependencies
中指定它们,然后使用该环境创建 Spark 会话。 依赖项安装在 Databricks 计算中,并且将在使用此 Spark 会话的所有 UDF 中可用。
以下代码将 PyPI 包 dice
声明为依赖项:
from databricks.connect import DatabricksSession, DatabricksEnv
env = DatabricksEnv().withDependencies("dice==3.1.0")
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()
或者,若要在卷中指定滚轮的依赖项::
from databricks.connect import DatabricksSession, DatabricksEnv
env = DatabricksEnv().withDependencies("dbfs:/Volumes/users/someone@example.com/wheels/my_private_dep.whl")
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()
Databricks 笔记本和作业中的行为
在笔记本和作业中,需要直接在 REPL 中安装 UDF 依赖项。 Databricks Connect 通过验证是否已安装所有指定的依赖项来验证 REPL Python 环境,如果未安装任何依赖项,则会引发异常。
对 PyPI 和 Unity 目录卷依赖项执行笔记本环境验证。 卷依赖项需要按照PEP-427或更高版本的标准 Python 打包规范进行打包,用于轮子文件,而源分发文件则需遵循PEP-241或更高版本的规范。 有关 Python 打包标准的详细信息,请参阅 PyPA 文档。
局限性
- 本地开发计算机上的 Python 滚轮或源分发等文件不能直接指定为依赖项。 必须先将它们上传到 Unity 目录卷。
- 不支持在窗口函数中使用
pyspark.sql.streaming.DataStreamWriter.foreach
、pyspark.sql.streaming.DataStreamWriter.foreachBatch
以及pandas聚合UDF的依赖项。
例子
以下示例定义了环境中的 PyPI 依赖项和卷依赖项,创建与该环境的会话,然后定义使用这些依赖项的用户自定义函数(UDF)并调用它们:
from databricks.connect import DatabricksSession, DatabricksEnv
from pyspark.sql.functions import udf, col, pandas_udf
from pyspark.sql.types import IntegerType, LongType, StringType
import pandas as pd
pypi_deps = ["pyjokes>=0.8,<1"]
volumes_deps = [
# Example library from: https://pypi.org/project/dice/#files
"dbfs:/Volumes/main/someone@example.com/test/dice-4.0.0-py3-none-any.whl"
# Example library from: https://pypi.org/project/simplejson/#files
"dbfs:/Volumes/main/someone@example.com/test/simplejson-3.19.3.tar.gz",
]
env = DatabricksEnv().withDependencies(pypi_deps).withDependencies(volumes_deps)
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()
# UDFs
@udf(returnType=StringType())
def get_joke():
from pyjokes import get_joke
return get_joke()
@udf(returnType=IntegerType())
def double_and_json_parse(x):
import simplejson
return simplejson.loads(simplejson.dumps(x * 2))
@pandas_udf(returnType=LongType())
def multiply_and_add_roll(a: pd.Series, b: pd.Series) -> pd.Series:
import dice
return a * b + dice.roll(f"1d10")[0]
df = spark.range(1, 10)
df = df.withColumns({
"joke": get_joke(),
"doubled": double_and_json_parse(col("id")),
"mutliplied_with_roll": multiply_and_add_roll(col("id"), col("doubled"))
})
df.show()
Python 基础环境
UDF 在 Databricks 计算中执行,而不是在客户端上执行。 执行 UDF 的基本 Python 环境取决于所选的 Databricks 计算。
基本 Python 环境是在群集上运行的 Databricks Runtime 版本的 Python 环境。 此基本环境中的 Python 版本和包列表位于 Databricks Runtime 发行说明的系统环境和已安装的 Python 库部分下。