Databricks SDK for Python

本文介绍如何使用 Databricks SDK for Python 在 Azure Databricks 帐户、工作区和相关资源中自动执行操作。 本文补充了“阅读文档”中适用于 Python 的 Databricks SDK 文档,并补充了 GitHub 中适用于 Python 的 Databricks SDK 存储库中的代码示例

注意

此功能现提供 Beta 版本,可用于生产环境。

在 Beta 版本期间,Databricks 建议将依赖项固定到代码所依赖的 Databricks SDK for Python 的特定次要版本。 例如,可以在文件中固定依赖项,例如 venvrequirements.txt,或 Poetry 的 pyproject.tomlpoetry.lock。 有关固定依赖项的详细信息,请参阅 venv虚拟环境和包,或 Poetry 的安装依赖项

开始之前

可以从 Azure Databricks 笔记本内或从本地开发计算机使用 Databricks SDK for Python。

在开始使用 Databricks SDK for Python 之前,开发计算机必须满足以下要求:

  • 已配置 Azure Databricks 身份验证
  • 已安装 Python 3.8 或更高版本。 要自动处理 Azure Databricks 计算资源,Databricks 建议安装与目标 Azure Databricks 计算资源上安装的版本匹配的 Python 主要和次要版本。 本文的示例依赖于使用安装了 Python 3.10 的 Databricks Runtime 13.3 LTS 来自动化群集。 有关正确的版本,请参阅 Databricks Runtime 发行说明版本和兼容性以了解群集的 Databricks Runtime 版本。
  • Databricks 建议为与 Databricks SDK for Python 配合使用的每个 Python 代码项目创建并激活 Python 虚拟环境。 Python 虚拟环境有助于确保代码项目使用兼容版本的 Python 和 Python 包(在本例中为 Databricks SDK for Python 包)。 本文介绍如何在 Python 虚拟环境中使用 venvPotetry

使用 venv 创建 Python 虚拟环境

  1. 在终端中设置为 Python 代码项目的根目录,运行以下命令。 此命令指示 venv 对虚拟环境使用 Python 3.10,然后在 Python 代码项目根目录中的隐藏目录 .venv 中创建虚拟环境的支持文件。

    # Linux and macOS
    python3.10 -m venv ./.venv
    
    # Windows
    python3.10 -m venv .\.venv
    
  2. 使用 venv 激活虚拟环境。 请根据你的操作系统和终端类型,参阅 venv 文档了解要使用的正确命令。 例如,在运行 zsh 的 macOS 上:

    source ./.venv/bin/activate
    

    如果虚拟环境的名称(例如 .venv)显示于紧靠在终端提示符前面的括号中,则表示该虚拟环境已激活。

    若要随时停用虚拟环境,请运行命令 deactivate

    如果虚拟环境的名称不再显示于紧靠在终端提示符前面的括号中,则表示该虚拟环境已停用。

跳过开始使用 Databricks SDK for Python

使用 Poetry 创建虚拟环境

  1. 如果尚未安装 Poetry,请先安装。

  2. 在终端中设置为 Python 代码项目的根目录,运行以下命令,指示 poetry 初始化 Poetry 的 Python 代码项目。

    poetry init
    
  3. Poetry 显示多个需要完成的提示。 这些提示都不特定于 Databricks SDK for Python。 有关这些提示的信息,请参阅 init

  4. 完成提示后,Poetry 会将 pyproject.toml 文件添加到 Python 项目。 有关 pyproject.toml 文件的信息,请参阅 pyproject.toml 文件

  5. 在终端中仍设置为 Python 代码项目的根目录,运行以下命令。 此命令指示 poetry 读取 pyproject.toml 文件、安装和解决依赖项问题、创建 poetry.lock 文件来锁定依赖项,最后创建虚拟环境。

    poetry install
    
  6. 在终端中设置为 Python 代码项目的根目录,运行以下命令,指示 poetry 激活虚拟环境并进入 shell。

    poetry shell
    

    当虚拟环境的名称显示在终端提示符之前的括号中时,即表示你的虚拟环境已激活并且已进入 shell。

    若要随时停用虚拟环境并退出 shell,请运行命令 exit

    当虚拟环境的名称不再显示在紧接终端提示符前面的括号中时,就表示你已退出 shell。

    若要详细了解如何创建和管理 Poetry 环境,请参阅管理环境

开始使用 Databricks SDK for Python

