教程:使用 DLT 生成 ETL 管道

了解如何使用 DLT 和 Auto Loader 创建和部署 ETL(提取、转换和加载)数据编排管道。 ETL 管道实现从源系统读取数据、根据要求转换数据(如数据质量检查和记录重复数据)以及将数据写入目标系统(如数据仓库或数据湖)的步骤。

在本教程中,你将使用 DLT 和自动加载程序来:

  • 将原始源数据引入目标表。
  • 转换原始源数据,并将转换后的数据写入两个目标具体化视图。
  • 查询转换后的数据。
  • 使用 Databricks 作业自动执行 ETL 管道。

有关 DLT 和自动加载程序的详细信息,请参阅什么是 DLT?什么是自动加载程序?

要求

若要完成此教程,必须满足以下要求:

关于数据集

此示例中使用的数据集是 Million Song Dataset 的子集,该数据集是当代音乐曲目的特征和元数据集合。 此数据集在 Azure Databricks 工作区中包含的示例数据集中可用。

步骤 1:创建管道

首先,将在 DLT 中创建 ETL 管道。 DLT 使用 DLT 语法解析笔记本或文件中定义的依赖项(称为 源代码)来创建管道。 每个源代码文件只能包含一种语言,但可以在管道中添加多种语言的笔记本或文件。 若要了解详细信息,请参阅 什么是 DLT?

重要

源代码 字段留空,以便自动创建和配置用于源代码创作的笔记本。

本教程使用 SQL 计算和 Unity 目录。 对于未指定的所有配置选项,可以使用默认计算设置完成本教程。 如果使用默认计算设置,则必须在创建管道 UI 的目标部分,手动在存储选项下选择Unity Catalog

若要在 DLT 中创建新的 ETL 管道,请执行以下步骤:

  1. 在边栏中,单击“ 管道”。

  2. 单击“创建管道”“ETL 管道”

  3. 管道名称中,键入唯一的管道名称。

  4. 目标中,若要配置在其中发布表的 Unity 目录位置,请选择现有 目录 并在 架构 中编写一个新名称,以在目录中创建新架构。

  5. 单击“创建”。

  6. 单击“管道详细信息”面板中“源代码”字段下的源代码笔记本链接。

此时会显示新管道的管道 UI。

步骤 2:开发 DLT 管道

重要

笔记本只能包含单个编程语言。 不要在管道源代码笔记本中混合 Python 和 SQL 代码。

在此步骤中,将使用 Databricks 笔记本以交互方式开发和验证 DLT 管道的源代码。

该代码使用自动加载程序进行增量数据引入。 自动加载程序会在新文件到达云对象存储时自动对其进行检测和处理。 若要了解详细信息,请参阅 什么是自动加载程序?

将为管道自动创建和配置空白源代码笔记本。 笔记本在用户目录下的新文件夹中创建。 新目录和文件的名称与管道的名称匹配。 例如,/Users/someone@example.com/my_pipeline/my_pipeline

