Compare Spark Connect to Spark Classic

Spark Connect is a gRPC-based protocol within Apache Spark that specifies how a client application can communicate with a remote Spark Server. It allows remote execution of Spark workloads using the DataFrame API.

Spark Connect is used in the following:

  • Scala notebooks with Databricks Runtime version 13.3 and above, on standard and dedicated compute

  • Python notebooks with Databricks Runtime version 14.3 and above, on standard and dedicated compute

  • Databricks Connect

While both Spark Connect and Spark Classic utilize lazy execution for transformations, there are important differences to know to avoid unexpected behavior and performance issues when migrating existing code from Spark Classic to Spark Connect or when writing code that must work with both.

Lazy vs eager

The key difference between Spark Connect and Spark Classic is that Spark Connect defers analysis and name resolution to execution time, as summarized in the following table.

Aspect Spark Classic Spark Connect
Query execution Lazy Lazy
Schema analysis Eager Lazy
Schema access Local Triggers RPC
Temporary views Plan embedded Name lookup
UDF serialization At creation At execution

Query execution

Both Spark Classic and Spark Connect follow the same lazy execution model for query execution.

In Spark Classic, DataFrame transformations (such as filter and limit) are lazy. This means they are not executed immediately, but are recorded in a logical plan. The actual computation is triggered only when an action (such as show(), collect()) is invoked.

Spark Connect follows a similar lazy evaluation model. Transformations are constructed on the client side and sent as unresolved proto plans to the server. The server then performs the necessary analysis and execution when an action is called.

Aspect Spark Classic Spark Connect
Transformations: df.filter(...), df.select(...), df.limit(...) Lazy execution Lazy execution
SQL queries: spark.sql("select …") Lazy execution Lazy execution
Actions: df.collect(), df.show() Eager execution Eager execution
SQL commands: spark.sql("insert …"), spark.sql("create …") Eager execution Eager execution

Schema analysis

Spark Classic performs schema analysis eagerly during the logical plan construction phase. When you define transformations, Spark immediately analyzes the DataFrame's schema to ensure all referenced columns and data types are valid. For example, executing spark.sql("select 1 as a, 2 as b").filter("c > 1") will throw an error eagerly, indicating the column c cannot be found.

Spark Connect instead constructs unresolved proto plans during transformation. When accessing a schema or executing an action, the client sends the unresolved plans to the server via RPC (remote procedure call). The server then performs the analysis and execution. This design defers schema analysis. For example, spark.sql("select 1 as a, 2 as b").filter("c > 1") will not throw any error because the unresolved plan is client-side only, but on df.columns or df.show() an error will be thrown because the unresolved plan is sent to the server for analysis.

Unlike query execution, Spark Classic and Spark Connect differ in when schema analysis occurs.

Aspect Spark Classic Spark Connect
Transformations: df.filter(...), df.select(...), df.limit(...) Eager Lazy
Schema access: df.columns, df.schema, df.isStreaming Eager Eager
Triggers an analysis RPC request, unlike Spark Classic
Actions: df.collect(), df.show() Eager Eager
Dependent session state: UDFs, temporary views, configs Eager Lazy
Evaluated during the execution

Best practices

The difference between lazy and eager analysis means there are some best practices to follow to avoid unexpected behavior and performance issues, specifically those caused by overwriting of temporary view names, capturing external variables in UDFs, delayed error detection, and excessive schema access on new DataFrames.

Create unique temporary view names

In Spark Connect, the DataFrame stores only a reference to the temporary view by name. As a result, if the temp view is later replaced, the data in the DataFrame will also change because it looks up the view by name at execution time.

This behavior differs from Spark Classic, where the logical plan of the temp view is embedded into the data frame's plan at the time of creation. Any subsequent replacement of the temp view does not affect the original data frame.

To mitigate the difference, always create unique temporary view names. For example, include a UUID in the view name. This avoids affecting any existing DataFrames that reference a previously registered temp view.

Python

import uuid
def create_temp_view_and_create_dataframe(x):
  temp_view_name = f"`temp_view_{uuid.uuid4()}`"  # Use a random name to avoid conflicts.
  spark.range(x).createOrReplaceTempView(temp_view_name)
  return spark.table(temp_view_name)

df10 = create_temp_view_and_create_dataframe(10)
assert len(df10.collect()) == 10

df100 = create_temp_view_and_create_dataframe(100)
assert len(df10.collect()) == 10  # It works as expected now.
assert len(df100.collect()) == 100

Scala

import java.util.UUID

def createTempViewAndDataFrame(x: Int) = {
  val tempViewName = s"`temp_view_${UUID.randomUUID()}`"
  spark.range(x).createOrReplaceTempView(tempViewName)
  spark.table(tempViewName)
}

val df10 = createTempViewAndDataFrame(10)
assert(df10.collect().length == 10)

