共用方式為

在工作流中运行 Lakeflow 声明性管道

可以使用 Lakeflow 作业、Apache Airflow 或 Azure 数据工厂将 Lakeflow 声明性管道作为数据处理工作流的一部分运行。

职位

可在 Databricks 作业中协调多个任务以实现数据处理工作流。 若要在作业中包含管道,请在创建作业时使用 管道 任务。 请参阅 作业的管道任务

Apache Airflow

Apache Airflow是一种用于管理和计划数据工作流的开源解决方案。 Airflow 将工作流表示为操作的有向无环图 (DAG)。 在 Python 文件中定义工作流,Airflow 管理计划并执行。 有关在 Azure Databricks 上安装和使用 Airflow 的信息,请参阅 使用 Apache Airflow 编排 Lakeflow 作业

若要在 Airflow 工作流中运行管道,请使用 DatabricksSubmitRunOperator

要求

要使用 Lakeflow 声明式管道的 Airflow 支持,需满足以下要求:

示例:

以下示例创建一个 Airflow DAG,使用标识符 8279d543-063c-4d63-9926-dae38e35ce8b来触发对管道的更新。

from airflow import DAG
from airflow.providers.databricks.operators.databricks import DatabricksSubmitRunOperator
from airflow.utils.dates import days_ago

default_args = {
  'owner': 'airflow'
}

with DAG('dlt',
         start_date=days_ago(2),
         schedule_interval="@once",
         default_args=default_args
         ) as dag:

  opr_run_now=DatabricksSubmitRunOperator(
    task_id='run_now',
    databricks_conn_id='CONNECTION_ID',
    pipeline_task={"pipeline_id": "8279d543-063c-4d63-9926-dae38e35ce8b"}
  )

CONNECTION_ID 替换为与你的工作区的 Airflow 连接 的标识符。

将此示例保存在 airflow/dags 目录中,并使用 Airflow UI 来查看和触发 DAG。 使用 Lakeflow 声明性管道 UI 查看管道更新的详细信息。

Azure 数据工厂

注释

Lakeflow 声明性管道和 Azure 数据工厂都包含用于配置发生故障时重试次数的选项。 如果在管道以及调用该管道的 Azure 数据工厂活动中配置了重试值,那么重试次数是 Azure 数据工厂重试值与 Lakeflow 声明性管道重试值的乘积。

例如,如果管道更新失败,则 Lakeflow 声明性管道默认重试更新最多五次。 如果 Azure 数据工厂重试设置为 3 次,并且管道使用默认的 5 次重试,则失败的管道最多可以重试 15 次。 为避免管道更新失败时重试次数过多,Databricks 建议在配置管道或调用管道的 Azure 数据工厂活动时限制重试次数。

若要更改管道的重试配置,请在配置管道时使用 pipelines.numUpdateRetryAttempts 设置。

Azure 数据工厂是一项基于云的 ETL 服务,可用于协调数据集成和转换工作流。 Azure 数据工厂直接支持在工作流中运行 Azure Databricks 任务,包括笔记本、JAR 任务和 Python 脚本。 还可以通过从 Azure 数据工厂 Web 活动调用 Lakeflow 声明性管道 API,将管道包含在工作流中。 例如,从 Azure 数据工厂触发管道更新:

  1. 创建数据工厂或打开现有数据工厂。

  2. 创建完成后,打开数据工厂的页面,然后单击“打开 Azure 数据工厂工作室”磁贴。 Azure 数据工厂用户界面随即显示。

  3. 在 Azure 数据工厂工作室用户界面的“新建”下拉菜单中选择“管道”,以创建一个新的 Azure 数据工厂管道。

  4. 在“活动”工具箱中,展开“常规”并将“Web”活动拖到管道画布上。 单击“设置”选项卡,输入以下值:

    注释

    作为安全最佳做法,在使用自动化工具、系统、脚本和应用进行身份验证时,Databricks 建议使用属于服务主体(而不是工作区用户)的个人访问令牌。 若要为服务主体创建令牌,请参阅管理服务主体的令牌

    • URLhttps://<databricks-instance>/api/2.0/pipelines/<pipeline-id>/updates

      替换 <get-workspace-instance>

      <pipeline-id> 替换为管道标识符。

    • 方法:从下拉菜单中选择“POST”

    • 标头:单击“+ 新建”。 在“名称”文本框中,输入 Authorization 在“值”文本框中,输入 Bearer <personal-access-token>

      使用 Azure Databricks <personal-access-token>替换

    • 正文:若要传递其他请求参数,请输入包含参数的 JSON 文档。 例如,启动更新并重新处理管道的所有数据:{"full_refresh": "true"}。 如果没有其他请求参数,请输入空括号 ({})。

若要测试 Web 活动,请单击数据工厂 UI 中管道工具栏上的“调试”。 运行的输出和状态(包括错误)显示在 Azure 数据工厂管道的“输出”选项卡中。 使用 Lakeflow 声明性管道 UI 查看管道更新的详细信息。

小窍门

常见的工作流要求是在上一任务完成后启动任务。 由于 Lakeflow 声明性管道 updates 请求是异步的,因此请求在启动更新后返回,但在更新完成之前,Azure 数据工厂管道中的任务必须等待更新完成。 等待更新完成的一个选项是在启动 Lakeflow 声明性管道更新的 Web 活动之后添加 Until 活动。 在 Until 活动中:

  1. 添加 Wait 活动,等待配置的秒数以完成更新。
  2. 在 Wait 活动之后添加一个 Web 活动,该活动使用 Lakeflow 声明性管道更新详细信息请求来获取更新的状态。 响应中的 state 字段返回更新的当前状态,包括它是否已完成。
  3. 使用 state 字段的值来设置 Until 活动的终止条件。 你也可以使用 Set Variable 活动,根据 state 值添加管道变量,并将此变量用于终止条件。