共用方式為

将任务添加到 Databricks 资产捆绑包中的作业

本页提供有关如何在 Databricks 资产捆绑包中定义作业任务的信息。 有关任务作业的信息,请参阅 在 Lakeflow 中配置和编辑任务

重要

建议不要将作业 git_source 字段和任务 source 字段设置为 GIT,因为本地相对路径可能不会指向 Git 存储库中的相同内容。 捆绑包要求部署的作业具有与部署作业的本地副本相同的文件。

相反,会在本地克隆存储库并在此存储库中设置捆绑包项目,以便任务源是工作区。

配置任务

tasks键中为捆绑包中的作业定义任务。 可用任务类型的任务配置示例位于 “任务设置” 部分。 有关在捆绑包中定义作业的信息,请参阅 作业

提示

若要使用 Databricks CLI 快速生成现有作业的资源配置,可以使用 bundle generate job 命令。 请参阅捆绑包命令

若要设置任务值,大多数作业任务类型都具有特定于任务的参数,但你也可以定义传递给任务的 作业参数 。 作业参数支持动态值引用,这些参数支持传递特定于任务之间作业运行的值。 有关如何按任务类型传递任务值的完整信息,请参阅 按任务类型的详细信息

还可以使用目标工作区的设置替代常规作业任务设置。 请参阅 “使用目标设置进行替代”

以下示例配置定义包含两个笔记本任务的作业,并将任务值从第一个任务传递到第二个任务。

resources:
  jobs:
    pass_task_values_job:
      name: pass_task_values_job
      tasks:
        # Output task
        - task_key: output_value
          notebook_task:
            notebook_path: ../src/output_notebook.ipynb

        # Input task
        - task_key: input_value
          depends_on:
            - task_key: output_value
          notebook_task:
            notebook_path: ../src/input_notebook.ipynb
            base_parameters:
              received_message: '{{tasks.output_value.values.message}}'

output_notebook.ipynb包含以下代码,用于设置message键的任务值:

# Databricks notebook source
# This first task sets a simple output value.

message = "Hello from the first task"

# Set the message to be used by other tasks
dbutils.jobs.taskValues.set(key="message", value=message)

print(f"Produced message: {message}")

input_notebook.ipynb 检索在任务配置中设置的参数 received_message 的值。

# This notebook receives the message as a parameter.

dbutils.widgets.text("received_message", "")
received_message = dbutils.widgets.get("received_message")

print(f"Received message: {received_message}")

任务设置

本部分包含每个作业任务类型的设置和示例。

条件任务

你可以通过 condition_task 向作业添加具有 if/else 条件逻辑的任务。 该任务评估可用于控制其他任务执行的条件。 条件任务不需要群集来执行,也不支持重试或通知。 有关 if/else 条件任务的详细信息,请参阅 使用 If/else 任务向作业添加分支逻辑

以下键可用于条件任务。 有关相应的 REST API 对象定义,请参阅 condition_task

Key 类型 Description
left String 必填。 条件的左作数。 可以是字符串值、作业状态或动态值引用,例如 {{job.repair_count}}{{tasks.task_key.values.output}}
op String 必填。 要用于比较的运算符。 有效值为:EQUAL_TO、、NOT_EQUALGREATER_THANGREATER_THAN_OR_EQUALLESS_THAN、 。 LESS_THAN_OR_EQUAL
right String 必填。 条件的右作数。 可以是字符串值、作业状态或动态值引用。

例子

以下示例包含条件任务和笔记本任务,其中笔记本任务仅在作业修复数小于 5 时执行。

resources:
  jobs:
    my-job:
      name: my-job
      tasks:
        - task_key: condition_task
          condition_task:
            op: LESS_THAN
            left: '{{job.repair_count}}'
            right: '5'
        - task_key: notebook_task
          depends_on:
            - task_key: condition_task
              outcome: 'true'
          notebook_task:
            notebook_path: ../src/notebook.ipynb

控制面板任务

使用此任务刷新仪表板并将快照发送到订阅者。 有关捆绑包中的仪表板的详细信息,请参阅 仪表板

以下密钥可用于 仪表板任务。 有关相应的 REST API 对象定义,请参阅 dashboard_task

Key 类型 Description
dashboard_id String 必填。 需要刷新的仪表板的标识符。 仪表板必须已存在。
subscription 地图 用于发送仪表板快照的订阅配置。 每个订阅对象可以指定仪表板刷新完成后,发送快照的目标位置设置。 请参阅 订阅
warehouse_id String 用于执行计划仪表板的仓库 ID。 如果未指定,将使用仪表板的默认仓库。

例子

以下示例将仪表板中的任务添加到作业。 运行作业时,将刷新具有指定 ID 的仪表板。

resources:
  jobs:
    my-dashboard-job:
      name: my-dashboard-job
      tasks:
        - task_key: my-dashboard-task
          dashboard_task:
            dashboard_id: 11111111-1111-1111-1111-111111111111

dbt 任务

使用此任务运行一个或多个 dbt 命令。 有关 dbt 的详细信息,请参阅 “连接到 dbt Cloud”。

以下键可用于 dbt 任务。 有关相应的 REST API 对象定义,请参阅 dbt_task

Key 类型 Description
catalog String 要使用的目录的名称。 仅当指定了warehouse_id 时,才能指定目录值。 此字段需要 dbt-databricks >= 1.1.1。
commands 序列 必填。 要按顺序执行的 dbt 命令的列表。 每个命令必须是完整的 dbt 命令(例如,、dbt depsdbt seeddbt rundbt test)。 最多可提供 10 个命令。
profiles_directory String 包含 dbt profiles.yml 文件的目录的路径。 仅在没有指定 warehouse_id 时才能指定。 如果未指定warehouse_id,并且此文件夹未设置,将使用根目录。
project_directory String 包含 dbt 项目的目录的路径。 如果未指定,则默认为存储库或工作区目录的根目录。 对于 Databricks 工作区中存储的项目,路径必须是绝对路径,以斜杠开头。 对于远程存储库中的项目,路径必须为相对路径。
schema String 要写入的架构。 仅当提供了warehouse_id时才使用此参数。 如果未提供,则使用默认架构。
source String dbt 项目的位置类型。 有效值为 WORKSPACEGIT。 设置为 WORKSPACE 时,将从 Databricks 工作区检索项目。 设置为 GIT 时,将从在 git_source 中定义的 Git 存储库中检索项目。 如果为空,则任务使用GIT,如果定义了git_source,则使用git_source,否则使用。
warehouse_id String 用于运行 dbt 命令的 SQL 仓库的 ID。 如果未指定,将使用默认仓库。

例子

以下示例向作业添加 dbt 任务。 此 dbt 任务使用指定的 SQL 仓库来运行指定的 dbt 命令。

若要获取 SQL 仓库的 ID,请打开 SQL 仓库的设置页,然后复制“概述”选项卡上“名称”字段中仓库名称后面括号中的 ID

提示

Databricks 资产捆绑包还包含一个 dbt-sql 项目模板,可用于定义具有 dbt 任务的作业,以及已部署 dbt 作业的 dbt 配置文件。 有关 Databricks 资产捆绑包模板的信息,请参阅 默认捆绑模板

resources:
  jobs:
    my-dbt-job:
      name: my-dbt-job
      tasks:
        - task_key: my-dbt-task
          dbt_task:
            commands:
              - 'dbt deps'
              - 'dbt seed'
              - 'dbt run'
            project_directory: /Users/someone@example.com/Testing
            warehouse_id: 1a111111a1111aa1
          libraries:
            - pypi:
                package: 'dbt-databricks>=1.0.0,<2.0.0'

For each 任务

你可以通过 for_each_task 向作业添加包含 for each 循环的任务。 该任务为提供的每个输入执行嵌套任务。 如需获取有关 for_each_task的详细信息,请参阅 。使用 For each 任务在循环中运行另一个任务

以下键可用于for_each_task。 有关相应的 REST API 对象定义,请参阅 for_each_task

Key 类型 Description
concurrency 整数 可以并发运行的最大任务迭代数。 如果未指定,则所有迭代可能会并行运行,但受群集和工作区限制的约束。
inputs String 必填。 循环的输入数据。 可以是 JSON 字符串,也可以是对数组参数的引用。 数组中的每个元素都将传递给嵌套任务的一次迭代。
task 地图 必填。 要为每个输入执行的嵌套任务定义。 此对象包含完整的任务规范,包括 task_key 和任务类型(例如 notebook_taskpython_wheel_task 等等)。

例子

以下示例向作业添加一个 for_each_task,在该作业中循环访问另一个任务的值并将其处理。

resources:
  jobs:
    my_job:
      name: my_job
      tasks:
        - task_key: generate_countries_list
          notebook_task:
            notebook_path: ../src/generate_countries_list.ipnyb
        - task_key: process_countries
          depends_on:
            - task_key: generate_countries_list
          for_each_task:
            inputs: '{{tasks.generate_countries_list.values.countries}}'
            task:
              task_key: process_countries_iteration
              notebook_task:
                notebook_path: ../src/process_countries_notebook.ipnyb

JAR 任务

使用此任务运行 JAR。 可以引用本地 JAR 库或工作区、Unity Catalog 卷或外部云存储位置中的 JAR 库。 请参阅 JAR 文件(Java 或 Scala)。

若要详细了解如何在标准访问模式下在启用了 Unity Catalog 的群集上编译和部署 Scala JAR 文件,请参阅在 Unity Catalog 群集上部署 Scala JAR

以下密钥可用于 JAR 任务。 有关相应的 REST API 对象定义,请参阅 jar_task

Key 类型 Description
jar_uri String 已弃用。 要执行的 JAR 的 URI。 支持 DBFS 和云存储路径。 此字段已弃用,不应使用。 请改用 libraries 字段指定 JAR 依赖项。
main_class_name String 必填。 类的全名,包含要执行的主要方法。 此类必须包含在作为库提供的 JAR 中。 代码必须使用SparkContext.getOrCreate来获取 Spark 上下文,否则作业将失败。
parameters 序列 传递给主方法的参数。 使用任务参数变量设置包含作业运行信息的参数。

例子

以下示例向作业添加 JAR 任务。 JAR 文件的路径指向一个卷位置。

resources:
  jobs:
    my-jar-job:
      name: my-jar-job
      tasks:
        - task_key: my-jar-task
          spark_jar_task:
            main_class_name: org.example.com.Main
          libraries:
            - jar: /Volumes/main/default/my-volume/my-project-0.1.0-SNAPSHOT.jar

笔记本任务

使用此任务运行笔记本。 请参阅作业的笔记本任务

以下密钥可用于笔记本任务。 有关相应的 REST API 对象定义,请参阅 notebook_task

Key 类型 Description
base_parameters 地图 用于这项作业每次运行的基本参数。
  • 如果通过调用 jobs 或即时运行来启动运行,并指定参数,则将两个参数映射合并。
  • 如果在base_parametersrun-now中指定了同一个键,则使用来自run-now的值。 使用任务参数变量设置包含作业运行信息的参数。
  • 若笔记本参数未在作业的base_parameters或重写参数run-now中指定,则将使用笔记本中的默认值。 在笔记本中使用 dbutils.widgets.get 检索这些参数。
notebook_path String 必填。 Databricks 工作区或远程存储库中的笔记本路径,例如 /Users/user.name@databricks.com/notebook_to_run。 对于存储在 Databricks 工作区中的笔记本,路径必须是绝对路径,以斜杠开头。 对于存储在远程存储库中的笔记本,路径必须相对。
source String 笔记本的位置类型。 有效值为 WORKSPACEGIT。 当设置为WORKSPACE时,将从本地 Databricks 工作区检索笔记本。 当设置为GIT时,将从git_source定义的 Git 存储库中检索笔记本。 如果值为空,任务将在定义GIT时使用GIT,否则使用WORKSPACE
warehouse_id String 运行笔记本所需的仓库ID。 不支持经典 SQL 仓库。 请改用无服务器或 pro SQL 仓库。 请注意,SQL 仓库仅支持 SQL 单元格。 如果笔记本包含非 SQL 单元格,则运行将失败,因此,如果需要在单元格中使用 Python(或其他),请使用无服务器。

例子

以下示例将笔记本任务添加到作业,并设置名为 my_job_run_id 的作业参数。 要部署的笔记本的路径是相对于声明了此任务的配置文件而言。 该任务从其在 Azure Databricks 工作区中的部署位置获取笔记本。

resources:
  jobs:
    my-notebook-job:
      name: my-notebook-job
      tasks:
        - task_key: my-notebook-task
          notebook_task:
            notebook_path: ./my-notebook.ipynb
      parameters:
        - name: my_job_run_id
          default: '{{job.run_id}}'