val df100 = createTempViewAndDataFrame(100)
assert(df10.collect().length == 10) // Works as expected
assert(df100.collect().length == 100)

Wrap UDF definitions

In Spark Connect, Python UDFs are lazy. Their serialization and registration are deferred until execution time. In the following example, the UDF is only serialized and uploaded to the Spark cluster for execution when show() is called.

from pyspark.sql.functions import udf

x = 123

@udf("INT")
def foo():
  return x

df = spark.range(1).select(foo())
x = 456
df.show() # Prints 456

This behavior differs from Spark Classic, where UDFs are eagerly created. In Spark Classic, the value of x at the time of UDF creation is captured, so subsequent changes to x do not affect the already-created UDF.

If you need to modify the value of external variables that a UDF depends on, use a function factory (closure with early binding) to correctly capture variable values. Specifically, wrap the UDF creation in a helper function to capture the value of a dependent variable.

Python

from pyspark.sql.functions import udf

def make_udf(value):
  def foo():
    return value
  return udf(foo)

x = 123
foo_udf = make_udf(x)
x = 456
df = spark.range(1).select(foo_udf())
df.show() # Prints 123 as expected

Scala

def makeUDF(value: Int) = udf(() => value)

var x = 123
val fooUDF = makeUDF(x)  // Captures the current value
x = 456
val df = spark.range(1).select(fooUDF())
df.show() // Prints 123 as expected

By wrapping the UDF definition inside another function (make_udf), we create a new scope where the current value of x is passed in as an argument. This ensures each generated UDF has its own copy of the field, bound at the time the UDF is created.

Trigger eager analysis for error detection

The following error handling is useful in Spark Classic because it performs eager analysis, which allows exceptions to be thrown promptly. However, in Spark Connect, this code does not cause any issue, as it only constructs a local unresolved proto plan without triggering any analysis.

df = spark.createDataFrame([("Alice", 25), ("Bob", 30)], ["name", "age"])

try:
  df = df.select("name", "age")
  df = df.withColumn(
      "age_group",
      when(col("age") < 18, "minor").otherwise("adult"))
  df = df.filter(col("age_with_typo") > 6) # The use of non-existing column name will not throw analysis exception in Spark Connect
except Exception as e:
  print(f"Error: {repr(e)}")

If your code relies on the analysis exception and you want to catch it, you can trigger eager analysis, for example with df.columns, df.schema, or df.collect().

Python

try:
  df = ...
  df.columns # This will trigger eager analysis
except Exception as e:
  print(f"Error: {repr(e)}")

Scala

import org.apache.spark.SparkThrowable
import org.apache.spark.sql.functions._

val df = spark.createDataFrame(Seq(("Alice", 25), ("Bob", 30))).toDF("name", "age")

try {
  val df2 = df.select("name", "age")
    .withColumn("age_group", when(col("age") < 18, "minor").otherwise("adult"))
    .filter(col("age_with_typo") > 6)
  df2.columns // Trigger eager analysis to catch the error
} catch {
  case e: SparkThrowable => println(s"Error: ${e.getMessage}")
}

Avoid too many eager analysis requests

Performance can be improved if you avoid large numbers of analyze requests by avoiding excessive usage of calls triggering eager analysis (such as df.columns, df.schema).

If you cannot avoid this and must frequently check columns of new data frames, maintain a set to track column names to avoid analysis requests.

Python

df = spark.range(10)
columns = set(df.columns) # Maintain the set of column names
for i in range(200):
  new_column_name = str(i)
  if new_column_name not in columns: # Check the set
    df = df.withColumn(new_column_name, F.col("id") + i)
    columns.add(new_column_name)
df.show()

Scala

import org.apache.spark.sql.functions._

var df = spark.range(10).toDF
val columns = scala.collection.mutable.Set(df.columns: _*)
for (i <- 0 until 200) {
  val newColumnName = i.toString
  if (!columns.contains(newColumnName)) {
    df = df.withColumn(newColumnName, col("id") + i)
    columns.add(newColumnName)
  }
}
df.show()

Another similar case is creating a large number of unnecessary intermediate DataFrames and analyzing them. Instead, obtain StructType field information directly from the DataFrame's schema instead of creating intermediate DataFrames.

Python

from pyspark.sql.types import StructType

df = ...
struct_column_fields = {
    column_schema.name: [f.name for f in column_schema.dataType.fields]
    for column_schema in df.schema
    if isinstance(column_schema.dataType, StructType)
}
print(struct_column_fields)

Scala

import org.apache.spark.sql.types.StructType

df = ...
val structColumnFields = df.schema.fields
  .filter(_.dataType.isInstanceOf[StructType])
  .map { field =>
    field.name -> field.dataType.asInstanceOf[StructType].fields.map(_.name)
  }
  .toMap
println(structColumnFields)