在 DLT 中使用 from_json 来推理并演进架构

重要

该功能处于公开预览阶段。

本文介绍如何使用 DLT 中的 SQL 函数推断和改进 JSON Blob from_json 的架构。

概述

from_json SQL 函数分析 JSON 字符串列并返回结构值。 在 DLT 外部使用时,您必须使用 schema 参数显式提供返回值的架构。 与 DLT 一起使用时,可以启用架构推理和演变,从而自动管理返回值的架构。 此功能简化了初始设置(尤其是在架构未知时)以及架构频繁更改时的持续操作。 它能够实现无缝处理来自流式数据源(例如 Auto Loader、Kafka 或 Kinesis)的任意 JSON 数据块。

具体而言,在 DLT 中使用时,SQL 函数的 from_json 架构推理和演变可以:

  • 检测传入 JSON 记录中的新字段(包括嵌套 JSON 对象)
  • 推断字段类型并将其映射到相应的 Spark 数据类型
  • 自动改进架构以适应新字段
  • 自动处理不符合当前架构的数据

语法:自动推断和改进架构

如果与 DLT 一起使用 from_json ,它可以自动推断和改进架构。 若要启用此功能,请将架构设置为 NULL 并指定 schemaLocationKey 选项。 这使它可以推断和跟踪架构。

SQL

from_json(jsonStr, NULL, map("schemaLocationKey", "<uniqueKey>" [, otherOptions]))

Python语言

from_json(jsonStr, None, {"schemaLocationKey": "<uniqueKey>"[, otherOptions]})

查询可以有多个 from_json 表达式,但每个表达式必须具有唯 schemaLocationKey一的表达式。 每个管道的 schemaLocationKey 还必须唯一。

SQL

SELECT
  value,
  from_json(value, NULL, map('schemaLocationKey', 'keyX')) parsedX,
  from_json(value, NULL, map('schemaLocationKey', 'keyY')) parsedY,
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')

Python语言

(spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "text")
    .load("/databricks-datasets/nyctaxi/sample/json/")
    .select(
      col("value"),
      from_json(col("value"), None, {"schemaLocationKey": "keyX"}).alias("parsedX"),
      from_json(col("value"), None, {"schemaLocationKey": "keyY"}).alias("parsedY"))
)

语法:固定架构

如果要改为强制实施特定架构,可以使用以下 from_json 语法来分析使用该架构的 JSON 字符串:

from_json(jsonStr, schema, [, options])

此语法可在任何 Azure Databricks 环境中使用,包括 DLT。 此处提供了详细信息。

架构推理

from_json 从第一批 JSON 数据列推断出架构,并在内部根据其 schemaLocationKey(必需)进行索引。

如果 JSON 字符串是单个对象(例如),{"id": 123, "name": "John"}from_json则推断 STRUCT 类型的架构,并向字段列表添加一个rescuedDataColumn

STRUCT<id LONG, name STRING, _rescued_data STRING>

但是,如果 JSON 字符串具有顶级数组(例如 ["id": 123, "name": "John"]),则将 from_json ARRAY 包装在 STRUCT 中。 此方法支持恢复与推断架构不兼容的数据。 可以选择将数组值分解为单独的下游行。

STRUCT<value ARRAY<id LONG, name STRING>, _rescued_data STRING>

使用架构提示替代架构推理

可以选择提供 schemaHints 以影响 from_json 如何推断列的类型。 如果你知道某个列采用特定的数据类型,或者想要选择更常规的数据类型(例如双精度型而不是整数),这将很有用。 可以使用 SQL 架构规范语法为列数据类型提供任意数量的提示。 架构提示的语义与自动加载程序 架构提示的语义相同。 例如:

SELECT
-- The JSON `{"a": 1}` will treat `a` as a BIGINT
from_json(data, NULL, map('schemaLocationKey', 'w', 'schemaHints', '')),
-- The JSON `{"a": 1}` will treat `a` as a STRING
from_json(data, NULL, map('schemaLocationKey', 'x', 'schemaHints', 'a STRING')),
-- The JSON `{"a": {"b": 1}}` will treat `a` as a MAP<STRING, BIGINT>
from_json(data, NULL, map('schemaLocationKey', 'y', 'schemaHints', 'a MAP<STRING, BIGINT'>)),
-- The JSON `{"a": {"b": 1}}` will treat `a` as a STRING
from_json(data, NULL, map('schemaLocationKey', 'z', 'schemaHints', 'a STRING')),
FROM STREAM READ_FILES(...)

当 JSON 字符串包含顶级 ARRAY 时,它将包装在 STRUCT 中。 在这些情况下,架构提示应用于 ARRAY 架构,而不是已封装的 STRUCT。 例如,设想一个包含顶层数组的 JSON 字符串,例如:

[{"id": 123, "name": "John"}]

