从 Git 文件夹或工作区文件导入 Python 模块

可以将 Python 代码存储在 Databricks Git 文件夹工作区文件中 ,然后将该 Python 代码导入 Lakeflow 声明性管道。 有关在 Git 文件夹或工作区文件中使用模块的详细信息,请参阅使用 Python 和 R 模块

若要导入 Python 文件,有多个选项:

  • 将 Python 模块作为实用工具文件包含在管道中。 如果模块特定于管道,则这最有效。
  • 在需要使用它的任何管道中,将共享模块添加到管道环境中。
  • 使用 import 语句将工作区中的模块直接导入 Python 源。

在管道中包含 Python 模块

可以在管道中创建 Python 模块。 管道根文件夹会自动追加到该文件夹。sys.path 这样就可以直接在管道 Python 源代码中引用模块。

以下示例演示如何在管道根文件夹下创建 Python 模块,并从管道源中的 Python 源文件引用它:

  1. 管道编辑器中打开管道。

  2. 在左侧的管线资产浏览器中,单击「添加」图标。添加,然后从菜单中选择「工具」。

  3. 输入my_utils.py

  4. 保留默认路径,然后单击“ 创建”。

    这会在my_utils.py管道的文件夹中创建utilities文件,并创建utilities该文件夹(如果不存在)。 默认情况下,此文件夹中的文件不会自动添加到您的管道源中,但可以从源代码中包含的文件中调用.py

    默认情况下,实用工具文件具有一个调用 distance_km() 的示例函数,该函数以英里为单位获取距离并转换它。

  5. 在转换文件夹中的 Python 源文件中(可以通过选择 Plus 图标创建一个。添加,然后从菜单中选择 “转换 ”,添加以下代码:

    from utilities import my_utils
    

现在可以从该 Python 文件中调用函数 my_utils 。 必须从任何需要调用模块中的函数的 Python 文件中添加 import 该语句。

将 Python 模块添加到管道环境

如果要跨多个管道共享 Python 模块,可以在工作区文件中的任意位置保存该模块,并从需要使用它的任何管道的环境中引用它。 可以引用以下 Python 模块:

  • 单个 Python (.py) 文件。
  • 打包为 Python 滚轮 (.whl) 文件的 Python 项目。
  • 使用 pyproject.toml 文件解压缩 Python 项目(用于定义项目名称和版本)。

以下示例演示如何向管道添加依赖项。

  1. 管道编辑器中打开管道。

  2. 单击齿轮图标,在顶部栏中选择设置

  3. “管道设置” 中,在 “管道环境”下,单击 “铅笔图标”编辑环境

  4. 添加依赖项。 例如,若要在工作区中添加文件,可以添加 /Volumes/libraries/path/to/python_files/file.py。 对于存储在 Git 文件夹中的 Python 滚轮,路径可能如下所示 /Workspace/libraries/path/to/wheel_files/file.whl

    如果文件位于管道的根文件夹中,您可以添加没有路径的文件,或者使用相对路径。

注释

还可以使用依赖项将路径添加到共享文件夹,以允许 import 代码中的语句查找要导入的模块。 例如,-e /Workspace/Users/<user_name>/path/to/add/

使用 import 语句导入 Python 模块

还可以直接引用 Python 源代码中的工作区文件。

  • 如果文件位于 utilities 管道的文件夹中,则可以在没有路径的情况下引用该文件:

    from utilities import my_module
    
  • 如果文件位于其他任何位置,则可以先将模块的路径追加到 :sys.path

    import sys, os
    sys.path.append(os.path.abspath('<module-path>'))
    
    from my_module import *
    
  • 您还可以通过将路径添加到管道环境来将内容追加到sys.path所指定的所有管道源文件,如上一部分所述。

将查询导入为 Python 模块的示例

以下示例演示如何从工作区文件将数据集查询作为 Python 模块导入。 尽管此示例说明了如何使用工作区文件来存储管道源代码,但你可以将其与存储在 Git 文件夹中的源代码配合使用。

若要运行此示例,请使用以下步骤:

  1. 单击“工作区”图标。Azure Databricks 工作区边栏中的工作区以打开工作区浏览器。

  2. 使用工作区浏览器选择 Python 模块的目录。

  3. 单击“ Kebab”菜单图标。 在所选目录最右侧的列中,单击“ 创建 > 文件”。

  4. 输入文件的名称,例如 clickstream_raw_module.py。 文件编辑器随即打开。 若要创建一个用于将源数据读入表中的模块,请在编辑器窗口中输入以下内容:

    from pyspark import pipelines as dp
    
    json_path = "/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"
    
    def create_clickstream_raw_table(spark):
      @dp.table
      def clickstream_raw():
        return (
          spark.read.json(json_path)
        )
    
  5. 若要创建一个模块来创建包含已准备好的数据的新表,请在同一目录中创建一个新文件,输入该文件的名称(例如 clickstream_prepared_module.py),然后在新编辑器窗口中输入以下内容:

    from clickstream_raw_module import *
    from pyspark import pipelines as dp
    from pyspark.sql.functions import *
    from pyspark.sql.types import *
    
    def create_clickstream_prepared_table(spark):
      create_clickstream_raw_table(spark)
      @dp.table
      @dp.expect("valid_current_page_title", "current_page_title IS NOT NULL")
      @dp.expect_or_fail("valid_count", "click_count > 0")
      def clickstream_prepared():
        return (
          spark.read("clickstream_raw")
            .withColumn("click_count", expr("CAST(n AS INT)"))
            .withColumnRenamed("curr_title", "current_page_title")
            .withColumnRenamed("prev_title", "previous_page_title")
            .select("current_page_title", "click_count", "previous_page_title")
        )
    
  6. 接下来,在管道源中创建 Python 文件。 在管道编辑器中,选择“加号”图标。添加,然后转换。

  7. 为文件命名并确认 Python 是默认语言。

  8. 单击 “创建”

  9. 在笔记本中输入以下示例代码。

    注释

    如果笔记本从工作区文件路径或 Git 文件夹路径导入与笔记本目录不同的模块或包,则必须使用 sys.path.append() 手动将路径追加到文件。

    如果从 Git 文件夹导入文件,则必须在路径前面添加 /Workspace/。 例如,sys.path.append('/Workspace/...')。 省略路径中的 /Workspace/ 会导致错误。

    如果模块或包存储在笔记本所在的同一目录中,则无需手动追加路径。 从 Git 文件夹的根目录导入时,也不需要手动追加路径,因为根目录会自动追加到路径。

    import sys, os
    sys.path.append(os.path.abspath('<module-path>'))
    
    from pyspark import pipelines as dp
    from clickstream_prepared_module import *
    from pyspark.sql.functions import *
    from pyspark.sql.types import *
    
    create_clickstream_prepared_table(spark)
    
    @dp.table(
      comment="A table containing the top pages linking to the Apache Spark page."
    )
    def top_spark_referrers():
      return (
        spark.read.table("catalog_name.schema_name.clickstream_prepared")
          .filter(expr("current_page_title == 'Apache_Spark'"))
          .withColumnRenamed("previous_page_title", "referrer")
          .sort(desc("click_count"))
          .select("referrer", "click_count")
          .limit(10)
      )
    

    <module-path> 替换为包含要导入的 Python 模块的目录的路径。

  10. 若要运行管道,请单击“ 运行管道”。