API 示例API examples

本文包含的示例演示了如何使用 Azure Databricks REST API 2.0。This article contains examples that demonstrate how to use the Azure Databricks REST API 2.0.

在以下示例中,请将 <databricks-instance> 替换为 Azure Databricks 部署的工作区 URLIn the following examples, replace <databricks-instance> with the workspace URL of your Azure Databricks deployment.

身份验证Authentication

若要了解如何对 REST API 进行身份验证,请参阅使用 Azure Databricks 个人访问令牌进行身份验证使用 Azure Active Directory 令牌进行身份验证To learn how to authenticate to the REST API, review Authentication using Azure Databricks personal access tokens and Authentication using Azure Active Directory tokens.

本文中的示例假设你使用的是 Azure Databricks 个人访问令牌The examples in this article assume you are using Azure Databricks personal access tokens. 在以下示例中,将 <your-token> 替换为你的个人访问令牌。In the following examples, replace <your-token> with your personal access token. curl 示例假设你将 Azure Databricks API 凭据存储在 .netrc 中。The curl examples assume that you store Azure Databricks API credentials under .netrc. Python 示例使用 Bearer 身份验证。The Python examples use Bearer authentication. 尽管示例显示了将令牌存储在代码中,但为在 Azure Databricks 中安全地利用凭据,建议按机密管理用户指南进行操作。Although the examples show storing the token in the code, for leveraging credentials safely in Azure Databricks, we recommend that you follow the Secret management user guide.

有关按使用 Azure Active Directory 令牌进行身份验证进行操作的示例,请参阅该部分中的文章。For examples that use Authentication using Azure Active Directory tokens, see the articles in that section.

获取群集的 gzip 压缩列表 Get a gzipped list of clusters

curl -n -H "Accept-Encoding: gzip" https://<databricks-instance>/api/2.0/clusters/list > clusters.gz

将大文件上传到 DBFS Upload a big file into DBFS

单个 API 调用上传的数据量不能超过 1MB。The amount of data uploaded by single API call cannot exceed 1MB. 若要将超过 1MB 的文件上传到 DBFS,请使用由 createaddBlockclose 组合而成的流式处理 API。To upload a file that is larger than 1MB to DBFS, use the streaming API, which is a combination of create, addBlock, and close.

以下是有关如何使用 Python 执行此操作的示例。Here is an example of how to perform this action using Python.

import json
import requests
import base64

DOMAIN = '<databricks-instance>'
TOKEN = '<your-token>'
BASE_URL = 'https://%s/api/2.0/dbfs/' % (DOMAIN)

def dbfs_rpc(action, body):
  """ A helper function to make the DBFS API request, request/response is encoded/decoded as JSON """
  response = requests.post(
    BASE_URL + action,
    headers={'Authorization': 'Bearer %s' % TOKEN },
    json=body
  )
  return response.json()

# Create a handle that will be used to add blocks
handle = dbfs_rpc("create", {"path": "/temp/upload_large_file", "overwrite": "true"})['handle']
with open('/a/local/file') as f:
  while True:
    # A block can be at most 1MB
    block = f.read(1 << 20)
    if not block:
        break
    data = base64.standard_b64encode(block)
    dbfs_rpc("add-block", {"handle": handle, "data": data})
# close the handle to finish uploading
dbfs_rpc("close", {"handle": handle})

创建 Python 3 群集 (Databricks Runtime 5.5 LTS) Create a Python 3 cluster (Databricks Runtime 5.5 LTS)

备注

在 Databricks Runtime 6.0 及更高版本和带有 Conda 的 Databricks Runtime 中,Python 3 是默认的 Python 版本。Python 3 is default version of Python in Databricks Runtime 6.0 and above and Databricks Runtime with Conda.

以下示例显示了如何使用 Databricks REST API 和 Python HTTP 库 Requests来启动 Python 3 群集:The following example shows how to launch a Python 3 cluster using the Databricks REST API and the requests Python HTTP library:

import requests

DOMAIN = '<databricks-instance>'
TOKEN = '<your-token>'

response = requests.post(
  'https://%s/api/2.0/clusters/create' % (DOMAIN),
  headers={'Authorization': 'Bearer %s' % TOKEN},
  json={
    "cluster_name": "my-cluster",
    "spark_version": "5.5.x-scala2.11",
    "node_type_id": "Standard_D3_v2",
    "spark_env_vars": {
      "PYSPARK_PYTHON": "/databricks/python3/bin/python3",
    }
  }
)

if response.status_code == 200:
  print(response.json()['cluster_id'])
else:
  print("Error launching cluster: %s: %s" % (response.json()["error_code"], response.json()["message"]))

创建高并发群集 Create a High Concurrency cluster

以下示例显示了如何使用 Databricks REST API 启动高并发模式群集:The following example shows how to launch a High Concurrency mode cluster using the Databricks REST API:

curl -n -X POST -H 'Content-Type: application/json' -d '{
  "cluster_name": "high-concurrency-cluster",
  "spark_version": "6.3.x-scala2.11",
  "node_type_id": "Standard_D3_v2",
  "spark_conf":{
        "spark.databricks.cluster.profile":"serverless",
        "spark.databricks.repl.allowedLanguages":"sql,python,r"
     },
   "custom_tags":{
        "ResourceClass":"Serverless"
     },
       "autoscale":{
        "min_workers":1,
        "max_workers":2
     },
  "autotermination_minutes":10
}' https://<databricks-instance>/api/2.0/clusters/create

作业 API 示例 Jobs API examples

本部分介绍如何创建 Python、spark submit 和 JAR 作业,以及如何运行 JAR 作业并查看其输出。This section shows how to create Python, spark submit, and JAR jobs and run the JAR job and view its output.

创建 Python 作业Create a Python job

此示例演示如何创建 Python 作业。This example shows how to create a Python job. 它使用 Apache Spark Python Spark Pi 估算。It uses the Apache Spark Python Spark Pi estimation.

  1. 下载包含该示例的 [Python 文件]((https://docs.microsoft.com/en-us/azure/databricks/_static/examples/pi.py),然后使用 Databricks CLI 将其上传到 Databricks 文件系统 (DBFS)Download the [Python file]((https://docs.microsoft.com/en-us/azure/databricks/_static/examples/pi.py) containing the example and upload it to Databricks File System (DBFS) using the Databricks CLI.

    dbfs cp pi.py dbfs:/docs/pi.py
    
  2. 创建作业。Create the job. 以下示例演示如何使用 Databricks RuntimeDatabricks Light 创建作业。The following examples demonstrate how to create a job using Databricks Runtime and Databricks Light.

    Databricks RuntimeDatabricks Runtime

    curl -n -X POST -H 'Content-Type: application/json' -d \
    '{
      "name": "SparkPi Python job",
      "new_cluster": {
        "spark_version": "6.2.x-scala2.11",
        "node_type_id": "Standard_D3_v2",
        "num_workers": 2
      },
      "spark_python_task": {
        "python_file": "dbfs:/pi.py",
        "parameters": [
          "10"
        ]
      }
    }' https://<databricks-instance>/api/2.0/jobs/create
    

    Databricks LightDatabricks Light

    curl -n -X POST -H 'Content-Type: application/json' -d
    '{
        "name": "SparkPi Python job",
        "new_cluster": {
         "spark_version": "apache-spark-2.4.x-scala2.11",
         "node_type_id": "Standard_D3_v2",
         "num_workers": 2
        },
        "spark_python_task": {
         "python_file": "dbfs:/pi.py",
         "parameters": [
           "10"
         ]
      }
    }' https://<databricks-instance>/api/2.0/jobs/create
    

创建 spark-submit 作业 Create a spark-submit job

此示例演示如何创建 spark-submit 作业。This example shows how to create a spark-submit job. 它使用 Apache Spark SparkPi 示例It uses the Apache Spark SparkPi example.

  1. 下载包含该示例的 [JAR]((https://docs.microsoft.com/en-us/azure/databricks/_static/examples/SparkPi-assembly-0.1.jar),然后使用 Databricks CLI 将 JAR 上传到 Databricks 文件系统 (DBFS)Download the [JAR]((https://docs.microsoft.com/en-us/azure/databricks/_static/examples/SparkPi-assembly-0.1.jar) containing the example and upload the JAR to Databricks File System (DBFS) using the Databricks CLI.

    dbfs cp SparkPi-assembly-0.1.jar dbfs:/docs/sparkpi.jar
    
  2. 创建作业。Create the job.

    curl -n \
    -X POST -H 'Content-Type: application/json' \
    -d '{
         "name": "SparkPi spark-submit job",
         "new_cluster": {
           "spark_version": "5.2.x-scala2.11",
           "node_type_id": "Standard_DS3_v2",
           "num_workers": 2
           },
        "spark_submit_task": {
           "parameters": [
             "--class",
             "org.apache.spark.examples.SparkPi",
             "dbfs:/docs/sparkpi.jar",
             "10"
          ]
        }
    }' https://<databricks-instance>/api/2.0/jobs/create
    

创建并运行适用于 R 脚本的 spark-submit 作业 Create and run a spark-submit job for R scripts

此示例演示如何创建 spark-submit 作业以运行 R 脚本。This example shows how to create a spark-submit job to run R scripts.

  1. 使用 Databricks CLI 将 R 文件上传到 Databricks 文件系统 (DBFS)Upload the R file to Databricks File System (DBFS) using the Databricks CLI.

    dbfs cp your_code.R dbfs:/path/to/your_code.R
    

    如果代码使用 SparkR,则必须先安装包。If the code uses SparkR, it must first install the package. Databricks Runtime 包含 SparkR 源代码。Databricks Runtime contains the SparkR source code. 如以下示例所示,从其本地目录安装 SparkR 包:Install the SparkR package from its local directory as shown in the following example:

    install.packages("/databricks/spark/R/pkg", repos = NULL)
    library(SparkR)
    
    sparkR.session()
    n <- nrow(createDataFrame(iris))
    write.csv(n, "/dbfs/path/to/num_rows.csv")
    

    Databricks Runtime 从 CRAN 安装最新版的 sparklyr。Databricks Runtime installs the latest version of sparklyr from CRAN. 如果代码使用 sparklyr,则你必须在 spark_connect 中指定 Spark 主 URL。If the code uses sparklyr, You must specify the Spark master URL in spark_connect. 若要构成 Spark 主 URL,请使用 SPARK_LOCAL_IP 环境变量来获取 IP,然后使用默认端口 7077。To form the Spark master URL, use the SPARK_LOCAL_IP environment variable to get the IP, and use the default port 7077. 例如:For example:

    library(sparklyr)
    
    master <- paste("spark://", Sys.getenv("SPARK_LOCAL_IP"), ":7077", sep="")
    sc <- spark_connect(master)
    iris_tbl <- copy_to(sc, iris)
    write.csv(iris_tbl, "/dbfs/path/to/sparklyr_iris.csv")
    
  2. 创建作业。Create the job.

    curl -n \
    -X POST -H 'Content-Type: application/json' \
    -d '{
         "name": "R script spark-submit job",
         "new_cluster": {
           "spark_version": "5.5.x-scala2.11",
           "node_type_id": "Standard_DS3_v2",
           "num_workers": 2
           },
        "spark_submit_task": {
           "parameters": [ "dbfs:/path/to/your_code.R" ]
           }
    }' https://<databricks-instance>/api/2.0/jobs/create
    

    这将返回 job-id,随后可使用它来运行作业。This returns a job-id that you can then use to run the job.

  3. 使用 job-id 运行作业。Run the job using the job-id.

    curl -n \
    -X POST -H 'Content-Type: application/json' \
    -d '{ "job_id": <job-id> }' https://<databricks-instance>/api/2.0/jobs/run-now
    

创建并运行 JAR 作业 Create and run a JAR job

