本教程介绍如何开发和部署您的第一个 ETL(提取、转换和加载)管道,以使用 Apache Spark 进行数据编排。 尽管本教程使用 Databricks 全用途计算,但如果为工作区启用了无服务器计算,也可以使用无服务器计算。
还可以使用 DLT 生成 ETL 管道。 Databricks DLT 可降低生成、部署和维护生产 ETL 管道的复杂性。 请参阅 教程:使用 DLT 生成 ETL 管道。
本文结束时,你将了解如何:
本教程使用交互式笔记本在 Python 或 Scala 中完成常见 ETL 任务。
还可使用 Databricks Terraform 提供程序创建本文的资源。 请参阅使用 Terraform 创建群集、笔记本和作业。
- 你已登录到 Azure Databricks 工作区。
- 拥有创建群集的权限。
备注
如果没有群集控制特权,只要拥有群集访问权限,就仍然可以完成以下大部分步骤。
若要进行探索性数据分析和数据工程,请创建一个群集来提供执行命令所需的计算资源。
单击边栏中的
“计算”。
请在“计算”页单击“创建群集”。 这将打开“新建群集”页。
指定群集的唯一名称,将其余值保留为默认状态,然后单击“创建群集”。
要了解有关 Databricks 群集的详细信息,请参阅计算。
若要在工作区中创建笔记本,请单击边栏中的“ 新建”,然后单击“笔记本”。 将在工作区中打开一个空白笔记本。
若要了解有关创建和管理笔记本的详细信息,请参阅管理笔记本。
Databricks 建议使用自动加载程序来引入增量数据。 自动加载程序会在新文件到达云对象存储时自动对其进行检测和处理。
Databricks 建议使用 Delta Lake 存储数据。 Delta Lake 是开放源代码存储层,提供 ACID 事务并启用数据湖屋。 Delta Lake 是在 Databricks 中创建的表的默认格式。
要配置自动加载程序以将数据引入到 Delta Lake 表,请将以下代码复制并粘贴到笔记本的空单元格中:
# Import functions
from pyspark.sql.functions import col, current_timestamp
# Define variables used in code below
file_path = "/databricks-datasets/structured-streaming/events"
username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
table_name = f"{username}_etl_quickstart"
checkpoint_path = f"/tmp/{username}/_checkpoint/etl_quickstart"
# Clear out data from previous demo execution
spark.sql(f"DROP TABLE IF EXISTS {table_name}")
dbutils.fs.rm(checkpoint_path, True)
# Configure Auto Loader to ingest JSON data to a Delta table
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", checkpoint_path)
.load(file_path)
.select("*", col("_metadata.file_path").alias("source_file"), current_timestamp().alias("processing_time"))
.writeStream
.option("checkpointLocation", checkpoint_path)
.trigger(availableNow=True)
.toTable(table_name))
// Imports
import org.apache.spark.sql.functions.current_timestamp
import org.apache.spark.sql.streaming.Trigger
import spark.implicits._
// Define variables used in code below
val file_path = "/databricks-datasets/structured-streaming/events"
val username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first.get(0)
val table_name = s"${username}_etl_quickstart"
val checkpoint_path = s"/tmp/${username}/_checkpoint"
// Clear out data from previous demo execution
spark.sql(s"DROP TABLE IF EXISTS ${table_name}")
dbutils.fs.rm(checkpoint_path, true)
// Configure Auto Loader to ingest JSON data to a Delta table
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", checkpoint_path)
.load(file_path)
.select($"*", $"_metadata.file_path".as("source_file"), current_timestamp.as("processing_time"))
.writeStream
.option("checkpointLocation", checkpoint_path)
.trigger(Trigger.AvailableNow)
.toTable(table_name)
备注
此代码中定义的变量应让你能够安全执行该代码,而不会存在与现有工作区资产或其他用户发生冲突的风险。 在执行此代码时,受限的网络或存储权限将引发错误;请联系工作区管理员来排查这些限制。
若要了解有关自动加载程序的详细信息,请参阅什么是自动加载程序?。
笔记本逐个单元格执行逻辑。 在单元格中执行逻辑:
若要运行在上一步中完成的单元格,请选择该单元格并按 SHIFT+ENTER。
若要查询刚刚创建的表,请将以下代码复制并粘贴到某个空单元格中,然后按 SHIFT+ENTER 运行单元格。
Python语言
df = spark.read.table(table_name)
Scala(编程语言)
val df = spark.read.table(table_name)
若要预览 DataFrame 中的数据,请将以下代码复制并粘贴到某个空单元格中,然后按 SHIFT+ENTER 运行单元格。
Python语言
display(df)
Scala(编程语言)
display(df)
若要了解有关可视化数据的交互式选项的详细信息,请参阅 Databricks 笔记本和 SQL 编辑器中的可视化效果。
可以将 Databricks 笔记本作为生产脚本运行,方法是在 Databricks 作业中将其添加为任务。 在此步骤中,你将创建可手动触发的新作业。
若要将笔记本计划为任务,请执行以下操作:
单击标题栏右侧的“计划”。
为“作业名”输入唯一的名称。
单击“手动”。
在“群集”下拉列表中,选择在步骤 1 中创建的群集。
单击“创建”。
在出现的窗口中单击“立即运行”。
若要查看作业运行结果,请单击“上次运行”时间戳旁边的
图标。
有关作业的详细信息,请参阅 什么是作业?。
详细了解使用 Azure Databricks 进行数据工程的集成和工具: