将 Spark Connect 与 Spark 经典版进行比较

Spark Connect 是 Apache Spark 中基于 gRPC 的协议,用于指定客户端应用程序如何与远程 Spark Server 通信。 它允许使用数据帧 API 远程执行 Spark 工作负载。

Spark Connect 用于以下各项:

  • 标准计算和专用计算上使用 Databricks Runtime 版本 13.3 及更高版本的 Scala 笔记本

  • 在标准和专用计算上使用 Databricks Runtime 版本 14.3 及更高版本的 Python 笔记本

  • Databricks Connect

尽管 Spark Connect 和 Spark 经典都利用延迟执行转换,但在将现有代码从 Spark 经典迁移到 Spark Connect 或编写必须同时处理两者的代码时,需要了解的一些重要区别,以避免出现意外的行为和性能问题。

懒惰与急切

Spark Connect 和 Spark 经典版之间的主要区别在于 Spark Connect 将分析和名称解析推迟到执行时间,如下表所述。

方面 Spark 经典 Spark Connect
查询执行 懒惰 懒惰
架构分析 渴望 懒惰
架构访问 Local 触发器 RPC
临时视图 嵌入的计划 名称查找
UDF 序列化 创建时 执行时

查询执行

Spark 经典版和 Spark Connect 都遵循相同的延迟执行模型来执行查询。

在 Spark 经典版中,数据帧转换(如 filterlimit)是懒惰的。 这意味着它们不会立即执行,而是记录在逻辑计划中。 仅在调用一个动作(例如 show()collect())时,才会触发实际计算。

Spark Connect 遵循类似的延迟评估模型。 转换在客户端构造,并作为未解析的 proto 计划(原型计划)发送到服务器。 然后,当调用操作时,服务器将进行必要的分析和执行任务。

方面 Spark 经典 Spark Connect
转换: df.filter(...)、、 df.select(...)df.limit(...) 延迟执行 延迟执行
SQL 查询: spark.sql("select …") 延迟执行 延迟执行
动作: df.collect(), df.show() 立即执行 立即执行
SQL 命令: spark.sql("insert …")spark.sql("create …") 立即执行 立即执行

架构分析

Spark Classic 版本在逻辑计划构建阶段会急切地执行模式分析。 定义转换时,Spark 会立即分析 DataFrame 的架构,以确保所有引用的列和数据类型都有效。 例如,执行 spark.sql("select 1 as a, 2 as b").filter("c > 1") 将立即引发错误,表示找不到列 c

Spark Connect 在转换期间构建未解析的原型计划。 访问架构或执行作时,客户端会通过 RPC(远程过程调用)将未解析的计划发送到服务器。 然后,服务器进行分析并执行操作。 此设计推迟模式分析。 例如,spark.sql("select 1 as a, 2 as b").filter("c > 1") 不会引发任何错误,因为未解析的计划仅在客户端运行,但是在 df.columnsdf.show() 上会出现错误,因为未解析的计划将发送到服务器进行分析。

与查询执行不同,Spark 经典和 Spark Connect 在架构分析发生时有所不同。

方面 Spark 经典 Spark Connect
转换: df.filter(...)、、 df.select(...)df.limit(...) 渴望 懒惰
架构访问:df.columns、、 df.schemadf.isStreaming 渴望 渴望
触发分析 RPC 请求,与 Spark 经典版不同
动作: df.collect(), df.show() 渴望 渴望
依赖会话状态:用户定义函数(UDF)、临时视图、配置项 渴望 懒惰
在执行期间评估

最佳做法

延迟分析与急切分析之间的区别意味着,有一些最佳实践需要遵循,以避免意外行为和性能问题,特别是由于覆盖临时视图名称、在 UDF 中捕获外部变量、错误检测延迟以及新 DataFrame 上的过度模式访问所引起的。

创建唯一的临时视图名称

在 Spark Connect 中,DataFrame 仅按名称存储对临时视图的引用。 因此,如果临时视图稍后被替换,数据帧中的数据也会更改,因为它在执行时按名称查找视图。

此行为不同于 Spark 经典版,其中临时视图的逻辑计划在创建时嵌入到数据帧的计划中。 临时视图的任何后续替换都不会影响原始数据帧。

若要缓解差异,请始终创建唯一的临时视图名称。 例如,在视图名称中包含 UUID。 这可避免影响引用以前注册的临时视图的任何现有数据帧。

Python

import uuid
def create_temp_view_and_create_dataframe(x):
  temp_view_name = f"`temp_view_{uuid.uuid4()}`"  # Use a random name to avoid conflicts.
  spark.range(x).createOrReplaceTempView(temp_view_name)
  return spark.table(temp_view_name)

df10 = create_temp_view_and_create_dataframe(10)
assert len(df10.collect()) == 10

df100 = create_temp_view_and_create_dataframe(100)
assert len(df10.collect()) == 10  # It works as expected now.
assert len(df100.collect()) == 100

Scala

import java.util.UUID

def createTempViewAndDataFrame(x: Int) = {
  val tempViewName = s"`temp_view_${UUID.randomUUID()}`"
  spark.range(x).createOrReplaceTempView(tempViewName)
  spark.table(tempViewName)
}

val df10 = createTempViewAndDataFrame(10)
assert(df10.collect().length == 10)

val df100 = createTempViewAndDataFrame(100)
assert(df10.collect().length == 10) // Works as expected
assert(df100.collect().length == 100)

封装 UDF 定义

在 Spark Connect 中,Python UDF 延迟。 其序列化和注册将延迟到执行时间。 在以下示例中,UDF 仅在调用show()时才序列化并上传到 Spark 群集上执行。

from pyspark.sql.functions import udf

x = 123

@udf("INT")
def foo():
  return x

df = spark.range(1).select(foo())
x = 456
df.show() # Prints 456

此行为不同于 Spark 经典版,其中 UDF 已预先创建。 在 Spark 经典版中,创建 UDF 时会捕获 x 的值,因此后续对 x 的更改不会影响已创建的 UDF。

如果需要修改 UDF 所依赖的外部变量的值,请使用函数工厂(使用及早绑定的闭包)来正确捕获变量值。 具体而言,用辅助函数包裹 UDF 的创建,以捕捉依赖变量的值。

Python

from pyspark.sql.functions import udf

def make_udf(value):
  def foo():
    return value
  return udf(foo)

x = 123
foo_udf = make_udf(x)
x = 456
df = spark.range(1).select(foo_udf())
df.show() # Prints 123 as expected

Scala

def makeUDF(value: Int) = udf(() => value)

var x = 123
val fooUDF = makeUDF(x)  // Captures the current value
x = 456
val df = spark.range(1).select(fooUDF())
df.show() // Prints 123 as expected

通过将 UDF 定义包装在另一个函数(make_udf)内,我们将创建一个新范围,其中当前值 x 作为参数传入。 这可确保每个生成的 UDF 都有自己的字段副本,在创建 UDF 时绑定。

触发错误检测的立即分析

以下错误处理在 Spark 经典版中很有用,因为它执行了预先分析,这允许及时引发异常。 但是,在 Spark Connect 中,此代码不会导致任何问题,因为它只构造本地未解析的 proto 计划,而不会触发任何分析。

df = spark.createDataFrame([("Alice", 25), ("Bob", 30)], ["name", "age"])

try:
  df = df.select("name", "age")
  df = df.withColumn(
      "age_group",
      when(col("age") < 18, "minor").otherwise("adult"))
  df = df.filter(col("age_with_typo") > 6) # The use of non-existing column name will not throw analysis exception in Spark Connect
except Exception as e:
  print(f"Error: {repr(e)}")

如果您的代码依赖分析异常并想要捕获它,可以主动触发分析,例如使用df.columnsdf.schemadf.collect()

Python

try:
  df = ...
  df.columns # This will trigger eager analysis
except Exception as e:
  print(f"Error: {repr(e)}")

Scala

import org.apache.spark.SparkThrowable
import org.apache.spark.sql.functions._

val df = spark.createDataFrame(Seq(("Alice", 25), ("Bob", 30))).toDF("name", "age")

try {
  val df2 = df.select("name", "age")
    .withColumn("age_group", when(col("age") < 18, "minor").otherwise("adult"))
    .filter(col("age_with_typo") > 6)
  df2.columns // Trigger eager analysis to catch the error
} catch {
  case e: SparkThrowable => println(s"Error: ${e.getMessage}")
}

避免过多的热切分析请求

如果您通过避免过度使用触发急切分析(例如 df.columnsdf.schema)的调用来减少大量分析请求,则可以提高性能。

如果无法避免这种情况,并且必须经常检查新数据帧的列,请保留一组用于跟踪列名称以避免分析请求。

Python

df = spark.range(10)
columns = set(df.columns) # Maintain the set of column names
for i in range(200):
  new_column_name = str(i)
  if new_column_name not in columns: # Check the set
    df = df.withColumn(new_column_name, F.col("id") + i)
    columns.add(new_column_name)
df.show()

Scala

import org.apache.spark.sql.functions._

var df = spark.range(10).toDF
val columns = scala.collection.mutable.Set(df.columns: _*)
for (i <- 0 until 200) {
  val newColumnName = i.toString
  if (!columns.contains(newColumnName)) {
    df = df.withColumn(newColumnName, col("id") + i)
    columns.add(newColumnName)
  }
}
df.show()

另一个类似的情况是创建大量不必要的中间数据帧并对其进行分析。 而是直接从 DataFrame 的架构获取 StructType 字段信息,而不是创建中间 DataFrame。

Python

from pyspark.sql.types import StructType

df = ...
struct_column_fields = {
    column_schema.name: [f.name for f in column_schema.dataType.fields]
    for column_schema in df.schema
    if isinstance(column_schema.dataType, StructType)
}
print(struct_column_fields)

Scala

import org.apache.spark.sql.types.StructType

df = ...
val structColumnFields = df.schema.fields
  .filter(_.dataType.isInstanceOf[StructType])
  .map { field =>
    field.name -> field.dataType.asInstanceOf[StructType].fields.map(_.name)
  }
  .toMap
println(structColumnFields)