教程:运行第一个 Delta Live Tables 管道

本教程将引导你配置第一个增量实时表管道、编写基本 ETL 代码和运行管道更新。

本教程中的所有步骤专为已启用 Unity Catalog 的工作区而设计。 你也可以将增量实时表管道配置为与旧 Hive 元存储配合使用。 请参阅将增量实时表管道与旧 Hive 元存储配合使用

注意

本教程提供有关使用 Databricks 笔记本开发和验证新管道代码的说明。 你也可以在 Python 或 SQL 文件中使用源代码配置管道。

如果已有使用增量实时表语法编写的源代码,则可以配置管道以运行代码。 请参阅配置增量实时表管道

可以使用 Databricks SQL 中的完全声明性 SQL 语法,将具体化视图和流式处理表的刷新计划注册和设置为 Unity Catalog 托管对象。

示例:引入和处理纽约婴儿姓名数据

本文中的示例使用一个公开的数据集,其中包含纽约市婴儿姓名的记录。 此示例演示如何使用增量实时表管道执行以下操作:

  • 将卷中的原始 CSV 数据读取到表中。
  • 从引入表中读取记录,并使用增量实时表期望创建一个包含已清理数据的新表。
  • 使用清理后的记录作为创建派生数据集的 Delta Live Tables 查询的输入。

此代码演示了一个简化的奖牌体系结构示例。 请参阅什么是奖牌湖屋体系结构?

此示例的实现是针对 Python 和 SQL 提供的。 按照步骤创建新的管道和笔记本,然后复制提供的代码并粘贴。

还提供包含完整代码的示例笔记本

要求

  • 若要启动管道,必须具有群集创建权限或可以访问定义增量实时表群集的群集策略。 增量实时表运行时在运行管道之前创建群集,如果没有正确的权限,则会创建失败。

  • 默认情况下,所有用户均可使用无服务器管道触发更新。 必须在帐户级别启用无服务器,无服务器在工作区区域中可能不可用。

  • 本教程中的示例使用 Unity Catalog。 Databricks 建议创建新的架构以运行本教程,因为目标架构中创建多个数据库对象。

    • 要在目录中创建新架构,必须具有 ALL PRIVILEGESUSE CATALOGCREATE SCHEMA 特权。
    • 若无法创建新架构,请针对现有架构运行本教程。 必须具有以下特权:
      • 父目录的 USE CATALOG
      • 目标架构上的 ALL PRIVILEGESUSE SCHEMACREATE MATERIALIZED VIEW 以及 CREATE TABLE 特权。
    • 本教程使用卷以存储示例数据。 Databricks 建议为本教程创建新的卷。 若要为本教程创建新的架构,则可以在该架构中创建新的卷。
      • 要在现有架构中创建新的卷,必须具有以下特权:
        • 父目录的 USE CATALOG
        • 目标架构上的 ALL PRIVILEGESUSE SCHEMACREATE VOLUME 特权。
      • 可选择使用现有卷。 必须具有以下特权:
        • 父目录的 USE CATALOG
        • 父架构的 USE SCHEMA
        • 目标卷上的 ALL PRIVILEGESREAD VOLUMEWRITE VOLUME

    要设置这些权限,请联系 Databricks 管理员。 有关 Unity Catalog 特权的更多信息,请参阅 Unity Catalog 特权和安全对象

步骤 0:下载数据

此示例从 Unity Catalog 卷加载数据。 以下代码下载 CSV 文件并将其存储在指定卷中。 打开新笔记本并运行以下代码,将此数据下载到指定卷中:

my_catalog = "<catalog-name>"
my_schema = "<schema-name>"
my_volume = "<volume-name>"

spark.sql(f"CREATE SCHEMA IF NOT EXISTS {my_catalog}.{my_schema}")
spark.sql(f"CREATE VOLUME IF NOT EXISTS {my_catalog}.{my_schema}.{my_volume}")

volume_path = f"/Volumes/{my_catalog}/{my_schema}/{my_volume}/"
download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
filename = "babynames.csv"

dbutils.fs.cp(download_url, volume_path + filename)