本部分介绍如何从本地开发计算机开始使用 Databricks SDK for Python。 若要从 Azure Databricks 笔记本内使用 Databricks SDK for Python,请跳到从 Azure Databricks 笔记本内使用 Databricks SDK for Python

  1. 在已激活 Python 虚拟环境的情况下,在配置了 Azure Databricks 身份验证并安装了 Python 的开发计算机上,从 Python 包索引 (PyPI) 安装 databricks-sdk 包(及其依赖项),如下所示:

    Venv

    使用 pip 安装 databricks-sdk 包。 (在一些系统上,可能需要在此处和全文中将 pip3 替换为 pip。)

    pip3 install databricks-sdk
    

    诗歌

    poetry add databricks-sdk
    

    要在 Databricks SDK for Python 处于 Beta 阶段时安装 databricks-sdk 包的特定版本,请参阅该包的发布历史记录。 例如,若要安装版本 0.1.6,请运行:

    Venv

    pip3 install databricks-sdk==0.1.6
    

    诗歌

    poetry add databricks-sdk==0.1.6
    

    提示

    若要将现有的 Databricks SDK for Python 包安装升级到最新版本,请运行以下命令:

    Venv

    pip3 install --upgrade databricks-sdk
    

    诗歌

    poetry add databricks-sdk@latest
    

    要显示 Databricks SDK for Python 包的当前 Version 和其他详细信息,请运行以下命令:

    Venv

    pip3 show databricks-sdk
    

    诗歌

    poetry show databricks-sdk
    
  2. 在 Python 虚拟环境中,创建一个用于导入 Databricks SDK for Python 的 Python 代码文件。 以下示例位于包含以下内容的名为 main.py 的文件中,它仅列出 Azure Databricks 工作区中的所有群集:

    from databricks.sdk import WorkspaceClient
    
    w = WorkspaceClient()
    
    for c in w.clusters.list():
      print(c.cluster_name)
    
  3. 通过运行 python 命令来运行 Python 代码文件(假设文件名为 main.py):

    Venv

    python3.10 main.py
    

    诗歌

    如果位于虚拟环境的 shell 中:

    python3.10 main.py
    

    如果不在虚拟环境的 shell 中:

    poetry run python3.10 main.py
    

    注意

    如果不在前面的 w = WorkspaceClient() 调用中设置任何参数,Databricks SDK for Python 将使用其默认进程来尝试执行 Azure Databricks 身份验证。 若要替代此默认行为,请参阅下面的身份验证部分。

使用 Azure Databricks 帐户或工作区对 Databricks SDK for Python 进行身份验证

本部分介绍如何从本地开发计算机到 Azure Databricks 帐户或工作区对 Databricks SDK for Python 进行身份验证。 若要从 Azure Databricks 笔记本内对 Databricks SDK for Python 进行身份验证,请跳到从 Azure Databricks 笔记本内使用 Databricks SDK for Python

Databricks SDK for Python 实施 Databricks 客户端统一身份验证标准,这是一种整合且一致的体系结构和编程身份验证方法。 此方法有助于使 Azure Databricks 的身份验证设置和自动化更加集中和可预测。 借助此方法,你只需配置 Databricks 身份验证一次,然后即可在多个 Databricks 工具和 SDK 中使用该配置,而无需进一步更改身份验证配置。 有关详细信息,包括更完整的 Python 代码示例,请参阅 Databricks 客户端统一身份验证

注意

Databricks SDK for Python 尚未实现 Azure 托管标识身份验证

使用 Databricks SDK for Python 初始化 Databricks 身份验证的一些可用编码模式包括:

  • 通过以下操作之一使用 Databricks 默认身份验证:

    • 使用必填字段为目标 Databricks 身份验证类型创建或标识自定义 Databricks 配置文件。 然后将 DATABRICKS_CONFIG_PROFILE 环境变量设置为该自定义配置文件的名称。
    • 为目标 Databricks 身份验证类型设置所需的环境变量。

    然后实例化一个具有 Databricks 默认身份验证的 WorkspaceClient 对象,如下所示:

    from databricks.sdk import WorkspaceClient
    
    w = WorkspaceClient()
    # ...
    
  • 支持但不建议对必填字段进行硬编码,因为这可能会透露代码中的敏感信息,例如 Azure Databricks 个人访问令牌。 以下示例对用于 Databricks 令牌身份验证的 Azure Databricks 主机和访问令牌值进行硬编码:

    from databricks.sdk import WorkspaceClient
    
    w = WorkspaceClient(
      host  = 'https://...',
      token = '...'
    )
    # ...
    

另请参阅适用于 Python 的 Databricks SDK 文档中的身份验证

从 Azure Databricks 笔记本内使用 Databricks SDK for Python

可以从附加了 Azure Databricks 群集并且安装了 Databricks SDK for Python 的 Azure Databricks 笔记本调用 Databricks SDK for Python 功能。 所有使用 Databricks Runtime 13.3 LTS 或更高版本的 Azure Databricks 群集上均已安装了 Databricks SDK for Python。 对于使用 Databricks Runtime 12.2 LTS 及更低版本的 Azure Databricks 群集,必须先安装 Databricks SDK for Python。 请参阅步骤 1:安装或升级 Databricks SDK for Python

若要查看为特定的 Databricks Runtime 版本默认安装的 Databricks SDK for Python 的版本号,请参阅此 Databricks Runtime 版本的 Databricks Runtime 发行说明中的“已安装的 Python 库”部分。

Databricks SDK for Python 0.6.0 及更高版本使用默认的 Azure Databricks 笔记本身份验证。 默认 Azure Databricks 笔记本身份验证依赖于临时 Azure Databricks 个人访问令牌,Azure Databricks 在后台自动生成该令牌供自己使用。 Azure Databricks 会在笔记本停止运行后删除此临时令牌。

重要

默认 Azure Databricks 笔记本身份验证仅适用于群集的驱动程序节点,不适用于群集的任何辅助角色或执行程序节点。

Databricks Runtime 13.3 LTS 及更高版本支持向已安装的 Databricks SDK for Python 0.1.7 或更高版本进行默认 Azure Databricks 笔记本身份验证。 Databricks Runtime 10.4 LTS 及更高版本支持向已安装的 Databricks SDK for Python 0.1.10 或更高版本进行默认 Azure Databricks 笔记本身份验证。 但是,Databricks 建议安装或升级到 Databricks SDK for Python 0.6.0 或更高版本,以获得与默认 Azure Databricks 笔记本身份验证的最大兼容性,而不考虑 Databricks Runtime 版本。

如果要调用 Azure Databricks 帐户级 API,或者想要使用默认 Azure Databricks 笔记本身份验证以外的其他 Azure Databricks 身份验证类型,则必须在 Azure Databricks 群集上安装或升级 Databricks SDK for Python,如下所示:

身份验证类型 Databricks SDK for Python 版本
Microsoft Entra ID 服务主体身份验证 所有版本
Azure CLI 身份验证 所有版本
Databricks 个人访问令牌身份验证 所有版本

尚不支持 Azure 托管标识身份验证

Azure Databricks 笔记本身份验证不适用于 Azure Databricks 配置文件。

步骤 1:安装或升级 Databricks SDK for Python

  1. Azure Databricks Python 笔记本可以像使用任何其他 Python 库一样使用 Databricks SDK for Python。 若要在附加的 Azure Databricks 群集上安装或升级 Databricks SDK for Python 库,请从笔记本单元格运行 %pip magic 命令,如下所示:

    %pip install databricks-sdk --upgrade
    
  2. 运行 %pip magic 命令后必须重启 Python,使已安装或升级的库可供笔记本使用。 为此,请在使用 %pip magic 命令运行单元格后立即从笔记本单元格运行以下命令:

    dbutils.library.restartPython()
    
  3. 若要显示已安装的 Databricks SDK for Python 版本,请从笔记本单元格运行以下命令:

    %pip show databricks-sdk | grep -oP '(?<=Version: )\S+'
    

步骤 2:运行代码

在笔记本单元格中,创建可导入然后调用 Databricks SDK for Python 的 Python 代码。 以下示例使用默认的 Azure Databricks 笔记本身份验证列出 Azure Databricks 工作区中的所有群集:

from databricks.sdk import WorkspaceClient

w = WorkspaceClient()

for c in w.clusters.list():
  print(c.cluster_name)

运行此单元格时,会显示 Azure Databricks 工作区中所有可用群集的名称列表。

若要使用其他 Azure Databricks 身份验证类型,请参阅支持的 Azure Databricks 身份验证类型,然后单击相应的链接以获取其他技术详细信息。

使用 Databricks 实用工具

可以从本地开发计算机上运行的 Databricks SDK for Python 代码,也可以从 Azure Databricks 笔记本内调用 Databricks 实用工具 (dbutils) 参考

  • 在本地开发计算机上,Databricks 实用工具只能访问 dbutils.fsdbutils.secretsdbutils.widgetsdbutils.jobs 命令组。
  • 从附加到 Azure Databricks 群集的 Azure Databricks 笔记本中,Databricks 实用工具可以访问所有可用的 Databricks 实用工具命令组,而不仅仅是 dbutils.fsdbutils.secretsdbutils.widgets。 此外,dbutils.notebook 命令组仅限于两个级别的命令,例如 dbutils.notebook.rundbutils.notebook.exit

