Spark Connect 是 Apache Spark 中基于 gRPC 的协议,用于指定客户端应用程序如何与远程 Spark Server 通信。 它允许使用数据帧 API 远程执行 Spark 工作负载。
Spark Connect 用于以下各项:
标准计算和专用计算上使用 Databricks Runtime 版本 13.3 及更高版本的 Scala 笔记本
在标准和专用计算上使用 Databricks Runtime 版本 14.3 及更高版本的 Python 笔记本
Databricks Connect
尽管 Spark Connect 和 Spark 经典都利用延迟执行转换,但在将现有代码从 Spark 经典迁移到 Spark Connect 或编写必须同时处理两者的代码时,需要了解的一些重要区别,以避免出现意外的行为和性能问题。
懒惰与急切
Spark Connect 和 Spark 经典版之间的主要区别在于 Spark Connect 将分析和名称解析推迟到执行时间,如下表所述。
| 方面 | Spark 经典 | Spark Connect |
|---|---|---|
| 查询执行 | 懒惰 | 懒惰 |
| 架构分析 | 渴望 | 懒惰 |
| 架构访问 | Local | 触发器 RPC |
| 临时视图 | 嵌入的计划 | 名称查找 |
| UDF 序列化 | 创建时 | 执行时 |
查询执行
Spark 经典版和 Spark Connect 都遵循相同的延迟执行模型来执行查询。
在 Spark 经典版中,数据帧转换(如 filter 和 limit)是懒惰的。 这意味着它们不会立即执行,而是记录在逻辑计划中。 仅在调用一个动作(例如 show(),collect())时,才会触发实际计算。
Spark Connect 遵循类似的延迟评估模型。 转换在客户端构造,并作为未解析的 proto 计划(原型计划)发送到服务器。 然后,当调用操作时,服务器将进行必要的分析和执行任务。
| 方面 | Spark 经典 | Spark Connect |
|---|---|---|
转换: df.filter(...)、、 df.select(...)、 df.limit(...) |
延迟执行 | 延迟执行 |
SQL 查询: spark.sql("select …") |
延迟执行 | 延迟执行 |
动作: df.collect(), df.show() |
立即执行 | 立即执行 |
SQL 命令: spark.sql("insert …")spark.sql("create …") |
立即执行 | 立即执行 |
架构分析
Spark Classic 版本在逻辑计划构建阶段会急切地执行模式分析。 定义转换时,Spark 会立即分析 DataFrame 的架构,以确保所有引用的列和数据类型都有效。 例如,执行 spark.sql("select 1 as a, 2 as b").filter("c > 1") 将立即引发错误,表示找不到列 c 。
Spark Connect 在转换期间构建未解析的原型计划。 访问架构或执行作时,客户端会通过 RPC(远程过程调用)将未解析的计划发送到服务器。 然后,服务器进行分析并执行操作。 此设计推迟模式分析。 例如,spark.sql("select 1 as a, 2 as b").filter("c > 1") 不会引发任何错误,因为未解析的计划仅在客户端运行,但是在 df.columns 或 df.show() 上会出现错误,因为未解析的计划将发送到服务器进行分析。
与查询执行不同,Spark 经典和 Spark Connect 在架构分析发生时有所不同。
| 方面 | Spark 经典 | Spark Connect |
|---|---|---|
转换: df.filter(...)、、 df.select(...)、 df.limit(...) |
渴望 | 懒惰 |
架构访问:df.columns、、 df.schemadf.isStreaming |
渴望 | 渴望 触发分析 RPC 请求,与 Spark 经典版不同 |
动作: df.collect(), df.show() |
渴望 | 渴望 |
| 依赖会话状态:用户定义函数(UDF)、临时视图、配置项 | 渴望 | 懒惰 在执行期间评估 |
最佳做法
延迟分析与急切分析之间的区别意味着,有一些最佳实践需要遵循,以避免意外行为和性能问题,特别是由于覆盖临时视图名称、在 UDF 中捕获外部变量、错误检测延迟以及新 DataFrame 上的过度模式访问所引起的。
创建唯一的临时视图名称
在 Spark Connect 中,DataFrame 仅按名称存储对临时视图的引用。 因此,如果临时视图稍后被替换,数据帧中的数据也会更改,因为它在执行时按名称查找视图。
此行为不同于 Spark 经典版,其中临时视图的逻辑计划在创建时嵌入到数据帧的计划中。 临时视图的任何后续替换都不会影响原始数据帧。
若要缓解差异,请始终创建唯一的临时视图名称。 例如,在视图名称中包含 UUID。 这可避免影响引用以前注册的临时视图的任何现有数据帧。
Python
import uuid
def create_temp_view_and_create_dataframe(x):
temp_view_name = f"`temp_view_{uuid.uuid4()}`" # Use a random name to avoid conflicts.
spark.range(x).createOrReplaceTempView(temp_view_name)
return spark.table(temp_view_name)
df10 = create_temp_view_and_create_dataframe(10)
assert len(df10.collect()) == 10
df100 = create_temp_view_and_create_dataframe(100)
assert len(df10.collect()) == 10 # It works as expected now.
assert len(df100.collect()) == 100
Scala
import java.util.UUID
def createTempViewAndDataFrame(x: Int) = {
val tempViewName = s"`temp_view_${UUID.randomUUID()}`"
spark.range(x).createOrReplaceTempView(tempViewName)
spark.table(tempViewName)
}
val df10 = createTempViewAndDataFrame(10)
assert(df10.collect().length == 10)
val df100 = createTempViewAndDataFrame(100)
assert(df10.collect().length == 10) // Works as expected
assert(df100.collect().length == 100)
封装 UDF 定义
在 Spark Connect 中,Python UDF 延迟。 其序列化和注册将延迟到执行时间。 在以下示例中,UDF 仅在调用show()时才序列化并上传到 Spark 群集上执行。
from pyspark.sql.functions import udf
x = 123
@udf("INT")
def foo():
return x
df = spark.range(1).select(foo())
x = 456
df.show() # Prints 456
此行为不同于 Spark 经典版,其中 UDF 已预先创建。 在 Spark 经典版中,创建 UDF 时会捕获 x 的值,因此后续对 x 的更改不会影响已创建的 UDF。
如果需要修改 UDF 所依赖的外部变量的值,请使用函数工厂(使用及早绑定的闭包)来正确捕获变量值。 具体而言,用辅助函数包裹 UDF 的创建,以捕捉依赖变量的值。
Python
from pyspark.sql.functions import udf
def make_udf(value):
def foo():
return value
return udf(foo)
x = 123
foo_udf = make_udf(x)
x = 456
df = spark.range(1).select(foo_udf())
df.show() # Prints 123 as expected
Scala
def makeUDF(value: Int) = udf(() => value)
var x = 123
val fooUDF = makeUDF(x) // Captures the current value
x = 456
val df = spark.range(1).select(fooUDF())
df.show() // Prints 123 as expected
通过将 UDF 定义包装在另一个函数(make_udf)内,我们将创建一个新范围,其中当前值 x 作为参数传入。 这可确保每个生成的 UDF 都有自己的字段副本,在创建 UDF 时绑定。
触发错误检测的立即分析
以下错误处理在 Spark 经典版中很有用,因为它执行了预先分析,这允许及时引发异常。 但是,在 Spark Connect 中,此代码不会导致任何问题,因为它只构造本地未解析的 proto 计划,而不会触发任何分析。
df = spark.createDataFrame([("Alice", 25), ("Bob", 30)], ["name", "age"])
try:
df = df.select("name", "age")
df = df.withColumn(
"age_group",
when(col("age") < 18, "minor").otherwise("adult"))
df = df.filter(col("age_with_typo") > 6) # The use of non-existing column name will not throw analysis exception in Spark Connect
except Exception as e:
print(f"Error: {repr(e)}")
如果您的代码依赖分析异常并想要捕获它,可以主动触发分析,例如使用df.columns、df.schema或df.collect()。
Python
try:
df = ...
df.columns # This will trigger eager analysis
except Exception as e:
print(f"Error: {repr(e)}")
Scala
import org.apache.spark.SparkThrowable
import org.apache.spark.sql.functions._
val df = spark.createDataFrame(Seq(("Alice", 25), ("Bob", 30))).toDF("name", "age")
try {
val df2 = df.select("name", "age")
.withColumn("age_group", when(col("age") < 18, "minor").otherwise("adult"))
.filter(col("age_with_typo") > 6)
df2.columns // Trigger eager analysis to catch the error
} catch {
case e: SparkThrowable => println(s"Error: ${e.getMessage}")
}
避免过多的热切分析请求
如果您通过避免过度使用触发急切分析(例如 df.columns,df.schema)的调用来减少大量分析请求,则可以提高性能。
如果无法避免这种情况,并且必须经常检查新数据帧的列,请保留一组用于跟踪列名称以避免分析请求。
Python
df = spark.range(10)
columns = set(df.columns) # Maintain the set of column names
for i in range(200):
new_column_name = str(i)
if new_column_name not in columns: # Check the set
df = df.withColumn(new_column_name, F.col("id") + i)
columns.add(new_column_name)
df.show()
Scala
import org.apache.spark.sql.functions._
var df = spark.range(10).toDF
val columns = scala.collection.mutable.Set(df.columns: _*)
for (i <- 0 until 200) {
val newColumnName = i.toString
if (!columns.contains(newColumnName)) {
df = df.withColumn(newColumnName, col("id") + i)
columns.add(newColumnName)
}
}
df.show()
另一个类似的情况是创建大量不必要的中间数据帧并对其进行分析。 而是直接从 DataFrame 的架构获取 StructType 字段信息,而不是创建中间 DataFrame。
Python
from pyspark.sql.types import StructType
df = ...
struct_column_fields = {
column_schema.name: [f.name for f in column_schema.dataType.fields]
for column_schema in df.schema
if isinstance(column_schema.dataType, StructType)
}
print(struct_column_fields)
Scala
import org.apache.spark.sql.types.StructType
df = ...
val structColumnFields = df.schema.fields
.filter(_.dataType.isInstanceOf[StructType])
.map { field =>
field.name -> field.dataType.asInstanceOf[StructType].fields.map(_.name)
}
.toMap
println(structColumnFields)