测试适用于 Python 的 Databricks Connect

备注

本文介绍适用于 Databricks Runtime 13.3 LTS 及更高版本的 Databricks Connect。

本文介绍如何使用 pytest 在 Databricks Connect for Databricks Runtime 13.3 LTS 及更高版本运行测试。 有关 Databricks Connect 的详细信息,请参阅 适用于 Python 的 Databricks Connect

此信息假定你已安装适用于 Python 的 Databricks Connect。 请参阅安装适用于 Python 的 Databricks Connect

可以针对本地代码运行 pytest,而无需连接到远程 Azure Databricks 工作区中的群集。 例如,可以使用 pytest 来测试接受 PySpark DataFrame 对象并将其返回到本地内存的函数。 若要开始使用并在本地运行 pytest,请参阅 pytest 文档中的入门

例如,假定一个名为 nyctaxi_functions.py 的文件,该文件包含一个返回 SparkSession 实例的 get_spark 函数和一个 get_nyctaxi_trips 函数,该函数返回一个表示 samples 目录 nyctaxi 架构中 trips 表的 DataFrame

nyctaxi_functions.py

from databricks.connect import DatabricksSession
from pyspark.sql import DataFrame, SparkSession

def get_spark() -> SparkSession:
  spark = DatabricksSession.builder.getOrCreate()
  return spark

def get_nyctaxi_trips() -> DataFrame:
  spark = get_spark()
  df = spark.read.table("samples.nyctaxi.trips")
  return df

另外,假设以下名为 main.py 的文件调用这些 get_sparkget_nyctaxi_trips 函数:

main.py

from nyctaxi_functions import *

df = get_nyctaxi_trips()
df.show(5)

以下名为 test_nyctaxi_functions.py 的文件会测试 get_spark 函数是否返回 SparkSession 实例,以及 get_nyctaxi_trips 函数是否返回至少包含一行数据的 DataFrame

test_nyctaxi_functions.py

import pyspark.sql.connect.session
from nyctaxi_functions import *

def test_get_spark():
  spark = get_spark()
  assert isinstance(spark, pyspark.sql.connect.session.SparkSession)

def test_get_nyctaxi_trips():
  df = get_nyctaxi_trips()
  assert df.count() > 0

若要运行这些测试,请从代码项目的根目录运行 pytest 命令,该命令应生成如下所示的测试结果:

$ pytest
=================== test session starts ====================
platform darwin -- Python 3.11.7, pytest-8.1.1, pluggy-1.4.0
rootdir: <project-rootdir>
collected 2 items

test_nyctaxi_functions.py .. [100%]
======================== 2 passed ==========================