注释
本文介绍 Databricks Connect for Databricks Runtime 13.3 及更高版本。
用于 Python 的 Databricks Connect 支持用户定义的函数 (UDF)。 执行包含 UDF 的数据帧作时,UDF 由 Databricks Connect 序列化,并作为请求的一部分发送到服务器。
有关用于 Scala 的 Databricks Connect 的 UDF 的信息,请参阅 Databricks Connect for Scala 中的用户定义的函数。
注释
由于用户定义的函数已序列化和反序列化,因此客户端的 Python 版本必须与 Azure Databricks 计算中的 Python 版本匹配。 有关支持的版本,请参阅 版本支持矩阵。
定义 UDF
若要在用于 Python 的 Databricks Connect 中创建 UDF,请使用以下受支持的函数之一:
- PySpark 用户定义的函数
- PySpark 流式处理函数
例如,以下 Python 代码设置了一个简单的用户定义函数(UDF),用于对列中的值进行平方。
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType
from databricks.connect import DatabricksSession
@udf(returnType=IntegerType())
def double(x):
return x * x
spark = DatabricksSession.builder.getOrCreate()
df = spark.range(1, 2)
df = df.withColumn("doubled", double(col("id")))
df.show()
管理 UDF 依赖项
重要
此功能为公共预览版,需要 Databricks Connect for Python 16.4 或更高版本,以及运行 Databricks Runtime 16.4 或更高版本的群集。 若要使用此功能,请在工作区 中的 Unity 目录中启用预览增强型 Python UDF 。
Databricks Connect 支持指定 UDF 所需的 Python 依赖项。 这些依赖项作为 UDF Python 环境的一部分安装在 Databricks 计算上。
此功能允许用户指定除基础环境中提供的包外,UDF 所需的依赖项。 它还可用于安装与 基础环境中提供的不同版本的包。
可以从以下源安装依赖项:
- PyPI 包
- 可以根据 PEP 508(例如,
dicepyjokes<1或simplejson==3.19.*)指定 PyPI 包。
- 可以根据 PEP 508(例如,
- 存储在 Unity 目录卷中的包
- 支持生成分发(
.whl)和源分发(.tar.gz)。 - 指定 Unity Catalog 卷包可以采用
dbfs:<path>,例如dbfs:/Volumes/users/someone@example.com/wheels/my_private_dep-3.20.2-py3-none-any.whl或dbfs:/Volumes/users/someone@example.com/tars/my_private_dep-4.0.0.tar.gz。 - 用户必须在 re:[UC] 卷中获得对文件的
READ_FILE权限。 向所有帐户用户授予此权限会自动为新用户启用此权限。
- 支持生成分发(
- 本地包、文件夹和 Python 文件
- 本地生成的分发()、源分发(
.whl.tar.gz)、文件夹和 Python 文件可以指定为local:<path>,例如:local:/path/to/my_private_dep-3.20.2-py3-none-any.whl、、。local:/path/to/my_private_dep-4.0.0.tar.gzlocal:/path/to/my_folderlocal:/path/to/my_file.py - 支持绝对路径和相对路径,例如:
local:/path/to/my_file.py或local:./path/to/my_file.py。
- 本地生成的分发()、源分发(
若要在 UDF 中包含自定义依赖项,请在 withDependencies 中指定它们,然后使用该环境创建 Spark 会话。 依赖项安装在 Databricks 计算中,并且将在使用此 Spark 会话的所有 UDF 中可用。
以下代码将 PyPI 包 dice 声明为依赖项:
from databricks.connect import DatabricksSession, DatabricksEnv
env = DatabricksEnv().withDependencies("dice==3.1.0")
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()
或者,若要在卷中指定滚轮的依赖项::
from databricks.connect import DatabricksSession, DatabricksEnv
env = DatabricksEnv().withDependencies("dbfs:/Volumes/users/someone@example.com/wheels/my_private_dep-3.20.2-py3-none-any.whl")
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()
Databricks 笔记本和作业中的行为
在笔记本和作业中,需要直接在 REPL 中安装 UDF 依赖项。 Databricks Connect 通过验证是否已安装所有指定的依赖项来验证 REPL Python 环境,如果未安装任何依赖项,则会引发异常。 笔记本环境验证针对 PyPI 和 Unity 目录卷依赖项运行,但不适用于本地依赖项。
局限性
- UDF 依赖支持
pyspark.sql.streaming.DataStreamWriter.foreach需要 Databricks Connect,适用于 Python 18.0 或更高版本,以及运行 Databricks Runtime 18.0 或更高版本的群集。 - UDF 依赖支持
pyspark.sql.streaming.DataStreamWriter.foreachBatch需要 Databricks Connect,适用于 Python 18.0 或更高版本,以及运行 Databricks Runtime 18.0 或更高版本的群集。 此功能不支持在无服务器架构中运行。 - 对本地包、文件夹和 Python 文件的 UDF 依赖项支持需要用于 Python 18.1 或更高版本的 Databricks Connect,以及运行 Databricks Runtime 18.1 或更高版本的群集。
- 通过窗口函数进行的 Pandas 聚合 UDF 不支持 UDF 依赖项。
- Unity 目录卷包和本地包必须遵循 PEP-427 或更高版本的标准 Python 打包规范进行打包,用于轮子格式构建发布,以及 PEP-241 或更高版本的 tar 源发布。 有关 Python 打包标准的详细信息,请参阅 PyPA 文档。
例子
以下示例定义了环境中的 PyPI 依赖项和卷依赖项,创建与该环境的会话,然后定义使用这些依赖项的用户自定义函数(UDF)并调用它们:
from databricks.connect import DatabricksSession, DatabricksEnv
from pyspark.sql.functions import udf, col, pandas_udf
from pyspark.sql.types import IntegerType, LongType, StringType
import pandas as pd
pypi_deps = ["pyjokes>=0.8,<1"]
volumes_deps = [
# Example library from: https://pypi.org/project/dice/#files
"dbfs:/Volumes/main/someone@example.com/test/dice-4.0.0.tar.gz",
]
local_deps = [
# Example library from: https://pypi.org/project/simplejson/#files
"local:./test/simplejson-3.20.2-py3-none-any.whl",
]
env = DatabricksEnv().withDependencies(pypi_deps).withDependencies(volumes_deps).withDependencies(local_deps)
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()
# UDFs
@udf(returnType=StringType())
def get_joke():
from pyjokes import get_joke
return get_joke()
@udf(returnType=IntegerType())
def double_and_json_parse(x):
import simplejson
return simplejson.loads(simplejson.dumps(x * 2))
@pandas_udf(returnType=LongType())
def multiply_and_add_roll(a: pd.Series, b: pd.Series) -> pd.Series:
import dice
return a * b + dice.roll(f"1d10")[0]
df = spark.range(1, 10)
df = df.withColumns({
"joke": get_joke(),
"doubled": double_and_json_parse(col("id")),
"mutliplied_with_roll": multiply_and_add_roll(col("id"), col("doubled"))
})
df.show()
自动管理 UDF 依赖项
重要
此功能为 公共预览版 ,需要 Databricks Connect for Python 18.1 或更高版本、本地计算机上的 Python 3.12 以及运行 Databricks Runtime 18.1 或更高版本的群集。 若要使用此功能,请在工作区 中的 Unity 目录中启用预览增强型 Python UDF 。
Databricks Connect withAutoDependencies() API 支持自动发现和上传 UDF 中导入语句中使用的本地模块和公共 PyPI 依赖项。 它无需手动指定依赖项。
以下代码启用自动依赖项管理:
from databricks.connect import DatabricksSession, DatabricksEnv
env = DatabricksEnv().withAutoDependencies(upload_local=True, use_index=True)
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()
该方法 withAutoDependencies() 接受以下参数:
-
upload_local:设置为True时,导入到您的 UDFs 的本地模块会被自动发现、打包并上传到 UDF 沙盒中。 :设置为 时,会在 Azure Databricks 计算环境中自动发现并自动安装 UDF 中使用的公共 PyPI 依赖项。 发现过程使用本地计算机上的已安装包来确定版本,确保本地环境和远程执行环境之间的一致性。
局限性
- 不支持动态导入(例如
importlib.import_module("foo"))。 - 命名空间包(例如,
azure.eventhub和google.cloud.aiplatform)不受支持。 - 不支持使用直接 URL 引用安装的依赖项。 这包括从本地 Wheel 文件中安装的那些组件。
- 不支持从专用包索引安装的依赖项。 以这种方式安装的包无法与从公共 PyPI 安装的包区分开来。
- 依赖项发现在 Python shell 中不起作用。 仅支持 Python 脚本、IPython shell 和 Jupyter Notebook。
例子
以下示例演示了使用本地模块和 PyPI 包的自动依赖项管理。 此示例要求你已安装 simplejson 并 dice (使用 pip install simplejson dice)。
首先,创建本地帮助程序模块:
# my_helper.py
def double(x):
return 2 * x
# my_json.py
import simplejson
def loads(x):
return simplejson.loads(x)
def dumps(x):
return simplejson.dumps(x)
然后,在主脚本中导入这些模块,并在 UDF 中使用它们:
# main.py
import dice as dc
from databricks.connect import DatabricksSession, DatabricksEnv
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType, FloatType
import my_json
from my_helper import double
env = DatabricksEnv().withAutoDependencies(upload_local=True, use_index=True)
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()
@udf(returnType=IntegerType())
def double_and_json_parse(x):
return my_json.loads(my_json.dumps(double(x)))
@udf(returnType=FloatType())
def sum_and_add_noise(x, y):
return x + y + (dc.roll("d6")[0] / 6)
df = spark.range(1, 10)
df = df.withColumns({
"doubled": double_and_json_parse(col("id")),
"summed_with_noise": sum_and_add_noise(col("id"), col("doubled")),
})
df.show()
伐木业
若要输出发现的依赖项,请将 SPARK_CONNECT_LOG_LEVEL 环境变量 info 设置为或 debug。 或者,配置 Python 日志记录框架:
import logging
logging.basicConfig(level=logging.INFO)
相关日志由 databricks.connect.auto_dependencies 模块发出,例如:
DEBUG:databricks.connect.auto_dependencies.discovery:Discovered local module: my_json
DEBUG:databricks.connect.auto_dependencies.discovery:Discovered local module: my_helper
DEBUG:databricks.connect.auto_dependencies.discovery:Discovered distribution: simplejson for module simplejson
DEBUG:databricks.connect.auto_dependencies.discovery:Discovered distribution: dice for module dice
INFO:databricks.connect.auto_dependencies.hook:Synced zip artifact for: my_json
INFO:databricks.connect.auto_dependencies.hook:Synced zip artifact for: my_helper
INFO:databricks.connect.auto_dependencies.hook:Updated simplejson with auto-detected version ==3.20.2
INFO:databricks.connect.auto_dependencies.hook:Updated dice with auto-detected version ==4.0.0
Python 基础环境
UDF 在 Databricks 计算中执行,而不是在客户端上执行。 执行 UDF 的基本 Python 环境取决于 Databricks 计算。
对于群集,基本 Python 环境是群集上运行的 Databricks Runtime 版本的 Python 环境。 此基本环境中的 Python 版本和包列表位于 Databricks Runtime 发行说明的系统环境和已安装的 Python 库部分下。
对于无服务器计算,基本 Python 环境对应于下表的 无服务器环境版本 。 此表中未列出的 Databricks Connect 版本要么尚未支持无服务器架构,要么已达到支持结束。 请参阅 版本支持矩阵 和 停止支持的 Databricks Connect 版本。
| Databricks Connect 版本 | UDF 无服务器环境 |
|---|---|
| 18.0、Python 3.12 | 版本 5 |
| 17.2 到 17.3、Python 3.12 | 版本 4 |
| 16.4.1 至小于17,Python 3.12 | 版本 3 |
| 15.4.10 至 16 以下,Python 3.12 | 版本 3 |
| 15.4.10 至低于 16,Python 3.11 | 第 2 版 |