本文演示如何创建和部署端到端数据处理管道,包括如何引入原始数据、转换数据,以及对处理过的数据运行分析。
备注
尽管本文演示如何使用 Databricks 笔记本 和 Azure Databricks 作业 创建完整的数据管道来协调工作流,但 Databricks 建议使用 DLT(用于生成可靠、可维护且可测试的数据处理管道的声明性接口)来协调工作流。
数据管道实现从源系统移动数据、根据要求转换数据以及将数据存储到目标系统所要执行的步骤。 数据管道包括将原始数据转换为用户可以使用的准备好的数据所需的所有过程。 例如,数据管道可以准备数据,以便数据分析师和数据科学家可以通过分析和报告从数据中获得价值。
提取、转换和加载 (ETL) 工作流是数据管道的常见示例。 在 ETL 处理中,数据从源系统引入并写入暂存区域,根据要求(确保数据质量、删除重复数据记录等)进行转换,然后写入目标系统,如数据仓库或数据湖。
为了帮助你开始在 Azure Databricks 上构建数据管道,本文中包含的示例演示了如何创建数据处理工作流:
- 使用 Azure Databricks 功能浏览原始数据集。
- 创建 Databricks 笔记本以引入原始源数据并将原始数据写入目标表。
- 创建 Databricks 笔记本以转换原始源数据并将转换后的数据写入目标表。
- 创建 Databricks 笔记本以查询转换后的数据。
- 使用 Azure Databricks 作业实现数据管道自动化。
- 已登录到 Azure Databricks,并且已进入数据科学与工程工作区。
- 你有权 创建计算资源 或 访问计算资源。
- (可选)必须在 Unity Catalog 中创建目录和架构才能将表格发布到 Unity Catalog。
此示例中使用的数据集是 Million Song Dataset 的子集,该数据集是当代音乐曲目的特征和元数据集合。 此数据集在 Azure Databricks 工作区中包含的示例数据集中可用。
若要在此示例中执行数据处理和分析,请创建一个计算资源来运行命令。
备注
由于此示例使用存储在 DBFS 中的示例数据集,并建议将表保存到 Unity 目录,因此创建配置了专用访问模式的计算资源。 专用访问模式提供对 DBFS 的完全访问权限,同时启用对 Unity 目录的访问。 请参阅 DBFS 和 Unity Catalog 的最佳做法。
单击侧栏中的计算。
在“计算”页上,单击“ 创建计算”。
在新计算页上,输入计算资源的唯一名称。
在 “高级”下,将访问模式设置切换到 “手动 ”,然后选择“ 专用”。
在 “单用户或组”中,选择您的用户名。
将剩余值保留为其默认状态,然后单击“ 创建”。
若要了解有关 Databricks 计算资源的详细信息,请参阅 “计算”。
若要了解如何使用 Azure Databricks 接口浏览原始源数据,请参阅浏览数据管道的源数据。 如果要直接转到引入和准备数据,请继续执行步骤 3:引入原始数据。
在此步骤中,将原始数据加载到表中,以待接受进一步处理。 若要管理 Databricks 平台上的数据资产(例如表),Databricks 建议使用 Unity Catalog。 不过,即使没有权限创建将表格发布到 Unity Catalog 所需的目录和架构,依然可以通过将表格发布到 Hive 元存储来完成以下步骤。
Databricks 建议使用自动加载程序来引入数据。 自动加载程序会在新文件到达云对象存储时自动对其进行检测和处理。
可以将自动加载程序配置为自动检测已加载数据的架构,这样无需显式声明数据架构即可初始化表,并在引入新列时让表架构完成相应的演变。 这样就无需一直手动跟踪和应用架构更改。 使用自动加载程序时,Databricks 建议进行架构推理。 但是,如数据浏览步骤中所示,歌曲数据不包含标头信息。 由于标头未随数据一起存储,因此需要显式定义架构,如下一个示例所示。
在边栏中,单击
“新建”,然后从菜单中选择“笔记本”。 此时会显示“创建笔记本”对话框。
输入笔记本的名称,例如
Ingest songs data
。 默认情况下:- “Python”是选择的语言。
- 笔记本连接到你最后使用的计算资源。 在本例中,是你在步骤 1:创建计算资源中创建的资源。
在笔记本的第一个单元格中输入以下内容:
from pyspark.sql.types import DoubleType, IntegerType, StringType, StructType, StructField # Define variables used in the code below file_path = "/databricks-datasets/songs/data-001/" table_name = "<table-name>" checkpoint_path = "/tmp/pipeline_get_started/_checkpoint/song_data" schema = StructType( [ StructField("artist_id", StringType(), True), StructField("artist_lat", DoubleType(), True), StructField("artist_long", DoubleType(), True), StructField("artist_location", StringType(), True), StructField("artist_name", StringType(), True), StructField("duration", DoubleType(), True), StructField("end_of_fade_in", DoubleType(), True), StructField("key", IntegerType(), True), StructField("key_confidence", DoubleType(), True), StructField("loudness", DoubleType(), True), StructField("release", StringType(), True), StructField("song_hotnes", DoubleType(), True), StructField("song_id", StringType(), True), StructField("start_of_fade_out", DoubleType(), True), StructField("tempo", DoubleType(), True), StructField("time_signature", DoubleType(), True), StructField("time_signature_confidence", DoubleType(), True), StructField("title", StringType(), True), StructField("year", IntegerType(), True), StructField("partial_sequence", IntegerType(), True) ] ) (spark.readStream .format("cloudFiles") .schema(schema) .option("cloudFiles.format", "csv") .option("sep","\t") .load(file_path) .writeStream .option("checkpointLocation", checkpoint_path) .trigger(availableNow=True) .toTable(table_name) )
如果使用的是 Unity Catalog,请将
<table-name>
替换为目录、架构和表格以包含引入的记录(例如data_pipelines.songs_data.raw_song_data
)。 否则,请将<table-name>
替换为表的名称以包含引入的记录,例如raw_song_data
。将
<checkpoint-path>
替换为 DBFS 中用于维护检查点文件的目录路径,例如/tmp/pipeline_get_started/_checkpoint/song_data
。单击
,然后选择“运行单元格”。 此示例使用
README
中的信息定义数据架构,从file_path
中包含的所有文件中引入歌曲数据,并将数据写入table_name
指定的表。
若要准备好用于分析的原始数据,请执行以下步骤,通过筛选掉不需要的列并添加一个包含新记录创建时间戳的新字段来转换原始歌曲数据。
在边栏中,单击
“新建”,然后从菜单中选择“笔记本”。 此时会显示“创建笔记本”对话框。
输入笔记本的名称。 例如,
Prepare songs data
。 将默认语言更改为 SQL。在笔记本的第一个单元格中输入以下内容:
CREATE OR REPLACE TABLE <table-name> ( artist_id STRING, artist_name STRING, duration DOUBLE, release STRING, tempo DOUBLE, time_signature DOUBLE, title STRING, year DOUBLE, processed_time TIMESTAMP ); INSERT INTO <table-name> SELECT artist_id, artist_name, duration, release, tempo, time_signature, title, year, current_timestamp() FROM <raw-songs-table-name>
如果使用的是 Unity Catalog,请将
<table-name>
替换为目录、架构和表格以包含经过筛选和转换的记录(例如data_pipelines.songs_data.prepared_song_data
)。 否则,将<table-name>
替换为表的名称以包含筛选和转换的记录(例如prepared_song_data
)。将
<raw-songs-table-name>
替换为表的名称,其中包含在上一步中引入的原始歌曲记录。单击
,然后选择“运行单元格”。
在此步骤中,你将通过添加查询来分析歌曲数据,从而扩展处理管道。 这些查询使用在上一步中创建的准备好的记录。
在边栏中,单击
“新建”,然后从菜单中选择“笔记本”。 此时会显示“创建笔记本”对话框。
输入笔记本的名称。 例如,
Analyze songs data
。 将默认语言更改为 SQL。在笔记本的第一个单元格中输入以下内容:
-- Which artists released the most songs each year? SELECT artist_name, count(artist_name) AS num_songs, year FROM <prepared-songs-table-name> WHERE year > 0 GROUP BY artist_name, year ORDER BY num_songs DESC, year DESC
将
<prepared-songs-table-name>
替换为包含准备好的数据的表的名称。 例如,data_pipelines.songs_data.prepared_song_data
。在单元格操作菜单中单击
,选择“在下方添加单元格”,然后在新单元格中输入以下内容:
-- Find songs for your DJ list SELECT artist_name, title, tempo FROM <prepared-songs-table-name> WHERE time_signature = 4 AND tempo between 100 and 140;
将
<prepared-songs-table-name>
替换为在上一步中创建的已准备好的表的名称。 例如,data_pipelines.songs_data.prepared_song_data
。若要运行查询并查看输出,请单击“全部运行”。
可以使用 Azure Databricks 作业创建工作流来自动运行数据引入、处理和分析步骤。
在数据科学与工程工作区中,执行下列操作之一:
- 在边栏中,单击
“工作流”,然后单击
。
- 在边栏中,单击
“新建”,然后选择“作业”。
- 在边栏中,单击
在“任务”选项卡上的任务对话框中,将“为作业添加名称...”替换为你的作业名称。 例如,“歌曲工作流”。
在“任务名称”中输入首个任务的名称,例如 。
在“类型”中,选择“笔记本”任务类型。
在“源”中,选择“工作区”。
在 “路径” 字段中,使用文件浏览器查找数据引入笔记本,然后单击“ 确认”。
在 “计算”中,选择在步骤中创建的
Create a compute resource
计算资源。单击“创建”。
单击刚刚创建的任务下方的
并选择“笔记本”。
在“任务名称”中输入任务的名称,例如 。
在“类型”中,选择“笔记本”任务类型。
在源中,选择工作区。
使用文件浏览器找到数据准备笔记本,单击笔记本名称,然后单击“确认”。
在 “计算”中,选择在步骤中创建的
Create a compute resource
计算资源。单击“创建”。
单击刚刚创建的任务下方的
并选择“笔记本”。
在“任务名称”中输入任务的名称,例如 。
在“类型”中,选择“笔记本”任务类型。
在源中,选择工作区。
使用文件资源管理器找到数据分析笔记本,单击笔记本名称,然后单击“确认”。
在 “计算”中,选择在步骤中创建的
Create a compute resource
计算资源。单击“创建”。
若要运行工作流,请单击
。 若要查看运行详细信息,请在作业运行视图中单击该运行的“开始时间”列中的链接。 单击每个任务以查看任务运行详细信息。
若要查看工作流完成时的结果,请单击最终的数据分析任务。 此时会出现“输出”页,其中显示了查询结果。
备注
为了演示如何使用 Azure Databricks 作业安排计划的工作流,此入门示例将引入、准备和分析步骤拆分到不同的笔记本,然后再使用每个笔记本在作业中创建任务。 如果所有的处理都包含在一个笔记本中,您可以直接从 Azure Databricks 笔记本 UI 轻松安排笔记本的任务。 请参阅创建和管理计划的笔记本作业。
一个常见要求是按计划运行数据管道。 若要为运行管道的作业定义计划,请执行以下操作:
单击边栏中的
“工作流”。
在“名称”列中单击作业名称。 边侧面板显示[任务详细信息]。
在“作业详细信息”面板中,单击添加触发器,然后在触发器类型中选择计划任务。
指定时间段、开始时间和时区。 (可选)选中“显示 Cron 语法”复选框以使用 Quartz Cron 语法显示和编辑计划。
单击“ 保存”。
- 若要详细了解 Databricks 笔记本,请参阅 Databricks 笔记本简介。
- 若要详细了解 Azure Databricks 作业,请参阅 什么是作业?。
- 若要详细了解 Delta Lake,请参阅什么是 Delta Lake?。
- 若要了解有关使用 DLT 的数据处理管道的详细信息,请参阅 什么是 DLT?。