本教程将引导你配置第一个增量实时表管道、编写基本 ETL 代码和运行管道更新。
本教程中的所有步骤专为已启用 Unity Catalog 的工作区而设计。 你也可以将增量实时表管道配置为与旧 Hive 元存储配合使用。 请参阅将增量实时表管道与旧 Hive 元存储配合使用。
注意
本教程提供有关使用 Databricks 笔记本开发和验证新管道代码的说明。 你也可以在 Python 或 SQL 文件中使用源代码配置管道。
如果已有使用增量实时表语法编写的源代码,则可以配置管道以运行代码。 请参阅配置增量实时表管道。
可以使用 Databricks SQL 中的完全声明性 SQL 语法,将具体化视图和流式处理表的刷新计划注册和设置为 Unity Catalog 托管对象。
本文中的示例使用一个公开的数据集,其中包含纽约市婴儿姓名的记录。 此示例演示如何使用增量实时表管道执行以下操作:
- 将卷中的原始 CSV 数据读取到表中。
- 从引入表中读取记录,并使用增量实时表期望创建一个包含已清理数据的新表。
- 使用清理后的记录作为创建派生数据集的 Delta Live Tables 查询的输入。
此代码演示了一个简化的奖牌体系结构示例。 请参阅什么是奖牌湖屋体系结构?。
此示例的实现是针对 Python 和 SQL 提供的。 按照步骤创建新的管道和笔记本,然后复制提供的代码并粘贴。
还提供包含完整代码的示例笔记本。
若要启动管道,必须具有群集创建权限或可以访问定义增量实时表群集的群集策略。 增量实时表运行时在运行管道之前创建群集,如果没有正确的权限,则会创建失败。
默认情况下,所有用户均可使用无服务器管道触发更新。 必须在帐户级别启用无服务器,无服务器在工作区区域中可能不可用。
本教程中的示例使用 Unity Catalog。 Databricks 建议创建新的架构以运行本教程,因为目标架构中创建多个数据库对象。
- 要在目录中创建新架构,必须具有
ALL PRIVILEGES
、USE CATALOG
或CREATE SCHEMA
特权。 - 若无法创建新架构,请针对现有架构运行本教程。 必须具有以下特权:
- 父目录的
USE CATALOG
。 - 目标架构上的
ALL PRIVILEGES
或USE SCHEMA
、CREATE MATERIALIZED VIEW
以及CREATE TABLE
特权。
- 父目录的
- 本教程使用卷以存储示例数据。 Databricks 建议为本教程创建新的卷。 若要为本教程创建新的架构,则可以在该架构中创建新的卷。
- 要在现有架构中创建新的卷,必须具有以下特权:
- 父目录的
USE CATALOG
。 - 目标架构上的
ALL PRIVILEGES
或USE SCHEMA
、CREATE VOLUME
特权。
- 父目录的
- 可选择使用现有卷。 必须具有以下特权:
- 父目录的
USE CATALOG
。 - 父架构的
USE SCHEMA
。 - 目标卷上的
ALL PRIVILEGES
或READ VOLUME
、WRITE VOLUME
。
- 父目录的
- 要在现有架构中创建新的卷,必须具有以下特权:
要设置这些权限,请联系 Databricks 管理员。 有关 Unity Catalog 特权的更多信息,请参阅 Unity Catalog 特权和安全对象。
- 要在目录中创建新架构,必须具有
此示例从 Unity Catalog 卷加载数据。 以下代码下载 CSV 文件并将其存储在指定卷中。 打开新笔记本并运行以下代码,将此数据下载到指定卷中:
my_catalog = "<catalog-name>"
my_schema = "<schema-name>"
my_volume = "<volume-name>"
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {my_catalog}.{my_schema}")
spark.sql(f"CREATE VOLUME IF NOT EXISTS {my_catalog}.{my_schema}.{my_volume}")
volume_path = f"/Volumes/{my_catalog}/{my_schema}/{my_volume}/"
download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
filename = "babynames.csv"
dbutils.fs.cp(download_url, volume_path + filename)
将 <catalog-name>
、<schema-name>
和 <volume-name>
替换为 Unity 目录卷的目录、架构和卷名称。 如果这些对象不存在,则提供的代码将尝试创建指定的架构和卷。 必须具有相应的特权才能在 Unity Catalog 中创建和写入对象。 请参阅 要求。
注意
在继续本教程之前,请确保此笔记本已成功运行。 请勿将此笔记本配置为管道的一部分。
Delta Live Tables 通过使用 Delta Live Tables 语法解析笔记本或文件(称为源代码)中定义的依赖项来创建管道。 每个源代码文件仅能包含一种语言,但可以在管道中添加多种语言的笔记本或文件。
重要
请勿在源代码字段中配置任何资产。 将该字段设为黑色,可创建和配置用于源代码创作的笔记本。
本教程中的说明使用无服务器计算和 Unity Catalog。 使用这些说明中未提及的所有配置选项的默认设置。
注意
如果工作区中未启用或不支持无服务器,则可以使用默认计算设置完成本教程。 必须在“创建管道”UI 的“目标”部分的“存储选项”下手动选择“Unity Catalog”。
要配置新的管道,请进行以下操作:
- 单击边栏中的“增量实时表”。
- 单击“创建管道”。
- 提供唯一的管道名称。
- 勾选无服务器旁边的框。
- 选择要发布数据的目录。
- 选择目录中的架构。
- 指定新的架构名称,以创建架构。
- 使用“高级”下的“添加配置”按钮定义三个管道参数,以添加三个配置。 使用以下参数名称指定下载数据的目录、架构和卷:
my_catalog
my_schema
my_volume
- 单击“创建”。
会显示新创建的管道的管道 UI。 会自动为管道创建和配置源代码笔记本。
笔记本会在用户目录的新目录中创建。 新目录和文件的名称与管道的名称相匹配。 例如,/Users/your.username@databricks.com/my_pipeline/my_pipeline
。
访问此笔记本的链接位于管道详细信息面板中的源代码字段下。 单击链接以打开笔记本,然后继续执行下一步。
可以使用 Datbricks 笔记本以交互方式开发和验证增量实时表管道的源代码。 必须将笔记本附加到管道才能使用此功能。 将新创建的笔记本附加到刚创建的管道:
- 单击右上角的“连接”,以打开计算配置菜单。
- 将鼠标悬停在步骤 1 中创建的管道的名称上。
- 单击“连接”。
UI 将有所更改,右上角将出现“验证”和“开始”按钮。 要详细了解笔记本对管道代码开发的支持,请参阅在笔记本中开发和调试增量实时表管道。
重要
- 规划期间,增量实时表管道会评估笔记本中的所有单元格。 与针对通用计算运行或计划为作业运行的笔记本不同,管道不能保证单元按指定顺序运行。
- 笔记本仅包含单个编程语言。 请勿混合管道源代码笔记本中的 Python 和 SQL 代码。
有关使用 Python 或 SQL 开发代码的详细信息,请参阅使用 Python 开发管道代码或使用 SQL 开发管道代码。
要在本教程中实现该示例,请复制以下代码并粘贴到配置为管道源代码的笔记本中的单元格中。
提供的代码执行以下操作:
- 导入必要模块(仅限 Python)。
- 引用管道配置期间所定义的参数。
- 定义名为
baby_names_raw
的流式处理表,该表从卷引入。 - 定义名为
baby_names_prepared
的具体化视图,该视图验证引入数据。 - 定义名为
top_baby_names_2021
的具体化视图,该视图具有高度优化的数据视图。
# Import modules
import dlt
from pyspark.sql.functions import *
# Assign pipeline parameters to variables
my_catalog = spark.conf.get("my_catalog")
my_schema = spark.conf.get("my_schema")
my_volume = spark.conf.get("my_volume")
# Define the path to source data
volume_path = f"/Volumes/{my_catalog}/{my_schema}/{my_volume}/"
# Define a streaming table to ingest data from a volume
@dlt.table(
comment="Popular baby first names in New York. This data was ingested from the New York State Department of Health."
)
def baby_names_raw():
df = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("inferSchema", True)
.option("header", True)
.load(volume_path)
)
df_renamed_column = df.withColumnRenamed("First Name", "First_Name")
return df_renamed_column
# Define a materialized view that validates data and renames a column
@dlt.table(
comment="New York popular baby first name data cleaned and prepared for analysis."
)
@dlt.expect("valid_first_name", "First_Name IS NOT NULL")
@dlt.expect_or_fail("valid_count", "Count > 0")
def baby_names_prepared():
return (
spark.read.table("LIVE.baby_names_raw")
.withColumnRenamed("Year", "Year_Of_Birth")
.select("Year_Of_Birth", "First_Name", "Count")
)
# Define a materialized view that has a filtered, aggregated, and sorted view of the data
@dlt.table(
comment="A table summarizing counts of the top baby names for New York for 2021."
)
def top_baby_names_2021():
return (
spark.read.table("LIVE.baby_names_prepared")
.filter(expr("Year_Of_Birth == 2021"))
.groupBy("First_Name")
.agg(sum("Count").alias("Total_Count"))
.sort(desc("Total_Count"))
.limit(10)
)
-- Define a streaming table to ingest data from a volume
CREATE OR REFRESH STREAMING TABLE baby_names_raw
COMMENT "Popular baby first names in New York. This data was ingested from the New York State Department of Health."
AS SELECT Year, `First Name` AS First_Name, County, Sex, Count
FROM STREAM(read_files(
'/Volumes/${my_catalog}/${my_schema}/${my_volume}/babynames.csv',
format => 'csv',
header => true,
mode => 'FAILFAST'));
-- Define a materialized view that validates data and renames a column
CREATE OR REFRESH MATERIALIZED VIEW baby_names_prepared(
CONSTRAINT valid_first_name EXPECT (First_Name IS NOT NULL),
CONSTRAINT valid_count EXPECT (Count > 0) ON VIOLATION FAIL UPDATE
)
COMMENT "New York popular baby first name data cleaned and prepared for analysis."
AS SELECT
Year AS Year_Of_Birth,
First_Name,
Count
FROM LIVE.baby_names_raw;
-- Define a materialized view that provides a filtered, aggregated, and sorted view of the data
CREATE OR REFRESH MATERIALIZED VIEW top_baby_names_2021
COMMENT "A table summarizing counts of the top baby names for New York for 2021."
AS SELECT
First_Name,
SUM(Count) AS Total_Count
FROM LIVE.baby_names_prepared
WHERE Year_Of_Birth = 2021
GROUP BY First_Name
ORDER BY Total_Count DESC
LIMIT 10;
要开始管道更新,请单击笔记本 UI 右上角的“开始”按钮。
以下笔记本包含本文中所提供的相同代码示例。 这些笔记本的要求与本文中的步骤相同。 请参阅 要求。
要导入笔记本,请完成以下步骤:
- 打开笔记本 UI。
- 单击“+ 新建”>“笔记本”。
- 会打开空的笔记本。
- 单击“文件”>“导入”。 此时会出现“导入”对话框。
- 选择“从中导入”的“URL”选项。
- 粘贴笔记本的 URL。
- 单击“导入”。
本教程要求在配置和运行增量实时表管道之前运行数据设置笔记本。 导入以下笔记本,将笔记本附加到计算资源,填写 my_catalog
、my_schema
和 my_volume
所需的变量,然后单击“全部运行”。
以下笔记本提供 Python 或 SQL 中的示例。 导入笔记本时,该笔记本将保存到用户主目录。
导入以下笔记本之一后,完成创建管道的步骤,但使用源代码文件选取器选择下载的笔记本。 使用配置为源代码的笔记本创建管道后,单击管道 UI 中的“开始”以触发更新。