若要从本地开发计算机或 Azure Databricks 笔记本调用 Databricks 实用工具,请使用 WorkspaceClient 内的 dbutils。 以下代码示例使用默认的 Azure Databricks 笔记本身份验证调用 WorkspaceClient 中的 dbutils,以列出工作区的 DBFS 根中所有对象的路径。

from databricks.sdk import WorkspaceClient

w = WorkspaceClient()
d = w.dbutils.fs.ls('/')

for f in d:
  print(f.path)

另外,也可以直接调用 dbutils。 但是,只能使用默认的 Azure Databricks 笔记本身份验证。 此代码示例直接调用 dbutils 以列出工作区的 DBFS 根目录中所有对象的路径。

from databricks.sdk.runtime import *

d = dbutils.fs.ls('/')

for f in d:
  print(f.path)

若要访问 Unity Catalog ,请在 WorkspaceClient 中使用 files。 请参阅管理 Unity Catalog 卷中的文件。 不能单独使用(或在 WorkspaceClient 内使用)dbutils 来访问卷。

另请参阅与 dbutils 交互

代码示例

以下代码示例演示如何使用 Databricks SDK for Python 创建和删除群集、运行作业以及列出帐户级组。 这些代码示例使用默认的 Azure Databricks 笔记本身份验证。 有关默认 Azure Databricks 笔记本身份验证的详细信息,请参阅从 Azure Databricks 笔记本内使用 Databricks SDK for Python。 有关笔记本外部的默认身份验证的详细信息,请参阅使用 Azure Databricks 帐户或工作区对 Databricks SDK for Python 进行身份验证

有关其他代码示例,请参阅 GitHub 上的 Databricks SDK for Python 存储库中的示例。 另请参阅:

创建群集

此代码示例使用指定的 Databricks Runtime 版本和群集节点类型创建群集。 此群集有一个工作器,在空闲 15 分钟后自动终止。

from databricks.sdk import WorkspaceClient

w = WorkspaceClient()

print("Attempting to create cluster. Please wait...")

c = w.clusters.create_and_wait(
  cluster_name             = 'my-cluster',
  spark_version            = '12.2.x-scala2.12',
  node_type_id             = 'Standard_DS3_v2',
  autotermination_minutes  = 15,
  num_workers              = 1
)

print(f"The cluster is now ready at " \
      f"{w.config.host}#setting/clusters/{c.cluster_id}/configuration\n")

永久删除群集

此代码示例从工作区中永久删除具有指定群集 ID 的群集。

from databricks.sdk import WorkspaceClient

w = WorkspaceClient()

c_id = input('ID of cluster to delete (for example, 1234-567890-ab123cd4): ')

w.clusters.permanent_delete(cluster_id = c_id)

创建作业

此代码示例创建一个在指定群集上运行指定笔记本的 Azure Databricks 作业。 当代码运行时,它会从终端上的用户获取现有笔记本的路径、现有群集 ID 和相关作业设置。

from databricks.sdk import WorkspaceClient
from databricks.sdk.service.jobs import Task, NotebookTask, Source

w = WorkspaceClient()

job_name            = input("Some short name for the job (for example, my-job): ")
description         = input("Some short description for the job (for example, My job): ")
existing_cluster_id = input("ID of the existing cluster in the workspace to run the job on (for example, 1234-567890-ab123cd4): ")
notebook_path       = input("Workspace path of the notebook to run (for example, /Users/someone@example.com/my-notebook): ")
task_key            = input("Some key to apply to the job's tasks (for example, my-key): ")

print("Attempting to create the job. Please wait...\n")

j = w.jobs.create(
  name = job_name,
  tasks = [
    Task(
      description = description,
      existing_cluster_id = existing_cluster_id,
      notebook_task = NotebookTask(
        base_parameters = dict(""),
        notebook_path = notebook_path,
        source = Source("WORKSPACE")
      ),
      task_key = task_key
    )
  ]
)

print(f"View the job at {w.config.host}/#job/{j.job_id}\n")

管理 Unity Catalog 卷中的文件

此代码示例演示了对 WorkspaceClient 中用于访问 Unity Catalog files 功能的各种调用。

from databricks.sdk import WorkspaceClient

w = WorkspaceClient()

