在 DLT 中使用
重要
该功能处于公开预览阶段。
本文介绍如何使用 DLT 中的 SQL 函数推断和改进 JSON Blob from_json
的架构。
概述
from_json
SQL 函数分析 JSON 字符串列并返回结构值。 在 DLT 外部使用时,您必须使用 schema
参数显式提供返回值的架构。 与 DLT 一起使用时,可以启用架构推理和演变,从而自动管理返回值的架构。 此功能简化了初始设置(尤其是在架构未知时)以及架构频繁更改时的持续操作。 它能够实现无缝处理来自流式数据源(例如 Auto Loader、Kafka 或 Kinesis)的任意 JSON 数据块。
具体而言,在 DLT 中使用时,SQL 函数的 from_json
架构推理和演变可以:
- 检测传入 JSON 记录中的新字段(包括嵌套 JSON 对象)
- 推断字段类型并将其映射到相应的 Spark 数据类型
- 自动改进架构以适应新字段
- 自动处理不符合当前架构的数据
语法:自动推断和改进架构
如果与 DLT 一起使用 from_json
,它可以自动推断和改进架构。 若要启用此功能,请将架构设置为 NULL 并指定 schemaLocationKey
选项。 这使它可以推断和跟踪架构。
SQL
from_json(jsonStr, NULL, map("schemaLocationKey", "<uniqueKey>" [, otherOptions]))
Python语言
from_json(jsonStr, None, {"schemaLocationKey": "<uniqueKey>"[, otherOptions]})
查询可以有多个 from_json
表达式,但每个表达式必须具有唯 schemaLocationKey
一的表达式。 每个管道的 schemaLocationKey
还必须唯一。
SQL
SELECT
value,
from_json(value, NULL, map('schemaLocationKey', 'keyX')) parsedX,
from_json(value, NULL, map('schemaLocationKey', 'keyY')) parsedY,
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')
Python语言
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "text")
.load("/databricks-datasets/nyctaxi/sample/json/")
.select(
col("value"),
from_json(col("value"), None, {"schemaLocationKey": "keyX"}).alias("parsedX"),
from_json(col("value"), None, {"schemaLocationKey": "keyY"}).alias("parsedY"))
)
语法:固定架构
如果要改为强制实施特定架构,可以使用以下 from_json
语法来分析使用该架构的 JSON 字符串:
from_json(jsonStr, schema, [, options])
此语法可在任何 Azure Databricks 环境中使用,包括 DLT。 此处提供了详细信息。
架构推理
from_json
从第一批 JSON 数据列推断出架构,并在内部根据其 schemaLocationKey
(必需)进行索引。
如果 JSON 字符串是单个对象(例如),{"id": 123, "name": "John"}
from_json
则推断 STRUCT 类型的架构,并向字段列表添加一个rescuedDataColumn
。
STRUCT<id LONG, name STRING, _rescued_data STRING>
但是,如果 JSON 字符串具有顶级数组(例如 ["id": 123, "name": "John"]
),则将 from_json
ARRAY 包装在 STRUCT 中。 此方法支持恢复与推断架构不兼容的数据。 可以选择将数组值分解为单独的下游行。
STRUCT<value ARRAY<id LONG, name STRING>, _rescued_data STRING>
使用架构提示替代架构推理
可以选择提供 schemaHints
以影响 from_json
如何推断列的类型。 如果你知道某个列采用特定的数据类型,或者想要选择更常规的数据类型(例如双精度型而不是整数),这将很有用。 可以使用 SQL 架构规范语法为列数据类型提供任意数量的提示。 架构提示的语义与自动加载程序 架构提示的语义相同。 例如:
SELECT
-- The JSON `{"a": 1}` will treat `a` as a BIGINT
from_json(data, NULL, map('schemaLocationKey', 'w', 'schemaHints', '')),
-- The JSON `{"a": 1}` will treat `a` as a STRING
from_json(data, NULL, map('schemaLocationKey', 'x', 'schemaHints', 'a STRING')),
-- The JSON `{"a": {"b": 1}}` will treat `a` as a MAP<STRING, BIGINT>
from_json(data, NULL, map('schemaLocationKey', 'y', 'schemaHints', 'a MAP<STRING, BIGINT'>)),
-- The JSON `{"a": {"b": 1}}` will treat `a` as a STRING
from_json(data, NULL, map('schemaLocationKey', 'z', 'schemaHints', 'a STRING')),
FROM STREAM READ_FILES(...)
当 JSON 字符串包含顶级 ARRAY 时,它将包装在 STRUCT 中。 在这些情况下,架构提示应用于 ARRAY 架构,而不是已封装的 STRUCT。 例如,设想一个包含顶层数组的 JSON 字符串,例如:
[{"id": 123, "name": "John"}]
推导出的 ARRAY 模式被包装在 STRUCT 中。
STRUCT<value ARRAY<id LONG, name STRING>, _rescued_data STRING>
若要更改数据类型 id
,请将架构提示指定为 element.id
STRING。 若要添加类型为 DOUBLE 的新列,请指定 element.new_col
DOUBLE。 由于这些提示,顶级 JSON 数组的架构将变为:
struct<value array<id STRING, name STRING, new_col DOUBLE>, _rescued_data STRING>
使用 schemaEvolutionMode
进行架构演变
from_json
在处理数据时检测到新的列的添加。
from_json
检测到新字段时,它会将新列合并到架构末尾,并将推断架构更新为最新架构。 现有列的数据类型将保持不变。 架构更新后,管道会自动重启并使用更新后的架构。
from_json
支持使用可选 schemaEvolutionMode
设置的以下架构演变模式。 这些模式与 自动加载程序一致。
schemaEvolutionMode |
读取新列时的行为 |
---|---|
addNewColumns (默认值) |
流失败。 新列将添加到架构。 现有列不会使数据类型演变。 |
rescue |
架构永远不会演变,流不会因架构更改而失败。 所有新列都记录在补救数据列中。 |
failOnNewColumns |
流失败。 流不会重启,除非更新了 schemaHints 或删除了有问题的数据。 |
none |
不会使架构演变,将忽略新列,并且除非设置 rescuedDataColumn 选项,否则不会补救数据。 流不会因为架构更改而失败。 |
例如:
SELECT
-- If a new column appears, the pipeline will automatically add it to the schema:
from_json(a, NULL, map('schemaLocationKey', 'w', 'schemaEvolutionMode', 'addNewColumns')),
-- If a new column appears, the pipeline will add it to the rescued data column:
from_json(b, NULL, map('schemaLocationKey', 'x', 'schemaEvolutionMode', 'rescue')),
-- If a new column appears, the pipeline will ignore it:
from_json(c, NULL, map('schemaLocationKey', 'y', 'schemaEvolutionMode', 'none')),
-- If a new column appears, the pipeline will fail:
from_json(d, NULL, map('schemaLocationKey', 'z', 'schemaEvolutionMode', 'failOnNewColumns')),
FROM STREAM READ_FILES(...)
恢复的数据列
补救数据列作为 _rescued_data
自动添加到架构中。 可以通过设置 rescuedDataColumn
选项来重命名列。 例如:
from_json(jsonStr, None, {"schemaLocationKey": "keyX", "rescuedDataColumn": "my_rescued_data"})
当您选择使用已获救的数据列时,与推断架构不匹配的任何列将被保留下来,而不是被删除。 这可能是因为数据类型不匹配、架构中缺少的列或列名大小写差异而发生。
处理损坏的记录
若要存储格式不正确且无法分析的记录,请通过设置架构提示添加 _corrupt_record
列,如以下示例所示:
CREATE STREAMING TABLE bronze AS
SELECT
from_json(value, NULL,
map('schemaLocationKey', 'nycTaxi',
'schemaHints', '_corrupt_record STRING',
'columnNameOfCorruptRecord', '_corrupt_record')) jsonCol
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')
若要重命名损坏的记录列,请设置 columnNameOfCorruptRecord
该选项。
JSON 分析器支持三种处理损坏记录的模式:
模式 | DESCRIPTION |
---|---|
PERMISSIVE |
对于损坏的记录,将格式错误的字符串放入由 columnNameOfCorruptRecord 配置的字段中,并将不正确格式的字段设置为 null 。 若要保留损坏的记录,可以设置以用户定义的架构命名 columnNameOfCorruptRecord 的字符串类型字段。 如果架构没有该字段,则会在分析期间删除损坏的记录。 在推断架构时,分析器会在输出架构中隐式添加字段 columnNameOfCorruptRecord 。 |
DROPMALFORMED |
忽略损坏的记录。 使用 DROPMALFORMED 模式与 rescuedDataColumn 时,数据类型不匹配不会导致记录被删除。 仅删除损坏的记录,例如不完整或格式不正确的 JSON。 |
FAILFAST |
解析器遇到损坏的记录时引发异常。 使用 FAILFAST 模式与 rescuedDataColumn 结合时,数据类型不匹配不会引发错误。 仅损坏的记录会引发错误,例如不完整或格式不正确的 JSON。 |
引用 from_json 输出中的字段
from_json
在管道执行期间推断架构。 如果下游查询在 from_json
函数至少执行成功一次之前引用 from_json
字段,则该字段不会被解析,查询将被跳过。 在以下示例中,执行 bronze 查询中的 from_json
函数并推断架构之前,将跳过对 silver 表查询的分析。
CREATE STREAMING TABLE bronze AS
SELECT
from_json(value, NULL, map('schemaLocationKey', 'nycTaxi')) jsonCol
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')
CREATE STREAMING TABLE silver AS
SELECT jsonCol.VendorID, jsonCol.total_amount
FROM bronze
from_json
如果同一查询中引用了函数及其推断的字段,则分析可能会失败,如以下示例所示:
CREATE STREAMING TABLE bronze AS
SELECT
from_json(value, NULL, map('schemaLocationKey', 'nycTaxi')) jsonCol
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')
WHERE jsonCol.total_amount > 100.0
可以通过将对字段的 from_json
引用移动到下游查询(如上面的青铜/白银示例)来解决此问题。或者,可以指定 schemaHints
包含引用 from_json
字段。 例如:
CREATE STREAMING TABLE bronze AS
SELECT
from_json(value, NULL, map('schemaLocationKey', 'nycTaxi', 'schemaHints', 'total_amount DOUBLE')) jsonCol
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')
WHERE jsonCol.total_amount > 100.0
示例:自动推断和改进架构
本部分提供用于在 DLT 中使用 from_json
自动架构推理和演变的示例代码。
从云对象存储生成流表
以下示例使用 read_files
语法从云对象存储创建流式表。
SQL
CREATE STREAMING TABLE bronze AS
SELECT
from_json(value, NULL, map('schemaLocationKey', 'nycTaxi')) jsonCol
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')
Python语言
@dlt.table(comment="from_json autoloader example")
def bronze():
return (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "text")
.load("/databricks-datasets/nyctaxi/sample/json/")
.select(from_json(col("value"), None, {"schemaLocationKey": "nycTaxi"}).alias("jsonCol"))
)
从 Kafka 创建流式处理表
以下示例使用 read_kafka
语法从 Kafka 创建流表。
SQL
CREATE STREAMING TABLE bronze AS
SELECT
value,
from_json(value, NULL, map('schemaLocationKey', 'keyX')) jsonCol,
FROM READ_KAFKA(
bootstrapSevers => '<server:ip>',
subscribe => 'events',
"startingOffsets", "latest"
)
Python语言
@dlt.table(comment="from_json kafka example")
def bronze():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "latest")
.load()
.select(col("value"), from_json(col("value"), None, {"schemaLocationKey": "keyX"}).alias("jsonCol"))
)
示例:固定架构
有关用于 from_json
固定架构的示例代码,请参阅 from_json
函数。
常见问题解答
本部分解答了有关函数中 from_json
架构推理和演变支持的常见问题。
from_json
和 parse_json
之间有何区别?
该 parse_json
函数从 JSON 字符串返回一个 VARIANT
值。
VARIANT 提供了一种灵活高效的方法来存储半结构化数据。 这通过完全消除严格类型来绕过模式推断和演化。 但是,如果要在写入时强制实施架构(例如,因为架构相对严格), from_json
可能是更好的选择。
下表描述了 from_json
和 parse_json
之间的差异。
功能 | 用例 | 可用性 |
---|---|---|
from_json |
使用 from_json 进行的架构演变会保持架构。 这在以下情况下很有用:
|
架构推理和演变仅在 DLT 中可用 |
parse_json |
VARIANT 特别适合保存不需要架构化的数据。 例如:
|
提供带 DLT 和不带 DLT 的选项 |
是否可以在 DLT 之外使用 from_json
架构推理和演变语法?
否,不能在 DLT 之外使用 from_json
架构推理和演变语法。
如何访问由 from_json
推断的模式?
查看目标流式处理表的架构。
是否可以传递 from_json
架构并执行演变?
不可以,不能传递 from_json
架构,也无法进行演变。 但是,你可以提供架构提示,以替代 from_json
推断的一些或全部字段。
如果完全刷新表,架构会发生什么情况?
清除与表关联的架构位置,并从头开始重新推断架构。