注释
本文涵盖了由第三方开发的 Jenkins。 若要联系提供商,请参阅 Jenkins 帮助。
本文介绍如何将 Jenkins 自动化服务器与 Azure Databricks 配合使用来实现 CI/CD 开发工作流。
有关使用 Azure Databricks 的 CI/CD 概述,请参阅 Azure Databricks 上的 CI/CD。 有关最佳做法,请参阅 Databricks 上的最佳做法和建议的 CI/CD 工作流。
本地开发环境设置
本文的示例使用 Jenkins 指示 Databricks CLI 和 声明性自动化捆绑包 执行以下操作:
在本地开发计算机上生成Python滚轮文件。
将生成的 Python wheel 文件以及从本地开发计算机的其他 Python 文件和 Python 笔记本部署到 Azure Databricks 工作区。
在该工作区中测试和运行上传的Python轮子文件和笔记本。
若要设置本地开发计算机以指示Azure Databricks工作区执行此示例的生成和上传阶段,请在本地开发计算机上执行以下操作:
步骤 1:安装所需的工具
在此步骤中,你将在本地开发计算机上安装 Databricks CLI、Jenkins、jq,以及Python轮生成工具。 运行此示例需要这些工具。
安装 Databricks CLI 0.205 或更高版本(如果尚未安装)。 Jenkins 使用 Databricks CLI 将此示例的测试和运行说明传递到工作区。 请参阅安装或更新 Databricks CLI。
安装并启动 Jenkins(如果尚未安装)。 请参阅 安装 Jenkins,以了解在 Linux、macOS 或 Windows 上的安装方法。
安装 jq。 此示例使用
jq来分析某些 JSON 格式的命令输出。使用
pip通过以下命令安装 Python wheel 生成工具(某些系统可能需要使用pip3而不是pip):pip install --upgrade wheel
步骤 2:创建 Jenkins 管道
在此步骤中,使用 Jenkins 为本文的示例创建 Jenkins 管道。 Jenkins 提供了几种不同的项目类型,以用于创建 CI/CD 管道。 Jenkins Pipelines 提供了一个接口,用于使用 Groovy 代码调用和配置 Jenkins 插件来定义 Jenkins 管道中的阶段。
要在 Jenkins 中创建 Jenkins 管道:
启动 Jenkins 后,从 Jenkins 仪表板单击“ 新建项”。
对于 输入项名称,键入 Jenkins 管道的名称,例如
jenkins-demo。单击 “管道 项目类型”图标。
单击 “确定” 。 Jenkins 管道的“配置”页随即显示。
在 Pipeline 区域中的定义下拉列表中,选择Pipeline script from SCM。
在 SCM 下拉列表中,选择 “Git”。
对于 存储库 URL,请键入由第三部分 Git 提供程序托管的存储库的 URL。
对于分支说明符,请键入
*/<branch-name>,其中<branch-name>是您要使用的存储库中的分支名称,例如*/main。对于 脚本路径,请键入
Jenkinsfile,如果尚未设置。 你将在本文的稍后部分创建Jenkinsfile。取消选中标题为 “轻型签出”的框(如果已选中)。
单击“ 保存”。
步骤 3:将全局环境变量添加到 Jenkins
在此步骤中,将三个全局环境变量添加到 Jenkins。 Jenkins 将这些环境变量传递到 Databricks CLI。 Databricks CLI 需要这些环境变量的值才能向Azure Databricks工作区进行身份验证。 此示例对服务主体使用 OAuth 计算机到计算机 (M2M) 身份验证(不过其他身份验证类型也可用)。 若要为 Azure Databricks 工作区设置 OAuth M2M 身份验证,请参阅 使用 OAuth 授权服务主体访问 Azure Databricks。
此示例的三个全局环境变量为:
- 将
DATABRICKS_HOST设置为以https://开头的 Azure Databricks 工作区 URL。 请参阅工作区实例名称、URL 和 ID。 - 将
DATABRICKS_CLIENT_ID设置为服务主体的客户端 ID,也称为其应用程序 ID。 -
DATABRICKS_CLIENT_SECRET,设置为服务主体的 Azure Databricks OAuth 密钥。
若要在 Jenkins 中设置全局环境变量,请从 Jenkins 仪表板设置:
在边栏中,单击“ 管理 Jenkins”。
在 “系统配置 ”部分中,单击“ 系统”。
在“ 全局属性 ”部分中,选中标题为 “环境变量”的框。
单击“添加”,然后输入环境变量的“名称”和“值”。 对每个附加环境变量重复此操作。
添加环境变量后,单击“ 保存 ”返回到 Jenkins 仪表板。
设计 Jenkins 管道
Jenkins 提供了几种不同的项目类型,以用于创建 CI/CD 管道。 此示例实现 Jenkins 管道。 Jenkins Pipelines 提供了一个接口,用于使用 Groovy 代码调用和配置 Jenkins 插件来定义 Jenkins 管道中的阶段。
你可以将 Jenkins 管道定义写在一个称为 Jenkinsfile 的文本文件中,然后将其提交到项目的源码管理存储库中。 有关详细信息,请参阅 Jenkins Pipeline。 以下是本文示例的 Jenkins 管道。 在此示例 Jenkinsfile 中,替换以下占位符:
- 将
<user-name>和<repo-name>替换为您的第三方 Git 提供商托管的用户名和存储库名称。 本文使用GitHub URL 作为示例。 - 将
<release-branch-name>替换为存储库中发布分支的名称。 例如,它可以是main。 - 将
<databricks-cli-installation-path>替换为安装了 Databricks CLI 的本地开发计算机上的路径。 例如,在 macOS 上,它可以是/usr/local/bin。 - 将
<jq-installation-path>替换为安装了jq的本地开发计算机上的路径。 例如,在 macOS 上,它可以是/usr/local/bin。 - 将
<job-prefix-name>替换为一些字符串,以便在本示例中帮助唯一标识您在工作区中创建的作业。 例如,它可以是jenkins-demo。 - 请注意,
BUNDLETARGET被设置为dev,这是本文稍后定义的声明式自动化包目标名。 在实际实现中,将此更改为你自己的捆绑目标的名称。 本文后面提供了有关捆绑包目标的更多详细信息。
这里是 Jenkinsfile,需要添加到你的代码库根目录:
// Filename: Jenkinsfile
node {
def GITREPOREMOTE = "https://github.com/<user-name>/<repo-name>.git"
def GITBRANCH = "<release-branch-name>"
def DBCLIPATH = "<databricks-cli-installation-path>"
def JQPATH = "<jq-installation-path>"
def JOBPREFIX = "<job-prefix-name>"
def BUNDLETARGET = "dev"
stage('Checkout') {
git branch: GITBRANCH, url: GITREPOREMOTE
}
stage('Validate Bundle') {
sh """#!/bin/bash
${DBCLIPATH}/databricks bundle validate -t ${BUNDLETARGET}
"""
}
stage('Deploy Bundle') {
sh """#!/bin/bash
${DBCLIPATH}/databricks bundle deploy -t ${BUNDLETARGET}
"""
}
stage('Run Unit Tests') {
sh """#!/bin/bash
${DBCLIPATH}/databricks bundle run -t ${BUNDLETARGET} run-unit-tests
"""
}
stage('Run Notebook') {
sh """#!/bin/bash
${DBCLIPATH}/databricks bundle run -t ${BUNDLETARGET} run-dabdemo-notebook
"""
}
stage('Evaluate Notebook Runs') {
sh """#!/bin/bash
${DBCLIPATH}/databricks bundle run -t ${BUNDLETARGET} evaluate-notebook-runs
"""
}
stage('Import Test Results') {
def DATABRICKS_BUNDLE_WORKSPACE_ROOT_PATH
def getPath = "${DBCLIPATH}/databricks bundle validate -t ${BUNDLETARGET} | ${JQPATH}/jq -r .workspace.file_path"
def output = sh(script: getPath, returnStdout: true).trim()
if (output) {
DATABRICKS_BUNDLE_WORKSPACE_ROOT_PATH = "${output}"
} else {
error "Failed to capture output or command execution failed: ${getPath}"
}
sh """#!/bin/bash
${DBCLIPATH}/databricks workspace export-dir \
${DATABRICKS_BUNDLE_WORKSPACE_ROOT_PATH}/Validation/Output/test-results \
${WORKSPACE}/Validation/Output/test-results \
-t ${BUNDLETARGET} \
--overwrite
"""
}
stage('Publish Test Results') {
junit allowEmptyResults: true, testResults: '**/test-results/*.xml', skipPublishingChecks: true
}
}
本文的其余部分介绍了此 Jenkins 管道中的每个阶段,以及如何设置 Jenkins 在该阶段运行所需的工件和命令。
从第三方存储库拉取最新工件
此 Jenkins 管道中的第一阶段(即 Checkout 阶段)的定义如下:
stage('Checkout') {
git branch: GITBRANCH, url: GITREPOREMOTE
}
此阶段可确保 Jenkins 在本地开发计算机上使用的工作目录具有来自第三方 Git 存储库的最新工件。 通常,Jenkins 将此工作目录设置为 <your-user-home-directory>/.jenkins/workspace/<pipeline-name>。 这样的话,你就可以在同一本地开发计算机上将自己的开发工件副本与 Jenkins 从你的第三方 Git 存储库使用的工件分开。
验证 Databricks 资产捆绑包
此 Jenkins 管道中的第二阶段(即 Validate Bundle 阶段)的定义如下:
stage('Validate Bundle') {
sh """#!/bin/bash
${DBCLIPATH}/databricks bundle validate -t ${BUNDLETARGET}
"""
}
此阶段可确保定义用于测试和运行项目的工作流的捆绑包在语法上正确。 声明性自动化捆绑包可将完整的数据、分析和 ML 项目表示为源文件集合。 请参阅什么是声明性自动化捆绑包?
要定义本文的捆绑包,请在本地计算机上的克隆存储库的根中创建一个名为 databricks.yml 的文件。 在此示例 databricks.yml 文件中,替换以下占位符:
- 将
<bundle-name>替换为捆绑包的唯一编程名称。 例如,它可以是jenkins-demo。 - 将
<job-prefix-name>替换为一些字符串,以便在本示例中帮助唯一标识您在工作区中创建的作业。 例如,它可以是jenkins-demo。 它应与 Jenkinsfile 中的JOBPREFIX值匹配。 - 将
<spark-version-id>替换为作业群集的 Databricks Runtime 版本 ID,例如13.3.x-scala2.12。 - 将
<cluster-node-type-id>替换为作业群集的节点类型 ID,例如Standard_DS3_v2。 - 请注意,
dev映射中的targets与 Jenkinsfile 中的BUNDLETARGET相同。 打包目标指定主机以及相关的部署行为。
这里是 databricks.yml 文件,它必须添加到存储库的根,才能使此示例正常运行:
# Filename: databricks.yml
bundle:
name: <bundle-name>
variables:
job_prefix:
description: A unifying prefix for this bundle's job and task names.
default: <job-prefix-name>
spark_version:
description: The cluster's Spark version ID.
default: <spark-version-id>
node_type_id:
description: The cluster's node type ID.
default: <cluster-node-type-id>
artifacts:
dabdemo-wheel:
type: whl
path: ./Libraries/python/dabdemo
resources:
jobs:
run-unit-tests:
name: ${var.job_prefix}-run-unit-tests
tasks:
- task_key: ${var.job_prefix}-run-unit-tests-task
new_cluster:
spark_version: ${var.spark_version}
node_type_id: ${var.node_type_id}
num_workers: 1
spark_env_vars:
WORKSPACEBUNDLEPATH: ${workspace.root_path}
notebook_task:
notebook_path: ./run_unit_tests.py
source: WORKSPACE
libraries:
- pypi:
package: pytest
run-dabdemo-notebook:
name: ${var.job_prefix}-run-dabdemo-notebook
tasks:
- task_key: ${var.job_prefix}-run-dabdemo-notebook-task
new_cluster:
spark_version: ${var.spark_version}
node_type_id: ${var.node_type_id}
num_workers: 1
data_security_mode: SINGLE_USER
spark_env_vars:
WORKSPACEBUNDLEPATH: ${workspace.root_path}
notebook_task:
notebook_path: ./dabdemo_notebook.py
source: WORKSPACE
libraries:
- whl: '/Workspace${workspace.root_path}/files/Libraries/python/dabdemo/dist/dabdemo-0.0.1-py3-none-any.whl'
evaluate-notebook-runs:
name: ${var.job_prefix}-evaluate-notebook-runs
tasks:
- task_key: ${var.job_prefix}-evaluate-notebook-runs-task
new_cluster:
spark_version: ${var.spark_version}
node_type_id: ${var.node_type_id}
num_workers: 1
spark_env_vars:
WORKSPACEBUNDLEPATH: ${workspace.root_path}
spark_python_task:
python_file: ./evaluate_notebook_runs.py
source: WORKSPACE
libraries:
- pypi:
package: unittest-xml-reporting
targets:
dev:
mode: development
有关 databricks.yml 文件的详细信息,请参阅 声明性自动化捆绑包配置。
将捆绑包部署到工作区
Jenkins 管道的第三个阶段,标题为 Deploy Bundle,定义如下:
stage('Deploy Bundle') {
sh """#!/bin/bash
${DBCLIPATH}/databricks bundle deploy -t ${BUNDLETARGET}
"""
}
此阶段执行两项操作:
由于
artifact文件中的databricks.yml映射设置为whl,因此指示 Databricks CLI 使用指定位置中的setup.py文件生成Python滚轮文件。在本地开发计算机上生成Python滚轮文件后,Databricks CLI 会将生成的Python wheel 文件以及指定的Python文件和笔记本部署到Azure Databricks工作区。 默认情况下,声明性自动化捆绑包将 Python wheel 文件和其他文件部署到
/Workspace/Users/<your-username>/.bundle/<bundle-name>/<target-name>。
若要构建 Python wheel 文件,使其符合 databricks.yml 文件中的指定要求,请在本地计算机中您克隆的存储库根目录下创建以下文件夹和文件。
为了定义笔记本运行的 Python wheel 文件的逻辑和单元测试,请创建两个名为 addcol.py 和 test_addcol.py 的文件,并将它们添加到存储库的 python/dabdemo/dabdemo 文件夹中的 Libraries 文件夹结构中。
├── ...
├── Libraries
│ └── python
│ └── dabdemo
│ └── dabdemo
│ ├── addcol.py
│ └── test_addcol.py
├── ...
addcol.py 文件包含一个库函数,该函数稍后内置于Python wheel 文件中,然后安装在Azure Databricks群集上。 该函数向 Apache Spark 数据帧添加一个新的列,由文字填充:
# Filename: addcol.py
import pyspark.sql.functions as F
def with_status(df):
return df.withColumn("status", F.lit("checked"))
test_addcol.py 文件包含用于将模拟数据帧对象传递给 with_status 函数的测试,在 addcol.py 中定义。 然后,结果将与一个包含预期值的数据帧对象进行比较。 如果值匹配(在此例中它们匹配),则测试会通过:
# Filename: test_addcol.py
import pytest
from pyspark.sql import SparkSession
from dabdemo.addcol import *
class TestAppendCol(object):
def test_with_status(self):
spark = SparkSession.builder.getOrCreate()
source_data = [
("paula", "white", "paula.white@example.com"),
("john", "baer", "john.baer@example.com")
]
source_df = spark.createDataFrame(
source_data,
["first_name", "last_name", "email"]
)
actual_df = with_status(source_df)
expected_data = [
("paula", "white", "paula.white@example.com", "checked"),
("john", "baer", "john.baer@example.com", "checked")
]
expected_df = spark.createDataFrame(
expected_data,
["first_name", "last_name", "email", "status"]
)
assert(expected_df.collect() == actual_df.collect())
若要使 Databricks CLI 能够将此库代码正确打包到 Python wheel 文件中,请在前面两个文件所在的文件夹中创建两个名为 __init__.py 和 __main__.py的文件。 此外,在文件夹中创建一个名为setup.pypython/dabdemo的文件:
├── ...
├── Libraries
│ └── python
│ └── dabdemo
│ ├── dabdemo
│ │ ├── __init__.py
│ │ ├── __main__.py
│ │ ├── addcol.py
│ │ └── test_addcol.py
│ └── setup.py
├── ...
__init__.py 文件包含库的版本号和作者。 将 <my-author-name> 替换为你的名字:
# Filename: __init__.py
__version__ = '0.0.1'
__author__ = '<my-author-name>'
import sys, os
sys.path.append(os.path.join(os.path.dirname(__file__), "..", ".."))
__main__.py 文件包含库的入口点:
# Filename: __main__.py
import sys, os
sys.path.append(os.path.join(os.path.dirname(__file__), "..", ".."))
from addcol import *
def main():
pass
if __name__ == "__main__":
main()
setup.py 文件包含将库打包为 Python wheel 文件的额外设置。 将 <my-url>、<my-author-name>@<my-organization>、<my-package-description> 替换为有意义的值:
# Filename: setup.py
from setuptools import setup, find_packages
import dabdemo
setup(
name = "dabdemo",
version = dabdemo.__version__,
author = dabdemo.__author__,
url = "https://<my-url>",
author_email = "<my-author-name>@<my-organization>",
description = "<my-package-description>",
packages = find_packages(include = ["dabdemo"]),
entry_points={"group_1": "run=dabdemo.__main__:main"},
install_requires = ["setuptools"]
)
测试Python轮的组件逻辑
Run Unit Tests 阶段是此 Jenkins 管道的第四个阶段,它使用 pytest 来测试库的逻辑,以确保其按生成方式工作。 此阶段的定义如下:
stage('Run Unit Tests') {
sh """#!/bin/bash
${DBCLIPATH}/databricks bundle run -t ${BUNDLETARGET} run-unit-tests
"""
}
此阶段使用 Databricks CLI 运行笔记本作业。 此作业运行Python笔记本,文件名为 run-unit-test.py。 此笔记本运行 pytest 以验证库的逻辑。
若要运行此示例的单元测试,请将名为 run_unit_tests.py 的Python笔记本文件添加到本地计算机上的克隆存储库的根目录中:
# Databricks notebook source
# COMMAND ----------
# MAGIC %sh
# MAGIC
# MAGIC mkdir -p "/Workspace${WORKSPACEBUNDLEPATH}/Validation/reports/junit/test-reports"
# COMMAND ----------
# Prepare to run pytest.
import sys, pytest, os
# Skip writing pyc files on a readonly filesystem.
sys.dont_write_bytecode = True
# Run pytest.
retcode = pytest.main(["--junit-xml", f"/Workspace{os.getenv('WORKSPACEBUNDLEPATH')}/Validation/reports/junit/test-reports/TEST-libout.xml",
f"/Workspace{os.getenv('WORKSPACEBUNDLEPATH')}/files/Libraries/python/dabdemo/dabdemo/"])
# Fail the cell execution if there are any test failures.
assert retcode == 0, "The pytest invocation failed. See the log for details."
使用生成的Python轮
此 Jenkins Pipeline 的第五阶段(标题为 Run Notebook)运行一个 Python 笔记本,该笔记本调用生成的 Python wheel 文件中的逻辑,如下所示:
stage('Run Notebook') {
sh """#!/bin/bash
${DBCLIPATH}/databricks bundle run -t ${BUNDLETARGET} run-dabdemo-notebook
"""
}
此阶段运行 Databricks CLI,后者指示工作区运行笔记本作业。 此笔记本创建一个 DataFrame 对象,将其传递给库的 with_status 函数,输出结果并报告作业的运行结果。 通过在本地开发计算机上的克隆存储库根目录中添加名为 dabdaddemo_notebook.py 的Python笔记本文件来创建笔记本:
# Databricks notebook source
# COMMAND ----------
# Restart Python after installing the wheel.
dbutils.library.restartPython()
# COMMAND ----------
from dabdemo.addcol import with_status
df = (spark.createDataFrame(
schema = ["first_name", "last_name", "email"],
data = [
("paula", "white", "paula.white@example.com"),
("john", "baer", "john.baer@example.com")
]
))
new_df = with_status(df)
display(new_df)
# Expected output:
#
# +------------+-----------+-------------------------+---------+
# │first_name │last_name │email │status |
# +============+===========+=========================+=========+
# │paula │white │paula.white@example.com │checked |
# +------------+-----------+-------------------------+---------+
# │john │baer │john.baer@example.com │checked |
# +------------+-----------+-------------------------+---------+
评估笔记本作业运行结果
Evaluate Notebook Runs 阶段,也就是 Jenkins 管道的第六阶段,会评估前面的笔记本作业运行的结果。 此阶段的定义如下:
stage('Evaluate Notebook Runs') {
sh """#!/bin/bash
${DBCLIPATH}/databricks bundle run -t ${BUNDLETARGET} evaluate-notebook-runs
"""
}
此阶段运行 Databricks CLI,后者又指示工作区运行Python文件作业。 此Python文件确定笔记本作业运行的失败和成功条件,并报告此失败或成功结果。 在本地开发计算机的克隆存储库的根中创建包含以下内容的文件(名为 evaluate_notebook_runs.py):
import unittest
import xmlrunner
import json
import glob
import os
class TestJobOutput(unittest.TestCase):
test_output_path = f"/Workspace${os.getenv('WORKSPACEBUNDLEPATH')}/Validation/Output"
def test_performance(self):
path = self.test_output_path
statuses = []
for filename in glob.glob(os.path.join(path, '*.json')):
print('Evaluating: ' + filename)
with open(filename) as f:
data = json.load(f)
duration = data['tasks'][0]['execution_duration']
if duration > 100000:
status = 'FAILED'
else:
status = 'SUCCESS'
statuses.append(status)
f.close()
self.assertFalse('FAILED' in statuses)
def test_job_run(self):
path = self.test_output_path
statuses = []
for filename in glob.glob(os.path.join(path, '*.json')):
print('Evaluating: ' + filename)
with open(filename) as f:
data = json.load(f)
status = data['state']['result_state']
statuses.append(status)
f.close()
self.assertFalse('FAILED' in statuses)
if __name__ == '__main__':
unittest.main(
testRunner = xmlrunner.XMLTestRunner(
output = f"/Workspace${os.getenv('WORKSPACEBUNDLEPATH')}/Validation/Output/test-results",
),
failfast = False,
buffer = False,
catchbreak = False,
exit = False
)
导入和报告测试结果
此 Jenkins 管道中的第七阶段(名为 Import Test Results)使用 Databricks CLI 将工作区中的测试结果发送到本地开发计算机。 第八阶段,也就是最后阶段,名为 Publish Test Results,使用 junit Jenkins 插件将测试结果发布到 Jenkins。 这使你可以将与测试结果的状态相关的报表和仪表板可视化。 这些阶段的定义如下:
stage('Import Test Results') {
def DATABRICKS_BUNDLE_WORKSPACE_FILE_PATH
def getPath = "${DBCLIPATH}/databricks bundle validate -t ${BUNDLETARGET} | ${JQPATH}/jq -r .workspace.file_path"
def output = sh(script: getPath, returnStdout: true).trim()
if (output) {
DATABRICKS_BUNDLE_WORKSPACE_FILE_PATH = "${output}"
} else {
error "Failed to capture output or command execution failed: ${getPath}"
}
sh """#!/bin/bash
${DBCLIPATH}/databricks workspace export-dir \
${DATABRICKS_BUNDLE_WORKSPACE_FILE_PATH}/Validation/Output/test-results \
${WORKSPACE}/Validation/Output/test-results \
--overwrite
"""
}
stage('Publish Test Results') {
junit allowEmptyResults: true, testResults: '**/test-results/*.xml', skipPublishingChecks: true
}
将所有代码更改推送到第三方存储库
现在,你应将本地开发计算机上的克隆存储库的内容推送到第三方存储库。 在推送之前,应首先将以下条目添加到 .gitignore 克隆存储库中的文件,因为可能不应将内部捆绑包工作文件、验证报告、Python 生成文件和 Python 缓存推送到第三方存储库中。 通常,你需要重新生成新的验证报告和Azure Databricks工作区中最新的Python轮生成,而不是使用可能过时的验证报告和Python轮生成:
.databricks/
.vscode/
Libraries/python/dabdemo/build/
Libraries/python/dabdemo/__pycache__/
Libraries/python/dabdemo/dabdemo.egg-info/
Validation/
运行 Jenkins 管道
现在可以手动运行 Jenkins 管道了。 为此,请从 Jenkins 仪表板执行以下操作:
单击 Jenkins 管道的名称。
在边栏上,单击“ 立即生成”。
若要查看结果,请单击最新的管道运行(例如
#1),然后单击 控制台输出。
此时,CI/CD 管道已完成集成和部署周期。 通过自动执行此过程,可以确保代码已经通过有效、一致且可重复的过程进行了测试和部署。 要指示第三方 Git 提供商在每次发生特定事件(例如存储库拉取请求)时运行 Jenkins,请参阅第三方 Git 提供商的文档。