管理数据管道中的依赖项Managing dependencies in data pipelines

数据管道中通常存在复杂的依赖关系。Often there are complex dependencies in your data pipelines. 可以通过工作流系统描述此类依赖关系,并在管道运行时进行计划。Workflow systems allow you to describe such dependencies and schedule when pipelines run.

Azure 数据工厂Azure Data Factory

Azure 数据工厂是一项云数据集成服务,可用于将数据存储、移动和处理服务组合到自动化数据管道中。Azure Data Factory is a cloud data integration service that lets you compose data storage, movement, and processing services into automated data pipelines. 可以在 Azure 数据工厂数据管道中使 Databricks 笔记本可操作。You can operationalize Databricks notebooks in Azure Data Factory data pipelines. 请参阅在 Azure 数据工厂中使用 Databricks 笔记本活动运行 Databricks 笔记本,了解如何创建在 Azure Databricks 群集中运行 Databricks 笔记本的 Azure 数据工厂管道,然后通过运行 Databricks 笔记本来转换数据See Run a Databricks notebook with the Databricks notebook activity in Azure Data Factory for instructions on how to create an Azure Data Factory pipeline that runs a Databricks notebook in an Azure Databricks cluster, followed by Transform data by running a Databricks notebook.

Apache Airflow Apache Airflow

Apache Airflow 是一种用于管理和计划数据管道的解决方案。Apache Airflow is a solution for managing and scheduling data pipelines. Airflow 将数据管道表示为操作的有向无环图 (DAG),其中,一条边表示操作之间的一个逻辑依赖关系。Airflow represents data pipelines as directed acyclic graphs (DAGs) of operations, where an edge represents a logical dependency between operations.

Airflow 提供 Azure Databricks 与 Airflow 之间的紧密集成。Airflow provides tight integration between Azure Databricks and Airflow. 可以通过 Airflow Azure Databricks 集成将 Azure Databricks 提供的已优化的 Spark 引擎与 Airflow 的计划功能配合使用。The Airflow Azure Databricks integration lets you take advantage of the the optimized Spark engine offered by Azure Databricks with the scheduling features of Airflow.

安装 Airflow Azure Databricks 集成Install the Airflow Azure Databricks integration

Airflow 与 Azure Databricks 的集成在 Airflow 1.9.0 版中提供。The integration between Airflow and Azure Databricks is available in Airflow version 1.9.0. 若要安装 Airflow Azure Databricks 集成,请运行:To install the Airflow Azure Databricks integration, run:

pip install "apache-airflow[databricks]"

若要安装额外项(例如 celerypassword),请运行:To install extras (for example celery and password), run:

pip install "apache-airflow[databricks, celery, password]"

DatabricksRunNowOperator 运算符DatabricksRunNowOperator operator

Airflow Azure Databricks 集成提供 DatabricksRunNowOperator 作为计算的 DAG 中的一个节点。The Airflow Azure Databricks integration provides DatabricksRunNowOperator as a node in your DAG of computations. 此运算符与 Databricks 作业立即运行 API 终结点匹配,允许你以编程方式运行上传到 DBFS 的笔记本和 JAR。This operator matches the Databricks jobs Run now API endpoint and allows you to programmatically run notebooks and JARs uploaded to DBFS.

DatabricksSubmitRunOperator 运算符DatabricksSubmitRunOperator operator

Airflow Azure Databricks 集成提供 DatabricksSubmitRunOperator 作为计算的 DAG 中的一个节点。The Airflow Azure Databricks integration provides DatabricksSubmitRunOperator as a node in your DAG of computations. 此运算符与 Databricks 作业提交运行 API 终结点匹配,允许你以编程方式运行上传到 DBFS 的笔记本和 JAR。This operator matches the Databricks jobs Runs submit API endpoint and allows you to programmatically run notebooks and JARs uploaded to DBFS.

配置 Databricks 连接 Configure a Databricks connection

若要使用 DatabricksSubmitRunOperator,必须在相应的 Airflow 连接中提供凭据。To use DatabricksSubmitRunOperator you must provide credentials in the appropriate Airflow connection. 默认情况下,如果未为 DatabricksSubmitRunOperator 指定 databricks_conn_id 参数,该运算符会尝试在 ID 为 databricks_default 的连接中查找凭据。By default, if you do not specify the databricks_conn_id parameter to DatabricksSubmitRunOperator, the operator tries to find credentials in the connection with the ID equal to databricks_default.

可以按照管理连接中的说明通过 Airflow Web UI 配置 Airflow 连接。You can configure Airflow connections through the Airflow web UI as instructed in Managing Connections. 对于 Databricks 连接,请将“主机”字段设置为 Databricks 部署的主机名,将“登录名”字段设置为 token,将“密码”字段设置为 Databricks 生成的个人访问令牌,并将“额外项”字段设置为For the Databricks connection, set the Host field to the hostname of your Databricks deployment, the Login field to token, the Password field to a Databricks-generated personal access token, and the Extra field to

