Databricks SDK for R

注意

本文介绍 Databricks Labs 提供的 Databricks SDK for R,它处于试验状态。 若要提供反馈、提出问题和报告问题,请使用 GitHub 上 Databricks SDK for R 存储库中的“问题”选项卡。

本文介绍如何使用 Databricks SDK for R 在 Azure Databricks 工作区中自动执行 Azure Databricks 操作。本文补充了 Databricks SDK for R 文档

注意

Databricks SDK for R 不支持 Azure Databricks 帐户中的操作自动化。 若要调用帐户级操作,请使用其他 Databricks SDK,例如:

开始之前

在开始使用 Databricks SDK for R 之前,开发计算机必须满足以下要求:

  • 你要自动化的目标 Azure Databricks 工作区的 Azure Databricks 个人访问令牌

    注意

    Databricks SDK for R 仅支持 Azure Databricks 个人访问令牌身份验证。

  • R 或者 R 兼容的集成开发环境 (IDE)。 Databricks 建议使用 RStudio Desktop,并在本文的说明中使用它。

开始使用 Databricks SDK for R

  1. 使你的 Azure Databricks 工作区 URL 和个人访问令牌可用于你的 R 项目的脚本。 例如,可以将以下内容添加到 R 项目的 .Renviron 文件中。 将 <your-workspace-url> 替换为每工作区 URL,例如 https://adb-1234567890123456.7.databricks.azure.cn。 将 <your-personal-access-token> 替换为你的 Azure Databricks 个人访问令牌,例如 dapi12345678901234567890123456789012

    DATABRICKS_HOST=<your-workspace-url>
    DATABRICKS_TOKEN=<your-personal-access-token>
    

    要创建 Azure Databricks 个人访问令牌,请遵循适用于工作区用户的 Azure Databricks 个人访问令牌中的步骤。

    有关提供 Azure Databricks 工作区 URL 和个人访问令牌的其他方法,请参阅 GitHub 中 Databricks SDK for R 存储库中的身份验证

    重要

    请勿将 .Renviron 文件添加到版本控制系统,因为这有公开敏感信息的风险,例如 Azure Databricks 个人访问令牌。

  2. 安装 Databricks SDK for R 包。 例如,在 RStudio Desktop 中,在“控制台”视图(“视图 > 将焦点移动到控制台”)中,运行以下命令,一次一个:

    install.packages("devtools")
    library(devtools)
    install_github("databrickslabs/databricks-sdk-r")
    

    注意

    Databricks SDK for R 包在 CRAN 上不可用。

  3. 添加代码以引用 Databricks SDK for R 并列出 Azure Databricks 工作区中的所有群集。 例如,在项目的 main.r 文件中,代码可能如下所示:

    require(databricks)
    
    client <- DatabricksClient()
    
    list_clusters(client)[, "cluster_name"]
    
  4. 运行脚本。 例如,在 RStudio Desktop 中,在项目的 main.r 文件处于活动状态的脚本编辑器中,单击“源”>“源”或“带回显的源”

  5. 此时会显示群集列表。 例如,在 RStudio Desktop 中,它位于控制台视图中。

代码示例

以下代码示例演示如何使用 Databricks SDK for R 创建和删除群集以及创建作业。

创建群集

此代码示例使用指定的 Databricks Runtime 版本和群集节点类型创建群集。 此群集有一个工作器,群集在空闲 15 分钟后自动终止。

require(databricks)

client <- DatabricksClient()

response <- create_cluster(
  client = client,
  cluster_name = "my-cluster",
  spark_version = "12.2.x-scala2.12",
  node_type_id = "Standard_DS3_v2",
  autotermination_minutes = 15,
  num_workers = 1
)

# Get the workspace URL to be used in the following results message.
get_client_debug <- strsplit(client$debug_string(), split = "host=")
get_host <- strsplit(get_client_debug[[1]][2], split = ",")
host <- get_host[[1]][1]

# Make sure the workspace URL ends with a forward slash.
if (endsWith(host, "/")) {
} else {
  host <- paste(host, "/", sep = "")
}

print(paste(
  "View the cluster at ",
  host,
  "#setting/clusters/",
  response$cluster_id,
  "/configuration",
  sep = "")
)

永久删除群集

此代码示例从工作区中永久删除具有指定群集 ID 的群集。

require(databricks)

client <- DatabricksClient()

cluster_id <- readline("ID of the cluster to delete (for example, 1234-567890-ab123cd4):")

delete_cluster(client, cluster_id)

创建作业

此代码示例创建一个可用于在指定群集上运行指定笔记本的 Azure Databricks 作业。 此代码运行时,它会从控制台的用户获取现有笔记本的路径、现有群集 ID 和相关作业设置。

require(databricks)

client <- DatabricksClient()

job_name <- readline("Some short name for the job (for example, my-job):")
description <- readline("Some short description for the job (for example, My job):")
existing_cluster_id <- readline("ID of the existing cluster in the workspace to run the job on (for example, 1234-567890-ab123cd4):")
notebook_path <- readline("Workspace path of the notebook to run (for example, /Users/someone@example.com/my-notebook):")
task_key <- readline("Some key to apply to the job's tasks (for example, my-key):")

print("Attempting to create the job. Please wait...")

notebook_task <- list(
  notebook_path = notebook_path,
  source = "WORKSPACE"
)

job_task <- list(
  task_key = task_key,
  description = description,
  existing_cluster_id = existing_cluster_id,
  notebook_task = notebook_task
)

response <- create_job(
  client,
  name = job_name,
  tasks = list(job_task)
)

# Get the workspace URL to be used in the following results message.
get_client_debug <- strsplit(client$debug_string(), split = "host=")
get_host <- strsplit(get_client_debug[[1]][2], split = ",")
host <- get_host[[1]][1]

# Make sure the workspace URL ends with a forward slash.
if (endsWith(host, "/")) {
} else {
  host <- paste(host, "/", sep = "")
}

print(paste(
  "View the job at ",
  host,
  "#job/",
  response$job_id,
  sep = "")
)

日志记录

可以使用常用 logging 包来记录消息。 此包支持多个日志记录级别和自定义日志格式。 可以使用此包将消息记录到控制台或文件。 若要记录消息,请执行以下操作:

  1. 安装 logging 包。 例如,在 RStudio Desktop 中的“控制台”视图中(“视图”>“将焦点移动到控制台”),运行以下命令:

    install.packages("logging")
    library(logging)
    
  2. 启动日志记录包,设置记录消息的位置,并设置日志记录级别。 例如,以下代码将所有 ERROR 消息记录到 results.log 文件中。

    basicConfig()
    addHandler(writeToFile, file="results.log")
    setLevel("ERROR")
    
  3. 根据需要记录消息。 例如,如果代码无法进行身份验证或列出可用群集的名称,则以下代码会记录任何错误。

    require(databricks)
    require(logging)
    
    basicConfig()
    addHandler(writeToFile, file="results.log")
    setLevel("ERROR")
    
    tryCatch({
      client <- DatabricksClient()
    }, error = function(e) {
      logerror(paste("Error initializing DatabricksClient(): ", e$message))
      return(NA)
    })
    
    tryCatch({
      list_clusters(client)[, "cluster_name"]
    }, error = function(e) {
      logerror(paste("Error in list_clusters(client): ", e$message))
      return(NA)
    })
    

测试

若要测试代码,可以使用 R 测试框架(如 testthat)。 若要在不调用 Azure Databricks REST API 终结点或更改 Azure Databricks 帐户或工作区的状态的情况下在模拟条件下测试代码,可以使用 R 模拟库(如 mockery)。

例如,给定以下名为 helpers.r 的文件,其中包含返回有关新群集的信息的 createCluster 函数:

library(databricks)

createCluster <- function(
  databricks_client,
  cluster_name,
  spark_version,
  node_type_id,
  autotermination_minutes,
  num_workers
) {
  response <- create_cluster(
    client = databricks_client,
    cluster_name = cluster_name,
    spark_version = spark_version,
    node_type_id = node_type_id,
    autotermination_minutes = autotermination_minutes,
    num_workers = num_workers
  )
  return(response)
}

给定以下名为 main.R 的文件,用于调用 createCluster 函数:

library(databricks)
source("helpers.R")

client <- DatabricksClient()

# Replace <spark-version> with the target Spark version string.
# Replace <node-type-id> with the target node type string.
response = createCluster(
  databricks_client = client,
  cluster_name = "my-cluster",
  spark_version = "<spark-version>",
  node_type_id = "<node-type-id>",
  autotermination_minutes = 15,
  num_workers = 1
)

print(response$cluster_id)

以下名为 test-helpers.py 的文件测试 createCluster 函数是否返回预期的响应。 此测试不会在目标工作区中创建群集,而是模拟 DatabricksClient 对象,定义模拟对象的设置,然后将模拟对象传递给 createCluster 函数。 然后,测试将检查函数是否返回新的模拟群集的预期 ID。

# install.packages("testthat")
# install.pacakges("mockery")
# testthat::test_file("test-helpers.R")
lapply(c("databricks", "testthat", "mockery"), library, character.only = TRUE)
source("helpers.R")

test_that("createCluster mock returns expected results", {
  # Create a mock response.
  mock_response <- list(cluster_id = "abc123")

  # Create a mock function for create_cluster().
  mock_create_cluster <- mock(return_value = mock_response)

  # Run the test with the mock function.
  with_mock(
    create_cluster = mock_create_cluster,
    {
      # Create a mock Databricks client.
      mock_client <- mock()

      # Call the function with the mock client.
      # Replace <spark-version> with the target Spark version string.
      # Replace <node-type-id> with the target node type string.
      response <- createCluster(
        databricks_client = mock_client,
        cluster_name = "my-cluster",
        spark_version = "<spark-version>",
        node_type_id = "<node-type-id>",
        autotermination_minutes = 15,
        num_workers = 1
      )

      # Check that the function returned the correct mock response.
      expect_equal(response$cluster_id, "abc123")
    }
  )
})

其他资源

有关详细信息,请参阅: