在工作流中运行 Lakeflow 声明性管道

可以使用 Lakeflow 作业、Apache Airflow 或 Azure 数据工厂将 Lakeflow 声明性管道作为数据处理工作流的一部分运行。

Jobs

可以协调 Databricks 作业中的多个任务来实现数据处理工作流。 若要在作业中包含管道,请在创建作业时使用 管道 任务。 请参阅作业的管道任务

Apache Airflow

Apache Airflow 是一种开源解决方案,用于管理和计划数据工作流。 Airflow 将工作流表示为操作的有向无环图(DAG)。 在 Python 文件中定义工作流,Airflow 管理计划和执行。 有关通过 Azure Databricks 安装和使用 Airflow 的信息,请参阅 使用 Apache Airflow 协调 Lakeflow 作业

若要在 Airflow 工作流中运行管道,请使用 DatabricksSubmitRunOperator

要求

要使用 Airflow 对 Lakeflow 声明性管道的支持,以下条件是必需的:

Example

以下示例创建一个 Airflow DAG,该 DAG 会使用标识符 8279d543-063c-4d63-9926-dae38e35ce8b来触发管道的更新:

from airflow import DAG
from airflow.providers.databricks.operators.databricks import DatabricksSubmitRunOperator
from airflow.utils.dates import days_ago

default_args = {
  'owner': 'airflow'
}

with DAG('ldp',
         start_date=days_ago(2),
         schedule_interval="@once",
         default_args=default_args
         ) as dag:

  opr_run_now=DatabricksSubmitRunOperator(
    task_id='run_now',
    databricks_conn_id='CONNECTION_ID',
    pipeline_task={"pipeline_id": "8279d543-063c-4d63-9926-dae38e35ce8b"}
  )

CONNECTION_ID替换为你的工作区的Airflow 连接标识符。

airflow/dags 此示例保存在目录中,并使用 Airflow UI 查看和触发 DAG。 使用 Lakeflow 声明性管道 UI 查看管道更新的详细信息。

Azure 数据工厂

注释

Lakeflow 声明性管道和 Azure 数据工厂都包含用于配置发生故障时重试次数的选项。 在您的管道 以及 调用该管道的 Azure 数据工厂活动上配置了重试值时,重试次数是 Azure 数据工厂重试值与 Lakeflow 声明性管道重试值的乘积。

例如,如果管道更新失败,则 Lakeflow 声明性管道默认重试更新最多五次。 如果 Azure 数据工厂重试设置为 3 次,并且管道使用默认的 5 次重试,则失败的管道最多可以重试 15 次。 为避免管道更新失败时重试次数过多,Databricks 建议在配置管道或调用管道的 Azure 数据工厂活动时限制重试次数。

若要更改管道的重试配置,请在配置管道时使用 pipelines.numUpdateRetryAttempts 设置。

Azure 数据工厂是一项基于云的 ETL 服务,可用于协调数据集成和转换工作流。 Azure 数据工厂直接支持在工作流中运行 Azure Databricks 任务,包括 笔记本、JAR 任务和 Python 脚本。 还可以通过从 Azure 数据工厂 Web 活动调用 Lakeflow 声明性管道 API,将管道包含在工作流中。 例如,若要从 Azure 数据工厂触发管道更新,请执行以下作:

  1. 创建数据工厂 或打开现有数据工厂。

  2. 创建完成后,打开数据工厂的页面,然后单击“ 打开 Azure 数据工厂工作室 ”磁贴。 此时会显示 Azure 数据工厂用户界面。

  3. 通过从 Azure 数据工厂工作室用户界面的“新建”下拉菜单中选择“管道”创建新的 Azure 数据工厂管道。

  4. 活动工具箱中,展开General,然后将Web活动拖动到管道画布上。 单击 “设置” 选项卡并输入以下值:

    注释

    作为安全最佳做法,使用自动化工具、系统、脚本和应用进行身份验证时,Databricks 建议使用属于 服务主体 的个人访问令牌,而不是工作区用户。 若要为服务主体创建令牌,请参阅 管理服务主体的令牌

    • URL:

      替换 <get-workspace-instance>

      <pipeline-id> 替换为管道标识符。

    • 方法:从下拉菜单中选择 POST

    • 标头:单击 “+ 新建”。 在“名称”文本框中,输入 Authorization“值 ”文本框中,输入 Bearer <personal-access-token>

      <personal-access-token> 替换为 Azure Databricks 个人访问令牌

    • 正文:若要传递其他请求参数,请输入包含参数的 JSON 文档。 例如,若要启动更新并重新处理管道的所有数据: {"full_refresh": "true"}. 如果没有其他请求参数,请输入空大括号({})。

若要测试 Web 活动,请单击数据工厂 UI 中的管道工具栏上的 “调试 ”。 运行(包括错误)的输出和状态显示在 Azure 数据工厂管道的“ 输出 ”选项卡中。 使用 Lakeflow 声明性管道 UI 查看管道更新的详细信息。

小窍门

常见的工作流要求是在完成上一个任务后启动任务。 由于 Lakeflow 声明性管道 updates 请求是异步的,因此请求在启动更新后返回,但在更新完成之前,Azure 数据工厂管道中的任务必须等待更新完成。 等待更新完成的其中一个选项是,在触发 Lakeflow 声明性管道更新的 Web 活动之后,添加Until 活动。 在“直到”步骤中

  1. 添加 等待活动 ,等待配置的秒数以完成更新。
  2. 在 Wait 活动之后添加一个 Web 活动,该活动使用 Lakeflow 声明性管道更新详细信息请求来获取更新的状态。 state响应中的字段返回更新的当前状态,包括是否已完成。
  3. 使用 state 字段的值设置 Until 活动的终止条件。 还可以使用“设置变量”活动根据state值添加管道变量,并将此变量作为终止条件。