此示例说明如何创建并运行 JAR 作业。This example shows how to create and run a JAR job. 它使用 Apache Spark SparkPi 示例It uses the Apache Spark SparkPi example.

  1. 下载包含该示例的 [JAR]((https://docs.microsoft.com/en-us/azure/databricks/_static/examples/SparkPi-assembly-0.1.jar)。Download the [JAR]((https://docs.microsoft.com/en-us/azure/databricks/_static/examples/SparkPi-assembly-0.1.jar) containing the example.

  2. 使用 API 将 JAR 上传到 Azure Databricks 实例:Upload the JAR to your Azure Databricks instance using the API:

    curl -n \
    -F filedata=@"SparkPi-assembly-0.1.jar" \
    -F path="/docs/sparkpi.jar" \
    -F overwrite=true \
    https://<databricks-instance>/api/2.0/dbfs/put
    

    调用成功会返回 {}A successful call returns {}. 否则,你会看到错误消息。Otherwise you will see an error message.

  3. 创建作业前,获取所有 Spark 版本的列表。Get a list of all Spark versions prior to creating your job.

    curl -n https://<databricks-instance>/api/2.0/clusters/spark-versions
    

    本示例使用 5.2.x-scala2.11This example uses 5.2.x-scala2.11. 有关 Spark 群集版本的更多信息,请参阅 Runtime 版本字符串See Runtime version strings for more information about Spark cluster versions.

  4. 创建作业。Create the job. JAR 指定为库,主类名在 Spark JAR 任务中进行引用。The JAR is specified as a library and the main class name is referenced in the Spark JAR task.

    curl -n -X POST -H 'Content-Type: application/json' \
    -d '{
          "name": "SparkPi JAR job",
          "new_cluster": {
            "spark_version": "5.2.x-scala2.11",
            "node_type_id": "Standard_DS3_v2",
            "num_workers": 2
            },
         "libraries": [{"jar": "dbfs:/docs/sparkpi.jar"}],
         "spark_jar_task": {
            "main_class_name":"org.apache.spark.examples.SparkPi",
            "parameters": "10"
            }
    }' https://<databricks-instance>/api/2.0/jobs/create
    

    这将返回 job-id,随后可使用它来运行作业。This returns a job-id that you can then use to run the job.

  5. 使用 run now 运行作业:Run the job using run now:

    curl -n \
    -X POST -H 'Content-Type: application/json' \
    -d '{ "job_id": <job-id> }' https://<databricks-instance>/api/2.0/jobs/run-now
    
  6. 导航到 https://<databricks-instance>/#job/<job-id>,你将可以看到作业正在运行。Navigate to https://<databricks-instance>/#job/<job-id> and you’ll be able to see your job running.

  7. 你还可以使用上个请求返回的信息从 API 对其进行检查。You can also check on it from the API using the information returned from the previous request.

    curl -n https://<databricks-instance>/api/2.0/jobs/runs/get?run_id=<run-id> | jq
    

    这应返回如下所示的输出:Which should return something like:

    {
      "job_id": 35,
      "run_id": 30,
      "number_in_job": 1,
      "original_attempt_run_id": 30,
      "state": {
        "life_cycle_state": "TERMINATED",
        "result_state": "SUCCESS",
        "state_message": ""
      },
      "task": {
        "spark_jar_task": {
          "jar_uri": "",
          "main_class_name": "org.apache.spark.examples.SparkPi",
          "parameters": [
            "10"
          ],
          "run_as_repl": true
        }
      },
      "cluster_spec": {
        "new_cluster": {
          "spark_version": "5.2.x-scala2.11",
          "node_type_id": "<node-type>",
          "enable_elastic_disk": false,
          "num_workers": 1
        },
        "libraries": [
          {
            "jar": "dbfs:/docs/sparkpi.jar"
          }
        ]
      },
      "cluster_instance": {
        "cluster_id": "0412-165350-type465",
        "spark_context_id": "5998195893958609953"
      },
      "start_time": 1523552029282,
      "setup_duration": 211000,
      "execution_duration": 33000,
      "cleanup_duration": 2000,
      "trigger": "ONE_TIME",
      "creator_user_name": "...",
      "run_name": "SparkPi JAR job",
      "run_page_url": "<databricks-instance>/?o=3901135158661429#job/35/run/1",
      "run_type": "JOB_RUN"
    }
    
  8. 若要查看作业输出,请访问作业运行详细信息页To view the job output, visit the job run details page.

    Executing command, time = 1523552263909.
    Pi is roughly 3.13973913973914
    

有关创建启用了表访问控制的群集的示例 Create cluster enabled for table access control example

若要创建启用了表访问控制的群集,请在请求正文中指定以下 spark_conf 属性:To create a cluster enabled for table access control, specify the following spark_conf property in your request body:

curl -X POST https://<databricks-instance>/api/2.0/clusters/create -d'
{
  "cluster_name": "my-cluster",
  "spark_version": "5.2.x-scala2.11",
  "node_type_id": "Standard_DS3_v2",
  "spark_conf": {
    "spark.databricks.acl.dfAclsEnabled":true,
    "spark.databricks.repl.allowedLanguages": "python,sql"
  },
  "num_workers": 1,
  "custom_tags":{
     "costcenter":"Tags",
     "applicationname":"Tags1"
  }
}'

群集日志传送示例 Cluster log delivery examples

你可以在 Spark UI 中查看 Spark 驱动程序和执行程序日志,Azure Databricks 也可以将日志传递到 DBFS 目标。While you can view the Spark driver and executor logs in the Spark UI, Azure Databricks can also deliver the logs to DBFS destinations. 请参阅以下示例。See the following examples.

创建一个群集,其中的日志传递到 DBFS 位置Create a cluster with logs delivered to a DBFS location

以下 cURL 命令创建名为 cluster_log_dbfs 的群集,并请求 Azure Databricks 将其日志发送到群集 ID 为路径前缀的 dbfs:/logsThe following cURL command creates a cluster named cluster_log_dbfs and requests Azure Databricks to sends its logs to dbfs:/logs with the cluster ID as the path prefix.

curl -n -X POST -H 'Content-Type: application/json' -d \
'{
  "cluster_name": "cluster_log_dbfs",
  "spark_version": "5.2.x-scala2.11",
  "node_type_id": "Standard_D3_v2",
  "num_workers": 1,
  "cluster_log_conf": {
    "dbfs": {
      "destination": "dbfs:/logs"
    }
  }
}' https://<databricks-instance>/api/2.0/clusters/create

