本教程将引导你完成配置第一个 DLT 管道、编写基本 ETL 代码和运行管道更新的步骤。
本教程中的所有步骤都针对启用了 Unity 目录的工作区而设计。 还可以将 DLT 管道配置为与旧版 Hive 元存储配合使用。 请参阅将 DLT 管道与旧版 Hive 元存储配合使用。
注释
本教程提供有关使用 Databricks 笔记本开发和验证新管道代码的说明。 还可以在 Python 或 SQL 文件中使用源代码配置管道。
如果你已经有使用 DLT 语法编写的源代码,则可以配置管道来运行你的代码。 请参阅 配置 DLT 管道。
可以在 Databricks SQL 中使用完全声明性 SQL 语法,将具体化视图和流式表的刷新计划注册并设置为由 Unity 目录管理的对象。
示例:引入和处理纽约婴儿姓名数据
本文中的示例使用一个公开的数据集,其中包含纽约市婴儿姓名的记录。 此示例演示如何使用 DLT 管道:
- 将卷中的原始 CSV 数据读取到表中。
- 从引入表读取记录,并使用 DLT 期望 创建包含清理数据的新表。
- 将清理的记录用作创建派生数据集的 DLT 查询的输入。
此代码演示了一个简化的奖牌体系结构示例。 请参阅什么是奖牌湖屋体系结构?。
此示例的实现针对 Python 和 SQL 提供。 按照步骤创建新的管道和笔记本,然后复制粘贴提供的代码。
还提供了包含完整代码的示例 笔记本 。
要求
若要启动管道,必须具有 群集创建权限 或对定义 DLT 群集的群集策略的访问权限。 DLT 运行时在运行管道之前创建群集,如果没有正确的权限,则会失败。
默认情况下,所有用户都可以使用无服务器管道触发更新。 无服务器必须在帐户级别启用,可能在工作区区域中不可用。
本教程中的示例使用 Unity 目录。 Databricks 建议创建新的架构来运行本教程,因为目标架构中创建多个数据库对象。
- 若要在目录中创建新架构,必须具有
ALL PRIVILEGES
或USE CATALOG
和CREATE SCHEMA
特权。 - 如果无法创建新架构,请针对现有架构运行本教程。 必须具有以下特权:
- 对父目录的
USE CATALOG
。 - 对目标架构的
ALL PRIVILEGES
或USE SCHEMA
、CREATE MATERIALIZED VIEW
和CREATE TABLE
权限。
- 对父目录的
- 本教程使用卷来存储示例数据。 Databricks 建议为本教程创建新卷。 如果为本教程创建新的架构,则可以在该架构中创建新卷。
- 若要在现有架构中创建新卷,必须具有以下权限:
- 对父目录的
USE CATALOG
。 - 对目标架构的
ALL PRIVILEGES
或USE SCHEMA
和CREATE VOLUME
权限。
- 对父目录的
- 可以选择使用现有卷。 必须具有以下特权:
- 对父目录的
USE CATALOG
。 - 对父架构的
USE SCHEMA
。 - 针对目标卷的
ALL PRIVILEGES
或READ VOLUME
和WRITE VOLUME
权限。
- 对父目录的
- 若要在现有架构中创建新卷,必须具有以下权限:
若要设置这些权限,请联系 Databricks 管理员。 有关 Unity 目录权限的详细信息,请参阅 Unity 目录特权和安全对象。
- 若要在目录中创建新架构,必须具有
步骤 0:下载数据
此示例从 Unity 目录卷加载数据。 以下代码下载 CSV 文件并将其存储在指定的卷中。 打开新笔记本并运行以下代码,将此数据下载到指定的卷:
import urllib
my_catalog = "<catalog-name>"
my_schema = "<schema-name>"
my_volume = "<volume-name>"
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {my_catalog}.{my_schema}")
spark.sql(f"CREATE VOLUME IF NOT EXISTS {my_catalog}.{my_schema}.{my_volume}")
volume_path = f"/Volumes/{my_catalog}/{my_schema}/{my_volume}/"
download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
filename = "babynames.csv"
urllib.request.urlretrieve(download_url, volume_path + filename)
将 <catalog-name>
、<schema-name>
和 <volume-name>
替换为 Unity Catalog 卷中的目录、架构和卷名称。 如果这些对象不存在,则提供的代码将尝试创建指定的架构和卷。 必须具有相应的权限才能在 Unity 目录中创建和写入对象。 请参阅 要求。
注释
在继续学习本教程之前,请确保此笔记本已成功运行。 不要将此笔记本配置为管道的一部分。
步骤 1:创建管道
DLT 使用 DLT 语法解析笔记本或文件中定义的依赖项(称为 源代码)来创建管道。 每个源代码文件只能包含一种语言,但可以在管道中添加多种语言的笔记本或文件。
重要
请勿在 源代码 字段中配置任何资产。 将此字段留黑会创建并配置用于源代码创作的笔记本。
本教程中的说明使用无服务器计算和 Unity 目录。 对这些说明中未指定的所有配置选项使用默认设置。
注释
如果工作区中未启用或支持无服务器,则可以使用默认计算设置完成本教程。 必须在“创建管道”UI 的“目标”部分中的“存储选项”下,手动选择“Unity Catalog”。
若要配置新管道,请执行以下操作:
在边栏中,单击“ 管道”。
单击 创建管道。
在 管道名称中,键入唯一的管道名称。
在 目标中,若要配置发布表的 Unity 目录位置,请选择 目录 和 架构。
在 “高级”中,单击“ 添加配置 ”,然后使用以下参数名称为下载数据的目录、架构和卷定义管道参数:
my_catalog
my_schema
my_volume
单击 “创建” 。
新管道将显示管道 UI。 会自动为管道创建和配置源代码笔记本。
笔记本在用户目录下的新文件夹中创建。 新目录和文件的名称与管道的名称匹配。 例如,/Users/your.username@databricks.com/my_pipeline/my_pipeline
。
访问此笔记本的链接位于“管道详细信息”面板中的“源代码”字段下。 单击链接以打开笔记本,然后继续执行下一步。
步骤 2:使用 Python 或 SQL 在笔记本中声明具体化视图和流式处理表
可以使用 Databricks 笔记本以交互方式开发和验证 DLT 管道的源代码。 必须将笔记本附加到管道才能使用此功能。 将您新创建的笔记本连接至您刚刚创建的管道:
单击右上角的“连接”以打开计算配置菜单。
将鼠标悬停在步骤 1 中创建的管道的名称上。
单击连接。
UI 右上角更改为包含“验证”和“开始”按钮。 若要详细了解笔记本对管道代码开发的支持,请参阅 在笔记本中开发和调试 DLT 管道。
重要
- DLT 管道在规划期间会评估笔记本中的所有单元格。 与针对通用计算运行或安排为作业的笔记本不同,管道无法保证单元格按照指定的顺序运行。
- 笔记本只能包含单个编程语言。 不要在管道源代码笔记本中混合 Python 和 SQL 代码。
有关使用 Python 或 SQL 开发代码的详细信息,请参阅 使用 Python 开发管道代码或使用 SQL 开发管道代码。
示例管道代码
若要在本教程中实现该示例,请将以下代码复制并粘贴到配置为管道源代码的笔记本中的单元格中。
提供的代码执行以下操作:
- 导入必要的模块(仅限 Python)。
- 引用管道配置期间定义的参数。
- 定义一个名为
baby_names_raw
的流式处理表,用于从卷中引入数据。 - 定义一个名为
baby_names_prepared
的具体化视图,用于验证引入的数据。 - 定义一个名为
top_baby_names_2021
的具体化视图,其中包含细化的数据视图。
Python语言
# Import modules
import dlt
from pyspark.sql.functions import *
# Assign pipeline parameters to variables
my_catalog = spark.conf.get("my_catalog")
my_schema = spark.conf.get("my_schema")
my_volume = spark.conf.get("my_volume")
# Define the path to source data
volume_path = f"/Volumes/{my_catalog}/{my_schema}/{my_volume}/"
# Define a streaming table to ingest data from a volume
@dlt.table(
comment="Popular baby first names in New York. This data was ingested from the New York State Department of Health."
)
def baby_names_raw():
df = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("inferSchema", True)
.option("header", True)
.load(volume_path)
)
df_renamed_column = df.withColumnRenamed("First Name", "First_Name")
return df_renamed_column
# Define a materialized view that validates data and renames a column
@dlt.table(
comment="New York popular baby first name data cleaned and prepared for analysis."
)
@dlt.expect("valid_first_name", "First_Name IS NOT NULL")
@dlt.expect_or_fail("valid_count", "Count > 0")
def baby_names_prepared():
return (
spark.read.table("baby_names_raw")
.withColumnRenamed("Year", "Year_Of_Birth")
.select("Year_Of_Birth", "First_Name", "Count")
)
# Define a materialized view that has a filtered, aggregated, and sorted view of the data
@dlt.table(
comment="A table summarizing counts of the top baby names for New York for 2021."
)
def top_baby_names_2021():
return (
spark.read.table("baby_names_prepared")
.filter(expr("Year_Of_Birth == 2021"))
.groupBy("First_Name")
.agg(sum("Count").alias("Total_Count"))
.sort(desc("Total_Count"))
.limit(10)
)
SQL
-- Define a streaming table to ingest data from a volume
CREATE OR REFRESH STREAMING TABLE baby_names_raw
COMMENT "Popular baby first names in New York. This data was ingested from the New York State Department of Health."
AS SELECT Year, `First Name` AS First_Name, County, Sex, Count
FROM STREAM(read_files(
'/Volumes/${my_catalog}/${my_schema}/${my_volume}/',
format => 'csv',
header => true,
mode => 'FAILFAST'));
-- Define a materialized view that validates data and renames a column
CREATE OR REFRESH MATERIALIZED VIEW baby_names_prepared(
CONSTRAINT valid_first_name EXPECT (First_Name IS NOT NULL),
CONSTRAINT valid_count EXPECT (Count > 0) ON VIOLATION FAIL UPDATE
)
COMMENT "New York popular baby first name data cleaned and prepared for analysis."
AS SELECT
Year AS Year_Of_Birth,
First_Name,
Count
FROM baby_names_raw;
-- Define a materialized view that provides a filtered, aggregated, and sorted view of the data
CREATE OR REFRESH MATERIALIZED VIEW top_baby_names_2021
COMMENT "A table summarizing counts of the top baby names for New York for 2021."
AS SELECT
First_Name,
SUM(Count) AS Total_Count
FROM baby_names_prepared
WHERE Year_Of_Birth = 2021
GROUP BY First_Name
ORDER BY Total_Count DESC
LIMIT 10;
步骤 3:启动管道更新
若要启动管道更新,请单击 笔记本 UI 右上角的“开始 ”按钮。
示例笔记本
以下笔记本包含本文中提供的相同代码示例。 这些笔记本的要求与本文中的步骤相同。 请参阅 要求。
若要导入笔记本,请完成以下步骤:
打开笔记本 UI。
- 单击“+ 新建>笔记本”。
- 此时会打开一个空笔记本。
单击文件>导入…。 此时会出现“导入”对话框。
选择“从导入”的URL选项。
粘贴笔记本的 URL。
单击导入。
本教程要求在配置和运行 DLT 管道之前运行数据设置笔记本。 导入以下笔记本,将笔记本附加到计算资源,填写所需的变量my_catalog
、my_schema
和my_volume
,然后点击“全部运行”。
管道数据下载教程
以下笔记本在 Python 或 SQL 中提供了示例。 导入笔记本时,该笔记本将保存到用户主目录。
导入以下笔记本之一后,完成创建管道的步骤,但使用 源代码 文件选取器选择下载的笔记本。 使用配置为源代码的笔记本创建管道后,单击管道 UI 中的“开始”以触发更新。