转换复杂数据类型

处理嵌套数据类型时,Azure Databricks 将优化某些现成的转换。 以下代码示例演示了在 Azure Databricks 中使用复杂和嵌套数据类型的模式。

用于访问嵌套数据的点表示法

可以使用点表示法 (.) 访问嵌套字段。

Python

df.select("column_name.nested_field")

SQL

SELECT column_name.nested_field FROM table_name

选择所有嵌套字段

使用星号运算符 (*) 选择给定字段中的所有字段。

注意

这只会在指定的深度解压缩嵌套字段。

Python

df.select("column_name.*")

SQL

SELECT column_name.* FROM table_name

创建新的嵌套字段

使用 struct() 函数创建新的嵌套字段。

Python

from pyspark.sql.functions import struct, col

df.select(struct(col("field_to_nest").alias("nested_field")).alias("column_name"))

SQL

SELECT struct(field_to_nest AS nested_field) AS column_name FROM table_name

将所有字段嵌套到列中

使用星号运算符 (*) 将数据源中的所有字段嵌套为单个列。

Python

from pyspark.sql.functions import struct

df.select(struct("*").alias("column_name"))

SQL

SELECT struct(*) AS column_name FROM table_name

从嵌套列中选择命名字段

使用方括号 [] 从列中选择嵌套字段。

Python

from pyspark.sql.functions import col

df.select(col("column_name")["field_name"])

SQL

SELECT column_name["field_name"] FROM table_name

从映射或数组中分解嵌套元素

使用 explode() 函数从 ARRAYMAP 类型列解包值。

ARRAY 列将值存储为列表。 用 explode() 解包时,每个值都会成为输出中的一行。

Python

from pyspark.sql.functions import explode

df.select(explode("array_name").alias("column_name"))

SQL

SELECT explode(array_name) AS column_name FROM table_name

MAP 列将值存储为有序键值对。 用 explode() 解包时,每个键变为列,值变为行。

Python

from pyspark.sql.functions import explode

df.select(explode("map_name").alias("column1_name", "column2_name"))

SQL

SELECT explode(map_name) AS (column1_name, column2_name) FROM table_name

从列表或集创建数组

使用函数 collect_list()collect_set() 将列的值转换为数组。 collect_list() 收集列中的所有值,同时 collect_set() 仅收集唯一的值。

注意

Spark 不保证由任一操作生成的数组中的项的顺序。

Python

from pyspark.sql.functions import collect_list, collect_set

df.select(collect_list("column_name").alias("array_name"))
df.select(collect_set("column_name").alias("set_name"))

SQL

SELECT collect_list(column_name) AS array_name FROM table_name;
SELECT collect_set(column_name) AS set_name FROM table_name;

从数组中的映射中选择列

还可以使用点表示法 (.) 访问数组中包含的映射中的字段。 这会返回包含指定字段的所有值的数组。

考虑以下数据结构:

{
  "column_name": [
    {"field1": 1, "field2":"a"},
    {"field1": 2, "field2":"b"}
  ]
}

可以使用以下查询以数组的形式从 field1 返回值:

Python

df.select("column_name.field1")

SQL

SELECT column_name.field1 FROM table_name

将嵌套数据转换为 JSON

使用 to_json 函数将复杂数据类型转换为 JSON。

Python

from pyspark.sql.functions import to_json

df.select(to_json("column_name").alias("json_name"))

SQL

SELECT to_json(column_name) AS json_name FROM table_name

若要对查询或 DataFrame 的所有内容进行编码,请将其与 struct(*) 结合。

Python

from pyspark.sql.functions import to_json, struct

df.select(to_json(struct("*")).alias("json_name"))

SQL

SELECT to_json(struct(*)) AS json_name FROM table_name

注意

Azure Databricks 还支持用 to_avroto_protobuf 转换复杂数据类型,以便实现与集成系统的互操作性。

将 JSON 数据转换为复杂数据

使用 from_json 函数将 JSON 数据转换为本机复杂数据类型。

注意

必须为 JSON 数据指定架构。

Python

from pyspark.sql.functions import from_json

schema = "column1 STRING, column2 DOUBLE"

df.select(from_json("json_name", schema).alias("column_name"))

SQL

SELECT from_json(json_name, "column1 STRING, column2 DOUBLE") AS column_name FROM table_name

笔记本:转换复杂数据类型

以下笔记本提供了使用 Python、Scala 和 SQL 的复杂数据类型的示例。

转换复杂数据类型 - Python 笔记本

获取笔记本

转换复杂数据类型 - Scala 笔记本

获取笔记本

转换复杂数据类型 - SQL 笔记本

获取笔记本