# Define volume, folder, and file details.
catalog            = 'main'
schema             = 'default'
volume             = 'my-volume'
volume_path        = f"/Volumes/{catalog}/{schema}/{volume}" # /Volumes/main/default/my-volume
volume_folder      = 'my-folder'
volume_folder_path = f"{volume_path}/{volume_folder}" # /Volumes/main/default/my-volume/my-folder
volume_file        = 'data.csv'
volume_file_path   = f"{volume_folder_path}/{volume_file}" # /Volumes/main/default/my-volume/my-folder/data.csv
upload_file_path   = './data.csv'

# Create an empty folder in a volume.
w.files.create_directory(volume_folder_path)

# Upload a file to a volume.
with open(upload_file_path, 'rb') as file:
  file_bytes = file.read()
  binary_data = io.BytesIO(file_bytes)
  w.files.upload(volume_file_path, binary_data, overwrite = True)

# List the contents of a volume.
for item in w.files.list_directory_contents(volume_path):
  print(item.path)

# List the contents of a folder in a volume.
for item in w.files.list_directory_contents(volume_folder_path):
  print(item.path)

# Print the contents of a file in a volume.
resp = w.files.download(volume_file_path)
print(str(resp.contents.read(), encoding='utf-8'))

# Delete a file from a volume.
w.files.delete(volume_file_path)

# Delete a folder from a volume.
w.files.delete_directory(volume_folder_path)

列出帐户级组

此代码示例列出 Azure Databricks 帐户中所有可用组的显示名称。

from databricks.sdk import AccountClient

a = AccountClient()

for g in a.groups.list():
  print(g.display_name)

测试

若要测试代码,请使用 Python 测试框架,例如 pytest。 若要在模拟条件下测试代码,而不调用 Azure Databricks REST API 终结点或更改 Azure Databricks 帐户或工作区的状态,请使用 Python 模拟库(如 unittest.mock)。

例如,给定以下名为 helpers.py 的文件,其中包含返回有关新群集的信息的 create_cluster 函数:

# helpers.py

from databricks.sdk import WorkspaceClient
from databricks.sdk.service.compute import ClusterDetails

def create_cluster(
  w: WorkspaceClient,
  cluster_name:            str,
  spark_version:           str,
  node_type_id:            str,
  autotermination_minutes: int,
  num_workers:             int
) -> ClusterDetails:
  response = w.clusters.create(
    cluster_name            = cluster_name,
    spark_version           = spark_version,
    node_type_id            = node_type_id,
    autotermination_minutes = autotermination_minutes,
    num_workers             = num_workers
  )
  return response

给定以下名为 main.py 的文件,用于调用 create_cluster 函数:

# main.py

from databricks.sdk import WorkspaceClient
from helpers import *

w = WorkspaceClient()

# Replace <spark-version> with the target Spark version string.
# Replace <node-type-id> with the target node type string.
response = create_cluster(
  w = w,
  cluster_name            = 'Test Cluster',
  spark_version           = '<spark-version>',
  node_type_id            = '<node-type-id>',
  autotermination_minutes = 15,
  num_workers             = 1
)

print(response.cluster_id)

以下名为 test_helpers.py 的文件测试 create_cluster 函数是否返回预期的响应。 此测试不会在目标工作区中创建群集,而是模拟 WorkspaceClient 对象,定义模拟对象的设置,然后将模拟对象传递给 create_cluster 函数。 然后,测试将检查函数是否返回新的模拟群集的预期 ID。

# test_helpers.py

from databricks.sdk import WorkspaceClient
from helpers import *
from unittest.mock import create_autospec # Included with the Python standard library.

def test_create_cluster():
  # Create a mock WorkspaceClient.
  mock_workspace_client = create_autospec(WorkspaceClient)

  # Set the mock WorkspaceClient's clusters.create().cluster_id value.
  mock_workspace_client.clusters.create.return_value.cluster_id = '123abc'

  # Call the actual function but with the mock WorkspaceClient.
  # Replace <spark-version> with the target Spark version string.
  # Replace <node-type-id> with the target node type string.
  response = create_cluster(
    w = mock_workspace_client,
    cluster_name            = 'Test Cluster',
    spark_version           = '<spark-version>',
    node_type_id            = '<node-type-id>',
    autotermination_minutes = 15,
    num_workers             = 1
  )

  # Assert that the function returned the mocked cluster ID.
  assert response.cluster_id == '123abc'

若要运行此测试,请从代码项目的根目录运行 pytest 命令,该命令应生成如下所示的测试结果:

$ pytest
=================== test session starts ====================
platform darwin -- Python 3.12.2, pytest-8.1.1, pluggy-1.4.0
rootdir: <project-rootdir>
collected 1 item

test_helpers.py . [100%]
======================== 1 passed ==========================

其他资源

有关详细信息,请参阅: