增量实时表 API 指南

重要

本文内容已过时,将来可能不会更新。 请参阅 Databricks REST API 参考中的增量实时表

增量实时表 API 让你能够创建、编辑、删除、启动和查看管道相关的详细信息。

重要

要访问 Databricks REST API,必须进行身份验证

创建管道

端点 HTTP 方法
2.0/pipelines POST

创建新的增量实时表管道。

示例

此示例可创建新的触发管道。

请求

curl --netrc -X POST \
https://<databricks-instance>/api/2.0/pipelines \
--data @pipeline-settings.json

pipeline-settings.json

{
  "name": "Wikipedia pipeline (SQL)",
  "storage": "/Users/username/data",
  "clusters": [
    {
      "label": "default",
      "autoscale": {
        "min_workers": 1,
        "max_workers": 5,
        "mode": "ENHANCED"
      }
    }
  ],
  "libraries": [
    {
      "notebook": {
        "path": "/Users/username/DLT Notebooks/Delta Live Tables quickstart (SQL)"
      }
    }
  ],
  "continuous": false
}

将:

  • <databricks-instance> 替换为 Azure Databricks <databricks-instance>(例如 adb-1234567890123456.7.databricks.azure.cn)。

此示例使用 .netrc 文件。

响应

{
  "pipeline_id": "a12cd3e4-0ab1-1abc-1a2b-1a2bcd3e4fg5"
}

请求结构

请参阅 PipelineSettings

响应结构

字段名称 类型 说明
pipeline_id STRING 新创建的管道的唯一标识符。

编辑管道

端点 HTTP 方法
2.0/pipelines/{pipeline_id} PUT

更新现有管道的设置。

示例

此示例将 target 参数添加到 ID 为 a12cd3e4-0ab1-1abc-1a2b-1a2bcd3e4fg5 的管道:

请求

curl --netrc -X PUT \
https://<databricks-instance>/api/2.0/pipelines/a12cd3e4-0ab1-1abc-1a2b-1a2bcd3e4fg5 \
--data @pipeline-settings.json

pipeline-settings.json

{
  "id": "a12cd3e4-0ab1-1abc-1a2b-1a2bcd3e4fg5",
  "name": "Wikipedia pipeline (SQL)",
  "storage": "/Users/username/data",
  "clusters": [
    {
      "label": "default",
      "autoscale": {
        "min_workers": 1,
        "max_workers": 5,
        "mode": "ENHANCED"
      }
    }
  ],
  "libraries": [
    {
      "notebook": {
        "path": "/Users/username/DLT Notebooks/Delta Live Tables quickstart (SQL)"
      }
    }
  ],
  "target": "wikipedia_quickstart_data",
  "continuous": false
}

将:

  • <databricks-instance> 替换为 Azure Databricks <databricks-instance>(例如 adb-1234567890123456.7.databricks.azure.cn)。

此示例使用 .netrc 文件。

请求结构

请参阅 PipelineSettings

删除管道

端点 HTTP 方法
2.0/pipelines/{pipeline_id} DELETE

从增量实时表系统中删除管道。

示例

此示例删除 ID 为 a12cd3e4-0ab1-1abc-1a2b-1a2bcd3e4fg5 的管道:

请求

curl --netrc -X DELETE \
https://<databricks-instance>/api/2.0/pipelines/a12cd3e4-0ab1-1abc-1a2b-1a2bcd3e4fg5

将:

  • <databricks-instance> 替换为 Azure Databricks <databricks-instance>(例如 adb-1234567890123456.7.databricks.azure.cn)。

此示例使用 .netrc 文件。

启动管道更新

端点 HTTP 方法
2.0/pipelines/{pipeline_id}/updates POST

启动管道的更新。 可以启动对整个管道图的更新,也可以启动对特定表的选择性更新。

示例

启动全面刷新

此示例使用完全刷新开始更新 ID 为 a12cd3e4-0ab1-1abc-1a2b-1a2bcd3e4fg5 的管道:

请求
curl --netrc -X POST \
https://<databricks-instance>/api/2.0/pipelines/a12cd3e4-0ab1-1abc-1a2b-1a2bcd3e4fg5/updates \
--data '{ "full_refresh": "true" }'

将:

  • <databricks-instance> 替换为 Azure Databricks <databricks-instance>(例如 adb-1234567890123456.7.databricks.azure.cn)。

此示例使用 .netrc 文件。

响应
{
  "update_id": "a1b23c4d-5e6f-78gh-91i2-3j4k5lm67no8",
  "request_id": "a1b23c4d-5e6f-78gh-91i2-3j4k5lm67no8"
}

启动对所选表的更新

此示例启动更新以刷新管道中 ID 为 a12cd3e4-0ab1-1abc-1a2b-1a2bcd3e4fg5sales_orders_cleanedsales_order_in_chicago 表:

请求
curl --netrc -X POST \
https://<databricks-instance>/api/2.0/pipelines/a12cd3e4-0ab1-1abc-1a2b-1a2bcd3e4fg5/updates \
--data '{ "refresh_selection": ["sales_orders_cleaned", "sales_order_in_chicago"] }'

将:

  • <databricks-instance> 替换为 Azure Databricks <databricks-instance>(例如 adb-1234567890123456.7.databricks.azure.cn)。

此示例使用 .netrc 文件。

响应
{
  "update_id": "a1b23c4d-5e6f-78gh-91i2-3j4k5lm67no8",
  "request_id": "a1b23c4d-5e6f-78gh-91i2-3j4k5lm67no8"
}

启动对所选表的全面更新

此示例启动对 sales_orders_cleanedsales_order_in_chicago 表的更新,并在 ID 为 a12cd3e4-0ab1-1abc-1a2b-1a2bcd3e4fg5 的管道中启动对 customerssales_orders_raw 表的完全刷新更新。

请求
curl --netrc -X POST \
https://<databricks-instance>/api/2.0/pipelines/a12cd3e4-0ab1-1abc-1a2b-1a2bcd3e4fg5/updates \
--data '{ "refresh_selection": ["sales_orders_cleaned", "sales_order_in_chicago"], "full_refresh_selection": ["customers", "sales_orders_raw"] }'

将:

  • <databricks-instance> 替换为 Azure Databricks <databricks-instance>(例如 adb-1234567890123456.7.databricks.azure.cn)。

此示例使用 .netrc 文件。

响应
{
  "update_id": "a1b23c4d-5e6f-78gh-91i2-3j4k5lm67no8",
  "request_id": "a1b23c4d-5e6f-78gh-91i2-3j4k5lm67no8"
}

请求结构

字段名称 类型 说明
full_refresh BOOLEAN 是否重新处理所有数据。 如果为 true,则 Delta Live Tables 系统会在运行管道之前重置所有可重置的表。

此字段是可选的。

默认值为 false

如果 full_refesh 为 true 并且设置了 refresh_selectionfull_refresh_selection,则会返回错误。
refresh_selection 一个由 STRING 构成的数组 要更新的表列表。 用途
refresh_selection 启动对管道图中的一组选定表的刷新。

此字段可选。 如果
refresh_selection
full_refresh_selection 都为空,则刷新整个管道图。

如果出现以下情况,则返回错误:

* full_refesh 为 true 且
已设置 refresh_selection
* 一个或多个指定的表在管道图中不存在。
full_refresh_selection 一个由 STRING 构成的数组 完全刷新更新的表列表。 使用 full_refresh_selection 启动对一组选定的表的更新。 在 Delta Live Tables 系统启动更新之前,会重置指定表的状态。

此字段可选。 如果
refresh_selection
full_refresh_selection 都为空,则刷新整个管道图。

如果出现以下情况,则返回错误:

* full_refesh 为 true 且
已设置 refresh_selection
* 一个或多个指定的表在管道图中不存在。
* 一个或多个指定表不可重置。

响应结构

字段名称 类型 说明
update_id STRING 新创建的更新的唯一标识符。
request_id STRING 启动更新的请求的唯一标识符。

获取管道更新请求的状态

端点 HTTP 方法
2.0/pipelines/{pipeline_id}/requests/{request_id} GET

获取与 request_id 关联的管道更新的状态和信息,其中 request_id 是启动管道更新的请求的唯一标识符。 如果重试或重启更新,新更新将继承 request_id。

示例

