Compartir a través de

作为半结构化变体类型引入数据

重要

此功能目前以公共预览版提供。

在 Databricks Runtime 15.3 及更高版本中,可以使用VARIANT类型引入半结构化数据。 本文介绍了行为并提供使用自动加载器和COPY INTO从云对象存储引入数据的示例模式、Kafka 的流式处理记录以及用于使用变体数据新建表或使用变体类型插入新记录的 SQL 命令。 下表汇总了支持的文件格式和 Databricks Runtime 版本支持:

文件格式 支持的 Databricks Runtime 版本
JSON 15.3 及更高版本
XML 16.4 及更高版本
CSV 16.4 及更高版本

请参阅查询变体数据

创建一个包含变体列的表

VARIANT是 Databricks Runtime 15.3 及更高版本中的标准 SQL 类型,受 Delta Lake 支持的表支持。 Azure Databricks 上的托管表默认使用 Delta Lake,因此可以使用以下语法创建包含单个VARIANT列的空表:

CREATE TABLE table_name (variant_column VARIANT)

或者,可以使用 PARSE_JSON JSON 字符串上的函数或 FROM_XML XML 字符串上的函数来使用 CTAS 语句创建具有变体列的表。 以下示例创建一个包含两个列的表:

  • 从 JSON 字符串中提取的id列作为STRING类型。
  • variant_column列包含编码为VARIANT类型的整个 JSON 字符串。
CREATE TABLE table_name AS
  SELECT json_string:id AS id,
    PARSE_JSON(json_string) variant_column
  FROM source_data

注意

Databricks 建议将字段提取并存储为计划用于加速查询和优化存储布局的非变体列。

VARIANT列不能用于聚类分析键、分区或 Z 顺序键。 数据类型 VARIANT 无法用于比较、分组、排序和设置操作。 如需完整的限制列表,请参阅限制

使用parse_json插入数据

如果目标表已包含编码为VARIANT的列,则可以使用parse_json将 JSON 字符串记录作为VARIANT插入,如以下示例所示:

SQL

INSERT INTO table_name (variant_column)
  SELECT PARSE_JSON(json_string)
  FROM source_data

Python

from pyspark.sql.functions import col, parse_json

(spark.read
  .table("source_data")
  .select(parse_json(col("json_string")))
  .write
  .mode("append")
  .saveAsTable("table_name")
)

使用from_xml插入数据

如果目标表已包含编码为 VARIANT的列,则可以使用 from_xml 将 XML 字符串记录插入为 VARIANT。 例如:

SQL

INSERT INTO table_name (variant_column)
  SELECT FROM_XML(xml_string, 'variant')
  FROM source_data

Python

from pyspark.sql.functions import col, from_xml

(spark.read
  .table("source_data")
  .select(from_xml(col("xml_string"), "variant"))
  .write
  .mode("append")
  .saveAsTable("table_name")
)

使用from_csv插入数据

如果目标表已包含编码为 VARIANT的列,则可以使用 from_csv 将 XML 字符串记录插入为 VARIANT。 例如:

SQL

INSERT INTO table_name (variant_column)
  SELECT FROM_CSV(csv_string, 'v variant').v
  FROM source_data

Python

from pyspark.sql.functions import col, from_csv

(spark.read
  .table("source_data")
  .select(from_csv(col("csv_string"), "v variant").v)
  .write
  .mode("append")
  .saveAsTable("table_name")
)

将数据从云对象存储引入为变体

自动加载程序可用于将受支持的文件源中的所有数据作为目标表中的单个 VARIANT 列加载。 由于VARIANT能够灵活应对架构和类型更改,并且维护数据源中存在的区分大小写和NULL值,因此此模式对于大多数引入方案都十分可靠,但需要注意以下事项:

  • 格式不正确的记录不能使用 VARIANT 类型进行编码。
  • VARIANT类型只能容纳最大大小为 16mb 的记录。

注意

Variant 将超大型记录视为损坏记录。 在默认 PERMISSIVE 处理模式下,记录过大时会被捕获到 corruptRecordColumn 中。

整个记录被记录为单个 VARIANT 列,因此在引入期间不会发生架构演变,rescuedDataColumn 也不受支持。 以下示例假定目标表已存在单个VARIANT列。

(spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("singleVariantColumn", "variant_column")
  .load("/Volumes/catalog_name/schema_name/volume_name/path")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("table_name")
)

还可以在定义架构或传递VARIANT时指定schemaHints。 引用的源字段中的数据必须包含有效的记录。 以下示例演示了这些语法:

# Define the schema.
# Writes the columns `name` as a string and `address` as variant.
(spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  .schema("name STRING, address VARIANT")
  .load("/Volumes/catalog_name/schema_name/volume_name/path")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("table_name")
)

# Define the schema.
# A single field `payload` containing JSON data is written as variant.
(spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  .schema("payload VARIANT")
  .load("/Volumes/catalog_name/schema_name/volume_name/path")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("table_name")
)

# Supply schema hints.
# Writes the `address` column as variant.
# Infers the schema for other fields using standard rules.
(spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaHints", "address VARIANT")
  .load("/Volumes/catalog_name/schema_name/volume_name/path")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("table_name")
)

COPY INTO与变体配合使用

Databricks 建议在可用时使用自动加载程序,而不是 COPY INTO

COPY INTO 支持将支持的数据源的全部内容作为单个列引入。 以下示例新建包含单个VARIANT列的表,然后使用COPY INTO从 JSON 文件源引入记录。

CREATE TABLE table_name (variant_column VARIANT);
COPY INTO table_name
  FROM '/Volumes/catalog_name/schema_name/volume_name/path'
  FILEFORMAT = JSON
  FORMAT_OPTIONS ('singleVariantColumn' = 'name')

将 Kafka 数据作为变体流式传输

许多 Kafka 流使用 JSON 对有效负载进行编码。 使用VARIANT引入 Kafka 流会使这些工作负载对架构更改非常可靠。

以下示例演示了如何读取 Kafka 流式处理源,将key强制转换为STRING,将value强制转换为VARIANT,并写出到目标表。

from pyspark.sql.functions import col, parse_json

(spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .option("startingOffsets", "earliest")
  .load()
  .select(
    col("key").cast("string"),
    parse_json(col("value").cast("string"))
  ).writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("table_name")
)