将 Spark Connect 与 Spark 经典版进行比较

Spark Connect 是 Apache Spark 中基于 gRPC 的协议,用于指定客户端应用程序如何与远程 Spark Server 通信。 它允许使用数据帧 API 远程执行 Spark 工作负载。

Spark Connect 用于以下各项:

  • 在标准计算环境中运行 Databricks Runtime 13.3 及以上版本的 Scala 笔记本。
  • 标准计算中具有 Databricks Runtime 版本 14.3 及更高版本的 Python 笔记本
  • 无服务器计算
  • Databricks Connect

尽管 Spark Connect 和 Spark 经典都利用延迟执行转换,但在将现有代码从 Spark 经典迁移到 Spark Connect 或编写必须同时处理两者的代码时,需要了解的一些重要区别,以避免出现意外的行为和性能问题。

懒惰与急切

Spark Connect 和 Spark 经典版之间的主要区别在于 Spark Connect 将分析和名称解析推迟到执行时间,如下表所述。

方面 Spark 经典 Spark Connect
查询执行 懒惰 懒惰
架构分析 渴望 懒惰
架构访问 Local 触发 RPC 并在首次访问时缓存架构
临时视图 嵌入的计划 名称查找
UDF 序列化 创建时 执行时

查询执行

Spark 经典版和 Spark Connect 都遵循相同的延迟执行模型来执行查询。

在 Spark 经典版中,数据帧转换(如 filterlimit)是懒惰的。 这意味着它们不会立即执行,而是在逻辑计划中编码。 实际计算仅在某个操作(如 show()collect())触发时进行。

Spark Connect 遵循类似的延迟评估模型。 转换在客户端构建,并作为未解析计划发送到服务器。 然后,当调用操作时,服务器将进行必要的分析和执行任务。

方面 Spark 经典 Spark Connect
转换: df.filter(...)、、 df.select(...)df.limit(...) 延迟执行 延迟执行
SQL 查询: spark.sql("select …") 延迟执行 延迟执行
动作: df.collect(), df.show() 立即执行 立即执行
SQL 命令: spark.sql("insert …")spark.sql("create …") 立即执行 立即执行

架构分析

Spark Classic在逻辑计划构造期间主动进行分析。 此分析阶段将未解析的计划转换为完全解析的逻辑计划,并验证该作是否可以由 Spark 执行。 急切地执行这项工作的主要好处之一是用户在出错时立即收到反馈。 例如,执行 spark.sql("select 1 as a, 2 as b").filter("c > 1") 将立即引发错误,表示找不到列 c

Spark Connect 不同于经典版,因为客户端在转换期间构造未解析的计划并延迟其分析。 任何需要已解析计划的操作(例如访问架构、解释计划的过程、持久保存 DataFrame 或执行相关操作)都会导致客户端通过 RPC 将未解析的计划发送到服务器。 然后,服务器将进行全面分析,以获取解析的逻辑方案,并执行操作。 例如,spark.sql("select 1 as a, 2 as b").filter("c > 1") 不会引发任何错误,因为未解析的计划仅在客户端运行,但是在 df.columnsdf.show() 上会出现错误,因为未解析的计划将发送到服务器进行分析。

与查询执行不同,Spark 经典和 Spark Connect 在架构分析发生时有所不同。

方面 Spark 经典 Spark Connect
转换: df.filter(...)、、 df.select(...)df.limit(...) 渴望 懒惰
架构访问:df.columns、、 df.schemadf.isStreaming 渴望 渴望
触发分析 RPC 请求,与 Spark 经典版不同
动作: df.collect(), df.show() 渴望 渴望
数据帧的依赖会话状态:UDF、临时视图、配置项 渴望 懒惰
在执行数据帧计划的过程中进行评估
临时视图的依赖会话状态:UDF、其他临时视图、配置 渴望 渴望
创建临时视图时,会急切地触发分析

最佳做法

延迟分析与急切分析之间的区别意味着,有一些最佳实践需要遵循,以避免意外行为和性能问题,特别是由于覆盖临时视图名称、在 UDF 中捕获外部变量、错误检测延迟以及新 DataFrame 上的过度模式访问所引起的。