推导出的 ARRAY 模式被包装在 STRUCT 中。

STRUCT<value ARRAY<id LONG, name STRING>, _rescued_data STRING>

若要更改数据类型 id,请将架构提示指定为 element.id STRING。 若要添加类型为 DOUBLE 的新列,请指定 element.new_col DOUBLE。 由于这些提示,顶级 JSON 数组的架构将变为:

struct<value array<id STRING, name STRING, new_col DOUBLE>, _rescued_data STRING>

使用 schemaEvolutionMode 进行架构演变

from_json 在处理数据时检测到新的列的添加。 from_json 检测到新字段时,它会将新列合并到架构末尾,并将推断架构更新为最新架构。 现有列的数据类型将保持不变。 架构更新后,管道会自动重启并使用更新后的架构。

from_json 支持使用可选 schemaEvolutionMode 设置的以下架构演变模式。 这些模式与 自动加载程序一致。

schemaEvolutionMode 读取新列时的行为
addNewColumns(默认值) 流失败。 新列将添加到架构。 现有列不会使数据类型演变。
rescue 架构永远不会演变,流不会因架构更改而失败。 所有新列都记录在补救数据列中。
failOnNewColumns 流失败。 流不会重启,除非更新了 schemaHints 或删除了有问题的数据。
none 不会使架构演变,将忽略新列,并且除非设置 rescuedDataColumn 选项,否则不会补救数据。 流不会因为架构更改而失败。

例如:

SELECT
-- If a new column appears, the pipeline will automatically add it to the schema:
from_json(a, NULL, map('schemaLocationKey', 'w', 'schemaEvolutionMode', 'addNewColumns')),
-- If a new column appears, the pipeline will add it to the rescued data column:
from_json(b, NULL, map('schemaLocationKey', 'x', 'schemaEvolutionMode', 'rescue')),
-- If a new column appears, the pipeline will ignore it:
from_json(c, NULL, map('schemaLocationKey', 'y', 'schemaEvolutionMode', 'none')),
-- If a new column appears, the pipeline will fail:
from_json(d, NULL, map('schemaLocationKey', 'z', 'schemaEvolutionMode', 'failOnNewColumns')),
FROM STREAM READ_FILES(...)

恢复的数据列

补救数据列作为 _rescued_data 自动添加到架构中。 可以通过设置 rescuedDataColumn 选项来重命名列。 例如:

from_json(jsonStr, None, {"schemaLocationKey": "keyX", "rescuedDataColumn": "my_rescued_data"})

当您选择使用已获救的数据列时,与推断架构不匹配的任何列将被保留下来,而不是被删除。 这可能是因为数据类型不匹配、架构中缺少的列或列名大小写差异而发生。

处理损坏的记录

若要存储格式不正确且无法分析的记录,请通过设置架构提示添加 _corrupt_record 列,如以下示例所示:

CREATE STREAMING TABLE bronze AS
  SELECT
    from_json(value, NULL,
      map('schemaLocationKey', 'nycTaxi',
          'schemaHints', '_corrupt_record STRING',
          'columnNameOfCorruptRecord', '_corrupt_record')) jsonCol
  FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')

若要重命名损坏的记录列,请设置 columnNameOfCorruptRecord 该选项。

JSON 分析器支持三种处理损坏记录的模式:

模式 DESCRIPTION
PERMISSIVE 对于损坏的记录,将格式错误的字符串放入由 columnNameOfCorruptRecord 配置的字段中,并将不正确格式的字段设置为 null。 若要保留损坏的记录,可以设置以用户定义的架构命名 columnNameOfCorruptRecord 的字符串类型字段。 如果架构没有该字段,则会在分析期间删除损坏的记录。 在推断架构时,分析器会在输出架构中隐式添加字段 columnNameOfCorruptRecord
DROPMALFORMED 忽略损坏的记录。
使用 DROPMALFORMED 模式与 rescuedDataColumn 时,数据类型不匹配不会导致记录被删除。 仅删除损坏的记录,例如不完整或格式不正确的 JSON。
FAILFAST 解析器遇到损坏的记录时引发异常。
使用 FAILFAST 模式与 rescuedDataColumn 结合时,数据类型不匹配不会引发错误。 仅损坏的记录会引发错误,例如不完整或格式不正确的 JSON。

引用 from_json 输出中的字段

from_json 在管道执行期间推断架构。 如果下游查询在 from_json 函数至少执行成功一次之前引用 from_json 字段,则该字段不会被解析,查询将被跳过。 在以下示例中,执行 bronze 查询中的 from_json 函数并推断架构之前,将跳过对 silver 表查询的分析。

CREATE STREAMING TABLE bronze AS
  SELECT
    from_json(value, NULL, map('schemaLocationKey', 'nycTaxi')) jsonCol
  FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')

