使用 Databricks 资产捆绑包开发增量实时表管道
Databricks 资产捆绑包简称为捆绑包,使你能够以编程方式验证、部署和运行增量实时表管道等 Azure Databricks 资源。 你还可以使用捆绑包以编程方式管理 Azure Databricks 作业和使用 MLOps 堆栈。 请参阅什么是 Databricks 资产捆绑包?。
本文介绍可以在本地开发计算机上通过哪些步骤使用捆绑包以编程方式管理增量实时表管道。
要求
- Databricks CLI 版本 0.218.0 或更高版本。 若要检查安装的 Databricks CLI 版本,请运行命令
databricks -v
。 要安装 Databricks CLI,请参阅《安装或更新 Databricks CLI》。 - 远程工作区必须启用工作区文件。 请参阅什么是工作区文件?。
(可选)安装 Python 模块以支持本地管道开发
Databricks 提供了一个 Python 模块,用于在 IDE 中编写代码时提供语法检查、自动完成和数据类型检查,从而帮助进行 Delta Live Tables 管道代码的本地开发。
可在 PyPi 上找到用于本地开发的 Python 模块。 若要安装模块,请参阅 Delta Live Tables 的 Python 存根。
决策:使用模板或手动创建捆绑包
决定是要使用模板创建捆绑包,还是手动创建:
使用模板创建捆绑包
在这些步骤中,将使用适用于 Python 的 Azure Databricks 默认捆绑包模板创建捆绑包。 这些步骤将指导你创建一个捆绑包,其中包含一个笔记本,用于定义 Delta Live Tables 管道,该管道筛选原始数据集中的数据。 在 Azure Databricks 工作区中验证、部署和运行已部署的管道。
步骤 1:设置身份验证
若要详细了解如何设置身份验证,请参阅 Databricks 身份验证。
步骤 2:创建捆绑包
捆绑包中有要部署的工件以及要运行的工作流的设置。
使用终端或命令提示符切换到本地开发计算机上的目录,该目录中包含模板生成的捆绑。
使用 Databricks CLI 运行
bundle init
命令:databricks bundle init
对于
Template to use
,请按Enter
保留default-python
的默认值。对于
Unique name for this project
,请保留my_project
的默认值,或键入其他值,然后按Enter
。 这将确定此捆绑包的根目录的名称。 此根目录是在当前工作目录中创建的。对于“
Include a stub (sample) notebook
”,选择“no
”并按“Enter
”。 这会指示 Databricks CLI 此时不添加示例笔记本,因为与此选项关联的示例笔记本中没有增量实时表代码。对于
Include a stub (sample) DLT pipeline
,请按Enter
保留yes
的默认值。 这会指示 Databricks CLI 添加包含增量实时表代码的示例笔记本。对于“
Include a stub (sample) Python package
”,选择“no
”并按“Enter
”。 这会指示 Databricks CLI 不要向捆绑包添加示例 Python wheel 包文件或相关的生成说明。
步骤 3:浏览捆绑包
若要查看模板生成的文件,请切换到新创建的捆绑包的根目录,并使用首选 IDE(例如 Visual Studio Code)打开此目录。 特别感兴趣的文件包括:
databricks.yml
:此文件指定程序包的编程名称,包括对管道定义的引用,并指定有关目标工作区的设置。resources/<project-name>_job.yml
和resources/<project-name>_pipeline.yml
:此文件指定管道的设置。src/dlt_pipeline.ipynb
:此文件是运行时执行管道的笔记本。
对于自定义管道,管道声明中的映射对应于 REST API 参考中的 POST /api/2.0/pipelines 中定义的创建管道操作的请求负载(以 YAML 格式表示)。
步骤 4:验证项目的程序包配置文件
在此步骤中,检查捆绑包配置是否有效。
在根目录中,使用 Databricks CLI 运行
bundle validate
命令,如下所示:databricks bundle validate
如果返回了捆绑包配置的摘要,则表示验证成功。 如果返回了任何错误,请修复错误,然后重复此步骤。
如果在此步骤之后对捆绑包进行任何更改,则应重复此步骤以检查捆绑包配置是否仍然有效。
步骤 5:将本地项目部署到远程工作区
此步骤将本地笔记本部署到远程 Azure Databricks 工作区,并在工作区中创建增量实时表管道。
使用 Databricks CLI 运行
bundle validate
命令,如下所示:databricks bundle deploy -t dev
检查本地笔记本是否已部署:在 Azure Databricks 工作区的边栏中,单击“工作区”。
单击进入以下文件夹:Users >
<your-username>
> .bundle ><project-name>
> dev > files > src。 笔记本应该位于此文件夹中。检查是否已创建管道:在 Azure Databricks 工作区的边栏中,单击“增量实时表”。
在“增量实时表”选项卡上,单击 [dev
<your-username>
]<project-name>
_pipeline。
如果在此步骤之后对捆绑包进行了任何更改,则应重复步骤 4-5 以检查捆绑包配置是否仍然有效,然后重新部署项目。
步骤 6:运行部署的项目
在此步骤中,你将在工作区中运行增量实时表管道。
在根目录中,使用 Databricks CLI 运行
bundle run
命令,如下所示,将<project-name>
替换为步骤 2 中项目的名称:databricks bundle run -t dev <project-name>_pipeline
复制终端中显示的
Update URL
值,并将该值粘贴到 Web 浏览器中以打开 Azure Databricks 工作区。在 Azure Databricks 工作区中,管道成功完成后,单击 taxi_raw 视图和 filtered_taxis 具体化视图以查看详细信息。
如果在此步骤之后对捆绑包进行了任何更改,则应重复步骤 4-6 以检查捆绑包配置是否仍然有效,重新部署项目,然后运行重新部署的项目。
步骤 7:清理
在此步骤中,将从工作区中删除已部署的笔记本和管道。
在根目录中,使用 Databricks CLI 运行
bundle destroy
命令,如下所示:databricks bundle destroy -t dev
确认管道删除请求:当系统提示是否永久销毁资源时,请键入
y
并按Enter
。确认笔记本删除请求:当系统提示是否永久销毁先前部署的文件夹及其所有文件时,请键入
y
并按Enter
。如果还想从开发计算机中删除捆绑包,现在可以从步骤 2 中删除本地目录。
你已达到使用模板创建捆绑包的步骤的末尾。
手动创建捆绑包
在这些步骤中,从头开始创建捆绑包。 这些步骤引导你创建一个捆绑包,其中包含一个嵌入了增量实时表指令的笔记本,以及用于运行该笔记本的增量实时表管道的定义。 然后,从 Azure Databricks 工作区中的管道验证、部署和运行已部署的笔记本。
步骤 1:创建捆绑包
捆绑包中有要部署的工件以及要运行的工作流的设置。
- 在开发计算机上创建或标识一个空目录。
- 切换到终端中的空目录,或在 IDE 中打开空目录。
提示
你的空目录可能与 Git 提供商管理的克隆存储库相关联。 这样,你就可以通过外部版本控制管理捆绑包,并更轻松地与其他开发人员和 IT 专业人员就项目展开协作。 但是,为了帮助简化本演示,此处未使用克隆的存储库。
如果你选择为本演示克隆存储库,Databricks 建议克隆空存储库或仅包含基本文件(例如 README
和 .gitignore
)的存储库。 否则,该存储库中的任何现有文件可能会不必要地同步到 Azure Databricks 工作区。
步骤 2:将笔记本添加到项目
此步骤将一个笔记本添加到项目。 此笔记本执行以下操作:
- 将 Databricks 数据集中的原始 JSON 点击流数据读取到 Azure Databricks 工作区的 DBFS 根文件夹内
pipelines
文件夹中的原始 Delta 表中。 - 从原始 Delta 表读取记录,并使用增量实时表查询和预期来创建新的 Delta 表,其中的数据已清理并已准备好。
- 使用增量实时表查询对新 Delta 表中已准备好的数据执行分析。
在目录的根目录中,创建名为
dlt-wikipedia-python.py
的文件。将以下代码添加到
dlt-wikipedia-python.py
文件:# Databricks notebook source import dlt from pyspark.sql.functions import * # COMMAND ---------- json_path = "/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json" # COMMAND ---------- @dlt.table( comment="The raw wikipedia clickstream dataset, ingested from /databricks-datasets." ) def clickstream_raw(): return (spark.read.format("json").load(json_path)) # COMMAND ---------- @dlt.table( comment="Wikipedia clickstream data cleaned and prepared for analysis." ) @dlt.expect("valid_current_page_title", "current_page_title IS NOT NULL") @dlt.expect_or_fail("valid_count", "click_count > 0") def clickstream_prepared(): return ( dlt.read("clickstream_raw") .withColumn("click_count", expr("CAST(n AS INT)")) .withColumnRenamed("curr_title", "current_page_title") .withColumnRenamed("prev_title", "previous_page_title") .select("current_page_title", "click_count", "previous_page_title") ) # COMMAND ---------- @dlt.table( comment="A table containing the top pages linking to the Apache Spark page." ) def top_spark_referrers(): return ( dlt.read("clickstream_prepared") .filter(expr("current_page_title == 'Apache_Spark'")) .withColumnRenamed("previous_page_title", "referrer") .sort(desc("click_count")) .select("referrer", "click_count") .limit(10) )
步骤 3:将捆绑包配置架构文件添加到项目
如果使用支持 YAML 文件和 JSON 架构文件的 IDE(如 Visual Studio Code、PyCharm Professional 或 IntelliJ IDEA Ultimate),则使用 IDE 不仅可以创建捆绑包配置架构文件,还可以检查项目的捆绑包配置文件语法和格式设置,并提供代码补全提示,如下所示。 请注意,稍后在步骤 5 中创建的捆绑包配置文件基于 YAML,而此步骤中的捆绑包配置架构文件基于 JSON。
Visual Studio Code
将 YAML 语言服务器支持添加到 Visual Studio Code,例如,通过从 Visual Studio Code Marketplace 安装 YAML 扩展。
使用 Databricks CLI 运行
bundle schema
命令并将输出重定向到 JSON 文件,生成 Databricks 资产捆绑包配置 JSON 架构文件。 例如,在当前目录中生成名为bundle_config_schema.json
的文件,如下所示:databricks bundle schema > bundle_config_schema.json
请注意,稍后在步骤 5 中,你将向捆绑包配置文件的开头添加以下注释,该文件将捆绑包配置文件与指定的 JSON 架构文件相关联:
# yaml-language-server: $schema=bundle_config_schema.json
注意
在前面的注释中,如果 Databricks 资产捆绑包配置 JSON 架构文件位于不同的路径中,请将
bundle_config_schema.json
替换为架构文件的完整路径。
PyCharm 专业版
使用 Databricks CLI 运行
bundle schema
命令并将输出重定向到 JSON 文件,生成 Databricks 资产捆绑包配置 JSON 架构文件。 例如,在当前目录中生成名为bundle_config_schema.json
的文件,如下所示:databricks bundle schema > bundle_config_schema.json
配置 PyCharm 以识别捆绑包配置 JSON 架构文件,然后按照“配置自定义 JSON 架构”中的说明完成 JSON 架构映射。
请注意,稍后在步骤 5 中,你将使用 PyCharm 创建或打开捆绑包配置文件。 按照约定,此文件命名为
databricks.yml
。
IntelliJ IDEA Ultimate
使用 Databricks CLI 运行
bundle schema
命令并将输出重定向到 JSON 文件,生成 Databricks 资产捆绑包配置 JSON 架构文件。 例如,在当前目录中生成名为bundle_config_schema.json
的文件,如下所示:databricks bundle schema > bundle_config_schema.json
配置 IntelliJ IDEA 以识别捆绑包配置 JSON 架构文件,然后按照“配置自定义 JSON 架构”中的说明完成 JSON 架构映射。
请注意,稍后在步骤 5 中,你将使用 IntelliJ IDEA 创建或打开捆绑包配置文件。 按照约定,此文件命名为
databricks.yml
。
步骤 4:设置身份验证
若要详细了解如何设置身份验证,请参阅 Databricks 身份验证。
步骤 5:将捆绑包配置文件添加到项目
此步骤定义如何部署和运行此笔记本。 对于本演示,你希望使用增量实时表管道运行笔记本。 你将在项目的捆绑包配置文件中对此目标进行建模。
- 在目录的根目录中,使用你喜欢的文本编辑器或 IDE 创建捆绑包配置文件。 按照约定,此文件命名为
databricks.yml
。 - 将以下代码添加到
databricks.yml
文件中,并将<workspace-url>
替换为每工作区 URL,例如https://adb-1234567890123456.7.databricks.azure.cn
。 此 URL 必须与.databrickscfg
文件中的 URL 匹配:
提示
只有在 IDE 支持的情况下,才需要以 # yaml-language-server
开头的第一行。 有关详细信息,请参阅前面的步骤 3。
# yaml-language-server: $schema=bundle_config_schema.json
bundle:
name: dlt-wikipedia
resources:
pipelines:
dlt-wikipedia-pipeline:
name: dlt-wikipedia-pipeline
development: true
continuous: false
channel: "CURRENT"
photon: false
libraries:
- notebook:
path: ./dlt-wikipedia-python.py
edition: "ADVANCED"
clusters:
- label: "default"
num_workers: 1
targets:
development:
workspace:
host: <workspace-url>
对于自定义管道,管道声明中的映射对应于 REST API 参考中的 POST /api/2.0/pipelines 中定义的创建管道操作的请求负载(以 YAML 格式表示)。
步骤 6:验证项目的程序包配置文件
在此步骤中,检查捆绑包配置是否有效。
使用 Databricks CLI 运行
bundle validate
命令,如下所示:databricks bundle validate
如果返回了捆绑包配置的摘要,则表示验证成功。 如果返回了任何错误,请修复错误,然后重复此步骤。
如果在此步骤之后对捆绑包进行任何更改,则应重复此步骤以检查捆绑包配置是否仍然有效。
步骤 7:将本地项目部署到远程工作区
此步骤将本地笔记本部署到远程 Azure Databricks 工作区,并在工作区中创建增量实时表管道。
使用 Databricks CLI 运行
bundle validate
命令,如下所示:databricks bundle deploy -t development
检查本地笔记本是否已部署:在 Azure Databricks 工作区的边栏中,单击“工作区”。
单击进入 Users >
<your-username>
> .bundle > dlt-wikipedia > development > files 文件夹。 笔记本应该位于此文件夹中。检查是否已创建增量实时表管道:在 Azure Databricks 工作区的边栏中,单击“工作流”。
在“增量实时表”选项卡上,单击“dlt-wikipedia-pipeline”。
如果在此步骤之后对捆绑包进行了任何更改,则应重复步骤 6-7 以检查捆绑包配置是否仍然有效,然后重新部署项目。
步骤 8:运行部署的项目
此步骤在工作区中运行 Azure Databricks 作业。
使用 Databricks CLI 运行
bundle run
命令,如下所示:databricks bundle run -t development dlt-wikipedia-pipeline
复制终端中显示的
Update URL
值,并将该值粘贴到 Web 浏览器中以打开 Azure Databricks 工作区。在 Azure Databricks 工作区中,当增量实时表管道成功完成并在各个具体化视图中显示绿色标题栏后,请单击“clickstream_raw”、“clickstream_prepared”或“top_spark_referrers”具体化视图以查看更多详细信息。
在开始执行下一步进行清理之前,请记下 DBFS 中创建的 Delta 表的位置,如下所示。 如果你稍后想要手动清理这些 Delta 表,则需要此信息:
- 在增量实时表管道仍处于打开状态的情况下,单击“设置”按钮(位于“权限”和“计划”按钮旁边)。
- 在“目标”区域中,记下“存储位置”字段的值。 这是在 DBFS 中创建 Delta 表的位置。
如果在此步骤之后对捆绑包进行了任何更改,则应重复步骤 6-8 以检查捆绑包配置是否仍然有效,重新部署项目,然后运行重新部署的项目。
步骤 9:清理
此步骤从工作区中删除已部署的笔记本和增量实时表管道。
使用 Databricks CLI 运行
bundle destroy
命令,如下所示:databricks bundle destroy
确认增量实时表管道删除请求:当系统提示是否永久销毁资源时,请键入
y
并按Enter
。确认笔记本删除请求:当系统提示是否永久销毁先前部署的文件夹及其所有文件时,请键入
y
并按Enter
。
运行 bundle destroy
命令只会删除已部署的增量实时表管道以及包含已部署的笔记本的文件夹。 此命令不会删除任何副产物,例如笔记本在 DBFS 中创建的 Delta 表。 如果你需要删除这些 Delta 表,必须手动删除。
将现有管道定义添加到捆绑包
可以基于现有的增量实时表管道定义在捆绑包配置文件中定义新管道。 为此,请完成以下步骤:
注意
以下步骤创建一个与现有管道设置相同的新管道。 但是,新管道的管道 ID 不同于现有管道。 无法自动将现有管道 ID 导入捆绑包中。
步骤 1:获取 JSON 格式的现有管道定义
在此步骤中,使用 Azure Databricks 工作区用户界面获取现有作业定义的 JSON 表示形式。
- 在 Azure Databricks 工作区的边栏中,单击“工作流”。
- 在“增量实时表”选项卡上,单击管道的“名称”链接。
- 在“权限”和“计划”按钮之间,单击“设置”按钮。
- 单击 JSON 按钮。
- 复制管道定义的 JSON。
步骤 2:将管道定义从 JSON 转换为 YAML 格式
你从上一步复制的管道定义采用 JSON 格式。 捆绑包配置采用 YAML 格式。 必须将管道定义从 JSON 转换为 YAML 格式。 Databricks 建议使用以下资源将 JSON 转换为 YAML:
- 将 JSON 在线转换为 XML。
- 对于 Visual Studio Code 为 json2yaml 扩展。
步骤 3:将管道定义 YAML 添加到捆绑包配置文件
在捆绑包配置文件中,将从上一步复制的 YAML 添加到捆绑包配置文件中标记为 <pipeline-yaml-can-go-here>
的以下位置之一,如下所示:
resources:
pipelines:
<some-unique-programmatic-identifier-for-this-pipeline>:
<pipeline-yaml-can-go-here>
targets:
<some-unique-programmatic-identifier-for-this-target>:
resources:
pipelines:
<some-unique-programmatic-identifier-for-this-pipeline>:
<pipeline-yaml-can-go-here>
步骤 4:将笔记本、Python 文件和其他生成工件添加到捆绑包
应将现有管道中引用的所有 Python 文件和笔记本移动到捆绑包的源。
为了更好地与捆绑包兼容,笔记本应使用 IPython 笔记本格式 (.ipynb
)。 如果在本地开发程序包,可以通过单击 Azure Databricks 笔记本用户界面中的“文件”>“导出”>“IPython Notebook”,将现有笔记本从 Azure Databricks 工作区导出为 .ipynb
格式。 按照惯例,之后应将下载的笔记本放入捆绑包的 src/
目录中。
将笔记本、Python 文件和其他生成工件添加到捆绑包后,请确保管道定义引用它们。 例如,对于 src/
目录中文件名为 hello.ipynb
的笔记本,如果 src/
目录与引用 src/
目录的捆绑包配置文件位于同一文件夹中,则管道定义可能会用以下方式表示:
resources:
pipelines:
hello-pipeline:
name: hello-pipeline
libraries:
-
notebook:
path: ./src/hello.ipynb
步骤 5:验证、部署和运行新管道
通过运行以下命令,验证捆绑包的配置文件在语法上是否正确:
databricks bundle validate
通过运行以下命令导入捆绑包。 在此命令中,将
<target-identifier>
替换为捆绑包配置中目标的唯一编程标识符:databricks bundle deploy -t <target-identifier>
通过运行以下命令来运行管道。 在此命令中,替换以下值:
- 将
<target-identifier>
替换为捆绑包配置中目标的唯一编程标识符。 - 将
<pipeline-identifier>
替换为捆绑包配置中管道的唯一编程标识符。
databricks bundle run -t <target-identifier> <pipeline-identifier>
- 将