作为半结构化变体类型引入数据

重要

此功能目前以公共预览版提供。

在 Databricks Runtime 15.3 及更高版本中,可以使用VARIANT类型引入半结构化数据。 本文介绍了行为并提供使用自动加载器和COPY INTO从云对象存储引入数据的示例模式、Kafka 的流式处理记录以及用于使用变体数据新建表或使用变体类型插入新记录的 SQL 命令。

请参阅查询变体数据

创建一个包含变体列的表

VARIANT是 Databricks Runtime 15.3 及更高版本中的标准 SQL 类型,受 Delta Lake 支持的表支持。 Azure Databricks 上的托管表默认使用 Delta Lake,因此可以使用以下语法创建包含单个VARIANT列的空表:

CREATE TABLE table_name (variant_column VARIANT)

或者,可以使用 JSON 字符串上的PARSE_JSON函数来使用 CTAS 语句创建具有变体列的表。 以下示例创建一个包含两个列的表:

  • 从 JSON 字符串中提取的id列作为STRING类型。
  • variant_column列包含编码为VARIANT类型的整个 JSON 字符串。
CREATE TABLE table_name AS
  SELECT json_string:id AS id,
    PARSE_JSON(json_string) variant_column
  FROM source_data

注意

VARIANT列不能用于聚类分析键、分区或 Z 顺序键。 使用VARIANT类型存储的数据不能用于比较和排序。

Databricks 建议将字段提取并存储为计划用于加速查询和优化存储布局的非变体列。

使用parse_json插入数据

如果目标表已包含编码为VARIANT的列,则可以使用parse_json将 JSON 字符串记录作为VARIANT插入,如以下示例所示:

SQL

INSERT INTO table_name (variant_column)
  SELECT PARSE_JSON(json_string)
  FROM source_data

Python

from pyspark.sql.functions import col, parse_json

(spark.read
  .table("source_data")
  .select(parse_json(col("json_string")))
  .write
  .mode("append")
  .saveAsTable("table_name")
)

将数据从云对象存储引入为变体

在 Databricks Runtime 15.3 及更高版本中,可以使用自动加载程序将 JSON 源中的所有数据作为目标表中的单个VARIANT列加载。 由于VARIANT能够灵活应对架构和类型更改,并且维护数据源中存在的区分大小写和NULL值,因此此模式对于大多数引入方案都十分可靠,但需要注意以下事项:

  • 格式错误的 JSON 记录不能使用VARIANT类型进行编码。
  • VARIANT类型只能容纳最大大小为 16mb 的记录。

注意

变体处理与损坏记录类似的超大型记录。 在默认的PERMISSIVE处理模式下,会在_malformed_data列中捕获超大型记录以及格式错误的 JSON 记录。

由于 JSON 源中的所有数据都记录为单个 VARIANT 列,因此引入期间不会发生架构演变且不支持 rescuedDataColumn。 以下示例假定目标表已存在单个VARIANT列。

(spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("singleVariantColumn", "variant_column")
  .load("/Volumes/catalog_name/schema_name/volume_name/path")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("table_name")
)

还可以在定义架构或传递schemaHints时指定VARIANT。 引用的源字段中的数据必须包含有效的 JSON 字符串。 以下示例演示了这些语法:

# Define the schema.
# Writes the columns `name` as a string and `address` as variant.
(spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  .schema("name STRING, address VARIANT")
  .load("/Volumes/catalog_name/schema_name/volume_name/path")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("table_name")
)

# Define the schema.
# A single field `payload` containing JSON data is written as variant.
(spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  .schema("payload VARIANT")
  .load("/Volumes/catalog_name/schema_name/volume_name/path")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("table_name")
)

# Supply schema hints.
# Writes the `address` column as variant.
# Infers the schema for other fields using standard rules.
(spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaHints", "address VARIANT")
  .load("/Volumes/catalog_name/schema_name/volume_name/path")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("table_name")
)

COPY INTO与变体配合使用

Databricks 建议在可用时使用自动加载程序,而不是 COPY INTO

COPY INTO支持将 JSON 数据源的全部内容引入为单个列。 以下示例新建包含单个VARIANT列的表,然后使用COPY INTO从 JSON 文件源引入记录。

CREATE TABLE table_name (variant_column VARIANT);
COPY INTO table_name
  FROM '/Volumes/catalog_name/schema_name/volume_name/path'
  FILEFORMAT = JSON
  FORMAT_OPTIONS ('singleVariantColumn' = 'name')

还可以将目标表中的任何字段定义为VARIANT。 运行COPY INTO时,数据源中的相应字段会引入并强制转换为VARIANT类型,如以下示例所示:

-- Extracts the `address` field from the JSON record and casts to variant
CREATE TABLE table_name (address VARIANT);
COPY INTO table_name
  FROM '/Volumes/catalog_name/schema_name/volume_name/path'
  FILEFORMAT = JSON

-- Extracts `name` and `address` from the JSON record and casts `address` to variant
CREATE TABLE table_name (name STRING, address VARIANT);
COPY INTO table_name
  FROM '/Volumes/catalog_name/schema_name/volume_name/path'
  FILEFORMAT = JSON

将 Kafka 数据作为变体流式传输

许多 Kafka 流使用 JSON 对有效负载进行编码。 使用VARIANT引入 Kafka 流会使这些工作负载对架构更改非常可靠。

以下示例演示了如何读取 Kafka 流式处理源,将key强制转换为STRING,将value强制转换为VARIANT,并写出到目标表。

from pyspark.sql.functions import col, parse_json

(spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .option("startingOffsets", "earliest")
  .load()
  .select(
    col("key").cast("string"),
    parse_json(col("value").cast("string"))
  ).writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("table_name")
)