开发 DLT 管道时,可以选择 Python 或 SQL。 这两种语言都包含示例。 根据语言选择,确保选择默认笔记本语言。 若要了解有关对 DLT 管道代码开发的笔记本支持的详细信息,请参阅 在笔记本中开发和调试 DLT 管道

  1. 访问此笔记本的链接位于“管道详细信息”面板中的“源代码”字段下。 单击链接以打开笔记本,然后继续执行下一步。

  2. 单击右上角的“连接以打开计算配置菜单。

  3. 将鼠标悬停在步骤 1 中创建的管道的名称上。

  4. 单击连接

  5. 在顶部笔记本的标题旁边,选择笔记本的默认语言(Python 或 SQL)。

  6. 将以下代码复制并粘贴到笔记本中的单元格中。

    Python语言

    # Import modules
    import dlt
    from pyspark.sql.functions import *
    from pyspark.sql.types import DoubleType, IntegerType, StringType, StructType, StructField
    
    # Define the path to source data
    file_path = f"/databricks-datasets/songs/data-001/"
    
    # Define a streaming table to ingest data from a volume
    schema = StructType(
      [
        StructField("artist_id", StringType(), True),
        StructField("artist_lat", DoubleType(), True),
        StructField("artist_long", DoubleType(), True),
        StructField("artist_location", StringType(), True),
        StructField("artist_name", StringType(), True),
        StructField("duration", DoubleType(), True),
        StructField("end_of_fade_in", DoubleType(), True),
        StructField("key", IntegerType(), True),
        StructField("key_confidence", DoubleType(), True),
        StructField("loudness", DoubleType(), True),
        StructField("release", StringType(), True),
        StructField("song_hotnes", DoubleType(), True),
        StructField("song_id", StringType(), True),
        StructField("start_of_fade_out", DoubleType(), True),
        StructField("tempo", DoubleType(), True),
        StructField("time_signature", DoubleType(), True),
        StructField("time_signature_confidence", DoubleType(), True),
        StructField("title", StringType(), True),
        StructField("year", IntegerType(), True),
        StructField("partial_sequence", IntegerType(), True)
      ]
    )
    
    @dlt.table(
      comment="Raw data from a subset of the Million Song Dataset; a collection of features and metadata for contemporary music tracks."
    )
    def songs_raw():
      return (spark.readStream
        .format("cloudFiles")
        .schema(schema)
        .option("cloudFiles.format", "csv")
        .option("sep","\t")
        .option("inferSchema", True)
        .load(file_path))
    
    # Define a materialized view that validates data and renames a column
    @dlt.table(
      comment="Million Song Dataset with data cleaned and prepared for analysis."
    )
    @dlt.expect("valid_artist_name", "artist_name IS NOT NULL")
    @dlt.expect("valid_title", "song_title IS NOT NULL")
    @dlt.expect("valid_duration", "duration > 0")
    def songs_prepared():
      return (
        spark.read.table("songs_raw")
          .withColumnRenamed("title", "song_title")
          .select("artist_id", "artist_name", "duration", "release", "tempo", "time_signature", "song_title", "year")
      )
    
    # Define a materialized view that has a filtered, aggregated, and sorted view of the data
    @dlt.table(
      comment="A table summarizing counts of songs released by the artists each year who released most songs."
    )
    def top_artists_by_year():
      return (
        spark.read.table("songs_prepared")
          .filter(expr("year > 0"))
          .groupBy("artist_name", "year")
          .count().withColumnRenamed("count", "total_number_of_songs")
          .sort(desc("total_number_of_songs"), desc("year"))
      )
    

    SQL

    -- Define a streaming table to ingest data from a volume
    CREATE OR REFRESH STREAMING TABLE songs_raw
    (
     artist_id STRING,
     artist_lat DOUBLE,
     artist_long DOUBLE,
     artist_location STRING,
     artist_name STRING,
     duration DOUBLE,
     end_of_fade_in DOUBLE,
     key INT,
     key_confidence DOUBLE,
     loudness DOUBLE,
     release STRING,
     song_hotnes DOUBLE,
     song_id STRING,
     start_of_fade_out DOUBLE,
     tempo DOUBLE,
     time_signature INT,
     time_signature_confidence DOUBLE,
     title STRING,
     year INT,
     partial_sequence STRING,
     value STRING
    )
    COMMENT "Raw data from a subset of the Million Song Dataset; a collection of features and metadata for contemporary music tracks."
    AS SELECT *
    FROM STREAM read_files(
    '/databricks-datasets/songs/data-001/');
    
    -- Define a materialized view that validates data and renames a column
    CREATE OR REFRESH MATERIALIZED VIEW songs_prepared(
    CONSTRAINT valid_artist_name EXPECT (artist_name IS NOT NULL),
    CONSTRAINT valid_title EXPECT (song_title IS NOT NULL),
    CONSTRAINT valid_duration EXPECT (duration > 0)
    )
    COMMENT "Million Song Dataset with data cleaned and prepared for analysis."
    AS SELECT artist_id, artist_name, duration, release, tempo, time_signature, title AS song_title, year
    FROM songs_raw;
    
    -- Define a materialized view that has a filtered, aggregated, and sorted view of the data
    CREATE OR REFRESH MATERIALIZED VIEW top_artists_by_year
    COMMENT "A table summarizing counts of songs released by the artists each year who released most songs."
    AS SELECT
     artist_name,
     year,
     COUNT(*) AS total_number_of_songs
    FROM songs_prepared
    WHERE year > 0
    GROUP BY artist_name, year
    ORDER BY total_number_of_songs DESC, year DESC
    

步骤 3:查询转换后的数据

在此步骤中,你将查询 ETL 管道中处理的数据以分析歌曲数据。 这些查询使用在上一步中创建的准备好的记录。

首先,运行一个查询,查找从1990年开始每年发行最多的歌曲的艺术家。

  1. 在边栏中,单击 “SQL 编辑器”图标SQL 编辑器

  2. 单击 “添加”或“加号”图标 新建选项卡图标,然后从菜单中选择“ 创建新查询 ”。

  3. 输入以下内容:

    -- Which artists released the most songs each year in 1990 or later?
    SELECT artist_name, total_number_of_songs, year
    FROM <catalog>.<schema>.top_artists_by_year
    WHERE year >= 1990
    ORDER BY total_number_of_songs DESC, year DESC
    

    <catalog><schema> 替换为表所在的目录和架构的名称。 例如,data_pipelines.songs_data.top_artists_by_year

  4. 单击运行已选项目

现在,运行另一个查询,找到具备4/4节拍和适合跳舞的节奏的歌曲。

  1. 单击 “添加”或“加号”图标 新建点击图标,然后从菜单中选择“ 创建新查询 ”。

  2. 输入以下代码:

     -- Find songs with a 4/4 beat and danceable tempo
     SELECT artist_name, song_title, tempo
     FROM <catalog>.<schema>.songs_prepared
     WHERE time_signature = 4 AND tempo between 100 and 140;
    

    <catalog><schema> 替换为表所在的目录和架构的名称。 例如,data_pipelines.songs_data.songs_prepared

  3. 单击运行已选项目

步骤 4:创建作业以运行 DLT 管道

接下来,创建一个工作流,以使用 Databricks 作业自动运行数据引入、处理和分析步骤。

  1. 在工作区中,单击边栏中的 “工作流”图标工作流 ,然后单击“ 创建作业”。

  2. 在任务标题框中,将 “新建作业 <”日期和时间> 替换为作业名称。 例如,Songs workflow

  3. 在“任务名称”中输入首个任务的名称,例如

  4. “类型”中,选择 “管道”。

  5. 管道中,选择在步骤 1 中创建的 DLT 管道。

  6. 单击“创建”。

  7. 若要运行工作流,请单击“ 立即运行”。 若要查看运行的详细信息,请单击“ 运行 ”选项卡。单击该任务可查看任务运行的详细信息。

  8. 若要查看工作流完成后的结果,请单击“ 转到最新成功运行 ”或作业运行的 开始时间 。 此时会出现“输出”页,其中显示了查询结果。

有关作业运行的详细信息,请参阅 Databricks 作业的监视和可观测性

步骤 5:计划 DLT 管道作业

若要按计划运行 ETL 管道,请执行以下步骤:

  1. 单击边栏中的 工作流图标工作流”。

  2. 在“名称”列中单击作业名称。 边侧面板显示[任务详细信息]。

  3. 在“计划和触发器”面板中单击“添加触发器”,然后在触发器类型中选择“计划”。

  4. 指定时间段、开始时间和时区。

  5. 单击“ 保存”。

了解更多