Databricks SDK for Go

本文介绍如何使用 Databricks SDK for Go 在 Azure Databricks 帐户、工作区和相关资源中自动执行操作。 本文是对 Databricks SDK for Go READMEAPI 参考示例的补充。

注意

此功能现提供 Beta 版本,可用于生产环境。

例如,在 Beta 版本期间,Databricks 建议在项目的 go.mod 文件中将依赖项固定到代码所依赖的 Databricks SDK for Go 的特定次要版本。 有关固定依赖项的详细信息,请参阅管理依赖项

开始之前

在开始使用 Databricks SDK for Go 之前,开发计算机必须满足以下要求:

开始使用 Databricks SDK for Go

  1. 在已安装 Go、已创建现有 Go 代码项目并已配置 Azure Databricks 身份验证的开发计算机上,通过运行 go mod init 命令创建一个 go.mod 文件来跟踪 Go 代码的依赖项,例如:

    go mod init sample
    
  2. 通过运行 go mod edit -require 命令(将 0.8.0 替换为 CHANGELOG 中列出的 Databricks SDK for Go 包的最新版本)来依赖于 Databricks SDK for Go 包:

    go mod edit -require github.com/databricks/databricks-sdk-go@v0.8.0
    

    go.mod 文件现在应如下所示:

    module sample
    
    go 1.18
    
    require github.com/databricks/databricks-sdk-go v0.8.0
    
  3. 在项目中创建一个 Go 代码文件,用于导入 Databricks SDK for Go。 以下示例位于包含以下内容的名为 main.go 的文件中,它列出 Azure Databricks 工作区中的所有群集:

    package main
    
    import (
      "context"
    
      "github.com/databricks/databricks-sdk-go"
      "github.com/databricks/databricks-sdk-go/service/compute"
    )
    
    func main() {
      w := databricks.Must(databricks.NewWorkspaceClient())
      all, err := w.Clusters.ListAll(context.Background(), compute.ListClustersRequest{})
      if err != nil {
        panic(err)
      }
      for _, c := range all {
        println(c.ClusterName)
      }
    }
    
  4. 通过运行 go mod tidy 命令添加任何缺少的模块依赖项:

    go mod tidy
    

    注意

    如果收到错误 go: warning: "all" matched no packages,则表示你忘记了添加用于导入 Databricks SDK for Go 的 Go 代码文件。

  5. 通过运行 go mod vendor 命令,抓取支持生成和测试 main 模块中的包所需的所有包的副本:

    go mod vendor
    
  6. 为开发计算机设置 Azure Databricks 身份验证

  7. 通过运行 go run 命令来运行 Go 代码文件(假设文件名为 main.go):

    go run main.go
    

    注意

    如果不在前面的 w := databricks.Must(databricks.NewWorkspaceClient()) 调用中将 *databricks.Config 设置为参数,Databricks SDK for Go 将使用其默认进程来尝试执行 Azure Databricks 身份验证。 若要替代此默认行为,请参阅使用 Azure Databricks 帐户或工作区对 Databricks SDK for Go 进行身份验证

更新 Databricks SDK for Go

若要更新 Go 项目以使用 CHANGELOG 中列出的 Databricks SDK for Go 包之一,请执行以下操作:

  1. 从项目的根运行 go get 命令,指定要执行更新的 -u 标志,并提供 Databricks SDK for Go 包的名称和目标版本号。 例如,若要更新到版本 0.12.0,请运行以下命令:

    go get -u github.com/databricks/databricks-sdk-go@v0.12.0
    
  2. 通过运行 go mod tidy 命令添加并更新任何缺失和过时的模块依赖项:

    go mod tidy
    
  3. 通过运行 go mod vendor 命令,抓取支持生成和测试 main 模块中的包所需的所有新的和已更新包的副本:

    go mod vendor
    

使用 Azure Databricks 帐户或工作区对 Databricks SDK for Go 进行身份验证

Databricks SDK for Go 实施 Databricks 客户端统一身份验证标准,这是一种整合且一致的体系结构和编程身份验证方法。 此方法有助于使 Azure Databricks 的身份验证设置和自动化更加集中和可预测。 借助此方法,你只需配置 Databricks 身份验证一次,然后即可在多个 Databricks 工具和 SDK 中使用该配置,而无需进一步更改身份验证配置。 有关详细信息,包括更完整的 Go 代码示例,请参阅 Databricks 客户端统一身份验证

使用 Databricks SDK for Go 初始化 Databricks 身份验证的一些可用编码模式包括:

  • 通过以下操作之一使用 Databricks 默认身份验证:

    • 使用必填字段为目标 Databricks 身份验证类型创建或标识自定义 Databricks 配置文件。 然后将 DATABRICKS_CONFIG_PROFILE 环境变量设置为该自定义配置文件的名称。
    • 为目标 Databricks 身份验证类型设置所需的环境变量。

    然后实例化一个具有 Databricks 默认身份验证的 WorkspaceClient 对象,如下所示:

    import (
      "github.com/databricks/databricks-sdk-go"
    )
    // ...
    w := databricks.Must(databricks.NewWorkspaceClient())
    
  • 支持但不建议对必填字段进行硬编码,因为这可能会透露代码中的敏感信息,例如 Azure Databricks 个人访问令牌。 以下示例对用于 Databricks 令牌身份验证的 Azure Databricks 主机和访问令牌值进行硬编码:

    import (
      "github.com/databricks/databricks-sdk-go"
      "github.com/databricks/databricks-sdk-go/config"
    )
    // ...
    w := databricks.Must(databricks.NewWorkspaceClient(&databricks.Config{
      Host:  "https://...",
      Token: "...",
    }))
    

另请参阅 Databricks SDK for Go README 中的身份验证

示例

以下代码示例演示如何使用 Databricks SDK for Go 创建和删除群集、运行作业以及列出帐户用户。 这些代码示例使用 Databricks SDK for Go 的默认 Azure Databricks 身份验证过程。

有关其他代码示例,请参阅 GitHub 上的 Databricks SDK for Go 存储库中的 examples 文件夹。

创建群集

此代码示例创建使用最新可用 Databricks Runtime 长期支持 (LTS) 版本和最小可用群集节点类型的、包含本地磁盘的群集。 此群集有一个工作器,在空闲 15 分钟后自动终止。 CreateAndWait 方法调用导致代码暂停,直到新群集在工作区中运行。

package main

import (
  "context"
  "fmt"

  "github.com/databricks/databricks-sdk-go"
  "github.com/databricks/databricks-sdk-go/service/compute"
)

func main() {
  const clusterName            = "my-cluster"
  const autoTerminationMinutes = 15
  const numWorkers             = 1

  w   := databricks.Must(databricks.NewWorkspaceClient())
  ctx := context.Background()

  // Get the full list of available Spark versions to choose from.
  sparkVersions, err := w.Clusters.SparkVersions(ctx)

  if err != nil {
    panic(err)
  }

  // Choose the latest Long Term Support (LTS) version.
  latestLTS, err := sparkVersions.Select(compute.SparkVersionRequest{
    Latest:          true,
    LongTermSupport: true,
  })

  if err != nil {
    panic(err)
  }

  // Get the list of available cluster node types to choose from.
  nodeTypes, err := w.Clusters.ListNodeTypes(ctx)

  if err != nil {
    panic(err)
  }

  // Choose the smallest available cluster node type.
  smallestWithLocalDisk, err := nodeTypes.Smallest(clusters.NodeTypeRequest{
    LocalDisk: true,
  })

  if err != nil {
    panic(err)
  }

  fmt.Println("Now attempting to create the cluster, please wait...")

  runningCluster, err := w.Clusters.CreateAndWait(ctx, compute.CreateCluster{
    ClusterName:            clusterName,
    SparkVersion:           latestLTS,
    NodeTypeId:             smallestWithLocalDisk,
    AutoterminationMinutes: autoTerminationMinutes,
    NumWorkers:             numWorkers,
  })

  if err != nil {
    panic(err)
  }

  switch runningCluster.State {
  case compute.StateRunning:
    fmt.Printf("The cluster is now ready at %s#setting/clusters/%s/configuration\n",
      w.Config.Host,
      runningCluster.ClusterId,
    )
  default:
    fmt.Printf("Cluster is not running or failed to create. %s", runningCluster.StateMessage)
  }

  // Output:
  //
  // Now attempting to create the cluster, please wait...
  // The cluster is now ready at <workspace-host>#setting/clusters/<cluster-id>/configuration
}

永久删除群集

此代码示例从工作区中永久删除具有指定群集 ID 的群集。

package main

import (
  "context"

  "github.com/databricks/databricks-sdk-go"
  "github.com/databricks/databricks-sdk-go/service/clusters"
)

func main() {
  // Replace with your cluster's ID.
  const clusterId = "1234-567890-ab123cd4"

  w   := databricks.Must(databricks.NewWorkspaceClient())
  ctx := context.Background()

  err := w.Clusters.PermanentDelete(ctx, compute.PermanentDeleteCluster{
    ClusterId: clusterId,
  })

  if err != nil {
    panic(err)
  }
}

运行作业

此代码示例创建一个在指定群集上运行指定笔记本的 Azure Databricks 作业。 当代码运行时,它会从终端上的用户获取现有笔记本的路径、现有群集 ID 和相关作业设置。 RunNowAndWait 方法调用导致代码暂停,直到新作业在工作区中完成运行。

package main

import (
  "bufio"
  "context"
  "fmt"
  "os"
  "strings"

  "github.com/databricks/databricks-sdk-go"
  "github.com/databricks/databricks-sdk-go/service/jobs"
)

func main() {
  w   := databricks.Must(databricks.NewWorkspaceClient())
  ctx := context.Background()

  nt := jobs.NotebookTask{
    NotebookPath: askFor("Workspace path of the notebook to run:"),
  }

  jobToRun, err := w.Jobs.Create(ctx, jobs.CreateJob{
    Name: askFor("Some short name for the job:"),
    Tasks: []jobs.JobTaskSettings{
      {
        Description:       askFor("Some short description for the job:"),
        TaskKey:           askFor("Some key to apply to the job's tasks:"),
        ExistingClusterId: askFor("ID of the existing cluster in the workspace to run the job on:"),
        NotebookTask:      &nt,
      },
    },
  })

  if err != nil {
    panic(err)
  }

  fmt.Printf("Now attempting to run the job at %s/#job/%d, please wait...\n",
    w.Config.Host,
    jobToRun.JobId,
  )

  runningJob, err := w.Jobs.RunNow(ctx, jobs.RunNow{
    JobId: jobToRun.JobId,
  })

  if err != nil {
    panic(err)
  }

  jobRun, err := runningJob.Get()

  if err != nil {
    panic(err)
  }

  fmt.Printf("View the job run results at %s/#job/%d/run/%d\n",
    w.Config.Host,
    jobRun.JobId,
    jobRun.RunId,
  )

  // Output:
  //
  // Now attempting to run the job at <workspace-host>/#job/<job-id>, please wait...
  // View the job run results at <workspace-host>/#job/<job-id>/run/<run-id>
}

// Get job settings from the user.
func askFor(prompt string) string {
  var s string
  r := bufio.NewReader(os.Stdin)
  for {
    fmt.Fprint(os.Stdout, prompt+" ")
    s, _ = r.ReadString('\n')
    if s != "" {
      break
    }
  }
  return strings.TrimSpace(s)
}

管理 Unity Catalog 卷中的文件

此代码示例演示了对 WorkspaceClient 中用于访问 Unity Catalog files 功能的各种调用。

package main

import (
  "context"
  "io"
  "os"

  "github.com/databricks/databricks-sdk-go"
  "github.com/databricks/databricks-sdk-go/service/files"
)

func main() {
  w := databricks.Must(databricks.NewWorkspaceClient())

  catalog          := "main"
  schema           := "default"
  volume           := "my-volume"
  volumePath       := "/Volumes/" + catalog + "/" + schema + "/" + volume // /Volumes/main/default/my-volume
  volumeFolder     := "my-folder"
  volumeFolderPath := volumePath + "/" + volumeFolder // /Volumes/main/default/my-volume/my-folder
  volumeFile       := "data.csv"
  volumeFilePath   := volumeFolderPath + "/" + volumeFile // /Volumes/main/default/my-volume/my-folder/data.csv
  uploadFilePath   := "./data.csv"

  // Create an empty folder in a volume.
  err := w.Files.CreateDirectory(
    context.Background(),
    files.CreateDirectoryRequest{DirectoryPath: volumeFolderPath},
  )
  if err != nil {
    panic(err)
  }

  // Upload a file to a volume.
  fileUpload, err := os.Open(uploadFilePath)
  if err != nil {
    panic(err)
  }
  defer fileUpload.Close()

  w.Files.Upload(
    context.Background(),
    files.UploadRequest{
      Contents:  fileUpload,
      FilePath:  volumeFilePath,
      Overwrite: true,
    },
  )

  // List the contents of a volume.
  items := w.Files.ListDirectoryContents(
    context.Background(),
    files.ListDirectoryContentsRequest{DirectoryPath: volumePath},
  )

  for {
    if items.HasNext(context.Background()) {
      item, err := items.Next(context.Background())
      if err != nil {
        break
      }
      println(item.Path)

    } else {
      break
    }
  }

  // List the contents of a folder in a volume.
  itemsFolder := w.Files.ListDirectoryContents(
    context.Background(),
    files.ListDirectoryContentsRequest{DirectoryPath: volumeFolderPath},
  )

  for {
    if itemsFolder.HasNext(context.Background()) {
      item, err := itemsFolder.Next(context.Background())
      if err != nil {
        break
      }
      println(item.Path)
    } else {
      break
    }
  }

  // Print the contents of a file in a volume.
  file, err := w.Files.DownloadByFilePath(
    context.Background(),
    volumeFilePath,
  )
  if err != nil {
    panic(err)
  }

  bufDownload := make([]byte, file.ContentLength)

  for {
    file, err := file.Contents.Read(bufDownload)
    if err != nil && err != io.EOF {
      panic(err)
    }
    if file == 0 {
      break
    }

    println(string(bufDownload[:file]))
  }

  // Delete a file from a volume.
  w.Files.DeleteByFilePath(
    context.Background(),
    volumeFilePath,
  )

  // Delete a folder from a volume.
  w.Files.DeleteDirectory(
    context.Background(),
    files.DeleteDirectoryRequest{
      DirectoryPath: volumeFolderPath,
    },
  )
}

列出帐户用户

此代码示例列出某个 Azure Databricks 帐户中的可用用户。

package main

import (
  "context"

  "github.com/databricks/databricks-sdk-go"
  "github.com/databricks/databricks-sdk-go/service/iam"
)

func main() {
  a := databricks.Must(databricks.NewAccountClient())
  all, err := a.Users.ListAll(context.Background(), iam.ListAccountUsersRequest{})
  if err != nil {
    panic(err)
  }
  for _, u := range all {
    println(u.UserName)
  }
}

其他资源

有关详细信息,请参阅: