转换复杂数据类型
处理嵌套数据类型时,Azure Databricks 将优化某些现成的转换。 以下代码示例演示了在 Azure Databricks 中使用复杂和嵌套数据类型的模式。
用于访问嵌套数据的点表示法
可以使用点表示法 (.
) 访问嵌套字段。
Python
df.select("column_name.nested_field")
SQL
SELECT column_name.nested_field FROM table_name
选择所有嵌套字段
使用星号运算符 (*
) 选择给定字段中的所有字段。
注意
这只会在指定的深度解压缩嵌套字段。
Python
df.select("column_name.*")
SQL
SELECT column_name.* FROM table_name
创建新的嵌套字段
使用 struct()
函数创建新的嵌套字段。
Python
from pyspark.sql.functions import struct, col
df.select(struct(col("field_to_nest").alias("nested_field")).alias("column_name"))
SQL
SELECT struct(field_to_nest AS nested_field) AS column_name FROM table_name
将所有字段嵌套到列中
使用星号运算符 (*
) 将数据源中的所有字段嵌套为单个列。
Python
from pyspark.sql.functions import struct
df.select(struct("*").alias("column_name"))
SQL
SELECT struct(*) AS column_name FROM table_name
从嵌套列中选择命名字段
使用方括号 []
从列中选择嵌套字段。
Python
from pyspark.sql.functions import col
df.select(col("column_name")["field_name"])
SQL
SELECT column_name["field_name"] FROM table_name
从映射或数组中分解嵌套元素
使用 explode()
函数从 ARRAY
和 MAP
类型列解包值。
ARRAY
列将值存储为列表。 用 explode()
解包时,每个值都会成为输出中的一行。
Python
from pyspark.sql.functions import explode
df.select(explode("array_name").alias("column_name"))
SQL
SELECT explode(array_name) AS column_name FROM table_name
MAP
列将值存储为有序键值对。 用 explode()
解包时,每个键变为列,值变为行。
Python
from pyspark.sql.functions import explode
df.select(explode("map_name").alias("column1_name", "column2_name"))
SQL
SELECT explode(map_name) AS (column1_name, column2_name) FROM table_name
从列表或集创建数组
使用函数 collect_list()
或 collect_set()
将列的值转换为数组。 collect_list()
收集列中的所有值,同时 collect_set()
仅收集唯一的值。
注意
Spark 不保证由任一操作生成的数组中的项的顺序。
Python
from pyspark.sql.functions import collect_list, collect_set
df.select(collect_list("column_name").alias("array_name"))
df.select(collect_set("column_name").alias("set_name"))
SQL
SELECT collect_list(column_name) AS array_name FROM table_name;
SELECT collect_set(column_name) AS set_name FROM table_name;
从数组中的映射中选择列
还可以使用点表示法 (.
) 访问数组中包含的映射中的字段。 这会返回包含指定字段的所有值的数组。
考虑以下数据结构:
{
"column_name": [
{"field1": 1, "field2":"a"},
{"field1": 2, "field2":"b"}
]
}
可以使用以下查询以数组的形式从 field1
返回值:
Python
df.select("column_name.field1")
SQL
SELECT column_name.field1 FROM table_name
将嵌套数据转换为 JSON
使用 to_json
函数将复杂数据类型转换为 JSON。
Python
from pyspark.sql.functions import to_json
df.select(to_json("column_name").alias("json_name"))
SQL
SELECT to_json(column_name) AS json_name FROM table_name
若要对查询或 DataFrame 的所有内容进行编码,请将其与 struct(*)
结合。
Python
from pyspark.sql.functions import to_json, struct
df.select(to_json(struct("*")).alias("json_name"))
SQL
SELECT to_json(struct(*)) AS json_name FROM table_name
注意
Azure Databricks 还支持用 to_avro
和 to_protobuf
转换复杂数据类型,以便实现与集成系统的互操作性。
将 JSON 数据转换为复杂数据
使用 from_json
函数将 JSON 数据转换为本机复杂数据类型。
注意
必须为 JSON 数据指定架构。
Python
from pyspark.sql.functions import from_json
schema = "column1 STRING, column2 DOUBLE"
df.select(from_json("json_name", schema).alias("column_name"))
SQL
SELECT from_json(json_name, "column1 STRING, column2 DOUBLE") AS column_name FROM table_name
笔记本:转换复杂数据类型
以下笔记本提供了使用 Python、Scala 和 SQL 的复杂数据类型的示例。