<catalog-name><schema-name><volume-name> 替换为 Unity 目录卷的目录、架构和卷名称。 如果这些对象不存在,则提供的代码将尝试创建指定的架构和卷。 必须具有相应的特权才能在 Unity Catalog 中创建和写入对象。 请参阅 要求

注意

在继续本教程之前,请确保此笔记本已成功运行。 请勿将此笔记本配置为管道的一部分。

步骤 1:创建管道

Delta Live Tables 通过使用 Delta Live Tables 语法解析笔记本或文件(称为源代码)中定义的依赖项来创建管道。 每个源代码文件仅能包含一种语言,但可以在管道中添加多种语言的笔记本或文件。

重要

请勿在源代码字段中配置任何资产。 将该字段设为黑色,可创建和配置用于源代码创作的笔记本。

本教程中的说明使用无服务器计算和 Unity Catalog。 使用这些说明中未提及的所有配置选项的默认设置。

注意

如果工作区中未启用或不支持无服务器,则可以使用默认计算设置完成本教程。 必须在“创建管道”UI 的“目标”部分的“存储选项”下手动选择“Unity Catalog”。

要配置新的管道,请进行以下操作:

  1. 单击边栏中的“增量实时表”
  2. 单击“创建管道”
  3. 提供唯一的管道名称
  4. 勾选无服务器旁边的框。
  5. 选择要发布数据的目录
  6. 选择目录中的架构
    • 指定新的架构名称,以创建架构。
  7. 使用“高级”下的“添加配置”按钮定义三个管道参数,以添加三个配置。 使用以下参数名称指定下载数据的目录、架构和卷:
    • my_catalog
    • my_schema
    • my_volume
  8. 单击“创建”

会显示新创建的管道的管道 UI。 会自动为管道创建和配置源代码笔记本。

笔记本会在用户目录的新目录中创建。 新目录和文件的名称与管道的名称相匹配。 例如,/Users/your.username@databricks.com/my_pipeline/my_pipeline

访问此笔记本的链接位于管道详细信息面板中的源代码字段下。 单击链接以打开笔记本,然后继续执行下一步。

步骤 2:使用 Python 或 SQL 在笔记本中声明具体化视图和流式处理表

可以使用 Datbricks 笔记本以交互方式开发和验证增量实时表管道的源代码。 必须将笔记本附加到管道才能使用此功能。 将新创建的笔记本附加到刚创建的管道:

  1. 单击右上角的“连接”,以打开计算配置菜单。
  2. 将鼠标悬停在步骤 1 中创建的管道的名称上。
  3. 单击“连接”。

UI 将有所更改,右上角将出现“验证”和“开始”按钮。 要详细了解笔记本对管道代码开发的支持,请参阅在笔记本中开发和调试增量实时表管道

重要

  • 规划期间,增量实时表管道会评估笔记本中的所有单元格。 与针对通用计算运行或计划为作业运行的笔记本不同,管道不能保证单元按指定顺序运行。
  • 笔记本仅包含单个编程语言。 请勿混合管道源代码笔记本中的 Python 和 SQL 代码。

有关使用 Python 或 SQL 开发代码的详细信息,请参阅使用 Python 开发管道代码使用 SQL 开发管道代码

示例管道代码

要在本教程中实现该示例,请复制以下代码并粘贴到配置为管道源代码的笔记本中的单元格中。

提供的代码执行以下操作:

  • 导入必要模块(仅限 Python)。
  • 引用管道配置期间所定义的参数。
  • 定义名为 baby_names_raw 的流式处理表,该表从卷引入。
  • 定义名为 baby_names_prepared 的具体化视图,该视图验证引入数据。
  • 定义名为 top_baby_names_2021 的具体化视图,该视图具有高度优化的数据视图。

Python

# Import modules

import dlt
from pyspark.sql.functions import *

# Assign pipeline parameters to variables

my_catalog = spark.conf.get("my_catalog")
my_schema = spark.conf.get("my_schema")
my_volume = spark.conf.get("my_volume")

# Define the path to source data

volume_path = f"/Volumes/{my_catalog}/{my_schema}/{my_volume}/"

# Define a streaming table to ingest data from a volume

