某些文件中的架构不兼容Incompatible schema in some files

问题Problem

在读取 Parquet 文件时,Spark 作业会失败并显示如下异常:The Spark job fails with an exception like the following while reading Parquet files:

Error in SQL statement: SparkException: Job aborted due to stage failure:
Task 20 in stage 11227.0 failed 4 times, most recent failure: Lost task 20.3 in stage 11227.0
(TID 868031, 10.111.245.219, executor 31):
java.lang.UnsupportedOperationException: org.apache.parquet.column.values.dictionary.PlainValuesDictionary$PlainDoubleDictionary
    at org.apache.parquet.column.Dictionary.decodeToLong(Dictionary.java:52)

原因Cause

此实例中的 java.lang.UnsupportedOperationException 是由写入到 Parquet 文件夹中的一个或多个具有不兼容架构的 Parquet 文件导致的。The java.lang.UnsupportedOperationException in this instance is caused by one or more Parquet files written to a Parquet folder with an incompatible schema.

解决方案Solution

找到这些 Parquet 文件,然后使用正确的架构重新编写这些文件。Find the Parquet files and rewrite them with the correct schema. 尝试在启用架构合并的情况下读取 Parquet 数据集:Try to read the Parquet dataset with schema merging enabled:

spark.read.option("mergeSchema", "true").parquet(path)

or

spark.conf.set("spark.sql.parquet.mergeSchema", "true")
spark.read.parquet(path)

如果 Parquet 文件确实具有不兼容的架构,那么上面的代码片段会输出一个错误,其中包含具有错误架构的文件的名称。If you do have Parquet files with incompatible schemas, the snippets above will output an error with the name of the file that has the wrong schema.

还可以使用 merge 方法检查两个架构是否兼容。You can also check if two schemas are compatible by using the merge method. 例如,假设你有以下两个架构:For example, let’s say you have these two schemas:

import org.apache.spark.sql.types._

val struct1 = (new StructType)
  .add("a", "int", true)
  .add("b", "long", false)

val struct2 = (new StructType)
  .add("a", "int", true)
  .add("b", "long", false)
  .add("c", "timestamp", true)

然后,可以测试它们是否兼容:Then you can test if they are compatible:

struct1.merge(struct2).treeString

这会出现以下结果:This will give you:

res0: String =
"root
|-- a: integer (nullable = true)
|-- b: long (nullable = false)
|-- c: timestamp (nullable = true)
"

但是,如果 struct2 具有以下不兼容的架构:However, if struct2 has the following incompatible schema:

val struct2 = (new StructType)
  .add("a", "int", true)
  .add("b", "string", false)

则此测试将为你提供以下 SparkExceptionThen the test will give you the following SparkException:

org.apache.spark.SparkException: Failed to merge fields 'b' and 'b'. Failed to merge incompatible data types LongType and StringType