Apache Spark Jobs hang due to non-deterministic custom UDF

Learn what to do when your Apache Spark job hangs due to a non-deterministic custom UDF.

Written by Adam Pavlacka

Last published at: May 10th, 2022

Problem

Sometimes Apache Spark jobs hang indefinitely due to the non-deterministic behavior of a Spark User-Defined Function (UDF). Here is an example of such a function:

%scala

val convertorUDF = (commentCol: String) =>
    {
              #UDF definition
    }
val translateColumn = udf(convertorUDF)

If you call this UDF using the withColumn() API and then apply some filter transformation on the resulting DataFrame, the UDF could potentially execute multiple times for each record, affecting application performance.

%scala

val translatedDF = df.withColumn("translatedColumn", translateColumn( df("columnToTranslate")))
val filteredDF = translatedDF.filter(!translatedDF("translatedColumn").contains("Invalid URL Provided")) && !translatedDF("translatedColumn").contains("Unable to connect to Microsoft API"))

Cause

Sometimes a deterministic UDF can behave nondeterministically, performing duplicate invocations depending on the definition of the UDF. You often see this behavior when you use a UDF on a DataFrame to add an additional column using the withColumn() API, and then apply a transformation (filter) to the resulting DataFrame.

Solution

UDFs must be deterministic. Due to optimization, duplicate invocations might be eliminated or the function can be invoked more times than it is present in the query.

The better option is to cache the DataFrame where you are using the UDF. If the DataFrame contains a large amount of data, then writing it to a Parquet format file is optimal.

You can use the following code to cache the result:

%scala

val translatedDF = df.withColumn("translatedColumn", translateColumn( df("columnToTranslate"))).cache()


Was this article helpful?