管理数据管道中的依赖项Managing dependencies in data pipelines
数据管道中通常存在复杂的依赖关系。Often there are complex dependencies in your data pipelines. 可以通过工作流系统描述此类依赖关系,并在管道运行时进行计划。Workflow systems allow you to describe such dependencies and schedule when pipelines run.
Azure 数据工厂Azure Data Factory
Azure 数据工厂是一项云数据集成服务,可用于将数据存储、移动和处理服务组合到自动化数据管道中。Azure Data Factory is a cloud data integration service that lets you compose data storage, movement, and processing services into automated data pipelines. 可以在 Azure 数据工厂数据管道中使 Databricks 笔记本可操作。You can operationalize Databricks notebooks in Azure Data Factory data pipelines. 请参阅在 Azure 数据工厂中使用 Databricks 笔记本活动运行 Databricks 笔记本,了解如何创建在 Azure Databricks 群集中运行 Databricks 笔记本的 Azure 数据工厂管道,然后通过运行 Databricks 笔记本来转换数据。See Run a Databricks notebook with the Databricks notebook activity in Azure Data Factory for instructions on how to create an Azure Data Factory pipeline that runs a Databricks notebook in an Azure Databricks cluster, followed by Transform data by running a Databricks notebook.
Apache Airflow Apache Airflow
Apache Airflow 是一种用于管理和计划数据管道的解决方案。Apache Airflow is a solution for managing and scheduling data pipelines. Airflow 将数据管道表示为操作的有向无环图 (DAG),其中,一条边表示操作之间的一个逻辑依赖关系。Airflow represents data pipelines as directed acyclic graphs (DAGs) of operations, where an edge represents a logical dependency between operations.
Airflow 提供 Azure Databricks 与 Airflow 之间的紧密集成。Airflow provides tight integration between Azure Databricks and Airflow. 可以通过 Airflow Azure Databricks 集成将 Azure Databricks 提供的已优化的 Spark 引擎与 Airflow 的计划功能配合使用。The Airflow Azure Databricks integration lets you take advantage of the the optimized Spark engine offered by Azure Databricks with the scheduling features of Airflow.
安装 Airflow Azure Databricks 集成Install the Airflow Azure Databricks integration
Airflow 与 Azure Databricks 的集成在 Airflow 1.9.0 版中提供。The integration between Airflow and Azure Databricks is available in Airflow version 1.9.0. 若要安装 Airflow Azure Databricks 集成,请运行:To install the Airflow Azure Databricks integration, run:
pip install "apache-airflow[databricks]"
若要安装额外项(例如 celery
和 password
),请运行:To install extras (for example celery
and password
), run:
pip install "apache-airflow[databricks, celery, password]"
DatabricksRunNowOperator
运算符DatabricksRunNowOperator
operator
Airflow Azure Databricks 集成提供 DatabricksRunNowOperator 作为计算的 DAG 中的一个节点。The Airflow Azure Databricks integration provides DatabricksRunNowOperator as a node in your DAG of computations. 此运算符与 Databricks 作业立即运行 API 终结点匹配,允许你以编程方式运行上传到 DBFS 的笔记本和 JAR。This operator matches the Databricks jobs Run now API endpoint and allows you to programmatically run notebooks and JARs uploaded to DBFS.
DatabricksSubmitRunOperator
运算符DatabricksSubmitRunOperator
operator
Airflow Azure Databricks 集成提供 DatabricksSubmitRunOperator 作为计算的 DAG 中的一个节点。The Airflow Azure Databricks integration provides DatabricksSubmitRunOperator as a node in your DAG of computations. 此运算符与 Databricks 作业提交运行 API 终结点匹配,允许你以编程方式运行上传到 DBFS 的笔记本和 JAR。This operator matches the Databricks jobs Runs submit API endpoint and allows you to programmatically run notebooks and JARs uploaded to DBFS.
配置 Databricks 连接 Configure a Databricks connection
若要使用 DatabricksSubmitRunOperator
,必须在相应的 Airflow 连接中提供凭据。To use DatabricksSubmitRunOperator
you must provide credentials in the appropriate Airflow connection. 默认情况下,如果未为 DatabricksSubmitRunOperator
指定 databricks_conn_id
参数,该运算符会尝试在 ID 为 databricks_default
的连接中查找凭据。By default, if you do not specify the databricks_conn_id
parameter to DatabricksSubmitRunOperator
, the operator tries to find credentials in the connection with the ID equal to databricks_default
.
可以按照管理连接中的说明通过 Airflow Web UI 配置 Airflow 连接。You can configure Airflow connections through the Airflow web UI as instructed in Managing Connections. 对于 Databricks 连接,请将“主机”字段设置为 Databricks 部署的主机名,将“登录名”字段设置为 token
,将“密码”字段设置为 Databricks 生成的个人访问令牌,并将“额外项”字段设置为For the Databricks connection, set the Host field to the hostname of your Databricks deployment, the Login field to token
, the Password field to a Databricks-generated personal access token, and the Extra field to
{"token": "<your personal access token>"}
示例Example
在此示例中,我们演示如何设置一个在本地计算机上运行的简单的 Airflow 部署,并部署一个已命名的示例 DAG,用于在 Databricks 中触发运行。In this example, we show how to set up a simple Airflow deployment that runs on your local machine and deploys an example DAG named that triggers runs in Databricks.
初始化 Airflow 数据库Initialize Airflow database
初始化可供 Airflow 用来跟踪其他元数据的 SQLite 数据库。Initialize the SQLite database that Airflow uses to track miscellaneous metadata. 在生产型 Airflow 部署中,可以为 Airflow 配置一个标准数据库。In a production Airflow deployment, you would configure Airflow with a standard database. 若要执行初始化运行,请执行以下语句:To perform the initialization run:
airflow initdb
适用于 Airflow 部署的 SQLite 数据库和默认配置在 ~/airflow
中初始化。The SQLite database and default configuration for your Airflow deployment are initialized in ~/airflow
.
DAG 定义DAG definition
DAG 定义是一个 Python 文件,在此示例中命名为 example_databricks_operator.py
。A DAG definition is a Python file and in this example is named example_databricks_operator.py
. 此示例运行两个具有一种线性依赖关系的 Databricks 作业。The example runs two Databricks jobs with one linear dependency. 第一个 Databricks 作业触发位于 /Users/airflow@example.com/PrepareData
的笔记本,第二个将运行位于 dbfs:/lib/etl-0.1.jar
的 JAR。The first Databricks job triggers a notebook located at /Users/airflow@example.com/PrepareData
and the second runs a JAR located at dbfs:/lib/etl-0.1.jar
. 示例 DAG 定义构造两个 DatabricksSubmitRunOperator
任务,然后通过 set_dowstream
方法在结尾处设置依赖关系。The example DAG definition constructs two DatabricksSubmitRunOperator
tasks and then sets the dependency at the end with the set_dowstream
method. 代码的主干版本如下所示:A skeleton version of the code looks something like:
notebook_task = DatabricksSubmitRunOperator(
task_id='notebook_task',
dag=dag,
json=notebook_task_params)
spark_jar_task = DatabricksSubmitRunOperator(
task_id='spark_jar_task',
dag=dag,
json=spark_jar_task_params)
notebook_task.set_downstream(spark_jar_task)
导入 Airflow 和所需的类Import Airflow and required classes
DAG 定义的顶部导入了 airflow
、DAG
和 DatabricksSubmitRunOperator
:The top of a DAG definition imports airflow
, DAG
, and DatabricksSubmitRunOperator
:
import airflow
from airflow import DAG
from airflow.contrib.operators.databricks_operator import DatabricksSubmitRunOperator
配置全局参数Configure global arguments
下一节设置应用于 DAG 中的每个任务的默认参数。The next section sets default arguments applied to each task in the DAG.
args = {
'owner': 'airflow',
'email': ['airflow@example.com'],
'depends_on_past': False,
'start_date': airflow.utils.dates.days_ago(0)
}
两个值得一提的参数是 depends_on_past
和 start_date
。The two interesting arguments are depends_on_past
and start_date
. 将 depends_on_past
设置为 true
意味着,除非任务的上一个实例成功完成,否则不应触发该任务。Setting depends_on_past
to true
signals that a task should not be triggered unless the previous instance of the task completed successfully. start_date argument
决定了第一个任务实例计划在何时执行。The start_date argument
determines when the first task instance will be scheduled.
实例化 DAGInstantiate the DAG
DAG 实例化语句为 DAG 提供唯一 ID,附加默认参数,并为其提供每日计划。The DAG instantiation statement gives the DAG a unique ID, attaches the default arguments, and gives it a daily schedule.
dag = DAG(dag_id='example_databricks_operator', default_args=args, schedule_interval='@daily')
下一语句指定要运行任务的群集中的 Spark 版本、节点类型和工作器数。The next statement specifies the Spark version, node type, and number of workers in the cluster that will run your tasks. 规范的架构与作业提交运行终结点的 new_cluster
字段匹配。The schema of the specification matches the new_cluster
field of the job Runs Submit endpoint.
new_cluster = {
'spark_version': '6.0.x-scala2.11',
"node_type_id": "Standard_D3_v2",
'num_workers': 8
}
在 DAG 中注册任务Register tasks in DAG
对于 notebook_task
,请将 DatabricksSubmitRunOperator
实例化。For notebook_task
, instantiate DatabricksSubmitRunOperator
.
notebook_task_params = {
'new_cluster': new_cluster,
'notebook_task': {
'notebook_path': '/Users/airflow@example.com/PrepareData',
},
}
# Example of using the JSON parameter to initialize the operator.
notebook_task = DatabricksSubmitRunOperator(
task_id='notebook_task',
dag=dag,
json=notebook_task_params)
在此代码段中,JSON 参数采用与 Runs Submit
终结点匹配的 Python 字典。In this piece of code, the JSON parameter takes a Python dictionary that matches the Runs Submit
endpoint.
对于 spark_jar_task
(运行位于 dbfs:/lib/etl-0.1.jar
的 JAR),请将 DatabricksSubmitRunOperator
实例化。For spark_jar_task
, which runs a JAR located at dbfs:/lib/etl-0.1.jar
, instantiate DatabricksSubmitRunOperator
.
# Example of using the named parameters of DatabricksSubmitRunOperator to initialize the operator.
spark_jar_task = DatabricksSubmitRunOperator(
task_id='spark_jar_task',
dag=dag,
new_cluster=new_cluster,
spark_jar_task={
'main_class_name': 'com.example.ProcessData'
},
libraries=[
{
'jar': 'dbfs:/lib/etl-0.1.jar'
}
]
)
若要配置 spark_jar_task
以运行下游项目,请使用 notebook_task
上的 set_downstream
方法来注册依赖关系。To configure spark_jar_task
to run downstream, use the set_downstream
method on notebook_task
to register the dependency.
notebook_task.set_downstream(spark_jar_task)
请注意,在 notebook_task
中,我们已使用 json
参数指定“提交运行”终结点的完整规范,而在 spark_jar_task
中,我们已将“提交运行”终结点的顶级键平展为 DatabricksSubmitRunOperator
的参数。Notice that in notebook_task
we used the json
parameter to specify the full specification for the submit run endpoint and that in spark_jar_task
we flattened the top level keys of the submit run endpoint into parameters for DatabricksSubmitRunOperator
. 尽管两种实例化运算符的方法是等效的,但后一方法不允许使用任何新的顶级字段,如 spark_python_task
或 spark_submit_task
。Although both ways of instantiating the operator are equivalent, the latter method does not allow you to use any new top level fields such as spark_python_task
or spark_submit_task
. 有关详细信息,请参阅 DatabricksSubmitRunOperator API。For details, see the DatabricksSubmitRunOperator API.
在 Airflow 中安装 DAG 并进行验证Install and verify the DAG in Airflow
若要在 Airflow 中安装 DAG,请创建目录 ~/airflow/dags
,然后将 DAG 定义文件复制到该目录中。To install the DAG in Airflow, create the directory ~/airflow/dags
and copy the DAG definition file into that directory.
若要验证 Airflow 是否已读入 DAG,请运行 list_dags
命令:To verify that Airflow has read in the DAG, run the list_dags
command:
airflow list_dags
[2017-07-06 10:27:23,868] {__init__.py:57} INFO - Using executor SequentialExecutor
[2017-07-06 10:27:24,238] {models.py:168} INFO - Filling up the DagBag from /Users/<user>/airflow/dags
-------------------------------------------------------------------
DAGS
-------------------------------------------------------------------
example_bash_operator
example_branch_dop_operator_v3
example_branch_operator
example_databricks_operator
...
在 Airflow UI 中可视化 DAGVisualize the DAG in the Airflow UI
可以在 Airflow Web UI 中可视化 DAG。You can visualize the DAG in the Airflow web UI. 运行 airflow webserver
并连接到 localhost:8080
。Run airflow webserver
and connect to localhost:8080
. 单击 example_databricks_operator
以查看 DAG 的多个可视化效果。Click example_databricks_operator
to see many visualizations of your DAG. 以下是示例:Here is an example:
配置与 Airflow 的连接Configure the connection to Airflow
DAG 定义中未指定 Databricks 的连接凭据。The connection credentials for Databricks aren’t specified in the DAG definition. 默认情况下,DatabricksSubmitRunOperator
将 databricks_conn_id
参数设置为 databricks_default
,因此,请使用配置 Databricks 连接中所述的 Web UI 为 ID databricks_default
添加一个连接。By default, DatabricksSubmitRunOperator
sets the databricks_conn_id
parameter to databricks_default
, so add a connection through the web UI described in Configure a Databricks connection for the ID databricks_default
.
测试每个任务Test each task
若要测试 notebook_task
,请运行 airflow test example_databricks_operator notebook_task <YYYY-MM-DD>
;对于 spark_jar_task
,请运行 airflow test example_databricks_operator spark_jar_task <YYYY-MM-DD>
。To test notebook_task
, run airflow test example_databricks_operator notebook_task <YYYY-MM-DD>
and for spark_jar_task
, run airflow test example_databricks_operator spark_jar_task <YYYY-MM-DD>
. 若要按计划运行 DAG,可使用命令 airflow scheduler
调用计划程序后台进程。To run the DAG on a schedule, you would invoke the scheduler daemon process with the command airflow scheduler
.
启动计划程序后,应该能够在 Web UI 中看到 DAG 的回填运行。After starting the scheduler, you should be able to see backfilled runs of your DAG in the web UI.