备注
本文介绍适用于 Databricks Runtime 13.3 LTS 及更高版本的 Databricks Connect。
本文介绍如何使用 pytest
在 Databricks Connect for Databricks Runtime 13.3 LTS 及更高版本运行测试。 有关 Databricks Connect 的详细信息,请参阅 适用于 Python 的 Databricks Connect。
此信息假定你已安装适用于 Python 的 Databricks Connect。 请参阅安装适用于 Python 的 Databricks Connect。
可以针对本地代码运行 pytest,而无需连接到远程 Azure Databricks 工作区中的群集。 例如,可以使用 pytest
来测试接受 PySpark DataFrame
对象并将其返回到本地内存的函数。 若要开始使用并在本地运行 pytest
,请参阅 pytest
文档中的入门。
例如,假定一个名为 nyctaxi_functions.py
的文件,该文件包含一个返回 SparkSession
实例的 get_spark
函数和一个 get_nyctaxi_trips
函数,该函数返回一个表示 samples
目录 nyctaxi
架构中 trips
表的 DataFrame
:
nyctaxi_functions.py
:
from databricks.connect import DatabricksSession
from pyspark.sql import DataFrame, SparkSession
def get_spark() -> SparkSession:
spark = DatabricksSession.builder.getOrCreate()
return spark
def get_nyctaxi_trips() -> DataFrame:
spark = get_spark()
df = spark.read.table("samples.nyctaxi.trips")
return df
另外,假设以下名为 main.py
的文件调用这些 get_spark
和get_nyctaxi_trips
函数:
main.py
:
from nyctaxi_functions import *
df = get_nyctaxi_trips()
df.show(5)
以下名为 test_nyctaxi_functions.py
的文件会测试 get_spark
函数是否返回 SparkSession
实例,以及 get_nyctaxi_trips
函数是否返回至少包含一行数据的 DataFrame
:
test_nyctaxi_functions.py
:
import pyspark.sql.connect.session
from nyctaxi_functions import *
def test_get_spark():
spark = get_spark()
assert isinstance(spark, pyspark.sql.connect.session.SparkSession)
def test_get_nyctaxi_trips():
df = get_nyctaxi_trips()
assert df.count() > 0
若要运行这些测试,请从代码项目的根目录运行 pytest
命令,该命令应生成如下所示的测试结果:
$ pytest
=================== test session starts ====================
platform darwin -- Python 3.11.7, pytest-8.1.1, pluggy-1.4.0
rootdir: <project-rootdir>
collected 2 items
test_nyctaxi_functions.py .. [100%]
======================== 2 passed ==========================