CREATE STREAMING TABLE silver AS
  SELECT jsonCol.VendorID, jsonCol.total_amount
  FROM bronze

from_json如果同一查询中引用了函数及其推断的字段,则分析可能会失败,如以下示例所示:

CREATE STREAMING TABLE bronze AS
  SELECT
    from_json(value, NULL, map('schemaLocationKey', 'nycTaxi')) jsonCol
  FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')
  WHERE jsonCol.total_amount > 100.0

可以通过将对字段的 from_json 引用移动到下游查询(如上面的青铜/白银示例)来解决此问题。或者,可以指定 schemaHints 包含引用 from_json 字段。 例如:

CREATE STREAMING TABLE bronze AS
  SELECT
    from_json(value, NULL, map('schemaLocationKey', 'nycTaxi', 'schemaHints', 'total_amount DOUBLE')) jsonCol
  FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')
  WHERE jsonCol.total_amount > 100.0

示例:自动推断和改进架构

本部分提供用于在 DLT 中使用 from_json 自动架构推理和演变的示例代码。

从云对象存储生成流表

以下示例使用 read_files 语法从云对象存储创建流式表。

SQL

CREATE STREAMING TABLE bronze AS
  SELECT
    from_json(value, NULL, map('schemaLocationKey', 'nycTaxi')) jsonCol
  FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')

Python语言

@dlt.table(comment="from_json autoloader example")
def bronze():
  return (
    spark.readStream
         .format("cloudFiles")
         .option("cloudFiles.format", "text")
         .load("/databricks-datasets/nyctaxi/sample/json/")
         .select(from_json(col("value"), None, {"schemaLocationKey": "nycTaxi"}).alias("jsonCol"))
)

从 Kafka 创建流式处理表

以下示例使用 read_kafka 语法从 Kafka 创建流表。

SQL

CREATE STREAMING TABLE bronze AS
  SELECT
    value,
    from_json(value, NULL, map('schemaLocationKey', 'keyX')) jsonCol,
  FROM READ_KAFKA(
    bootstrapSevers => '<server:ip>',
    subscribe => 'events',
    "startingOffsets", "latest"
)

Python语言

@dlt.table(comment="from_json kafka example")
def bronze():
  return (
    spark.readStream
         .format("kafka")
         .option("kafka.bootstrap.servers", "<server:ip>")
         .option("subscribe", "<topic>")
         .option("startingOffsets", "latest")
         .load()
         .select(col("value"), from_json(col("value"), None, {"schemaLocationKey": "keyX"}).alias("jsonCol"))
)

示例:固定架构

有关用于 from_json 固定架构的示例代码,请参阅 from_json 函数

常见问题解答

本部分解答了有关函数中 from_json 架构推理和演变支持的常见问题。

from_jsonparse_json 之间有何区别?

parse_json 函数从 JSON 字符串返回一个 VARIANT 值。

VARIANT 提供了一种灵活高效的方法来存储半结构化数据。 这通过完全消除严格类型来绕过模式推断和演化。 但是,如果要在写入时强制实施架构(例如,因为架构相对严格), from_json 可能是更好的选择。

下表描述了 from_jsonparse_json 之间的差异。

功能 用例 可用性
from_json 使用 from_json 进行的架构演变会保持架构。 这在以下情况下很有用:
  • 你想要强制实施数据架构(例如,查看每个架构更改然后再持久保留)。
  • 你想要优化存储,并且需要低查询延迟和成本。
  • 你希望遇到类型不匹配时数据时失败。
  • 你想要从损坏的 JSON 记录中提取部分结果,并将格式不正确的记录 _corrupt_record 存储在列中。 相比之下,对于无效的 JSON,VARIANT 引入会返回一个错误。
架构推理和演变仅在 DLT 中可用
parse_json VARIANT 特别适合保存不需要架构化的数据。 例如:
  • 你想要保留半结构化的数据,因为它很灵活。
  • 架构更改太快,无法将其强制转换为架构,而不频繁发生流故障和重启。
  • 不希望因数据类型不匹配而导致失败。 (对于有效的 JSON 记录,VARIANT 引入将始终成功 - 即使存在类型不匹配也是如此。)
  • 用户不想处理包含不符合架构的字段的已获救数据列。
提供带 DLT 和不带 DLT 的选项

是否可以在 DLT 之外使用 from_json 架构推理和演变语法?

否,不能在 DLT 之外使用 from_json 架构推理和演变语法。

如何访问由 from_json 推断的模式?

查看目标流式处理表的架构。

是否可以传递 from_json 架构并执行演变?

不可以,不能传递 from_json 架构,也无法进行演变。 但是,你可以提供架构提示,以替代 from_json 推断的一些或全部字段。

如果完全刷新表,架构会发生什么情况?

清除与表关联的架构位置,并从头开始重新推断架构。