如何更新嵌套列How to update nested columns

Spark 不支持在嵌套结构中添加新列或删除现有列。Spark doesn’t support adding new columns or dropping existing columns in nested structures. 具体而言,Dataset 类的 withColumndrop 方法不允许你指定与任何顶级列均不同的列名。In particular, the withColumn and drop methods of the Dataset class don’t allow you to specify a column name different from any top level columns. 例如,假设你的数据集具有以下架构:For example, suppose you have a dataset with the following schema:

val schema = (new StructType)
      .add("metadata",(new StructType)
             .add("eventid", "string", true)
             .add("hostname", "string", true)
             .add("timestamp", "string", true)
           , true)
      .add("items", (new StructType)
             .add("books", (new StructType).add("fees", "double", true), true)
             .add("paper", (new StructType).add("pages", "int", true), true)
           ,true)
schema.treeString

架构如下所示:The schema looks like:

root
 |-- metadata: struct (nullable = true)
 |    |-- eventid: string (nullable = true)
 |    |-- hostname: string (nullable = true)
 |    |-- timestamp: string (nullable = true)
 |-- items: struct (nullable = true)
 |    |-- books: struct (nullable = true)
 |    |    |-- fees: double (nullable = true)
 |    |-- paper: struct (nullable = true)
 |    |    |-- pages: integer (nullable = true)

假设你有数据帧:Suppose you have the DataFrame:

val rdd: RDD[Row] = sc.parallelize(Seq(Row(
  Row("eventid1", "hostname1", "timestamp1"),
  Row(Row(100.0), Row(10)))))
val df = spark.createDataFrame(rdd, schema)
display(df)

你想要将 books 下嵌套的 fees 列增加 1%。You want to increase the fees column, which is nested under books, by 1%. 若要更新 fees 列,可以从现有列和已更新列重新构造数据集,如下所示:To update the fees column, you can reconstruct the dataset from existing columns and the updated column as follows:

val updated = df.selectExpr("""
    named_struct(
        'metadata', metadata,
        'items', named_struct(
          'books', named_struct('fees', items.books.fees * 1.01),
          'paper', items.paper
        )
    ) as named_struct
""").select($"named_struct.metadata", $"named_struct.items")
updated.show(false)

然后会得到以下结果:Then you will get the result:

+-----------------------------------+-----------------+
| metadata                          | items           |
+===================================+=================+
| [eventid1, hostname1, timestamp1] | [[101.0], [10]] |
+-----------------------------------+-----------------+