从 Azure Data Lake Storage Gen2 载入数据

本文介绍如何将数据从 Azure Data Lake Storage Gen2 载入到新的 Azure Databricks 工作区。 你将了解如何安全地访问与 Unity 目录卷(推荐)或 Unity 目录外部位置相对应的云对象存储位置中的源数据。 然后,你将了解如何使用带有增量实时表的自动加载程序将数据增量引入 Unity 目录托管表。

开始之前

如果你不是管理员,本文假定管理员已提供以下内容:

  • 启用了 Unity Catalog 的 Azure Databricks 工作区的访问权限。 有关详细信息,请参阅设置和管理 Unity Catalog

  • 与包含源数据的云存储位置相对应的 Unity Catalog 外部卷或 Unity Catalog 外部位置的 READ FILES 权限。 有关详细信息,请参阅创建外部位置以将云存储连接到 Azure Databricks

  • 源数据的路径。

    卷路径示例:/Volumes/<catalog>/<schema>/<volume>/<path>/<folder>

    外部位置路径示例:abfss://<container>@<storage-account>.dfs.core.chinacloudapi.cn/<folder>

  • 对要将数据加载到的架构的 USE SCHEMACREATE TABLE 权限。

  • 群集创建权限或对定义增量实时表管道群集的群集策略cluster_type 字段设置为 dlt)的访问权限。

    如果源数据的路径是卷路径,群集必须运行 Databricks Runtime 13.2 或更高版本。

重要

如果对这些先决条件有疑问,请联系帐户管理员。

步骤 1:创建群集

若要创建群集,请执行以下操作:

  1. 登录到你的 Azure Databricks 工作区。
  2. 在边栏中,单击“新建”>“群集”
  3. 在群集 UI 中,指定群集的唯一名称。
  4. 如果源数据的路径是卷路径,则对于 Databricks Runtime 运行时版本,请选择 13.2 或更高版本
  5. 单击“创建群集”。

步骤 2:创建数据探索笔记本

本部分介绍如何创建数据探索笔记本,以便在创建数据管道之前了解数据。

  1. 在边栏中,单击“+ 新建”>“笔记本”

    该笔记本已自动附加到你上次使用的群集(在本例中,是在“步骤 1:创建群集”中创建的群集)

  2. 输入笔记本的名称。

  3. 单击语言按钮,然后从下拉菜单中选择 PythonSQL。 默认情况下选择 Python

  4. 若要确认对 ADLS Gen2 中源数据的数据访问,请将以下代码粘贴到笔记本单元格中,单击 运行菜单,然后单击“运行单元格”

    SQL

    LIST '<path-to-source-data>'
    

    Python

    %fs ls '<path-to-source-data>'
    

    <path-to-source-data> 替换为包含数据的目录的路径。

    此时将显示包含数据集的目录的内容。

  5. 若要查看记录示例,以便更好地了解每个记录的内容和格式,请将以下内容粘贴到笔记本单元格中,单击 运行菜单,然后单击“运行单元格”

    SQL

    SELECT * from read_files('<path-to-source-data>', format => '<file-format>') LIMIT 10
    

    Python

    spark.read.format('<file-format>').load('<path-to-source-data>').limit(10).display()
    

    请替换以下值:

    • <file-format>:支持的文件格式。 请参阅文件格式选项
    • <path to source data>:包含数据的目录中文件的路径。

    此时将显示指定文件中的前十条记录。

步骤 3:引入原始数据

若要引入原始数据,请执行以下操作:

  1. 在边栏中,单击“新建”>“笔记本”

    该笔记本已自动附加到你上次使用的群集(在本例中是本文前面创建的群集)。

  2. 输入笔记本的名称。

  3. 单击语言按钮,然后从下拉菜单中选择 PythonSQL。 默认情况下选择 Python

  4. 将以下代码粘贴到笔记本单元格中:

    SQL

    CREATE OR REFRESH STREAMING TABLE
      <table-name>
    AS SELECT
      *
    FROM
      STREAM read_files(
        '<path-to-source-data>',
        format => '<file-format>'
      )
    

    Python

    @dlt.table(table_properties={'quality': 'bronze'})
    def <table-name>():
      return (
         spark.readStream.format('cloudFiles')
         .option('cloudFiles.format', '<file-format>')
         .load(f'{<path-to-source-data>}')
     )
    

    请替换以下值:

    • <table-name>:将包含引入记录的表的名称。
    • <path-to-source-data>:源数据的路径。
    • <file-format>:支持的文件格式。 请参阅文件格式选项

注意

增量实时表不设计为在笔记本单元格中以交互方式运行。 运行笔记本中包含增量实时表语法的单元格会返回一条有关查询在语法上是否有效的消息,但不会运行查询逻辑。 以下步骤介绍如何从刚刚创建的引入笔记本创建管道。

步骤 4:创建并发布管道

若要创建管道并将其发布到 Unity Catalog,请执行以下操作:

  1. 在边栏中单击“工作流”,单击“增量实时表”选项卡,然后单击“创建管道”
  2. 为管道输入一个名称。
  3. 对于“管道模式”,选择“已触发”
  4. 对于“源代码”,选择包含管道源代码的笔记本
  5. 对于“目标”,选择“Unity Catalog”
  6. 若要确保表由 Unity Catalog 管理,并且有权访问父架构的任何用户都可以对其进行查询,请从下拉列表中选择“目录”和“目标架构”
  7. 如果没有群集创建权限,请从下拉列表中选择支持增量实时表的群集策略
  8. 对于“高级”,将“通道”设置为“预览版”
  9. 接受所有其他默认值,然后单击“创建”

步骤 5:计划管道

若要计划管道,请执行以下操作:

  1. 在边栏中,单击“增量实时表”
  2. 单击要计划的管道的名称。
  3. 单击“计划”>“添加计划”
  4. 对于“作业名称”,输入作业的名称
  5. 将“计划”设置为“已计划”。
  6. 指定时间段、开始时间和时区。
  7. 配置一个或多个电子邮件地址,以接收有关管道启动、成功或失败的警报。
  8. 单击“创建”。

后续步骤