{"token": "<your personal access token>"}

示例Example

在此示例中,我们演示如何设置一个在本地计算机上运行的简单的 Airflow 部署,并部署一个已命名的示例 DAG,用于在 Databricks 中触发运行。In this example, we show how to set up a simple Airflow deployment that runs on your local machine and deploys an example DAG named that triggers runs in Databricks.

初始化 Airflow 数据库Initialize Airflow database

初始化可供 Airflow 用来跟踪其他元数据的 SQLite 数据库。Initialize the SQLite database that Airflow uses to track miscellaneous metadata. 在生产型 Airflow 部署中,可以为 Airflow 配置一个标准数据库。In a production Airflow deployment, you would configure Airflow with a standard database. 若要执行初始化运行,请执行以下语句:To perform the initialization run:

airflow initdb

适用于 Airflow 部署的 SQLite 数据库和默认配置在 ~/airflow 中初始化。The SQLite database and default configuration for your Airflow deployment are initialized in ~/airflow.

DAG 定义DAG definition

DAG 定义是一个 Python 文件,在此示例中命名为 example_databricks_operator.pyA DAG definition is a Python file and in this example is named example_databricks_operator.py. 此示例运行两个具有一种线性依赖关系的 Databricks 作业。The example runs two Databricks jobs with one linear dependency. 第一个 Databricks 作业触发位于 /Users/airflow@example.com/PrepareData 的笔记本,第二个将运行位于 dbfs:/lib/etl-0.1.jar 的 JAR。The first Databricks job triggers a notebook located at /Users/airflow@example.com/PrepareData and the second runs a JAR located at dbfs:/lib/etl-0.1.jar. 示例 DAG 定义构造两个 DatabricksSubmitRunOperator 任务,然后通过 set_dowstream 方法在结尾处设置依赖关系。The example DAG definition constructs two DatabricksSubmitRunOperator tasks and then sets the dependency at the end with the set_dowstream method. 代码的主干版本如下所示:A skeleton version of the code looks something like:

notebook_task = DatabricksSubmitRunOperator(
    task_id='notebook_task',
    dag=dag,
    json=notebook_task_params)

spark_jar_task = DatabricksSubmitRunOperator(
    task_id='spark_jar_task',
    dag=dag,
    json=spark_jar_task_params)

notebook_task.set_downstream(spark_jar_task)
导入 Airflow 和所需的类Import Airflow and required classes

DAG 定义的顶部导入了 airflowDAGDatabricksSubmitRunOperatorThe top of a DAG definition imports airflow, DAG, and DatabricksSubmitRunOperator:

import airflow

from airflow import DAG
from airflow.contrib.operators.databricks_operator import DatabricksSubmitRunOperator
配置全局参数Configure global arguments

下一节设置应用于 DAG 中的每个任务的默认参数。The next section sets default arguments applied to each task in the DAG.

args = {
    'owner': 'airflow',
    'email': ['airflow@example.com'],
    'depends_on_past': False,
    'start_date': airflow.utils.dates.days_ago(0)
}

两个值得一提的参数是 depends_on_paststart_dateThe two interesting arguments are depends_on_past and start_date. depends_on_past 设置为 true 意味着,除非任务的上一个实例成功完成,否则不应触发该任务。Setting depends_on_past to true signals that a task should not be triggered unless the previous instance of the task completed successfully. start_date argument 决定了第一个任务实例计划在何时执行。The start_date argument determines when the first task instance will be scheduled.

实例化 DAGInstantiate the DAG

DAG 实例化语句为 DAG 提供唯一 ID,附加默认参数,并为其提供每日计划。The DAG instantiation statement gives the DAG a unique ID, attaches the default arguments, and gives it a daily schedule.

dag = DAG(dag_id='example_databricks_operator', default_args=args, schedule_interval='@daily')

下一语句指定要运行任务的群集中的 Spark 版本、节点类型和工作器数。The next statement specifies the Spark version, node type, and number of workers in the cluster that will run your tasks. 规范的架构与作业提交运行终结点的 new_cluster 字段匹配。The schema of the specification matches the new_cluster field of the job Runs Submit endpoint.

new_cluster = {
    'spark_version': '6.0.x-scala2.11',
    "node_type_id": "Standard_D3_v2",
    'num_workers': 8
}
在 DAG 中注册任务Register tasks in DAG

对于 notebook_task,请将 DatabricksSubmitRunOperator 实例化。For notebook_task, instantiate DatabricksSubmitRunOperator.

notebook_task_params = {
    'new_cluster': new_cluster,
    'notebook_task': {
    'notebook_path': '/Users/airflow@example.com/PrepareData',
  },
}
# Example of using the JSON parameter to initialize the operator.
notebook_task = DatabricksSubmitRunOperator(
  task_id='notebook_task',
  dag=dag,
  json=notebook_task_params)

在此代码段中,JSON 参数采用与 Runs Submit 终结点匹配的 Python 字典。In this piece of code, the JSON parameter takes a Python dictionary that matches the Runs Submit endpoint.

对于 spark_jar_task(运行位于 dbfs:/lib/etl-0.1.jar 的 JAR),请将 DatabricksSubmitRunOperator 实例化。For spark_jar_task, which runs a JAR located at dbfs:/lib/etl-0.1.jar, instantiate DatabricksSubmitRunOperator.

# Example of using the named parameters of DatabricksSubmitRunOperator to initialize the operator.
spark_jar_task = DatabricksSubmitRunOperator(
  task_id='spark_jar_task',
  dag=dag,
  new_cluster=new_cluster,
  spark_jar_task={
    'main_class_name': 'com.example.ProcessData'
  },
  libraries=[
    {
      'jar': 'dbfs:/lib/etl-0.1.jar'
    }
  ]
)

若要配置 spark_jar_task 以运行下游项目,请使用 notebook_task 上的 set_downstream 方法来注册依赖关系。To configure spark_jar_task to run downstream, use the set_downstream method on notebook_task to register the dependency.

notebook_task.set_downstream(spark_jar_task)

请注意,在 notebook_task 中,我们已使用 json 参数指定“提交运行”终结点的完整规范,而在 spark_jar_task 中,我们已将“提交运行”终结点的顶级键平展为 DatabricksSubmitRunOperator 的参数。Notice that in notebook_task we used the json parameter to specify the full specification for the submit run endpoint and that in spark_jar_task we flattened the top level keys of the submit run endpoint into parameters for DatabricksSubmitRunOperator. 尽管两种实例化运算符的方法是等效的,但后一方法不允许使用任何新的顶级字段,如 spark_python_taskspark_submit_taskAlthough both ways of instantiating the operator are equivalent, the latter method does not allow you to use any new top level fields such as spark_python_task or spark_submit_task. 有关详细信息,请参阅 DatabricksSubmitRunOperator APIFor details, see the DatabricksSubmitRunOperator API.

在 Airflow 中安装 DAG 并进行验证Install and verify the DAG in Airflow

若要在 Airflow 中安装 DAG,请创建目录 ~/airflow/dags,然后将 DAG 定义文件复制到该目录中。To install the DAG in Airflow, create the directory ~/airflow/dags and copy the DAG definition file into that directory.

若要验证 Airflow 是否已读入 DAG,请运行 list_dags 命令:To verify that Airflow has read in the DAG, run the list_dags command:

airflow list_dags

[2017-07-06 10:27:23,868] {__init__.py:57} INFO - Using executor SequentialExecutor
[2017-07-06 10:27:24,238] {models.py:168} INFO - Filling up the DagBag from /Users/<user>/airflow/dags

-------------------------------------------------------------------
DAGS
-------------------------------------------------------------------
example_bash_operator
example_branch_dop_operator_v3
example_branch_operator
example_databricks_operator
...

在 Airflow UI 中可视化 DAGVisualize the DAG in the Airflow UI

可以在 Airflow Web UI 中可视化 DAG。You can visualize the DAG in the Airflow web UI. 运行 airflow webserver 并连接到 localhost:8080Run airflow webserver and connect to localhost:8080. 单击 example_databricks_operator 以查看 DAG 的多个可视化效果。Click example_databricks_operator to see many visualizations of your DAG. 以下是示例:Here is an example:

Airflow DAGAirflow DAG

配置与 Airflow 的连接Configure the connection to Airflow

DAG 定义中未指定 Databricks 的连接凭据。The connection credentials for Databricks aren’t specified in the DAG definition. 默认情况下,DatabricksSubmitRunOperatordatabricks_conn_id 参数设置为 databricks_default,因此,请使用配置 Databricks 连接中所述的 Web UI 为 ID databricks_default 添加一个连接。By default, DatabricksSubmitRunOperator sets the databricks_conn_id parameter to databricks_default, so add a connection through the web UI described in Configure a Databricks connection for the ID databricks_default.

测试每个任务Test each task

若要测试 notebook_task,请运行 airflow test example_databricks_operator notebook_task <YYYY-MM-DD>;对于 spark_jar_task,请运行 airflow test example_databricks_operator spark_jar_task <YYYY-MM-DD>To test notebook_task, run airflow test example_databricks_operator notebook_task <YYYY-MM-DD> and for spark_jar_task, run airflow test example_databricks_operator spark_jar_task <YYYY-MM-DD>. 若要按计划运行 DAG,可使用命令 airflow scheduler 调用计划程序后台进程。To run the DAG on a schedule, you would invoke the scheduler daemon process with the command airflow scheduler.

启动计划程序后,应该能够在 Web UI 中看到 DAG 的回填运行。After starting the scheduler, you should be able to see backfilled runs of your DAG in the web UI.