注意
Databricks 建议使用声明性自动化捆绑包来创建、开发、部署和测试作业和其他 Databricks 资源作为源代码。 请参阅什么是声明性自动化捆绑包?
本文中,您将学习如何使用 Databricks SDK for Python 自动化 Azure Databricks 操作并加速开发。 本文是对 Read The Docs 上 Databricks SDK for Python 文档以及 GitHub 上 Databricks SDK for Python 仓库中的代码示例的补充。
注意
用于Python的 Databricks SDK 位于 Beta 中,可用于生产环境。
在 Beta 期间,Databricks 建议锁定你的代码所依赖的 Python 的 Databricks SDK 的特定小版本。 例如,可以在文件中固定依赖项,例如 requirements.txt 的 venv,或 Poetry 的 pyproject.toml 和 poetry.lock。 有关固定依赖项的详细信息,请参阅关于虚拟环境和程序包的venv,或 Poetry 的安装依赖项。
开始之前
可以在 Azure Databricks 笔记本中或从本地开发计算机上使用适用于 Python 的 Databricks SDK。
- 若要在 Azure Databricks 笔记本中使用 Databricks SDK for Python,请跳到 从 Azure Databricks 笔记本使用 Databricks SDK for Python。
- 若要在本地开发计算机上使用 Databricks SDK for Python,请完成本部分中的步骤。
在开始使用 Databricks SDK for Python 之前,开发计算机必须具备:
- 已配置 Azure Databricks authentication。
- 已安装 Python 3.8 或更高版本。 为了自动化Azure Databricks计算资源,Databricks建议安装与目标Azure Databricks计算资源上已安装的Python主版本和次版本相匹配的Python版本。 本文的示例依赖于使用已安装 Python 3.10 的 Databricks Runtime 13.3 LTS 自动执行群集。 有关正确的版本,请参阅 Databricks Runtime 发行说明版本和兼容性以了解群集的 Databricks Runtime 版本。
- Databricks 建议为使用 Databricks Python SDK 的每个 Python 项目创建和激活 Python 虚拟环境。 Python虚拟环境有助于确保代码项目使用兼容版本的Python和Python包(在本例中为用于 Python 包的 Databricks SDK)。 有关Python虚拟环境的详细信息,请参阅 venv 或 Poetry。
用于 Python 的 Databricks SDK 入门
本部分介绍如何从本地开发计算机开始使用用于Python的 Databricks SDK。 若要在 Azure Databricks 笔记本中使用 Databricks SDK for Python,请跳到 从 Azure Databricks 笔记本使用 Databricks SDK for Python。
在配置了Azure Databricks身份验证的开发计算机上,Python已安装,Python虚拟环境已激活,请从Python包索引(PyPI)安装 databricks-sdk 包(及其依赖项),如下所示:
Venv
使用
pip安装databricks-sdk包。 (在一些系统上,可能需要在此处和全文中将pip3替换为pip。)pip3 install databricks-sdk诗歌
poetry add databricks-sdk若要在 Databricks SDK for Python 处于 Beta 版时安装特定版本的
databricks-sdk包,请参阅包的 Release 历史记录。 例如,若要安装版本0.1.6,请运行:Venv
pip3 install databricks-sdk==0.1.6诗歌
poetry add databricks-sdk==0.1.6若要将 Databricks SDK for Python 包的现有安装升级到最新版本,请运行以下命令:
Venv
pip3 install --upgrade databricks-sdk诗歌
poetry add databricks-sdk@latest若要显示 Databricks SDK for Python 软件包的当前
Version和其他详细信息,请运行以下命令:Venv
pip3 show databricks-sdk诗歌
poetry show databricks-sdk在Python虚拟环境中,创建Python代码文件,用于导入用于Python的 Databricks SDK。 以下示例在包含以下内容的名为
main.py的文件中,只需列出Azure Databricks工作区中的所有群集:from databricks.sdk import WorkspaceClient w = WorkspaceClient() for c in w.clusters.list(): print(c.cluster_name)通过运行
main.py命令运行Python代码文件(假设名为python的文件):Venv
python3.10 main.py诗歌
如果位于虚拟环境的 shell 中:
python3.10 main.py如果不在虚拟环境的 shell 中:
poetry run python3.10 main.py注意
不在上述调用中为
w = WorkspaceClient()设置任何参数时,Databricks SDK for Python 会使用其默认的方法尝试执行 Azure Databricks 身份验证。 若要替代此默认行为,请参阅下面的身份验证部分。
使用 Azure Databricks 帐户或工作区对用于Python的 Databricks SDK 进行身份验证
本节介绍如何通过本地开发计算机认证 Python 的 Databricks SDK,以访问你的 Azure Databricks 帐户或工作区。 若要从 Azure Databricks 笔记本中对用于 Python 的 Databricks SDK 进行身份验证,请直接跳到 从 Azure Databricks 笔记本中使用 Databricks SDK for Python。
用于Python的 Databricks SDK 实现了 Databricks 统一身份验证标准,这是一种统一且一致的体系结构和编程身份验证方法。 此方法有助于通过Azure Databricks更集中、更可预测的方式设置和自动化身份验证。 借助此方法,你只需配置 Databricks 身份验证一次,然后即可在多个 Databricks 工具和 SDK 中使用该配置,而无需进一步更改身份验证配置。 有关详细信息,包括Python中更完整的代码示例,请参阅 Databricks 统一身份验证。
注意
用于Python的 Databricks SDK 尚未实现使用Azure托管身份进行认证。
使用用于Python的 Databricks SDK 初始化 Databricks 身份验证的一些可用编码模式包括:
通过以下操作之一使用 Databricks 默认身份验证:
- 创建或识别一个包含必填字段的自定义 Databricks 配置文件,以用于目标 Databricks 身份验证类型。 然后将
DATABRICKS_CONFIG_PROFILE环境变量设置为该自定义配置文件的名称。 - 为目标 Databricks 身份验证类型设置所需的环境变量。
然后实例化一个具有 Databricks 默认身份验证的
WorkspaceClient对象,如下所示:from databricks.sdk import WorkspaceClient w = WorkspaceClient() # ...- 创建或识别一个包含必填字段的自定义 Databricks 配置文件,以用于目标 Databricks 身份验证类型。 然后将
支持硬编码必填字段,但不建议这样做,因为它可能会公开代码中的敏感信息,例如Azure Databricks个人访问令牌。 以下示例对 Azure Databricks 主机和用于 Databricks 令牌身份验证的访问令牌值进行硬编码:
from databricks.sdk import WorkspaceClient w = WorkspaceClient( host = 'https://...', token = '...' ) # ...
另请参阅 Databricks SDK for Python 文档中的 Authentication。
使用 Databricks SDK for Python 从 Azure Databricks 笔记本中使用 Python
可以在安装了 Databricks SDK for Python 的 Azure Databricks 群集上附加的 Azure Databricks 笔记本中调用 Databricks SDK for Python 的功能。 默认情况下,它安装在使用 Databricks Runtime 13.3 LTS 或更高版本的所有Azure Databricks群集上。 对于使用 Databricks Runtime 12.2 LTS 及更低版本Azure Databricks群集,必须先安装 Databricks SDK for Python。 请参阅 Step 1:安装或升级 Databricks SDK for Python。
若要查看特定 Databricks Runtime 版本中安装的 Databricks SDK for Python 的版本,请参阅该版本的 Databricks Runtime 发行说明中的“已安装的 Python 库”部分。
Databricks 建议从 PiPy 安装最新版本的 SDK,但至少安装或升级到 databricks SDK for Python 0.6.0 或更高版本,因为默认情况下,Azure Databricks笔记本身份验证由版本 0.6.0 及更高版本用于所有 Databricks Runtime 版本。
注意
Databricks Runtime 15.1 是第一个安装 Databricks SDK for Python (0.20.0) 版本的 Databricks Runtime,它支持默认笔记本身份验证,无需升级。
下表概述了对用于 Python 和 Databricks Runtime 版本的 Databricks SDK 的笔记本身份验证支持:
| SDK/Databricks运行时环境 | 10.4 LTS | 11.3 LTS | 12.3 LTS | 13.3 LTS | 14.3 LTS | 15.1 及更高版本 |
|---|---|---|---|---|---|---|
| 0.1.7 及更低版本 | ||||||
| 0.1.10 | ✓ | ✓ | ✓ | ✓ | ✓ | |
| 0.6.0 | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ |
| 0.20.0 及更高版本 | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ |
默认Azure Databricks笔记本身份验证依赖于Azure Databricks在后台自动生成的临时Azure Databricks个人访问令牌供其自己使用。 Azure Databricks笔记本停止运行后删除此临时令牌。
重要
- 默认Azure Databricks笔记本身份验证仅适用于群集的驱动程序节点,而不适用于群集的任何辅助角色或执行程序节点。
- Azure Databricks笔记本身份验证不适用于Azure Databricks配置文件。
- Azure Databricks笔记本身份验证不适用于 Databricks 容器服务。
如果要调用 Azure Databricks account 级别 API,或者想要使用默认 Databricks 笔记本身份验证以外的 Databricks 身份验证类型,也支持以下身份验证类型:
| 身份验证类型 | 用于Python版本的 Databricks SDK |
|---|---|
| OAuth 机器对机器 (M2M) 身份验证 | 0.18.0 及更高版本 |
| OAuth 用户对机器 (U2M) 身份验证 | 0.19.0 及更高版本 |
| Microsoft Entra ID服务主体身份验证 | 所有版本 |
| Azure CLI身份验证 | 所有版本 |
| Databricks 个人访问令牌身份验证 | 所有版本 |
尚不支持Azure托管标识身份验证。
步骤 1:安装或升级用于 Python 的 Databricks SDK
注意
默认情况下,databricks SDK for Python 安装在使用 Databricks Runtime 13.3 LTS 或更高版本的所有Azure Databricks群集上。
Azure Databricks Python笔记本可以像使用其他任何Python库一样使用 Databricks 的 Python SDK。 若要在附加Azure Databricks群集上安装或升级用于 Python 库的 Databricks SDK,请从笔记本单元运行
%pipmagic 命令,如下所示:%pip install databricks-sdk --upgrade运行
%pipmagic 命令后,必须重启Python以使已安装或升级的库可供笔记本使用。 为此,请在包含%pip魔法命令的单元格之后的笔记本单元格中立即运行以下命令:dbutils.library.restartPython()若要显示用于Python的 Databricks SDK 的已安装版本,请从笔记本单元运行以下命令:
%pip show databricks-sdk | grep -oP '(?<=Version: )\S+'
步骤 2:运行代码
在笔记本单元中创建 Python 代码,导入并调用适用于 Python 的 Databricks SDK。 以下示例使用默认Azure Databricks笔记本身份验证列出Azure Databricks工作区中的所有群集:
from databricks.sdk import WorkspaceClient
w = WorkspaceClient()
for c in w.clusters.list():
print(c.cluster_name)
运行此单元格时,将显示Azure Databricks工作区中所有可用群集的名称列表。
若要使用不同的Azure Databricks身份验证类型,请参阅 Authorization 方法并单击相应的链接以获取其他技术详细信息。
使用 Databricks 实用工具
可以使用 Databricks SDK 中的 Databricks Utilities 在本地开发计算机上或Azure Databricks笔记本中运行的Python代码。
- 在本地开发计算机上,Databricks 实用工具只能访问
dbutils.fs、dbutils.secrets、dbutils.widgets和dbutils.jobs命令组。 - 从附加到Azure Databricks群集的Azure Databricks笔记本中,Databricks Utilities 可以访问所有可用的 Databricks Utilities 命令组,但
dbutils.notebook命令组仅限于两个级别的命令,例如,dbutils.notebook.run或dbutils.notebook.exit。
若要从本地开发计算机或 Azure Databricks 笔记本调用 Databricks Utilities,请在 dbutils 中使用 WorkspaceClient。 此代码示例使用默认Azure Databricks笔记本身份验证在 dbutils 内调用 WorkspaceClient,以列出工作区的 DBFS 根目录中所有对象的路径。
from databricks.sdk import WorkspaceClient
w = WorkspaceClient()
d = w.dbutils.fs.ls('/')
for f in d:
print(f.path)
另外,也可以直接调用 dbutils。 但是,只能使用默认Azure Databricks笔记本身份验证。 此代码示例直接调用 dbutils 以列出工作区的 DBFS 根目录中所有对象的路径。
from databricks.sdk.runtime import *
d = dbutils.fs.ls('/')
for f in d:
print(f.path)
若要访问 Unity Catalog 卷,请在 files 中使用 WorkspaceClient。 请参阅管理 Unity Catalog 卷中的文件。 不能单独使用(或在 dbutils 内使用)WorkspaceClient 来访问卷。
另请参阅与 dbutils 交互。
代码示例
以下代码示例演示如何使用用于Python的 Databricks SDK 来创建和删除群集、运行作业和列出帐户级组。 这些代码示例使用默认Azure Databricks笔记本身份验证。 有关默认 Azure Databricks 笔记本的身份验证的详细信息,请参阅 使用 Azure Databricks 笔记本中的 Databricks SDK for Python。 有关笔记本外部的默认身份验证的详细信息,请参阅 使用 Azure Databricks 帐户或工作区对用于Python的 Databricks SDK 进行身份验证。
有关其他代码示例,请参阅 GitHub 中 Databricks SDK for Python 存储库中的 examples。 另请参阅:
- Databricks 工作区 API 参考
- Databricks 帐户 API 文档
- 创建群集
- 永久删除群集
- 创建作业
- 创建使用无服务器计算的作业
- 管理 Unity Catalog 卷中的文件
- 列出帐户级组
创建群集
此代码示例使用指定的 Databricks Runtime 版本和群集节点类型创建群集。 此群集有一个工作器,在空闲 15 分钟后自动终止。
from databricks.sdk import WorkspaceClient
w = WorkspaceClient()
print("Attempting to create cluster. Please wait...")
c = w.clusters.create_and_wait(
cluster_name = 'my-cluster',
spark_version = '12.2.x-scala2.12',
node_type_id = 'Standard_DS3_v2',
autotermination_minutes = 15,
num_workers = 1
)
print(f"The cluster is now ready at " \
f"{w.config.host}#setting/clusters/{c.cluster_id}/configuration\n")
永久删除群集
此代码示例从工作区中永久删除具有指定群集 ID 的群集。
from databricks.sdk import WorkspaceClient
w = WorkspaceClient()
c_id = input('ID of cluster to delete (for example, 1234-567890-ab123cd4): ')
w.clusters.permanent_delete(cluster_id = c_id)
创建作业
此代码示例创建一个Azure Databricks作业,该作业在指定的群集上运行指定的笔记本。 当代码运行时,它会从终端上的用户获取现有笔记本的路径、现有群集 ID 和相关作业设置。
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.jobs import Task, NotebookTask, Source
w = WorkspaceClient()
job_name = input("Some short name for the job (for example, my-job): ")
description = input("Some short description for the job (for example, My job): ")
existing_cluster_id = input("ID of the existing cluster in the workspace to run the job on (for example, 1234-567890-ab123cd4): ")
notebook_path = input("Workspace path of the notebook to run (for example, /Users/someone@example.com/my-notebook): ")
task_key = input("Some key to apply to the job's tasks (for example, my-key): ")
print("Attempting to create the job. Please wait...\n")
j = w.jobs.create(
name = job_name,
tasks = [
Task(
description = description,
existing_cluster_id = existing_cluster_id,
notebook_task = NotebookTask(
base_parameters = dict(""),
notebook_path = notebook_path,
source = Source("WORKSPACE")
),
task_key = task_key
)
]
)
print(f"View the job at {w.config.host}/#job/{j.job_id}\n")
创建使用无服务器计算的作业
以下示例创建一个作业,该作业使用适用于作业的无服务器计算:
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.jobs import NotebookTask, Source, Task
w = WorkspaceClient()
j = w.jobs.create(
name = "My Serverless Job",
tasks = [
Task(
notebook_task = NotebookTask(
notebook_path = "/Users/someone@example.com/MyNotebook",
source = Source("WORKSPACE")
),
task_key = "MyTask",
)
]
)
管理 Unity Catalog 卷中的文件
此代码示例演示了在files中对WorkspaceClient功能的各种调用,以访问 Unity Catalog 卷。 有关详细信息,请参阅 完整的 SDK 文档。
用于Python的 Databricks SDK 提供了两种上传和下载文件的方法。 推荐使用 upload_from 和 download_to 作为本地文件路径。
upload 和 download 推荐用于内存和流接口中的数据。 所有四种方法都支持使用 SDK v0.72.0 或更高版本时基础云存储支持的最大大小的文件。
如果Azure存储帐户启用了防火墙,则文件大小限制为 5 GB。
from databricks.sdk import WorkspaceClient
w = WorkspaceClient()
# Define volume, folder, and file details.
catalog = 'main'
schema = 'default'
volume = 'my-volume'
volume_path = f"/Volumes/{catalog}/{schema}/{volume}" # /Volumes/main/default/my-volume
volume_folder = 'my-folder'
volume_folder_path = f"{volume_path}/{volume_folder}" # /Volumes/main/default/my-volume/my-folder
volume_file = 'data.csv'
volume_file_path = f"{volume_folder_path}/{volume_file}" # /Volumes/main/default/my-volume/my-folder/data.csv
upload_file_path = './data.csv'
# Create an empty folder in a volume.
w.files.create_directory(volume_folder_path)
# Upload a file to a volume (method 1: recommended when your data is in a local file path)
w.files.upload_from(volume_file_path, upload_file_path, overwrite=True)
# Upload a file to a volume (method 2: recommended when your data is in-memory, or not a local file)
with open(upload_file_path, "rb") as f:
w.files.upload(volume_file_path, io.BytesIO(f.read()), overwrite=True)
# List the contents of a volume.
for item in w.files.list_directory_contents(volume_path):
print(item.path)
# List the contents of a folder in a volume.
for item in w.files.list_directory_contents(volume_folder_path):
print(item.path)
# Download a file from a volume (method 1: recommended when data needs to be stored in a local file; fastest)
w.files.download_to(volume_file_path, local_download_path)
# Download a file from a volume (method 2: recommended when you need the data in-memory)
resp = w.files.download(volume_file_path)
chunk_size = 8192
with resp.contents as f:
while True:
chunk = f.read(chunk_size)
if not chunk:
break
print(f"Read {len(chunk)} characters")
# Delete a file from a volume.
w.files.delete(volume_file_path)
# Delete a folder from a volume.
w.files.delete_directory(volume_folder_path)
列出账户级别用户组
此代码示例列出Azure Databricks帐户中所有可用组的显示名称。
注意
不支持AccountClient本地的笔记本身份验证,因此必须在构建函数中设置凭据,才能在笔记本中运行此示例。
from databricks.sdk import AccountClient
a = AccountClient()
for g in a.groups.list():
print(g.display_name)
测试
若要测试代码,请使用 Python 测试框架,例如 pytest。 若要在不调用Azure Databricks REST API 终结点或更改Azure Databricks帐户或工作区的状态的情况下测试代码,请使用Python模拟库,例如 unittest.mock。
提示
Databricks Labs 提供 pytest 插件,可简化与 Databricks 的集成测试,也提供 pylint 插件,可确保代码质量。
以下名为 helpers.py 的示例文件包含返回有关新群集的信息的 create_cluster 函数:
# helpers.py
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.compute import ClusterDetails
def create_cluster(
w: WorkspaceClient,
cluster_name: str,
spark_version: str,
node_type_id: str,
autotermination_minutes: int,
num_workers: int
) -> ClusterDetails:
response = w.clusters.create(
cluster_name = cluster_name,
spark_version = spark_version,
node_type_id = node_type_id,
autotermination_minutes = autotermination_minutes,
num_workers = num_workers
)
return response
给定以下名为 main.py 的文件,用于调用 create_cluster 函数:
# main.py
from databricks.sdk import WorkspaceClient
from helpers import *
w = WorkspaceClient()
# Replace <spark-version> with the target Spark version string.
# Replace <node-type-id> with the target node type string.
response = create_cluster(
w = w,
cluster_name = 'Test Cluster',
spark_version = '<spark-version>',
node_type_id = '<node-type-id>',
autotermination_minutes = 15,
num_workers = 1
)
print(response.cluster_id)
以下名为 test_helpers.py 的文件测试 create_cluster 函数是否返回预期的响应。 此测试不会在目标工作区中创建群集,而是模拟 WorkspaceClient 对象,定义模拟对象的设置,然后将模拟对象传递给 create_cluster 函数。 然后,测试将检查函数是否返回新的模拟群集的预期 ID。
# test_helpers.py
from databricks.sdk import WorkspaceClient
from helpers import *
from unittest.mock import create_autospec # Included with the Python standard library.
def test_create_cluster():
# Create a mock WorkspaceClient.
mock_workspace_client = create_autospec(WorkspaceClient)
# Set the mock WorkspaceClient's clusters.create().cluster_id value.
mock_workspace_client.clusters.create.return_value.cluster_id = '123abc'
# Call the actual function but with the mock WorkspaceClient.
# Replace <spark-version> with the target Spark version string.
# Replace <node-type-id> with the target node type string.
response = create_cluster(
w = mock_workspace_client,
cluster_name = 'Test Cluster',
spark_version = '<spark-version>',
node_type_id = '<node-type-id>',
autotermination_minutes = 15,
num_workers = 1
)
# Assert that the function returned the mocked cluster ID.
assert response.cluster_id == '123abc'
若要运行此测试,请从代码项目的根目录运行 pytest 命令,该命令应生成如下所示的测试结果:
$ pytest
=================== test session starts ====================
platform darwin -- Python 3.12.2, pytest-8.1.1, pluggy-1.4.0
rootdir: <project-rootdir>
collected 1 item
test_helpers.py . [100%]
======================== 1 passed ==========================
Troubleshooting
本部分介绍用于 Python 的 Databricks SDK 常见问题的解决方案。
若要报告问题或其他反馈,请在 GitHub 中创建一个与 Databricks SDK for Python 相关的问题。
错误:无法分析响应
如果在尝试使用 Databricks SDK for Python 时收到以下错误,则它几乎总是指示身份验证配置出现问题。
Error: unable to parse response. This is likely a bug in the Databricks SDK for Python or the underlying REST API.
如果遇到此错误,请验证以下内容:
- 确保 Databricks 主机已正确设置。
- 确认身份验证方法具有尝试执行的 API作所需的权限。
- 如果位于企业防火墙后面,请确保它不会阻止或重定向 API 流量。
导致此错误的一个常见原因是专用链接将 SDK 重定向到登录页,而 SDK 无法处理这种重定向。 尝试访问启用了专用链接的工作区时,通常会出现这种情况,该工作区配置了不允许公共 Internet 访问,而访问来自不同于VPC终端所在的网络。
有关更多详细信息,请参阅:
其他资源
有关详细信息,请参阅:
- 用于 Python 的 Databricks SDK 文档
- 其他代码示例
- Databricks 工作区 API 参考
- Databricks 帐户 API 文档
- 日志记录
- 长期运行的操作
- 分页响应
- 使用 OAuth 进行单一登录