@dlt.table(
  comment="Popular baby first names in New York. This data was ingested from the New York State Department of Health."
)
def baby_names_raw():
  df = (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .option("inferSchema", True)
    .option("header", True)
    .load(volume_path)
  )
  df_renamed_column = df.withColumnRenamed("First Name", "First_Name")
  return df_renamed_column

# Define a materialized view that validates data and renames a column

@dlt.table(
  comment="New York popular baby first name data cleaned and prepared for analysis."
)
@dlt.expect("valid_first_name", "First_Name IS NOT NULL")
@dlt.expect_or_fail("valid_count", "Count > 0")
def baby_names_prepared():
  return (
    spark.read.table("LIVE.baby_names_raw")
      .withColumnRenamed("Year", "Year_Of_Birth")
      .select("Year_Of_Birth", "First_Name", "Count")
  )

# Define a materialized view that has a filtered, aggregated, and sorted view of the data

@dlt.table(
  comment="A table summarizing counts of the top baby names for New York for 2021."
)
def top_baby_names_2021():
  return (
    spark.read.table("LIVE.baby_names_prepared")
      .filter(expr("Year_Of_Birth == 2021"))
      .groupBy("First_Name")
      .agg(sum("Count").alias("Total_Count"))
      .sort(desc("Total_Count"))
      .limit(10)
  )

SQL

-- Define a streaming table to ingest data from a volume

CREATE OR REFRESH STREAMING TABLE baby_names_raw
COMMENT "Popular baby first names in New York. This data was ingested from the New York State Department of Health."
AS SELECT Year, `First Name` AS First_Name, County, Sex, Count
FROM STREAM(read_files(
  '/Volumes/${my_catalog}/${my_schema}/${my_volume}/babynames.csv',
  format => 'csv',
  header => true,
  mode => 'FAILFAST'));

-- Define a materialized view that validates data and renames a column

CREATE OR REFRESH MATERIALIZED VIEW baby_names_prepared(
  CONSTRAINT valid_first_name EXPECT (First_Name IS NOT NULL),
  CONSTRAINT valid_count EXPECT (Count > 0) ON VIOLATION FAIL UPDATE
)
COMMENT "New York popular baby first name data cleaned and prepared for analysis."
AS SELECT
  Year AS Year_Of_Birth,
  First_Name,
  Count
FROM LIVE.baby_names_raw;

-- Define a materialized view that provides a filtered, aggregated, and sorted view of the data

CREATE OR REFRESH MATERIALIZED VIEW top_baby_names_2021
COMMENT "A table summarizing counts of the top baby names for New York for 2021."
AS SELECT
  First_Name,
  SUM(Count) AS Total_Count
FROM LIVE.baby_names_prepared
WHERE Year_Of_Birth = 2021
GROUP BY First_Name
ORDER BY Total_Count DESC
LIMIT 10;

步骤 3:启动管道更新

要开始管道更新,请单击笔记本 UI 右上角的“开始”按钮

示例笔记本

以下笔记本包含本文中所提供的相同代码示例。 这些笔记本的要求与本文中的步骤相同。 请参阅 要求

要导入笔记本,请完成以下步骤:

  1. 打开笔记本 UI。
    • 单击“+ 新建”>“笔记本”。
    • 会打开空的笔记本。
  2. 单击“文件”>“导入”。 此时会出现“导入”对话框。
  3. 选择“从中导入”的“URL”选项。
  4. 粘贴笔记本的 URL。
  5. 单击“导入”

本教程要求在配置和运行增量实时表管道之前运行数据设置笔记本。 导入以下笔记本,将笔记本附加到计算资源,填写 my_catalogmy_schemamy_volume 所需的变量,然后单击“全部运行”

管道教程的数据下载

获取笔记本

以下笔记本提供 Python 或 SQL 中的示例。 导入笔记本时,该笔记本将保存到用户主目录。

导入以下笔记本之一后,完成创建管道的步骤,但使用源代码文件选取器选择下载的笔记本。 使用配置为源代码的笔记本创建管道后,单击管道 UI 中的“开始”以触发更新

增量实时表 Python 笔记本入门

获取笔记本

增量实时表 SQL 笔记本入门

获取笔记本