管道任务

你可以用这个任务来运行一个管道。 请参阅 Lakeflow 声明性管道

以下密钥可用于 管道任务。 有关相应的 REST API 对象定义,请参阅 pipeline_task

Key 类型 Description
full_refresh 布尔 如果为真,将触发管道的完全刷新,这将全量重算管道中的所有数据集。 如果是 false 或省略,则只处理增量数据。 有关详细信息,请参阅 管道刷新语义
pipeline_id String 必填。 要运行的管道的 ID。 管道必须已存在。

例子

以下示例将流水线任务添加到作业中。 此任务运行指定的管道。

提示

可以通过以下方法来查找管道的 ID:在工作区中打开管道,并在管道设置页的“管道详细信息”选项卡上复制“管道 ID”值。

resources:
  jobs:
    my-pipeline-job:
      name: my-pipeline-job
      tasks:
        - task_key: my-pipeline-task
          pipeline_task:
            pipeline_id: 11111111-1111-1111-1111-111111111111

Python 脚本任务

使用此任务运行 Python 文件。

以下密钥可用于 Python 脚本任务。 有关相应的 REST API 对象定义,请参阅 python_task

Key 类型 Description
parameters 序列 要传递给 Python 文件的参数。 使用任务参数变量设置包含作业运行信息的参数。
python_file String 必填。 要执行的 Python 文件的 URI,例如 /Users/someone@example.com/my-script.py。 对于存储在 Databricks 工作区中的 python 文件,路径必须是绝对路径,以 /开头。 对于存储在远程存储库中的文件,路径必须是相对路径。 此字段不支持动态值引用,如变量。
source String Python 文件的位置类型。 有效值为 WORKSPACEGIT。 当设置为 WORKSPACE 时,将从本地 Databricks 工作区中检索该文件。 设置为 GIT 时,将从 git_source 定义的 Git 存储库中检索文件。 如果值为空,任务将在定义GIT时使用GIT,否则使用WORKSPACE

例子

以下示例向作业添加 Python 脚本任务。 要部署的 Python 文件的路径是相对于声明了此任务的配置文件而言。 该任务从其在 Azure Databricks 工作区中的部署位置获取 Python 文件。

resources:
  jobs:
    my-python-script-job:
      name: my-python-script-job

      tasks:
        - task_key: my-python-script-task
          spark_python_task:
            python_file: ./my-script.py

Python wheel 任务

使用此任务运行 Python 滚轮。 请参阅 使用 Databricks 资产捆绑包生成 Python 滚轮文件

以下键可用于 Python 滚轮任务。 有关相应的 REST API 对象定义,请参阅 python_wheel_task

Key 类型 Description
entry_point String 必填。 要执行的命名入口点:函数或类。 如果包的元数据中不存在,系统会直接从包中执行该函数,使用$packageName.$entryPoint()
named_parameters 地图 要传递给 Python wheel 任务的命名参数(也称为 关键字参数)。 命名参数是包含字符串键和字符串值的键值对。 parametersnamed_parameters 不能同时指定。 如果指定了 named_parameters,则会将 parameters 作为关键字参数传递给入口点函数。
package_name String 必填。 要执行的 Python 包的名称。 所有依赖项都必须安装在环境中。 这不会检查或安装任何包依赖项。
parameters 序列 要传递给 Python Wheel 任务的参数,也称为 位置参数。 每个参数都是一个字符串。 如果指定, named_parameters 则不得指定。

例子

以下示例向作业添加 Python wheel 任务。 要部署的 Python wheel 文件的路径是相对于声明了此任务的配置文件而言。 请参阅 Databricks 资产捆绑包的库依赖项

resources:
  jobs:
    my-python-wheel-job:
      name: my-python-wheel-job
      tasks:
        - task_key: my-python-wheel-task
          python_wheel_task:
            entry_point: run
            package_name: my_package
          libraries:
            - whl: ./my_package/dist/my_package-*.whl

运行作业任务

使用此任务运行另一个作业。

以下密钥可用于 运行作业任务。 有关相应的 REST API 对象定义,请参阅 run_job_task

