Spark Connect 是 Apache Spark 中基于 gRPC 的协议,用于指定客户端应用程序如何与远程 Spark Server 通信。 它允许使用数据帧 API 远程执行 Spark 工作负载。
Spark Connect 用于以下各项:
- 在标准计算环境中运行 Databricks Runtime 13.3 及以上版本的 Scala 笔记本。
- 标准计算中具有 Databricks Runtime 版本 14.3 及更高版本的 Python 笔记本
- 无服务器计算
- Databricks Connect
尽管 Spark Connect 和 Spark 经典都利用延迟执行转换,但在将现有代码从 Spark 经典迁移到 Spark Connect 或编写必须同时处理两者的代码时,需要了解的一些重要区别,以避免出现意外的行为和性能问题。
懒惰与急切
Spark Connect 和 Spark 经典版之间的主要区别在于 Spark Connect 将分析和名称解析推迟到执行时间,如下表所述。
| 方面 | Spark 经典 | Spark Connect |
|---|---|---|
| 查询执行 | 懒惰 | 懒惰 |
| 架构分析 | 渴望 | 懒惰 |
| 架构访问 | Local | 触发 RPC 并在首次访问时缓存架构 |
| 临时视图 | 嵌入的计划 | 名称查找 |
| UDF 序列化 | 创建时 | 执行时 |
查询执行
Spark 经典版和 Spark Connect 都遵循相同的延迟执行模型来执行查询。
在 Spark 经典版中,数据帧转换(如 filter 和 limit)是懒惰的。 这意味着它们不会立即执行,而是在逻辑计划中编码。 实际计算仅在某个操作(如 show(),collect())触发时进行。
Spark Connect 遵循类似的延迟评估模型。 转换在客户端构建,并作为未解析计划发送到服务器。 然后,当调用操作时,服务器将进行必要的分析和执行任务。
| 方面 | Spark 经典 | Spark Connect |
|---|---|---|
转换: df.filter(...)、、 df.select(...)、 df.limit(...) |
延迟执行 | 延迟执行 |
SQL 查询: spark.sql("select …") |
延迟执行 | 延迟执行 |
动作: df.collect(), df.show() |
立即执行 | 立即执行 |
SQL 命令: spark.sql("insert …")spark.sql("create …") |
立即执行 | 立即执行 |
架构分析
Spark Classic在逻辑计划构造期间主动进行分析。 此分析阶段将未解析的计划转换为完全解析的逻辑计划,并验证该作是否可以由 Spark 执行。 急切地执行这项工作的主要好处之一是用户在出错时立即收到反馈。 例如,执行 spark.sql("select 1 as a, 2 as b").filter("c > 1") 将立即引发错误,表示找不到列 c 。
Spark Connect 不同于经典版,因为客户端在转换期间构造未解析的计划并延迟其分析。 任何需要已解析计划的操作(例如访问架构、解释计划的过程、持久保存 DataFrame 或执行相关操作)都会导致客户端通过 RPC 将未解析的计划发送到服务器。 然后,服务器将进行全面分析,以获取解析的逻辑方案,并执行操作。 例如,spark.sql("select 1 as a, 2 as b").filter("c > 1") 不会引发任何错误,因为未解析的计划仅在客户端运行,但是在 df.columns 或 df.show() 上会出现错误,因为未解析的计划将发送到服务器进行分析。
与查询执行不同,Spark 经典和 Spark Connect 在架构分析发生时有所不同。
| 方面 | Spark 经典 | Spark Connect |
|---|---|---|
转换: df.filter(...)、、 df.select(...)、 df.limit(...) |
渴望 | 懒惰 |
架构访问:df.columns、、 df.schemadf.isStreaming |
渴望 | 渴望 触发分析 RPC 请求,与 Spark 经典版不同 |
动作: df.collect(), df.show() |
渴望 | 渴望 |
| 数据帧的依赖会话状态:UDF、临时视图、配置项 | 渴望 | 懒惰 在执行数据帧计划的过程中进行评估 |
| 临时视图的依赖会话状态:UDF、其他临时视图、配置 | 渴望 | 渴望 创建临时视图时,会急切地触发分析 |
最佳做法
延迟分析与急切分析之间的区别意味着,有一些最佳实践需要遵循,以避免意外行为和性能问题,特别是由于覆盖临时视图名称、在 UDF 中捕获外部变量、错误检测延迟以及新 DataFrame 上的过度模式访问所引起的。
创建唯一的临时视图名称
在 Spark Connect 中,DataFrame 仅按名称存储对临时视图的引用。 因此,如果临时视图稍后被替换,数据帧中的数据也会更改,因为它在执行时按名称查找视图。
此行为不同于 Spark 经典版,其中临时视图的逻辑计划在创建时嵌入到数据帧的计划中。 临时视图的任何后续替换都不会影响原始数据帧。
若要缓解差异,请始终创建唯一的临时视图名称。 例如,在视图名称中包含 UUID。 这可避免影响引用以前注册的临时视图的任何现有数据帧。
Python
import uuid
def create_temp_view_and_create_dataframe(x):
temp_view_name = f"`temp_view_{uuid.uuid4()}`" # Use a random name to avoid conflicts.
spark.range(x).createOrReplaceTempView(temp_view_name)
return spark.table(temp_view_name)
df10 = create_temp_view_and_create_dataframe(10)
assert len(df10.collect()) == 10
df100 = create_temp_view_and_create_dataframe(100)
assert len(df10.collect()) == 10 # It works as expected now.
assert len(df100.collect()) == 100
Scala
import java.util.UUID
def createTempViewAndDataFrame(x: Int) = {
val tempViewName = s"`temp_view_${UUID.randomUUID()}`"
spark.range(x).createOrReplaceTempView(tempViewName)
spark.table(tempViewName)
}
val df10 = createTempViewAndDataFrame(10)
assert(df10.collect().length == 10)
val df100 = createTempViewAndDataFrame(100)
assert(df10.collect().length == 10) // Works as expected
assert(df100.collect().length == 100)
封装 UDF 定义
UDFs 依赖于可变的外部变量通常被认为是不好的做法,因为这会引入隐式依赖,导致非确定性行为,并降低组合性。 但是,如果确实有这样的模式,请注意以下注意事项:
在 Spark Connect 中,Python UDF 延迟。 其序列化和注册将延迟到执行时间。 在以下示例中,UDF 仅在调用show()时才序列化并上传到 Spark 群集上执行。
from pyspark.sql.functions import udf
x = 123
@udf("INT")
def foo():
return x
df = spark.range(1).select(foo())
x = 456
df.show() # Prints 456
此行为不同于 Spark 经典版,其中 UDF 已预先创建。 在 Spark 经典版中,创建 UDF 时会捕获 x 的值,因此后续对 x 的更改不会影响已创建的 UDF。
如果需要修改 UDF 所依赖的外部变量的值,请使用函数工厂(使用及早绑定的闭包)来正确捕获变量值。 具体而言,用辅助函数包裹 UDF 的创建,以捕捉依赖变量的值。
Python
from pyspark.sql.functions import udf
def make_udf(value):
def foo():
return value
return udf(foo)
x = 123
foo_udf = make_udf(x)
x = 456
df = spark.range(1).select(foo_udf())
df.show() # Prints 123 as expected
Scala
def makeUDF(value: Int) = udf(() => value)
var x = 123
val fooUDF = makeUDF(x) // Captures the current value
x = 456
val df = spark.range(1).select(fooUDF())
df.show() // Prints 123 as expected
通过将 UDF 定义包装在另一个函数(make_udf)内,我们将创建一个新范围,其中当前值 x 作为参数传入。 这可确保每个生成的 UDF 都有自己的字段副本,在创建 UDF 时绑定。
触发错误检测的立即分析
以下错误处理在 Spark 经典版中很有用,因为它执行了预先分析,这允许及时引发异常。 但是,在 Spark Connect 中,此代码不会导致任何问题,因为它只构造本地未解析的计划,而不会触发任何分析。
df = spark.createDataFrame([("Alice", 25), ("Bob", 30)], ["name", "age"])
try:
df = df.select("name", "age")
df = df.withColumn(
"age_group",
when(col("age") < 18, "minor").otherwise("adult"))
df = df.filter(col("age_with_typo") > 6) # The use of non-existing column name will not throw analysis exception in Spark Connect
except Exception as e:
print(f"Error: {repr(e)}")
如果您的代码依赖分析异常并想要捕获它,可以主动触发分析,例如使用df.columns、df.schema或df.collect()。
Python
try:
df = ...
df.columns # This will trigger eager analysis
except Exception as e:
print(f"Error: {repr(e)}")
Scala
import org.apache.spark.SparkThrowable
import org.apache.spark.sql.functions._
val df = spark.createDataFrame(Seq(("Alice", 25), ("Bob", 30))).toDF("name", "age")
try {
val df2 = df.select("name", "age")
.withColumn("age_group", when(col("age") < 18, "minor").otherwise("adult"))
.filter(col("age_with_typo") > 6)
df2.columns // Trigger eager analysis to catch the error
} catch {
case e: SparkThrowable => println(s"Error: ${e.getMessage}")
}
避免过多的热切分析请求
如果避免对大量数据帧执行分析请求,则可以提高性能。
逐步创建新的 DataFrame,并在每次迭代中访问其架构
创建大量新的 DataFrame 时,请避免过度使用调用来触发对它们进行预先分析(例如 df.columns, df.schema)。 可以多次访问同一数据帧的架构,但对许多新创建的 DataFrame 触发分析会影响性能。
例如,在循环内以迭代方式将列添加到 DataFrame,并检查每个列是否已存在,然后再添加该列时,调用 df.columns 每个新建的数据帧都会在每个迭代时触发分析请求。 为了避免这种情况,请保留一组用于跟踪列名称,而不是重复访问 DataFrame 的架构。
Python
df = spark.range(10)
columns = set(df.columns) # Maintain the set of column names
for i in range(200):
new_column_name = str(i)
# if new_column_name not in df.columns: # Bad practice. The `df.columns` call causes an analysis request on the newly created DataFrame in every iteration.
if new_column_name not in columns: # Check the set without triggering analysis
df = df.withColumn(new_column_name, F.col("id") + i)
columns.add(new_column_name)
df.show()
Scala
import org.apache.spark.sql.functions._
var df = spark.range(10).toDF
val columns = scala.collection.mutable.Set(df.columns: _*)
for (i <- 0 until 200) {
val newColumnName = i.toString
// if (!df.columns.contains(newColumnName)) { // Bad practice. The `df.columns` call causes an analysis request on the newly created DataFrame in every iteration.
if (!columns.contains(newColumnName)) { // Check the set without triggering analysis
df = df.withColumn(newColumnName, col("id") + i)
columns.add(newColumnName)
}
}
df.show()
避免访问大量中间数据帧的架构
另一个类似的情况是创建大量不必要的中间数据帧并对其进行分析。 在以下情况下,若要从结构类型的每一列中提取字段名称,请直接从 DataFrame 的架构中获取 StructType 字段信息,而不是创建中间 DataFrame。
Python
from pyspark.sql.types import StructType
df = ...
struct_column_fields = {
# column_schema.name: df.select(column_schema.name + ".*").columns # Bad practice. This creates an intermediate DataFrame and triggers an analysis request for each StructType column.
column_schema.name: [f.name for f in column_schema.dataType.fields] # Access StructType fields directly from the schema, avoiding analysis on intermediate DataFrames.
for column_schema in df.schema
if isinstance(column_schema.dataType, StructType)
}
print(struct_column_fields)
Scala
import org.apache.spark.sql.types.StructType
df = ...
val structColumnFields = df.schema.fields
.filter(_.dataType.isInstanceOf[StructType])
.map { field =>
// field.name -> df.select(field.name + ".*").columns // Bad practice. This creates an intermediate DataFrame and triggers analysis for each StructType column.
field.name -> field.dataType.asInstanceOf[StructType].fields.map(_.name) // Access StructType fields directly from the schema, avoiding analysis on intermediate DataFrames.
}
.toMap
println(structColumnFields)