响应应包含群集 ID:The response should contain the cluster ID:

{"cluster_id":"1111-223344-abc55"}

创建群集后,Azure Databricks 每 5 分钟将日志文件同步到目标。After cluster creation, Azure Databricks syncs log files to the destination every 5 minutes. 它将驱动程序日志上传到 dbfs:/logs/1111-223344-abc55/driver,将执行程序日志上传到 dbfs:/logs/1111-223344-abc55/executorIt uploads driver logs to dbfs:/logs/1111-223344-abc55/driver and executor logs to dbfs:/logs/1111-223344-abc55/executor.

检查日志传送状态Check log delivery status

可以通过 API 检索包含日志传送状态的群集信息:You can retrieve cluster information with log delivery status via API:

curl -n -H 'Content-Type: application/json' -d \
'{
  "cluster_id": "1111-223344-abc55"
}' https://<databricks-instance>/api/2.0/clusters/get

如果成功上传最近一批日志,则响应应仅包含最后一次尝试的时间戳:If the latest batch of log upload was successful, the response should contain only the timestamp of the last attempt:

{
  "cluster_log_status": {
    "last_attempted": 1479338561
  }
}

如果出现错误,错误消息会显示在响应中:In case of errors, the error message would appear in the response:

{
  "cluster_log_status": {
    "last_attempted": 1479338561,
    "last_exception": "Exception: Access Denied ..."
  }
}

工作区示例 Workspace examples

下面是使用工作区 API 列出、创建、删除、导出和导入工作区对象以及获取其相关信息的一些示例。Here are some examples for using the Workspace API to list, get info about, create, delete, export, and import workspace objects.

列出笔记本或文件夹List a notebook or a folder

以下 cURL 命令列出了工作区中的一个路径。The following cURL command lists a path in the workspace.

curl -n -X GET -H 'Content-Type: application/json' -d \
'{
  "path": "/Users/user@example.com/"
}' https://<databricks-instance>/api/2.0/workspace/list

响应应包含状态列表:The response should contain a list of statuses:

{
  "objects": [
    {
     "object_type": "DIRECTORY",
     "path": "/Users/user@example.com/folder"
    },
    {
     "object_type": "NOTEBOOK",
     "language": "PYTHON",
     "path": "/Users/user@example.com/notebook1"
    },
    {
     "object_type": "NOTEBOOK",
     "language": "SCALA",
     "path": "/Users/user@example.com/notebook2"
    }
  ]
}

如果该路径是一个笔记本,则响应将包含具有输入笔记本状态的数组。If the path is a notebook, the response contains an array containing the status of the input notebook.

获取有关笔记本或文件夹的信息Get information about a notebook or a folder

以下 cURL 命令获取工作区中一个路径的状态。The following cURL command gets the status of a path in the workspace.

curl -n  -X GET -H 'Content-Type: application/json' -d \
'{
  "path": "/Users/user@example.com/"
}' https://<databricks-instance>/api/2.0/workspace/get-status