Key 类型 Description
job_id 整数 必填。 要运行的作业的 ID。 作业必须已存在于工作区中。
job_parameters 地图 要传递给所运行的作业的作业级别参数。 这些参数可在作业任务中访问。
pipeline_params 地图 管道任务的参数。 仅当运行的作业包含管道任务时才使用。 可以包括 full_refresh 以触发管道的完全刷新。

例子

以下示例在第二个作业中包含了运行第一个作业的运行作业任务。

此示例使用替换来检索要运行的作业的 ID。 若要从 UI 获取作业的 ID,请在工作区中打开作业,并从作业“设置”页的“作业详细信息”选项卡中的“作业 ID”值中复制该 ID

resources:
  jobs:
    my-first-job:
      name: my-first-job
      tasks:
        - task_key: my-first-job-task
          new_cluster:
            spark_version: '13.3.x-scala2.12'
            node_type_id: 'i3.xlarge'
            num_workers: 2
          notebook_task:
            notebook_path: ./src/test.py
    my_second_job:
      name: my-second-job
      tasks:
        - task_key: my-second-job-task
          run_job_task:
            job_id: ${resources.jobs.my-first-job.id}

SQL 任务

使用此任务运行 SQL 文件、查询或警报。

以下密钥可用于 SQL 任务。 有关相应的 REST API 对象定义,请参阅 sql_task

Key 类型 Description
alert 地图 用于运行 SQL 警报的配置。 包含:
  • alert_id (字符串):必需。 要运行的 SQL 警报的规范标识符。
  • pause_subscriptions (布尔值):是否暂停警报订阅服务。
  • subscriptions (Sequence): 订阅配置列表。
dashboard 地图 用于刷新 SQL 仪表板的配置。 包含:
  • dashboard_id (字符串):必需。 要刷新的 SQL 仪表板的唯一规范标识符。
  • custom_subject (字符串):发送给仪表板订阅者的电子邮件的自定义主题。
  • pause_subscriptions (布尔):是否暂停仪表板订阅。
  • subscriptions (Sequence): 订阅配置列表。
file 地图 用于运行 SQL 文件的配置。 包含:
  • path (字符串):必需。 工作区或远程存储库中 SQL 文件的路径。 对于存储在 Databricks 工作区中的文件,路径必须是绝对路径,以斜杠开头。 对于存储在远程存储库中的文件,路径必须是相对路径。
  • source (字符串):SQL 文件的位置类型。 有效值为 WORKSPACEGIT
parameters 地图 此任务每次运行所需的参数。 SQL 查询和文件可以通过通过语法 {{parameter_key}}引用这些参数来使用这些参数。 使用任务参数变量设置包含作业运行信息的参数。
query 地图 用于运行 SQL 查询的配置。 包含:
  • query_id (字符串):必需。 要运行的 SQL 查询的规范标识符。
warehouse_id String 必填。 用于运行 SQL 任务的 SQL 仓库的 ID。 SQL 仓库必须已存在。

例子

提示

若要获取 SQL 仓库的 ID,请打开 SQL 仓库的设置页,然后复制“概述”选项卡上“名称”字段中仓库名称后面括号中的 ID

以下示例向作业添加 SQL 文件任务。 此 SQL 文件任务使用指定的 SQL 仓库来运行指定的 SQL 文件。

resources:
  jobs:
    my-sql-file-job:
      name: my-sql-file-job
      tasks:
        - task_key: my-sql-file-task
          sql_task:
            file:
              path: /Users/someone@example.com/hello-world.sql
              source: WORKSPACE
            warehouse_id: 1a111111a1111aa1

以下示例将 SQL 警报任务添加到作业。 此 SQL 警报任务使用指定的 SQL 仓库刷新指定的 SQL 警报。

resources:
  jobs:
    my-sql-file-job:
      name: my-sql-alert-job
      tasks:
        - task_key: my-sql-alert-task
          sql_task:
            warehouse_id: 1a111111a1111aa1
            alert:
              alert_id: 11111111-1111-1111-1111-111111111111

以下示例将 SQL 查询任务添加到作业。 此 SQL 查询任务使用指定的 SQL 仓库来运行指定的 SQL 查询。

resources:
  jobs:
    my-sql-query-job:
      name: my-sql-query-job
      tasks:
        - task_key: my-sql-query-task
          sql_task:
            warehouse_id: 1a111111a1111aa1
            query:
              query_id: 11111111-1111-1111-1111-111111111111

其他任务设置

以下任务设置允许为所有任务配置行为。 有关相应的 REST API 对象定义,请参阅 任务

Key 类型 Description
compute_key String 用于此任务的计算资源的关键。 如果指定了new_cluster,则不能指定existing_cluster_idjob_cluster_key
depends_on 序列 任务依赖项的可选列表。 每个项都包含:
  • task_key (字符串):必需。 此任务依赖的关键。
  • outcome(字符串):只能指定给 condition_task。 如果指定,依赖任务只有在条件评估为指定结果(truefalse)时才会运行。
description String 任务的可选说明。
disable_auto_optimization 布尔 是否为此任务禁用自动优化。 如果为 true,将禁用自适应查询执行等自动优化。
email_notifications 地图 一组可选的电子邮件地址,用于在运行开始、完成或失败时通知。 每个项都包含:
  • on_start (序列):运行启动时要通知的电子邮件地址列表。
  • on_success (序列):运行成功完成时要通知的电子邮件地址列表。
  • on_failure (序列):运行失败时要通知的电子邮件地址列表。
  • on_duration_warning_threshold_exceeded (序列):运行持续时间超过阈值时要通知的电子邮件地址列表。
  • on_streaming_backlog_suceeded (序列):当任何流的积压阈值被超过时,需要通知的电子邮件地址列表。
environment_key String 定义在作业 environments 配置中的环境键。 用于指定环境特定的设置。 使用无服务器计算时,Python 脚本、Python 滚轮和 dbt 任务需要此字段。
existing_cluster_id String 此任务将使用已存在的群集的 ID 进行所有运行。
health 地图 此任务的健康监测的可选规范说明,其中包括一个 rules 键,这是要评估的健康规则列表。
job_cluster_key String 作业配置中定义的作业群集的 job_clusters 密钥。
libraries 序列 要安装在将执行任务的群集上的库的可选列表。 每个库都指定为具有键(如jareggwhlpypimavencranrequirements)的映射。
max_retries 整数 重试任务时的可选最大次数,如果任务失败。 如果未指定,则不会重试任务。
min_retry_interval_millis 整数 在失败运行开始和后续重试运行之间的可选最小间隔(以毫秒为单位)。 如果未指定,则默认值为 0(立即重试)。
new_cluster 地图 为每次运行此任务创建的新群集的规范。 请参阅 群集
notification_settings 地图 此任务的可选通知设置。 每个项都包含:
  • no_alert_for_skipped_runs (布尔):如果为 true,则不发送跳过运行的通知。
  • no_alert_for_canceled_runs (布尔):如果为 true,则不发送已取消运行的通知。
  • alert_on_last_attempt (布尔):如果为 true,则仅在上次重试尝试时发送通知。
retry_on_timeout 布尔 一个可选策略,用于指定在任务超时时是否重试任务。如果未指定,则默认为 false。
run_if String 一个可选值,该值指示任务应运行的条件。 有效值为:
  • ALL_SUCCESS (默认值):如果所有依赖项都成功,则运行。
  • AT_LEAST_ONE_SUCCESS:如果至少一个依赖项成功,则运行。
  • NONE_FAILED:在依赖项未失败的情况下运行。
  • ALL_DONE:在所有依赖项完成时运行,而不考虑结果。
  • AT_LEAST_ONE_FAILED:如果至少一个依赖项失败,则运行。
  • ALL_FAILED:如果所有依赖项都失败,则运行。
task_key String 必填。 任务的唯一名称。 此字段用于从其他任务中使用 depends_on 字段引用此任务。
timeout_seconds 整数 每次运行此任务时可选的超时设置。 值为 0 表示无超时。 如果未设置,将使用群集配置的默认超时。
webhook_notifications 地图 一组可选的系统目标,用于在运行开始、完成或失败时通知。 每个项都包含:
  • on_start (序列):运行启动时通知目标的列表。
  • on_success (序列):运行完成后通知目标的列表。
  • on_failure (序列):运行失败时的通知目标列表。
  • on_duration_warning_threshold_exceeded (序列):运行持续时间超过阈值时的通知目标列表。
  • on_streaming_backlog_suceeded (序列):当任何流的积压阈值被超过时,需要通知的电子邮件地址列表。