在 Azure Databricks 上运行第一个 ETL 工作负载
了解如何使用 Azure Databricks 提供的生产就绪工具来开发和部署用于数据业务流程的第一个提取、转换和加载 (ETL) 管道。
本文结束时,你会感到自在:
- 启动 Databricks 通用计算群集。
- 创建 Databricks 笔记本。
- 使用自动加载程序将增量数据引入配置到 Delta Lake。
- 执行笔记本单元格来处理、查询和预览数据。
- 将笔记本计划为 Databricks 作业。
本教程使用交互式笔记本在 Python 或 Scala 中完成常见 ETL 任务。
还可使用增量实时表构建 ETL 管道。 Databricks 创建了增量实时表,以降低构建、部署和维护生产 ETL 管道的复杂性。 请参阅教程:运行第一个增量实时表管道。
还可使用 Databricks Terraform 提供程序创建本文的资源。 请参阅使用 Terraform 创建群集、笔记本和作业。
要求
- 你已登录到 Azure Databricks 工作区。
- 拥有创建群集的权限。
注意
如果没有群集控制特权,只要拥有群集访问权限,就仍然可以完成以下大部分步骤。
步骤 1:创建群集
若要进行探索性数据分析和数据工程,请创建一个群集来提供执行命令所需的计算资源。
- 单击边栏中的 “计算”。
- 请在“计算”页单击“创建群集”。 这将打开“新建群集”页。
- 指定群集的唯一名称,将其余值保留为默认状态,然后单击“创建群集”。
要了解有关 Databricks 群集的详细信息,请参阅计算。
步骤 2:创建 Databricks 笔记本
若要在工作区中创建笔记本,请单击边栏中的“ 新建”,然后单击“笔记本”。 将在工作区中打开一个空白笔记本。
若要了解有关创建和管理笔记本的详细信息,请参阅管理笔记本。
步骤 3:配置自动加载程序以将数据引入 Delta Lake
Databricks 建议使用自动加载程序来引入增量数据。 自动加载程序会在新文件到达云对象存储时自动对其进行检测和处理。
Databricks 建议使用 Delta Lake 存储数据。 Delta Lake 是开放源代码存储层,提供 ACID 事务并启用数据湖屋。 Delta Lake 是在 Databricks 中创建的表的默认格式。
要配置自动加载程序以将数据引入到 Delta Lake 表,请将以下代码复制并粘贴到笔记本的空单元格中:
Python
# Import functions
from pyspark.sql.functions import col, current_timestamp
# Define variables used in code below
file_path = "/databricks-datasets/structured-streaming/events"
username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
table_name = f"{username}_etl_quickstart"
checkpoint_path = f"/tmp/{username}/_checkpoint/etl_quickstart"
# Clear out data from previous demo execution
spark.sql(f"DROP TABLE IF EXISTS {table_name}")
dbutils.fs.rm(checkpoint_path, True)
# Configure Auto Loader to ingest JSON data to a Delta table
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", checkpoint_path)
.load(file_path)
.select("*", col("_metadata.file_path").alias("source_file"), current_timestamp().alias("processing_time"))
.writeStream
.option("checkpointLocation", checkpoint_path)
.trigger(availableNow=True)
.toTable(table_name))
Scala
// Imports
import org.apache.spark.sql.functions.current_timestamp
import org.apache.spark.sql.streaming.Trigger
import spark.implicits._
// Define variables used in code below
val file_path = "/databricks-datasets/structured-streaming/events"
val username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first.get(0)
val table_name = s"${username}_etl_quickstart"
val checkpoint_path = s"/tmp/${username}/_checkpoint"
// Clear out data from previous demo execution
spark.sql(s"DROP TABLE IF EXISTS ${table_name}")
dbutils.fs.rm(checkpoint_path, true)
// Configure Auto Loader to ingest JSON data to a Delta table
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", checkpoint_path)
.load(file_path)
.select($"*", $"_metadata.file_path".as("source_file"), current_timestamp.as("processing_time"))
.writeStream
.option("checkpointLocation", checkpoint_path)
.trigger(Trigger.AvailableNow)
.toTable(table_name)
注意
此代码中定义的变量应让你能够安全执行该代码,而不会存在与现有工作区资产或其他用户发生冲突的风险。 在执行此代码时,受限的网络或存储权限将引发错误;请联系工作区管理员来排查这些限制。
若要了解有关自动加载程序的详细信息,请参阅什么是自动加载程序?。
步骤 4:处理数据并与之交互
笔记本逐个单元格执行逻辑。 在单元格中执行逻辑:
若要运行在上一步中完成的单元格,请选择该单元格并按 SHIFT+ENTER。
若要查询刚刚创建的表,请将以下代码复制并粘贴到某个空单元格中,然后按 SHIFT+ENTER 运行单元格。
Python
df = spark.read.table(table_name)
Scala
val df = spark.read.table(table_name)
若要预览 DataFrame 中的数据,请将以下代码复制并粘贴到某个空单元格中,然后按 SHIFT+ENTER 运行单元格。
Python
display(df)
Scala
display(df)
若要详细了解用于可视化数据的交互式选项,请参阅 Databricks 笔记本中的可视化效果。
步骤 5:安排作业
可以将 Databricks 笔记本作为生产脚本运行,方法是在 Databricks 作业中将其添加为任务。 在此步骤中,你将创建可手动触发的新作业。
若要将笔记本计划为任务,请执行以下操作:
- 单击标题栏右侧的“计划”。
- 为“作业名”输入唯一的名称。
- 单击“手动”。
- 在“群集”下拉列表中,选择在步骤 1 中创建的群集。
- 单击“创建”。
- 在出现的窗口中单击“立即运行”。
- 若要查看作业运行结果,请单击“上次运行”时间戳旁边的图标。
有关作业的详细信息,请参阅什么是 Databricks 作业?。
其他集成
详细了解使用 Azure Databricks 进行数据工程的集成和工具: