作为半结构化变体类型引入数据
重要
此功能目前以公共预览版提供。
在 Databricks Runtime 15.3 及更高版本中,可以使用VARIANT
类型引入半结构化数据。 本文介绍了行为并提供使用自动加载器和COPY INTO
从云对象存储引入数据的示例模式、Kafka 的流式处理记录以及用于使用变体数据新建表或使用变体类型插入新记录的 SQL 命令。
请参阅查询变体数据。
创建一个包含变体列的表
VARIANT
是 Databricks Runtime 15.3 及更高版本中的标准 SQL 类型,受 Delta Lake 支持的表支持。 Azure Databricks 上的托管表默认使用 Delta Lake,因此可以使用以下语法创建包含单个VARIANT
列的空表:
CREATE TABLE table_name (variant_column VARIANT)
或者,可以使用 JSON 字符串上的PARSE_JSON
函数来使用 CTAS 语句创建具有变体列的表。 以下示例创建一个包含两个列的表:
- 从 JSON 字符串中提取的
id
列作为STRING
类型。 variant_column
列包含编码为VARIANT
类型的整个 JSON 字符串。
CREATE TABLE table_name AS
SELECT json_string:id AS id,
PARSE_JSON(json_string) variant_column
FROM source_data
注意
VARIANT
列不能用于聚类分析键、分区或 Z 顺序键。 使用VARIANT
类型存储的数据不能用于比较和排序。
Databricks 建议将字段提取并存储为计划用于加速查询和优化存储布局的非变体列。
使用parse_json
插入数据
如果目标表已包含编码为VARIANT
的列,则可以使用parse_json
将 JSON 字符串记录作为VARIANT
插入,如以下示例所示:
SQL
INSERT INTO table_name (variant_column)
SELECT PARSE_JSON(json_string)
FROM source_data
Python
from pyspark.sql.functions import col, parse_json
(spark.read
.table("source_data")
.select(parse_json(col("json_string")))
.write
.mode("append")
.saveAsTable("table_name")
)
将数据从云对象存储引入为变体
在 Databricks Runtime 15.3 及更高版本中,可以使用自动加载程序将 JSON 源中的所有数据作为目标表中的单个VARIANT
列加载。 由于VARIANT
能够灵活应对架构和类型更改,并且维护数据源中存在的区分大小写和NULL
值,因此此模式对于大多数引入方案都十分可靠,但需要注意以下事项:
- 格式错误的 JSON 记录不能使用
VARIANT
类型进行编码。 VARIANT
类型只能容纳最大大小为 16mb 的记录。
注意
变体处理与损坏记录类似的超大型记录。 在默认的PERMISSIVE
处理模式下,会在_malformed_data
列中捕获超大型记录以及格式错误的 JSON 记录。
由于 JSON 源中的所有数据都记录为单个 VARIANT
列,因此引入期间不会发生架构演变且不支持 rescuedDataColumn
。 以下示例假定目标表已存在单个VARIANT
列。
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("singleVariantColumn", "variant_column")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)
还可以在定义架构或传递schemaHints
时指定VARIANT
。 引用的源字段中的数据必须包含有效的 JSON 字符串。 以下示例演示了这些语法:
# Define the schema.
# Writes the columns `name` as a string and `address` as variant.
(spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.schema("name STRING, address VARIANT")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)
# Define the schema.
# A single field `payload` containing JSON data is written as variant.
(spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.schema("payload VARIANT")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)
# Supply schema hints.
# Writes the `address` column as variant.
# Infers the schema for other fields using standard rules.
(spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaHints", "address VARIANT")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)
将COPY INTO
与变体配合使用
Databricks 建议在可用时使用自动加载程序,而不是 COPY INTO
。
COPY INTO
支持将 JSON 数据源的全部内容引入为单个列。 以下示例新建包含单个VARIANT
列的表,然后使用COPY INTO
从 JSON 文件源引入记录。
CREATE TABLE table_name (variant_column VARIANT);
COPY INTO table_name
FROM '/Volumes/catalog_name/schema_name/volume_name/path'
FILEFORMAT = JSON
FORMAT_OPTIONS ('singleVariantColumn' = 'name')
还可以将目标表中的任何字段定义为VARIANT
。 运行COPY INTO
时,数据源中的相应字段会引入并强制转换为VARIANT
类型,如以下示例所示:
-- Extracts the `address` field from the JSON record and casts to variant
CREATE TABLE table_name (address VARIANT);
COPY INTO table_name
FROM '/Volumes/catalog_name/schema_name/volume_name/path'
FILEFORMAT = JSON
-- Extracts `name` and `address` from the JSON record and casts `address` to variant
CREATE TABLE table_name (name STRING, address VARIANT);
COPY INTO table_name
FROM '/Volumes/catalog_name/schema_name/volume_name/path'
FILEFORMAT = JSON
将 Kafka 数据作为变体流式传输
许多 Kafka 流使用 JSON 对有效负载进行编码。 使用VARIANT
引入 Kafka 流会使这些工作负载对架构更改非常可靠。
以下示例演示了如何读取 Kafka 流式处理源,将key
强制转换为STRING
,将value
强制转换为VARIANT
,并写出到目标表。
from pyspark.sql.functions import col, parse_json
(spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.option("startingOffsets", "earliest")
.load()
.select(
col("key").cast("string"),
parse_json(col("value").cast("string"))
).writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)