使用 Apache Airflow 协调 Azure Databricks 作业

本文介绍了 Apache Airflow 对使用 Azure Databricks 协调数据管道的支持,提供了在本地安装和配置 Airflow 的说明,并给出使用 Airflow 部署和运行 Azure Databricks 工作流的示例。

数据管道中的作业业务流程

数据处理管道的开发和部署通常需要管理任务间复杂的依赖关系。 例如,管道可能会从源读取数据、清理数据、转换已清理的数据,以及将转换后的数据写入目标。 将管道操作化时,还需要为测试、计划和排查错误提供支持。

工作流系统可让你定义任务间的依赖关系、安排管道运行时间,以及监视工作流,从而解决这些难题。 Apache Airflow 是一种用于管理和计划数据管道的开源解决方案。 Airflow 将数据管道表示为操作的有向无环图 (DAG)。 在 Python 文件中定义工作流,Airflow 将管理计划和执行。 可通过 Airflow Azure Databricks 连接将 Azure Databricks 提供的已优化的 Spark 引擎与 Airflow 的计划功能配合使用。

要求

  • Airflow 与 Azure Databricks 之间的集成需要 Airflow 2.5.0 及更高版本。 本文中的示例已使用 Airflow 版本 2.6.1 进行测试。
  • Airflow 需要 Python 3.8、3.9、3.10 或 3.11。 本文中的示例使用 Python 3.8 进行测试。
  • 本文中有关安装和运行 Airflow 的说明需要使用 pipenv 来创建 Python 虚拟环境

适用于 Databricks 的 Airflow 运算符

Airflow DAG 由各种任务组成,其中每个任务运行一个 Airflow 运算符。 支持与 Databricks 集成的 Airflow 运算符在 Databricks 提供程序中实现。

Databricks 提供程序包含针对 Azure Databricks 工作区运行多个任务的运算符,这些任务包括将数据导入表中运行 SQL 查询,以及处理 Databricks Git 文件夹

Databricks 提供程序实现两个运算符来触发作业:

若要创建新的 Azure Databricks 作业或重置现有作业,Databricks 提供程序会实现 DatabricksCreateJobsOperatorDatabricksCreateJobsOperator 使用 POST /api/2.1/jobs/createPOST /api/2.1/jobs/reset API 请求。 可以将 DatabricksCreateJobsOperatorDatabricksRunNowOperator 配合使用来创建和运行作业。

注意

使用 Databricks 运算符触发作业需要在 Databricks 连接配置中提供凭据。 请参阅创建适用于 Airflow 的 Azure Databricks 个人访问令牌

Databricks Airflow 运算符每隔 polling_period_seconds(默认值为 30 秒)将作业运行页面 URL 写入 Airflow 日志。 有关详细信息,请参阅 Airflow 网站上 apache-airflow-providers-databricks 包页面。

在本地安装 Airflow Azure Databricks 集成

若要在本地安装 Airflow 和 Databricks 提供程序进行测试和开发,请使用以下步骤。 有关其他 Airflow 安装选项,包括创建生产安装,请参阅 Airflow 文档中的安装

打开终端并运行以下命令:

mkdir airflow
cd airflow
pipenv --python 3.8
pipenv shell
export AIRFLOW_HOME=$(pwd)
pipenv install apache-airflow
pipenv install apache-airflow-providers-databricks
mkdir dags
airflow db init
airflow users create --username admin --firstname <firstname> --lastname <lastname> --role Admin --email <email>

<firstname><lastname><email> 替换为你的用户名和电子邮件地址。 系统将提示你输入管理员用户的密码。 请务必保存此密码,因为在登录到 Airflow UI 时需要用到。

此脚本执行以下步骤:

  1. 创建名为 airflow 的目录,并切换到该目录。
  2. 使用 pipenv 创建并生成 Python 虚拟环境。 Databricks 建议使用 Python 虚拟环境将包版本和代码依赖项隔离到该环境。 此隔离操作有助于减少意外的包版本不匹配和代码依赖项冲突的情况。
  3. 初始化名为 AIRFLOW_HOME 的、设置为 airflow 目录路径的环境变量。
  4. 安装 Airflow 和 Airflow Databricks 提供程序包。
  5. 创建 airflow/dags 目录。 Airflow 使用 dags 目录来存储 DAG 定义。
  6. 初始化一个由 Airflow 用来跟踪元数据的 SQLite 数据库。 在生产型 Airflow 部署中,可以为 Airflow 配置一个标准数据库。 在 airflow 目录中初始化用于 Airflow 部署的 SQLite 数据库和默认配置。
  7. 为 Airflow 创建管理员用户。

提示

若要确认 Databricks 提供程序的安装,请在 Airflow 安装目录中运行以下命令:

airflow providers list

启动 Airflow Web 服务器和计划程序

需要 Airflow Web 服务器才能查看 Airflow UI。 若要启动 Web 服务器,请在 Airflow 安装目录中打开终端并运行以下命令:

注意

如果 Airflow Web 服务器由于端口冲突而无法启动,你可以更改 Airflow 配置中的默认端口。

pipenv shell
export AIRFLOW_HOME=$(pwd)
airflow webserver

计划程序是安排 DAG 的 Airflow 组件。 若要启动计划程序,请在 Airflow 安装目录中打开新终端并运行以下命令:

pipenv shell
export AIRFLOW_HOME=$(pwd)
airflow scheduler

测试 Airflow 的安装

若要验证 Airflow 的安装,可运行 Airflow 中包含的示例 DAG 之一:

  1. 在浏览器窗口中,打开 http://localhost:8080/home。 使用安装 Airflow 时创建的用户名和密码登录到 Airflow UI。 此时会显示 Airflow“DAG”页。
  2. 单击“暂停/恢复 DAG”切换,取消暂停其中一个示例 DAG,例如 example_python_operator
  3. 单击“触发 DAG”按钮以触发示例 DAG。
  4. 单击 DAG 名称可查看详细信息,其中包括 DAG 的运行状态。

创建适用于 Airflow 的 Azure Databricks 个人访问令牌

Airflow 使用 Azure Databricks 个人访问令牌 (PAT) 连接到 Databricks。 创建 PAT:

  1. 在 Azure Databricks 工作区中,单击顶部栏中 Azure Databricks 用户名,然后从下拉列表中选择“设置”
  2. 单击“开发人员”。
  3. 在“访问令牌”旁边,单击“管理”。
  4. 单击“生成新令牌”。
  5. (可选)输入有助于将来识别此令牌的注释,并将令牌的默认生存期更改为 90 天。 若要创建没有生存期的令牌(不建议),请将“生存期(天)”框留空(保留空白)。
  6. 单击“生成” 。
  7. 将显示的令牌复制到安全位置,然后单击“完成”。

注意

请务必将复制的令牌保存到安全的位置。 请勿与他人共享复制的令牌。 如果丢失了复制的令牌,你将无法重新生成完全相同的令牌, 而必须重复此过程来创建新令牌。 如果丢失了复制的令牌,或者认为令牌已泄露,Databricks 强烈建议通过单击“访问令牌”页上令牌旁边的垃圾桶(撤销)图标立即从工作区中删除该令牌。

如果你无法在工作区中创建或使用令牌,可能是因为工作区管理员已禁用令牌或未授予你创建或使用令牌的权限。 请与工作区管理员联系,或参阅以下内容:

注意

作为安全最佳做法,在使用自动化工具、系统、脚本和应用进行身份验证时,Databricks 建议使用属于服务主体(而不是工作区用户)的个人访问令牌。 若要为服务主体创建令牌,请参阅管理服务主体的令牌

还可以使用 Microsoft Entra ID 令牌向 Azure Databricks 进行身份验证。 请参阅 Airflow 文档中的 Databricks 连接

配置 Azure Databricks 连接

Airflow 安装包含了 Azure Databricks 的默认连接。 若要使用前面创建的个人访问令牌更新连接,以连接到工作区,请执行以下操作:

  1. 在浏览器窗口中,打开 http://localhost:8080/connection/list/。 如果系统提示你登录,请输入管理员用户名和密码。
  2. 在“Conn ID”下,找到“databricks_default”,然后单击“编辑记录”按钮
  3. 将“主机”字段中的值替换为 Azure Databricks 部署的工作区实例名称,例如 https://adb-123456789.cloud.databricks.com
  4. 在“密码”字段中,输入你的 Azure Databricks 个人访问令牌。
  5. 单击“ 保存”。

如果你正在使用 Microsoft Entra ID 令牌,请参阅 Airflow 文档中的 Databricks 连接,以了解有关配置身份验证的信息。

示例:创建 Airflow DAG 以运行 Azure Databricks 作业

以下示例演示如何创建在本地计算机上运行的简单 Airflow 部署,以及在 Azure Databricks 中部署示例 DAG 来触发运行。 在此示例中,你将:

  1. 创建新笔记本并添加代码,以根据配置的参数输出问候语。
  2. 创建 Azure Databricks 作业,其中包含一个运行笔记本的任务。
  3. 配置与 Azure Databricks 工作区的 Airflow 连接。
  4. 创建 Airflow DAG 以触发笔记本作业。 使用 DatabricksRunNowOperator 在 Python 脚本中定义 DAG。
  5. 使用 Airflow UI 触发 DAG 并查看运行状态。

创建笔记本

此示例使用包含两个单元格的笔记本:

  • 第一个单元格包含 Databricks 实用工具文本小组件,并将一个名为 greeting 集的变量定义为默认值 world
  • 第二个单元格显示以 hello 为前缀的 greeting 变量的值。

若要创建笔记本,则:

  1. 转到 Azure Databricks 登陆页,并在边栏中单击新建图标“新建”,然后选择“笔记本”。

  2. 为笔记本命名,例如 Hello Airflow,并确保默认语言设置为 Python

  3. 复制以下 Python 代码并将其粘贴到笔记本的第一个单元格中。

    dbutils.widgets.text("greeting", "world", "Greeting")
    greeting = dbutils.widgets.get("greeting")
    
  4. 将新单元格添加到第一个单元格下方,将以下 Python 代码复制并粘贴到新单元格中:

    print("hello {}".format(greeting))
    

创建作业

  1. 在边栏中,单击 工作流图标“工作流”

  2. 单击 “创建作业”按钮

    “任务”选项卡与“创建任务”对话框一起显示。

    创建第一个任务对话框

  3. 将“为作业添加名称…”替换为你的作业名称。

  4. 在“任务名称”字段中,为任务输入名称,例如“greeting-task” 。

  5. 在“类型”下拉菜单中选择“笔记本”

  6. 在“”下拉菜单中,选择“工作区”。

  7. 单击“路径”文本框,使用文件浏览器查找你创建的笔记本,单击笔记本名称,然后单击“确认”。

  8. 在“参数”下单击“添加” 。 在“键”字段中,选择 greeting。 在“值”字段中,输入 Airflow user

  9. 单击“创建任务”。

在“作业详细信息”面板中,复制“作业 ID”值。 需要此值才能从 Airflow 触发作业。

运行作业

若要在 Azure Databricks 作业 UI 中测试你的新作业,请点击右上角的 立即运行”按钮。 运行完成后,可以通过查看作业运行详细信息来验证输出。

创建新的 Airflow DAG

定义 Python 文件中的 Airflow DAG。 若要创建 DAG 来触发示例笔记本作业,请执行以下操作:

  1. 在文本编辑器或 IDE 中,使用以下内容创建名为 databricks_dag.py 的新文件:

    from airflow import DAG
    from airflow.providers.databricks.operators.databricks import DatabricksRunNowOperator
    from airflow.utils.dates import days_ago
    
    default_args = {
      'owner': 'airflow'
    }
    
    with DAG('databricks_dag',
      start_date = days_ago(2),
      schedule_interval = None,
      default_args = default_args
      ) as dag:
    
      opr_run_now = DatabricksRunNowOperator(
        task_id = 'run_now',
        databricks_conn_id = 'databricks_default',
        job_id = JOB_ID
      )
    

    JOB_ID 替换为之前保存的作业 ID 的值。

  2. 将文件保存到 airflow/dags 目录中。 Airflow 可自动读取和安装存储在 airflow/dags/ 中的 DAG 文件。

在 Airflow 中安装 DAG 并进行验证

若要触发并验证 Airflow UI 中的 DAG,请执行以下操作:

  1. 在浏览器窗口中,打开 http://localhost:8080/home。 此时会显示 Airflow DAG 的屏幕。
  2. 找到 databricks_dag,然后单击“暂停/恢复 DAG”切换以恢复 DAG。
  3. 单击“触发 DAG”按钮以触发 DAG。
  4. 单击“运行”列中的“运行”,查看运行状态和详细信息。