创建唯一的临时视图名称

在 Spark Connect 中,DataFrame 仅按名称存储对临时视图的引用。 因此,如果临时视图稍后被替换,数据帧中的数据也会更改,因为它在执行时按名称查找视图。

此行为不同于 Spark 经典版,其中临时视图的逻辑计划在创建时嵌入到数据帧的计划中。 临时视图的任何后续替换都不会影响原始数据帧。

若要缓解差异,请始终创建唯一的临时视图名称。 例如,在视图名称中包含 UUID。 这可避免影响引用以前注册的临时视图的任何现有数据帧。

Python

import uuid
def create_temp_view_and_create_dataframe(x):
  temp_view_name = f"`temp_view_{uuid.uuid4()}`"  # Use a random name to avoid conflicts.
  spark.range(x).createOrReplaceTempView(temp_view_name)
  return spark.table(temp_view_name)

df10 = create_temp_view_and_create_dataframe(10)
assert len(df10.collect()) == 10

df100 = create_temp_view_and_create_dataframe(100)
assert len(df10.collect()) == 10  # It works as expected now.
assert len(df100.collect()) == 100

Scala

import java.util.UUID

def createTempViewAndDataFrame(x: Int) = {
  val tempViewName = s"`temp_view_${UUID.randomUUID()}`"
  spark.range(x).createOrReplaceTempView(tempViewName)
  spark.table(tempViewName)
}

val df10 = createTempViewAndDataFrame(10)
assert(df10.collect().length == 10)

val df100 = createTempViewAndDataFrame(100)
assert(df10.collect().length == 10) // Works as expected
assert(df100.collect().length == 100)

封装 UDF 定义

UDFs 依赖于可变的外部变量通常被认为是不好的做法,因为这会引入隐式依赖,导致非确定性行为,并降低组合性。 但是,如果确实有这样的模式,请注意以下注意事项:

在 Spark Connect 中,Python UDF 延迟。 其序列化和注册将延迟到执行时间。 在以下示例中,UDF 仅在调用show()时才序列化并上传到 Spark 群集上执行。

from pyspark.sql.functions import udf

x = 123

@udf("INT")
def foo():
  return x

df = spark.range(1).select(foo())
x = 456
df.show() # Prints 456

此行为不同于 Spark 经典版,其中 UDF 已预先创建。 在 Spark 经典版中,创建 UDF 时会捕获 x 的值,因此后续对 x 的更改不会影响已创建的 UDF。

如果需要修改 UDF 所依赖的外部变量的值,请使用函数工厂(使用及早绑定的闭包)来正确捕获变量值。 具体而言,用辅助函数包裹 UDF 的创建,以捕捉依赖变量的值。

Python

from pyspark.sql.functions import udf

def make_udf(value):
  def foo():
    return value
  return udf(foo)

x = 123
foo_udf = make_udf(x)
x = 456
df = spark.range(1).select(foo_udf())
df.show() # Prints 123 as expected

Scala

def makeUDF(value: Int) = udf(() => value)

var x = 123
val fooUDF = makeUDF(x)  // Captures the current value
x = 456
val df = spark.range(1).select(fooUDF())
df.show() // Prints 123 as expected

通过将 UDF 定义包装在另一个函数(make_udf)内,我们将创建一个新范围,其中当前值 x 作为参数传入。 这可确保每个生成的 UDF 都有自己的字段副本,在创建 UDF 时绑定。

触发错误检测的立即分析

以下错误处理在 Spark 经典版中很有用,因为它执行了预先分析,这允许及时引发异常。 但是,在 Spark Connect 中,此代码不会导致任何问题,因为它只构造本地未解析的计划,而不会触发任何分析。

df = spark.createDataFrame([("Alice", 25), ("Bob", 30)], ["name", "age"])

try:
  df = df.select("name", "age")
  df = df.withColumn(
      "age_group",
      when(col("age") < 18, "minor").otherwise("adult"))
  df = df.filter(col("age_with_typo") > 6) # The use of non-existing column name will not throw analysis exception in Spark Connect
except Exception as e:
  print(f"Error: {repr(e)}")

如果您的代码依赖分析异常并想要捕获它,可以主动触发分析,例如使用df.columnsdf.schemadf.collect()

Python

try:
  df = ...
  df.columns # This will trigger eager analysis
except Exception as e:
  print(f"Error: {repr(e)}")

Scala

import org.apache.spark.SparkThrowable
import org.apache.spark.sql.functions._

val df = spark.createDataFrame(Seq(("Alice", 25), ("Bob", 30))).toDF("name", "age")

try {
  val df2 = df.select("name", "age")
    .withColumn("age_group", when(col("age") < 18, "minor").otherwise("adult"))
    .filter(col("age_with_typo") > 6)
  df2.columns // Trigger eager analysis to catch the error
} catch {
  case e: SparkThrowable => println(s"Error: ${e.getMessage}")
}

避免过多的热切分析请求

如果避免对大量数据帧执行分析请求,则可以提高性能。

逐步创建新的 DataFrame,并在每次迭代中访问其架构

创建大量新的 DataFrame 时,请避免过度使用调用来触发对它们进行预先分析(例如 df.columnsdf.schema)。 可以多次访问同一数据帧的架构,但对许多新创建的 DataFrame 触发分析会影响性能。

例如,在循环内以迭代方式将列添加到 DataFrame,并检查每个列是否已存在,然后再添加该列时,调用 df.columns 每个新建的数据帧都会在每个迭代时触发分析请求。 为了避免这种情况,请保留一组用于跟踪列名称,而不是重复访问 DataFrame 的架构。

Python
df = spark.range(10)
columns = set(df.columns) # Maintain the set of column names
for i in range(200):
  new_column_name = str(i)
  # if new_column_name not in df.columns:  # Bad practice. The `df.columns` call causes an analysis request on the newly created DataFrame in every iteration.
  if new_column_name not in columns:  # Check the set without triggering analysis
    df = df.withColumn(new_column_name, F.col("id") + i)
    columns.add(new_column_name)
df.show()
Scala
import org.apache.spark.sql.functions._

var df = spark.range(10).toDF
val columns = scala.collection.mutable.Set(df.columns: _*)
for (i <- 0 until 200) {
  val newColumnName = i.toString
  // if (!df.columns.contains(newColumnName)) {  // Bad practice. The `df.columns` call causes an analysis request on the newly created DataFrame in every iteration.
  if (!columns.contains(newColumnName)) {  // Check the set without triggering analysis
    df = df.withColumn(newColumnName, col("id") + i)
    columns.add(newColumnName)
  }
}
df.show()

避免访问大量中间数据帧的架构

另一个类似的情况是创建大量不必要的中间数据帧并对其进行分析。 在以下情况下,若要从结构类型的每一列中提取字段名称,请直接从 DataFrame 的架构中获取 StructType 字段信息,而不是创建中间 DataFrame。

Python
from pyspark.sql.types import StructType

df = ...
struct_column_fields = {
    # column_schema.name: df.select(column_schema.name + ".*").columns  # Bad practice. This creates an intermediate DataFrame and triggers an analysis request for each StructType column.
    column_schema.name: [f.name for f in column_schema.dataType.fields]  # Access StructType fields directly from the schema, avoiding analysis on intermediate DataFrames.
    for column_schema in df.schema
    if isinstance(column_schema.dataType, StructType)
}
print(struct_column_fields)
Scala
import org.apache.spark.sql.types.StructType

df = ...
val structColumnFields = df.schema.fields
  .filter(_.dataType.isInstanceOf[StructType])
  .map { field =>
    // field.name -> df.select(field.name + ".*").columns  // Bad practice. This creates an intermediate DataFrame and triggers analysis for each StructType column.
    field.name -> field.dataType.asInstanceOf[StructType].fields.map(_.name)  // Access StructType fields directly from the schema, avoiding analysis on intermediate DataFrames.
  }
  .toMap
println(structColumnFields)