对于 ID 为 a12cd3e4-0ab1-1abc-1a2b-1a2bcd3e4fg5 的管道,此示例返回与请求 ID a83d9f7c-d798-4fd5-aa39-301b6e6f4429 关联的更新的状态和信息:

请求

curl --netrc -X GET \
https://<databricks-instance>/api/2.0/pipelines/a12cd3e4-0ab1-1abc-1a2b-1a2bcd3e4fg5/requests/a83d9f7c-d798-4fd5-aa39-301b6e6f4429

将:

  • <databricks-instance> 替换为 Azure Databricks <databricks-instance>(例如 adb-1234567890123456.7.databricks.azure.cn)。

此示例使用 .netrc 文件。

响应

{
   "status": "TERMINATED",
   "latest_update":{
     "pipeline_id": "a12cd3e4-0ab1-1abc-1a2b-1a2bcd3e4fg5",
     "update_id": "90da8183-89de-4715-b5a9-c243e67f0093",
     "config":{
       "id": "aae89b88-e97e-40c4-8e1a-1b7ac76657e8",
       "name": "Retail sales (SQL)",
       "storage": "/Users/username/data",
       "configuration":{
         "pipelines.numStreamRetryAttempts": "5"
       },
       "clusters":[
         {
           "label": "default",
           "autoscale":{
             "min_workers": 1,
             "max_workers": 5,
             "mode": "ENHANCED"
           }
         }
       ],
       "libraries":[
         {
           "notebook":{
             "path": "/Users/username/DLT Notebooks/Delta Live Tables quickstart (SQL)"
           }
         }
       ],
       "continuous": false,
       "development": true,
       "photon": true,
       "edition": "advanced",
       "channel": "CURRENT"
     },
     "cause": "API_CALL",
     "state": "COMPLETED",
     "cluster_id": "1234-567891-abcde123",
     "creation_time": 1664304117145,
     "full_refresh": false,
     "request_id": "a83d9f7c-d798-4fd5-aa39-301b6e6f4429"
   }
}

响应结构

字段名称 类型 说明
status STRING 管道更新请求的状态。 其中一个

* ACTIVE:此请求的更新正在运行,或者可能会在新更新中重试。
* TERMINATED:请求已终止,不会进行重试或重启。
pipeline_id STRING 管道的唯一标识符。
update_id STRING 更新的唯一标识符。
config PipelineSettings 管道设置。
cause STRING 更新的触发器。 API_CALL
RETRY_ON_FAILURESERVICE_UPGRADESCHEMA_CHANGE,
JOB_TASKUSER_ACTION 中的一项。
state STRING 更新的状态。 QUEUEDCREATED
WAITING_FOR_RESOURCESINITIALIZINGRESETTING,
SETTING_UP_TABLESRUNNINGSTOPPINGCOMPLETED,
FAILEDCANCELED 中的一项。
cluster_id STRING 运行更新的群集的标识符。
creation_time INT64 创建更新时的时间戳。
full_refresh BOOLEAN 此更新是否在运行之前重置所有表
refresh_selection 一个由 STRING 构成的数组 无需完全刷新即可更新的表列表。
full_refresh_selection 一个由 STRING 构成的数组 完全刷新更新的表列表。
request_id STRING 启动更新的请求的唯一标识符。 这是更新请求返回的值。 如果重试或重启更新,新更新将继承 request_id。 但 update_id 将有所不同。

停止任何活动管道更新

端点 HTTP 方法
2.0/pipelines/{pipeline_id}/stop POST

停止任何活动管道更新。 如果没有正在运行的更新,则此请求为无操作。

对于连续管道,暂停管道执行。 当前处理的表完成刷新,但下游表没有刷新。 在下一次管道更新时,Delta Live Tables 对未完成处理的表执行选定刷新,并恢复剩余管道 DAG 的处理。

对于触发的管道,停止管道执行。 当前处理的表完成刷新,但下游表没有刷新。 在下一次管道更新时,Delta Live Tables 会刷新所有表。

示例

此示例停止更新 ID 为 a12cd3e4-0ab1-1abc-1a2b-1a2bcd3e4fg5 的管道:

请求

curl --netrc -X POST \
https://<databricks-instance>/api/2.0/pipelines/a12cd3e4-0ab1-1abc-1a2b-1a2bcd3e4fg5/stop

将:

  • <databricks-instance> 替换为 Azure Databricks <databricks-instance>(例如 adb-1234567890123456.7.databricks.azure.cn)。

此示例使用 .netrc 文件。

列出管道事件

端点 HTTP 方法
2.0/pipelines/{pipeline_id}/events GET

检索管道的事件。

示例

此示例最多检索 ID 为 a12cd3e4-0ab1-1abc-1a2b-1a2bcd3e4fg5 的管道的 5 个事件。

请求

curl --netrc -X GET \
https://<databricks-instance>/api/2.0/pipelines/a12cd3e4-0ab1-1abc-1a2b-1a2bcd3e4fg5/events?max_results=5

将:

  • <databricks-instance> 替换为 Azure Databricks <databricks-instance>(例如 adb-1234567890123456.7.databricks.azure.cn)。

此示例使用 .netrc 文件。

请求结构

字段名称 类型 说明
page_token STRING 先前调用返回的页面令牌。 此字段与此请求中除 max_results 以外的所有字段互斥。 如果在设置此字段时设置了除 max_results 以外的任何字段,则会返回错误。

此字段是可选的。
max_results INT32 要在单个页面中返回的最大条目数。 即使有更多的可用事件,系统在响应中返回的事件也可能少于 max_results

此字段是可选的。

默认值为 25。

最大值为 100。 如果
max_results 的值大于 100,则返回错误。
order_by STRING 一个指示结果的排序顺序的字符串(例如,["timestamp asc"])。

排序顺序可以是升序,也可以是降序。 默认情况下,事件按时间戳的降序返回。

此字段是可选的。
filter STRING 用于选择结果子集的条件,使用类似于 SQL 的语法表示。 支持的筛选器包括:

* level='INFO'(或 WARNERROR
* level in ('INFO', 'WARN')
* id='[event-id]'
* timestamp > 'TIMESTAMP'(或 >=<<==

支持复合表达式,例如:
level in ('ERROR', 'WARN') AND timestamp> '2021-07-22T06:37:33.083Z'

此字段是可选的。

响应结构

字段名称 类型 说明
events 管道事件的数组。 匹配请求条件的事件列表。
next_page_token STRING 如果存在,则为用于获取下一页事件的令牌。
prev_page_token STRING 如果存在,则为用于获取上一页事件的令牌。

获取管道详细信息

端点 HTTP 方法
2.0/pipelines/{pipeline_id} GET

获取有关管道的详细信息,包括管道设置和最新更新。

示例

此示例将获取 ID 为 a12cd3e4-0ab1-1abc-1a2b-1a2bcd3e4fg5 的管道的详细信息:

请求

curl --netrc -X GET \
https://<databricks-instance>/api/2.0/pipelines/a12cd3e4-0ab1-1abc-1a2b-1a2bcd3e4fg5

将:

  • <databricks-instance> 替换为 Azure Databricks <databricks-instance>(例如 adb-1234567890123456.7.databricks.azure.cn)。

此示例使用 .netrc 文件。

响应

{
  "pipeline_id": "a12cd3e4-0ab1-1abc-1a2b-1a2bcd3e4fg5",
  "spec": {
    "id": "a12cd3e4-0ab1-1abc-1a2b-1a2bcd3e4fg5",
    "name": "Wikipedia pipeline (SQL)",
    "storage": "/Users/username/data",
    "clusters": [
      {
        "label": "default",
        "autoscale": {
          "min_workers": 1,
          "max_workers": 5,
          "mode": "ENHANCED"
        }
      }
    ],
    "libraries": [
      {
        "notebook": {
          "path": "/Users/username/DLT Notebooks/Delta Live Tables quickstart (SQL)"
        }
      }
    ],
    "target": "wikipedia_quickstart_data",
    "continuous": false
  },
  "state": "IDLE",
  "cluster_id": "1234-567891-abcde123",
  "name": "Wikipedia pipeline (SQL)",
  "creator_user_name": "username",
  "latest_updates": [
    {
      "update_id": "8a0b6d02-fbd0-11eb-9a03-0242ac130003",
      "state": "COMPLETED",
      "creation_time": "2021-08-13T00:37:30.279Z"
    },
    {
      "update_id": "a72c08ba-fbd0-11eb-9a03-0242ac130003",
      "state": "CANCELED",
      "creation_time": "2021-08-13T00:35:51.902Z"
    },
    {
      "update_id": "ac37d924-fbd0-11eb-9a03-0242ac130003",
      "state": "FAILED",
      "creation_time": "2021-08-13T00:33:38.565Z"
    }
  ],
  "run_as_user_name": "username"
}

响应结构

字段名称 类型 说明
pipeline_id STRING 管道的唯一标识符。
spec PipelineSettings 管道设置。
state STRING 管道的状态。 IDLERUNNING 中的一项。

如果 state = RUNNING,则至少有一个活动的更新。
cluster_id STRING 运行管道的群集的标识符。
name STRING 此管道的用户友好名称。
creator_user_name STRING 管道创建者的用户名。
latest_updates UpdateStateInfo 的数组 管道最近更新的状态,按从新到旧的更新顺序排序。
run_as_user_name STRING 管道运行时使用的用户名。

获取更新详细信息

端点 HTTP 方法
2.0/pipelines/{pipeline_id}/updates/{update_id} GET

获取管道更新的详细信息。

示例

此示例将获取 ID 为 a12cd3e4-0ab1-1abc-1a2b-1a2bcd3e4fg5 的管道更新 9a84f906-fc51-11eb-9a03-0242ac130003 的详细信息:

请求

curl --netrc -X GET \
https://<databricks-instance>/api/2.0/pipelines/a12cd3e4-0ab1-1abc-1a2b-1a2bcd3e4fg5/updates/9a84f906-fc51-11eb-9a03-0242ac130003

将:

  • <databricks-instance> 替换为 Azure Databricks <databricks-instance>(例如 adb-1234567890123456.7.databricks.azure.cn)。

此示例使用 .netrc 文件。

响应

{
  "update": {
    "pipeline_id": "a12cd3e4-0ab1-1abc-1a2b-1a2bcd3e4fg5",
    "update_id": "9a84f906-fc51-11eb-9a03-0242ac130003",
    "config": {
      "id": "a12cd3e4-0ab1-1abc-1a2b-1a2bcd3e4fg5",
      "name": "Wikipedia pipeline (SQL)",
      "storage": "/Users/username/data",
      "configuration": {
        "pipelines.numStreamRetryAttempts": "5"
      },
      "clusters": [
        {
          "label": "default",
          "autoscale": {
            "min_workers": 1,
            "max_workers": 5,
            "mode": "ENHANCED"
          }
        }
      ],
      "libraries": [
        {
          "notebook": {
            "path": "/Users/username/DLT Notebooks/Delta Live Tables quickstart (SQL)"
          }
        }
      ],
      "target": "wikipedia_quickstart_data",
      "continuous": false,
      "development": false
    },
    "cause": "API_CALL",
    "state": "COMPLETED",
    "creation_time": 1628815050279,
    "full_refresh": true,
    "request_id": "a83d9f7c-d798-4fd5-aa39-301b6e6f4429"
  }
}

响应结构

字段名称 类型 说明
pipeline_id STRING 管道的唯一标识符。
update_id STRING 此更新的唯一标识符。
config PipelineSettings 管道设置。
cause STRING 更新的触发器。 API_CALL
RETRY_ON_FAILURESERVICE_UPGRADE.
state STRING 更新的状态。 QUEUEDCREATED
WAITING_FOR_RESOURCESINITIALIZINGRESETTING,
SETTING_UP_TABLESRUNNINGSTOPPINGCOMPLETED,
FAILEDCANCELED 中的一项。
cluster_id STRING 运行管道的群集的标识符。
creation_time INT64 创建更新时的时间戳。
full_refresh BOOLEAN 这是否是一次完全刷新。 如果是,则在运行更新之前重置所有管道表。

列出管道

端点 HTTP 方法
2.0/pipelines/ GET

列出增量实时表系统中定义的管道。

示例

此示例检索名称包含 quickstart 的管道的详细信息:

请求

curl --netrc -X GET \
https://<databricks-instance>/api/2.0/pipelines?filter=name%20LIKE%20%27%25quickstart%25%27

将:

  • <databricks-instance> 替换为 Azure Databricks <databricks-instance>(例如 adb-1234567890123456.7.databricks.azure.cn)。

此示例使用 .netrc 文件。

响应

{
  "statuses": [
    {
      "pipeline_id": "e0f01758-fc61-11eb-9a03-0242ac130003",
      "state": "IDLE",
      "name": "DLT quickstart (Python)",
      "latest_updates": [
        {
          "update_id": "ee9ae73e-fc61-11eb-9a03-0242ac130003",
          "state": "COMPLETED",
          "creation_time": "2021-08-13T00:34:21.871Z"
        }
      ],
      "creator_user_name": "username"
    },
    {
      "pipeline_id": "f4c82f5e-fc61-11eb-9a03-0242ac130003",
      "state": "IDLE",
      "name": "My DLT quickstart example",
      "creator_user_name": "username"
    }
  ],
  "next_page_token": "eyJ...==",
  "prev_page_token": "eyJ..x9"
}

请求结构

字段名称 类型 说明
page_token STRING 先前调用返回的页面令牌。

此字段是可选的。
max_results INT32 要在单个页面中返回的最大条目数。 即使有更多的可用事件,系统在响应中返回的事件也可能少于 max_results

此字段是可选的。

默认值为 25。

最大值为 100。 如果
max_results 的值大于 100,则返回错误。
order_by 一个由 STRING 构成的数组 指定结果顺序的字符串列表,例如,
["name asc"]。 支持的 order_by 字段为 id
name。 默认值为 id asc

此字段是可选的。
filter STRING 根据指定条件选择结果的子集。

支持的筛选器包括:

"notebook='<path>'",用以选择引用所提供笔记本路径的管道。

name LIKE '[pattern]',用以选择名称与 pattern 匹配的管道。 支持通配符,例如:
name LIKE '%shopping%'

不支持复合筛选器。

此字段是可选的。

响应结构

字段名称 类型 说明
statuses PipelineStateInfo 的数组 匹配请求条件的事件列表。
next_page_token STRING 如果存在,则为用于获取下一页事件的令牌。
prev_page_token STRING 如果存在,则为用于获取上一页事件的令牌。

数据结构

本节内容:

ABFSSStorageInfo

Azure Data Lake Storage (ADLS) 存储信息。

字段名称 类型 说明
destination STRING 文件目标。 示例: abfss://...

ClusterLogConf

群集日志的路径。

字段名称 类型 说明
dbfs DbfsStorageInfo 群集日志的 DBFS 位置。 必须提供目标。 例如,应用于对象的
{ "dbfs" : { "destination" : "dbfs:/home/cluster_log" } }

DbfsStorageInfo

DBFS 存储信息。

字段名称 类型 说明
destination STRING DBFS 目标。 示例: dbfs:/my/path

FileStorageInfo

文件存储信息。

注意

此位置类型只适用于使用 Databricks 容器服务设置的群集。

字段名称 类型 说明
destination STRING 文件目标。 示例: file:/my/file.sh

InitScriptInfo

初始化脚本的路径。

若要了解如何将初始化脚本与 Databricks 容器服务配合使用,请参阅使用初始化脚本

注意

文件存储类型(字段名称:file)只适用于使用 Databricks 容器服务设置的群集。 请参阅 FileStorageInfo

字段名称 类型 说明
workspace OR
dbfs(已弃用)

OR
abfss
WorkspaceStorageInfo

DbfsStorageInfo(已弃用)

ABFSSStorageInfo
init 脚本的工作区位置。 必须提供目标。 例如,应用于对象的
{ "workspace" : { "destination" : "/Users/someone@domain.com/init_script.sh" } }

(已弃用)init 脚本的 DBFS 位置。 必须提供目标。 例如,应用于对象的
{ "dbfs" : { "destination" : "dbfs:/home/init_script" } }

init 脚本的 Azure Data Lake Storage (ADLS) 位置。 必须提供目标。 例如,{ "abfss": { "destination" : "abfss://..." } }

KeyValue

指定配置参数的键值对。

字段名称 类型 说明
key STRING 配置属性名称。
value STRING 配置属性值。

NotebookLibrary

包含管道代码的笔记本的规范。

字段名称 类型 说明
path STRING 笔记本的绝对路径。

此字段为必需字段。

PipelinesAutoScale

属性,定义某个自动缩放群集。

字段名称 类型 说明
min_workers INT32 群集在未充分利用时可纵向缩减到的最小工作器数。 此数量也是群集在创建后将会具有的初始工作器的数量。
max_workers INT32 群集在负载过高时可纵向扩展到的最大工作器数。 max_workers 必须严格大于 min_workers。
mode STRING 群集的自动缩放模式:

* ENHANCED,可使用增强型自动缩放
* LEGACY,可使用群集自动缩放功能

PipelineLibrary

管道依赖项的规范。

字段名称 类型 说明
notebook NotebookLibrary 定义增量实时表数据集的笔记本的路径。 路径必须在 Databricks 工作区中,例如:
{ "notebook" : { "path" : "/my-pipeline-notebook-path" } }

PipelinesNewCluster

管道群集规范。

增量实时表系统设置以下属性。 用户无法配置以下属性:

  • spark_version
字段名称 类型 说明
label STRING 群集规范的标签,无论是
配置默认群集的 default,还是
配置维护群集的 maintenance

此字段是可选的。 默认值为 default
spark_conf KeyValue 一个对象,其中包含一组可选的由用户指定的 Spark 配置键值对。 你也可以分别通过以下属性,将额外 JVM 选项的字符串传入到驱动程序和执行程序:
spark.driver.extraJavaOptionsspark.executor.extraJavaOptions

示例 Spark 配置:
{"spark.speculation": true, "spark.streaming.ui.retainedBatches": 5}
{"spark.driver.extraJavaOptions": "-verbose:gc -XX:+PrintGCDetails"}
node_type_id STRING 此字段通过单个值对提供给此群集中的每个 Spark 节点的资源进行编码。 例如,可以针对内存密集型或计算密集型的工作负荷来预配和优化 Spark 节点。通过使用群集 API 调用可以检索可用节点类型的列表。
driver_node_type_id STRING Spark 驱动程序的节点类型。 此字段为可选;如果未设置,驱动程序节点类型将会被设置为与上面定义的 node_type_id 相同的值。
ssh_public_keys 一个由 STRING 构成的数组 将会添加到此群集中各个 Spark 节点的 SSH 公钥内容。 对应的私钥可用于在端口 2200上使用用户名 ubuntu 登录。 最多可以指定 10 个密钥。
custom_tags KeyValue 一个对象,其中包含群集资源的一组标记。 Databricks 会使用这些标记以及 default_tags 来标记所有的群集资源。

注意

* 旧版节点类型(如计算优化和内存优化)不支持标记
* Azure Databricks 最多允许 45 个自定义标记。
cluster_log_conf ClusterLogConf 用于将 Spark 日志传递到长期存储目标的配置。 对于一个群集,只能指定一个目标。 如果提供此配置,日志将会发送到目标,发送的时间间隔为
5 mins。 驱动程序日志的目标是 <destination>/<cluster-ID>/driver,而执行程序日志的目标是 <destination>/<cluster-ID>/executor
spark_env_vars KeyValue 一个对象,其中包含一组可选的由用户指定的环境变量键值对。 在启动驱动程序和工作器时,(X,Y) 形式的键值对会按原样导出(即
export X='Y')。

要额外指定一组 SPARK_DAEMON_JAVA_OPTS,Databricks 建议将其追加到 $SPARK_DAEMON_JAVA_OPTS,如以下示例中所示。 这样就确保了还会包含所有默认的 Azure Databricks 托管环境变量。

Spark 环境变量示例:
{"SPARK_WORKER_MEMORY": "28000m", "SPARK_LOCAL_DIRS": "/local_disk0"}
{"SPARK_DAEMON_JAVA_OPTS": "$SPARK_DAEMON_JAVA_OPTS -Dspark.shuffle.service.enabled=true"}
init_scripts 一个由 InitScriptInfo 构成的数组 用于存储初始化脚本的配置。 可以指定任意数量的目标。 这些脚本会按照所提供的顺序依次执行。 如果指定了 cluster_log_conf,初始化脚本日志将会发送到
<destination>/<cluster-ID>/init_scripts
instance_pool_id STRING 群集所属的实例池的可选 ID。 请参阅池配置参考
driver_instance_pool_id STRING 要用于驱动程序节点的实例池的可选 ID。 另外还必须指定
instance_pool_id。 请参阅实例池 API
policy_id STRING 群集策略 ID。
num_workers OR autoscale INT32InitScriptInfo 如果是 num_workers,则此项为此群集应该具有的工作器节点数。 一个群集有一个 Spark 驱动程序和 num_workers 个执行程序用于总共 (num_workers + 1) 个 Spark 节点。

在读取群集的属性时,此字段反映的是所需的工作器数,而不是实际的工作器数。 例如,如果将群集的大小从 5 个工作器重设为 10 个工作器,此字段会更新,以反映 10 个工作器的目标大小,而执行程序中列出的工作器将会随着新节点的预配,逐渐从 5 个增加到 10 个。

如果是 autoscale,则会需要参数,以便根据负载自动纵向扩展或缩减群集。

此字段可选。
apply_policy_default_values BOOLEAN 是否对缺失的群集属性使用策略默认值。

PipelineSettings

管道部署的设置。

字段名称 类型 说明
id STRING 此管道的唯一标识符。

标识符是由增量实时表系统创建的,并且在创建管道时不得提供。
name STRING 此管道的用户友好名称。

此字段是可选的。

默认情况下,管道名称必须唯一。 若要使用重复的名称,请在管道配置中将 allow_duplicate_names 设置为 true
storage STRING 用于存储管道创建的检查点和表的 DBFS 目录的路径。

此字段是可选的。

如果此字段为空,则系统将使用默认位置。
configuration STRING:STRING 的映射 要添加到将运行管道的群集的 Spark 配置中的键值对的列表。

此字段是可选的。

必须将元素设置为“键:值”对的格式。
clusters PipelinesNewCluster 的数组 要运行管道的群集的规范数组。

此字段是可选的。

如果未指定此字段,系统将为管道选择默认群集配置。
libraries PipelineLibrary 的数组 包含管道代码的笔记本和运行管道所需的任何依赖项。
target STRING 用于保存管道输出数据的数据库名称。

有关详细信息,请参阅将数据从增量实时表发布到 Hive 元存储
continuous BOOLEAN 这是否为连续管道。

此字段是可选的。

默认值为 false
development BOOLEAN 是否在开发模式下运行管道。

此字段是可选的。

默认值为 false
photon BOOLEAN 是否为此管道启用 Photon 加速。

此字段是可选的。

默认值为 false
channel STRING Delta Live Tables 发布通道指定用于此管道的运行时版本。 支持的值是:

* preview,用于测试管道,并对 Delta Live Tables 运行时进行即时更改。
* current,使用当前的 Delta Live Tables 运行时版本。

此字段是可选的。

默认值为 current
edition STRING 运行管道的 Delta Live Tables 产品版本:

* CORE 支持流式引入工作负载。
* PRO 还支持流式引入工作负载,并添加了对变更数据捕获 (CDC) 处理的支持。
* ADVANCED 支持 PRO 版本的所有功能,并增加了对需要 Delta Live Tables 期望以强制执行数据质量约束的工作负载的支持。

此字段是可选的。

默认值为 advanced

PipelineStateInfo

管道的状态、最新更新的状态以及有关关联资源的信息。

字段名称 类型 说明
state STRING 管道的状态。 IDLERUNNING 中的一项。
pipeline_id STRING 管道的唯一标识符。
cluster_id STRING 运行管道的群集的唯一标识符。
name STRING 管道的用户友好名称。
latest_updates UpdateStateInfo 的数组 管道最近更新的状态,按从新到旧的更新顺序排序。
creator_user_name STRING 管道创建者的用户名。
run_as_user_name STRING 管道运行时使用的用户名。 这是从管道所有者派生的只读值。

UpdateStateInfo

管道更新的当前状态。

字段名称 类型 说明
update_id STRING 此更新的唯一标识符。
state STRING 更新的状态。 QUEUEDCREATED
WAITING_FOR_RESOURCESINITIALIZINGRESETTING,
SETTING_UP_TABLESRUNNINGSTOPPINGCOMPLETED,
FAILEDCANCELED 中的一项。
creation_time STRING 创建此更新时的时间戳。

WorkspaceStorageInfo

工作区存储信息。

字段名称 类型 说明
destination STRING 文件目标。 示例: /Users/someone@domain.com/init_script.sh