响应应包含输入路径的状态:The response should contain the status of the input path:

{
  "object_type": "DIRECTORY",
  "path": "/Users/user@example.com"
}

创建一个文件夹Create a folder

以下 cURL 命令创建文件夹。The following cURL command creates a folder. 它以递归方式创建文件夹,例如 mkdir -pIt creates the folder recursively like mkdir -p. 如果文件夹已存在,它不需执行任何操作即可成功执行。If the folder already exists, it will do nothing and succeed.

curl -n -X POST -H 'Content-Type: application/json' -d \
'{
  "path": "/Users/user@example.com/new/folder"
}' https://<databricks-instance>/api/2.0/workspace/mkdirs

如果请求成功,将返回空 JSON 字符串。If the request succeeds, an empty JSON string will be returned.

删除笔记本或文件夹Delete a notebook or folder

以下 cURL 命令删除笔记本或文件夹。The following cURL command deletes a notebook or folder. 可以启用 recursive 以递归方式删除非空文件夹。You can enable recursive to recursively delete a non-empty folder.

curl -n -X POST -H 'Content-Type: application/json' -d \
'{
  "path": "/Users/user@example.com/new/folder",
  "recursive": "false"
}' https://<databricks-instance>/api/2.0/workspace/delete

如果请求成功,则返回空 JSON 字符串。If the request succeeds, an empty JSON string is returned.

导出笔记本或文件夹 Export a notebook or folder

以下 cURL 命令导出笔记本。The following cURL command exports a notebook. 可以以下格式导出笔记本:SOURCEHTMLJUPYTERDBCNotebooks can be exported in the following formats: SOURCE, HTML, JUPYTER, DBC. 文件夹只能导出为 DBCA folder can be exported only as DBC.

curl -n  -X GET -H 'Content-Type: application/json' \
'{
  "path": "/Users/user@example.com/notebook",
  "format": "SOURCE"
}' https://<databricks-instance>/api/2.0/workspace/export

响应包含 base64 编码的笔记本内容。The response contains base64 encoded notebook content.

{
  "content": "Ly8gRGF0YWJyaWNrcyBub3RlYm9vayBzb3VyY2UKcHJpbnQoImhlbGxvLCB3b3JsZCIpCgovLyBDT01NQU5EIC0tLS0tLS0tLS0KCg=="
}

或者,可以直接下载导出的笔记本。Alternatively, you can download the exported notebook directly.

curl -n -X GET "https://<databricks-instance>/api/2.0/workspace/export?format=SOURCE&direct_download=true&path=/Users/user@example.com/notebook"

响应将是导出的笔记本内容。The response will be the exported notebook content.

导入笔记本或目录 Import a notebook or directory

以下 cURL 命令在工作区中导入笔记本。The following cURL command imports a notebook in the workspace. 支持多种格式(SOURCEHTMLJUPYTERDBC)。Multiple formats (SOURCE, HTML, JUPYTER, DBC) are supported. 如果 formatSOURCE,则必须指定 languageIf the format is SOURCE, you must specify language. content 参数包含 base64 编码的笔记本内容。The content parameter contains base64 encoded notebook content. 可以启用 overwrite 来覆盖现有笔记本。You can enable overwrite to overwrite the existing notebook.

curl -n -X POST -H 'Content-Type: application/json' -d \
'{
  "path": "/Users/user@example.com/new-notebook",
  "format": "SOURCE",
  "language": "SCALA",
  "content": "Ly8gRGF0YWJyaWNrcyBub3RlYm9vayBzb3VyY2UKcHJpbnQoImhlbGxvLCB3b3JsZCIpCgovLyBDT01NQU5EIC0tLS0tLS0tLS0KCg==",
  "overwrite": "false"
}' https://<databricks-instance>/api/2.0/workspace/import

如果请求成功,则返回空 JSON 字符串。If the request succeeds, an empty JSON string is returned.

或者,可以通过多部分窗体发布导入笔记本。Alternatively, you can import a notebook via multipart form post.

curl -n -X POST https://<databricks-instance>/api/2.0/workspace/import \
       -F path="/Users/user@example.com/new-notebook" -F format=SOURCE -F language=SCALA -F overwrite=true -F content=@notebook.scala