从 Databricks 存储库或工作区文件导入 Python 模块

可以将 Python 代码存储在 Databricks 存储库或工作区文件中,然后将该 Python 代码导入到 Delta Live Tables 管道中。 有关在 Databricks 存储库或工作区文件中使用模块的详细信息,请参阅使用 Python 和 R 模块

注意

无法从存储在 Databricks 存储库或工作区文件中的笔记本导入源代码。 可以在创建或编辑管道时直接添加笔记本。 请参阅创建管道

将 Python 模块导入到增量实时表管道

以下示例通过将数据集查询作为 Python 模块从存储库中导入来改编增量实时表教程中的示例。 虽然此示例介绍了如何使用 Databricks 存储库来存储管道源代码,但你也可以将此示例与存储在工作区文件中的源代码一起使用。

若要运行此示例,请使用以下步骤:

  1. 若要为 Python 代码创建存储库,请单击边栏中的 Repos Icon“存储库”,然后单击“添加存储库”。

  2. 取消选择“通过克隆 Git 存储库创建存储库”,并在“存储库名称”中输入存储库的名称,例如 dlt-quickstart-repo

  3. 创建模块以将源数据读入表中:单击存储库名称旁边的向下箭头,选择“创建”>“文件”,然后输入文件的名称,例如 clickstream_raw_module.py。 文件编辑器随即打开。 在编辑器窗口中,输入以下内容:

    from dlt import *
    
    json_path = "/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"
    
    def create_clickstream_raw_table(spark):
      @table
      def clickstream_raw():
        return (
          spark.read.json(json_path)
        )
    
  4. 创建模块以创建包含已准备数据的新表:再次选择“创建”>“文件”,然后输入文件的名称,例如 clickstream_prepared_module.py。 在新的编辑器窗口中,输入以下内容:

    from clickstream_raw_module import *
    from dlt import read
    from pyspark.sql.functions import *
    from pyspark.sql.types import *
    
    def create_clickstream_prepared_table(spark):
      create_clickstream_raw_table(spark)
      @table
      @expect("valid_current_page_title", "current_page_title IS NOT NULL")
      @expect_or_fail("valid_count", "click_count > 0")
      def clickstream_prepared():
        return (
          read("clickstream_raw")
            .withColumn("click_count", expr("CAST(n AS INT)"))
            .withColumnRenamed("curr_title", "current_page_title")
            .withColumnRenamed("prev_title", "previous_page_title")
            .select("current_page_title", "click_count", "previous_page_title")
        )
    
  5. 创建管道笔记本:转到 Azure Databricks 登陆页,选择“创建笔记本”,或单击边栏中的 New Icon“新建”,然后选择“笔记本”。 此时会显示“创建笔记本”对话框。 还可以通过单击存储库名称旁边的向下箭头并选择“创建”>“笔记本”,在存储库中创建笔记本。

  6. 在“创建笔记本”对话框中为笔记本命名,然后从“默认语言”下拉菜单中选择“Python”。 可将“群集”设置保留为默认值。

  7. 单击“创建”。

  8. 在笔记本中输入示例代码。

    注意

    如果笔记本从工作区文件路径或 Databricks 存储库路径导入与笔记本目录不同的模块或包,则必须使用 sys.path.append() 手动将路径追加到文件。

    如果要从 Databricks 存储库路径导入文件,则必须在路径前面追加 /Workspace/。 例如 sys.path.append('/Workspace/Repos/...')。 省略路径中的 /Workspace/ 会导致错误。

    如果模块或包存储在笔记本所在的同一目录中,则无需手动追加路径。 从 Databricks 存储库的根目录导入时,也不需要手动追加路径,因为根目录会自动追加到路径。

    import sys, os
    # You can omit the sys.path.append() statement when the imports are from the same directory as the notebook.
    sys.path.append(os.path.abspath('<module-path>'))
    
    import dlt
    from clickstream_prepared_module import *
    from pyspark.sql.functions import *
    from pyspark.sql.types import *
    
    create_clickstream_prepared_table(spark)
    
    @dlt.table(
      comment="A table containing the top pages linking to the Apache Spark page."
    )
    def top_spark_referrers():
      return (
        dlt.read("clickstream_prepared")
          .filter(expr("current_page_title == 'Apache_Spark'"))
          .withColumnRenamed("previous_page_title", "referrer")
          .sort(desc("click_count"))
          .select("referrer", "click_count")
          .limit(10)
      )
    

    <module-path> 替换为包含要导入的 Python 模块的 Databricks 存储库的路径。

  9. 使用新笔记本创建管道。

  10. 若要运行管道,请在“管道详细信息”页中单击“启动”。

还可以将 Python 代码作为包导入。 来自 Delta Live Tables 笔记本的以下代码片段从该笔记本所在的存储库根中的 dlt_packages 目录导入 test_utils 包。 dlt_packages 目录包含文件 test_utils.py__init__.pytest_utils.py 定义函数 create_test_table()

import dlt

@dlt.table
def my_table():
  return dlt.read(...)

# ...

import dlt_packages.test_utils as test_utils
test_utils.create_test_table(spark)