教程:使用 Lakeflow 管道编辑器创建第一个管道

了解如何使用 Lakeflow Spark 声明性管道(SDP)为数据业务流程和自动加载程序创建新管道。 本教程通过清理数据和创建查询,以扩展示例管道,从而找出前 100 个用户。

本教程介绍如何使用 Lakeflow 管道编辑器:

  • 使用默认文件夹结构创建新管道,并从一组示例文件开始。
  • 使用预期定义数据质量约束。
  • 使用编辑器功能通过新的转换来扩展管道,以对数据执行分析。

要求

在开始本教程之前,必须:

  • 登录到Azure Databricks工作区。
  • 为工作区启用 Unity Catalog。
  • 有权创建计算资源或访问计算资源。
  • 有权在目录中创建新架构。 所需的权限为 ALL PRIVILEGESUSE CATALOGCREATE SCHEMA

步骤 1:创建管道

在此步骤中,将使用默认文件夹结构和代码示例创建管道。 代码示例引用 users 示例数据源中的 wanderbricks 表。

  1. 在Azure Databricks工作区中,单击Plus icon.New,然后Pipeline icon.ETL pipeline。 这将打开包含默认管道名称的管道编辑器,如下所示 New Pipeline <date> <time>

  2. (可选)选择该名称并输入管道的描述性名称。

  3. (可选)在名称右侧,单击目录和架构以设置不同的默认值。

  4. (可选)在为你创建的 my_transformation 源文件中,从语言下拉列表中选择 PythonSQL 以设置文件的语言。

  5. 单击 “代码”图标。使用示例代码

    所选语言中的示例代码显示在 my_transformation 文件夹中的 transformations 源文件中。 尚未创建输出数据集,屏幕右侧的 管道图 为空。

  6. 若要运行管道代码(文件夹中的代码 transformations ),请单击屏幕右上角的 “运行管道 ”。

    运行完成后,工作区底部会显示新创建的两个表:sample_users_<date_time>sample_aggregation_<date_time>。 工作区右侧的 管道图 现在会显示这两个表,其中 sample_userssample_aggregation 的源表。 记下完整 sample_users_<date_time> 表名 , 在下一步中引用它。

步骤 2:应用数据质量检查

在此步骤中,向sample_users表添加数据质量检查。 使用 管道预期 来约束数据。 在这种情况下,将删除没有有效电子邮件地址的任何用户记录,并将清理的表输出为 users_cleaned

  1. 在左侧的管道资产浏览器中,单击加号图标。,然后选择转换

  2. 在“ 创建新转换文件 ”对话框中,进行以下选择:

    • 请选择 PythonSQL 作为 Language。 这不必与上一个选择匹配。
    • 为文件命名。 在这种情况下,请选择 users_cleaned
    • 对于 目标路径,保留默认值。
    • 对于 数据集类型,请将其保留为 未选择 或选择 具体化视图。 如果选择 具体化视图,它将为你生成示例代码。
  3. 单击“ 创建 ”以创建转换代码文件。

  4. 在新代码文件中,编辑代码以匹配以下内容(根据上一屏幕上的选择使用 SQL 或Python)。 将 sample_users_<date_time> 替换为上一部分中 sample_users 表的全名。

    SQL

    -- Drop all rows that do not have an email address
    
    CREATE MATERIALIZED VIEW users_cleaned
    (
      CONSTRAINT non_null_email EXPECT (email IS NOT NULL) ON VIOLATION DROP ROW
    ) AS
    SELECT *
    FROM sample_users_<date_time>;
    

    Python

    from pyspark import pipelines as dp
    
    # Drop all rows that do not have an email address
    
    @dp.materialized_view
    @dp.expect_or_drop("no null emails", "email IS NOT NULL")
    def users_cleaned():
        return (
            spark.read.table("sample_users_<date_time>")
        )
    
  5. 单击“ 运行管道 ”以更新管道。 它现在应该有三个表格。

步骤 3:分析排名靠前的用户

接下来,按已创建的预订数获取前 100 名用户。 将 wanderbricks.bookings 表联接到 users_cleaned 物化视图。

  1. 在左侧的管道资产浏览器中,单击加号图标,然后选择转换

  2. 在“ 创建新转换文件 ”对话框中,进行以下选择:

    • 请选择 PythonSQL 作为 Language。 这不必与以前的选择匹配。
    • 为文件命名。 在这种情况下,请选择 users_and_bookings
    • 对于 目标路径,保留默认值。
    • 对于数据集类型,请保持未选择
  3. 单击“ 创建 ”以创建转换代码文件。

  4. 在新代码文件中,编辑代码以匹配以下内容(根据上一屏幕上的选择使用 SQL 或Python)。

    SQL

    -- Get the top 100 users by number of bookings
    
    CREATE OR REFRESH MATERIALIZED VIEW users_and_bookings AS
    SELECT u.name AS name, COUNT(b.booking_id) AS booking_count
    FROM users_cleaned u
    JOIN samples.wanderbricks.bookings b ON u.user_id = b.user_id
    GROUP BY u.name
    ORDER BY booking_count DESC
    LIMIT 100;
    

    Python

    from pyspark import pipelines as dp
    from pyspark.sql.functions import col, count, desc
    
    # Get the top 100 users by number of bookings
    
    @dp.materialized_view
    def users_and_bookings():
        return (
            spark.read.table("users_cleaned")
            .join(spark.read.table("samples.wanderbricks.bookings"), "user_id")
            .groupBy(col("name"))
            .agg(count("booking_id").alias("booking_count"))
            .orderBy(desc("booking_count"))
            .limit(100)
        )
    
  5. 单击“ 运行管道 ”更新数据集。 运行完成后,可以在 Pipeline Graph 中看到有四个表,包括新 users_and_bookings 表。

    流水线图显示流水线中的四个表

后续步骤

现在,你已了解如何使用 Lakeflow 管道编辑器的一些功能并创建了管道,下面提供了一些其他功能来了解有关以下内容的详细信息: