在自动加载程序中配置架构推理和演化

可以将自动加载程序配置为自动检测已加载数据的架构,这样无需显式声明数据架构即可初始化表,并在引入新列时让表架构完成相应的演变。 这样就无需一直手动跟踪和应用架构更改。

当 JSON Blob 列中出现意外的数据(例如不同类型的数据,以后可以选择使用半结构化数据访问 API 访问这些数据)时,自动加载程序还可以提供“补救”措施。

架构推理和演变支持以下格式:

文件格式 支持的版本
JSON 所有版本
CSV 所有版本
XML Databricks Runtime 14.3 LTS 及更高版本
Avro Databricks Runtime 10.4 LTS 及更高版本
Parquet Databricks Runtime 11.3 LTS 及更高版本
ORC 不支持
Text 不适用(固定架构)
Binaryfile 不适用(固定架构)

架构推理的语法及演变

为选项 cloudFiles.schemaLocation 指定目标目录可实现架构推理和演变。 可以选择使用为 checkpointLocation 指定的目录。 如果使用增量实时表,Azure Databricks 会自动管理架构位置和其他检查点信息。

备注

如果从多个源数据位置向目标表加载内容,则每个自动加载程序引入工作负载都需要单独的流式处理检查点。

下面的示例将 parquet 用于 cloudFiles.format。 将 csvavrojson 用于其他文件源。 每种格式的默认行为的其他所有读取和写入设置都保持不变。

Python

(spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "parquet")
  # The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
  .load("<path-to-source-data>")
  .writeStream
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")
)

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "parquet")
  // The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
  .load("<path-to-source-data>")
  .writeStream
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

自动加载程序架构推理的工作原理是怎样的?

若要在首次读取数据时推理架构,自动加载程序会在它发现的前 50 GB 或前 1000 个文件(以先超过的限制为准)中采样。 自动加载程序将架构信息存储在配置为 cloudFiles.schemaLocation 的目录 _schemas 中以跟踪之后对输入数据的架构更改。

注意

若要更改所用样本的大小,可以设置 SQL 配置:

spark.databricks.cloudFiles.schemaInference.sampleSize.numBytes

(字节字符串,例如 10gb

以及

spark.databricks.cloudFiles.schemaInference.sampleSize.numFiles

(整数)

默认情况下,自动加载程序架构推理会试图避免由于类型不匹配而出现的架构演变问题。 对于不对数据类型(JSON、CSV 和 XML)进行编码的格式,自动加载程序会将所有列推理为字符串(包括 JSON 文件中的嵌套字段)。 对于具有类型化架构(Parquet 和 Avro)的格式,自动加载程序会采样一部分文件且合并单个文件的架构。 下表汇总了此行为:

文件格式 默认推理数据类型
JSON String
CSV 字符串
XML 字符串
Avro Avro 架构中编码的类型
Parquet Parquet 架构中编码的类型

Apache Spark DataFrameReader 使用不同的行为进行架构推理,根据示例数据为 JSON、CSV 和 XML 源中的列选择数据类型。 若要使用自动加载程序实现此行为,请将选项 cloudFiles.inferColumnTypes 设置为 true

注意

对 CSV 数据的架构进行推理时,自动加载程序会假定文件包含标头。 如果 CSV 文件不包含标头,请提供选项 .option("header", "false")。 此外,自动加载程序会合并样本中所有文件的架构,以得到一个全局架构。 然后,自动加载程序可以根据文件标头读取每个文件,并正确分析 CSV。

注意

当列在两个 Parquet 文件中具有不同的数据类型时,自动加载程序选择最宽泛的类型。 可使用 schemaHints 替代此选项。 在指定架构提示时,自动加载程序不会将列强制转换为指定类型,而是告知 Parquet 读取器将列读取为指定类型。 如果不匹配,会补救数据列中补救该列。

自动加载程序架构演变的工作原理是怎样的?

自动加载程序在处理数据时会检测是否添加了新列。 当自动加载程序检测到新列时,流会停止并出现 UnknownFieldException。 在流引发此错误之前,自动加载程序会在最新的数据微批上执行架构推理,并通过将新列合并到架构末尾来使用最新架构更新架构位置。 现有列的数据类型将保持不变。

Databricks 建议使用 Databricks 作业配置自动加载程序流,以便在此类架构更改后自动重启。

自动加载程序支持以下架构演变模式,可以在选项 cloudFiles.schemaEvolutionMode 中设置这些模式:

模式 读取新列的行为
addNewColumns(默认值) 流失败。 新列将添加到架构。 现有列不会使数据类型演变。
rescue 架构永远不会演变,流不会因架构更改而失败。 所有新列都记录在补救数据列中。
failOnNewColumns 流失败。 除非更新提供的架构,或者删除有问题的数据文件,否则流不会重启。
none 不会使架构演变,将忽略新列,并且除非设置 rescuedDataColumn 选项,否则不会补救数据。 流不会因为架构更改而失败。

注意

当未提供架构时,addNewColumns 模式是默认模式,但当提供架构时,none 是默认模式。 当提供流的架构时,不允许使用 addNewColumns,但如果提供你的架构作为架构提示,则可以使用它。

分区对自动加载程序有何作用?

如果数据排布在 Hive 样式分区中,则自动加载程序会尝试从数据的基础目录结构推理分区列。 例如,文件路径 base_path/event=click/date=2021-04-01/f0.json 导致 dateevent 被推理为分区列。 如果基础目录结构包含有冲突的 Hive 分区或者不包含 Hive 样式分区,则会忽略分区列。

二进制文件 (binaryFile) 和 text 文件格式具有固定数据架构,但支持分区列推理。 Databricks 建议为这些文件格式设置 cloudFiles.schemaLocation。 这可以避免任何潜在的错误或信息丢失,并防止在每次自动加载程序启动时进行分区列推理。

不考虑对分区列进行架构演变。 如果有一个类似于 base_path/event=click/date=2021-04-01/f0.json 的初始目录结构,然后开始以 base_path/event=click/date=2021-04-01/hour=01/f1.json 的形式接收新文件,那么自动加载程序将忽略小时列。 若要捕获新分区列的信息,请将 cloudFiles.partitionColumns 设置为 event,date,hour

备注

选项 cloudFiles.partitionColumns 采用逗号分隔的列名称列表。 仅分析目录结构中 key=value 格式的列。

什么是补救数据列?

当自动加载程序推理架构时,会自动将补救数据列添加为架构中的 _rescued_data。 可以重命名该列,或者在提供架构的情况下通过设置选项 rescuedDataColumn 来包含该列。

补救数据列可确保补救与架构不匹配的列,而不是删除这些列。 补救数据列包含出于以下原因而未被分析的所有数据:

  • 架构中缺少该列。
  • 类型不匹配。
  • 案例不匹配。

补救数据列中包含一个 JSON,其中包含已补救列以及记录的源文件路径。

备注

分析记录时,JSON 和 CSV 分析器支持三种模式:PERMISSIVEDROPMALFORMEDFAILFAST。 与 rescuedDataColumn 一起使用时,数据类型不匹配不会导致在 DROPMALFORMED 模式下删除记录,或者在 FAILFAST 模式下引发错误。 只有损坏的记录会被删除或引发错误,例如不完整或格式错误的 JSON 或 CSV。 如果在分析 JSON 或 CSV 时使用 badRecordsPath,则在使用 rescuedDataColumn 时,不会将数据类型不匹配情况视为错误的记录。 只有不完整的和格式错误的 JSON 或 CSV 记录会存储在 badRecordsPath 中。

更改区分大小写的行为

除非启用了区分大小写功能,否则会将列 abcAbcABC 视为相同的列,这样做的目的是进行架构推理。 所选择的大小写是非特定的,具体取决于采样的数据。 可以通过架构提示来强制要求使用哪种大小写。 进行选择并推断架构后,自动加载程序不会考虑那些未进行选择但与架构一致的大小写变体。

启用数据列时,以非架构所用的大小写形式命名的字段将加载到 _rescued_data 列。 通过将选项 readerCaseSensitive 设置为 false 来更改此行为,在这种情况下自动加载程序将以不区分大小写的方式读取数据。

使用架构提示重写架构推理

通过使用架构提示,可以强制实施你所知道的并期望出现在推理架构中的信息。 如果你知道某列采用特定的属性类型,或者想要选择更常规的数据类型(例如不使用 integer,而改用 double),可以为列数据类型提供任意数量的提示(格式为符合 SQL 架构规范语法的字符串),例如下文所示:

.option("cloudFiles.schemaHints", "tags map<string,string>, version int")

有关支持的数据类型列表,请参阅有关数据类型的文档。

如果启动流时不存在某个列,则你还可使用架构提示将该列添加到推理的架构中。

以下推理架构示例演示了使用架构提示时的行为。

推理的架构:

|-- date: string
|-- quantity: int
|-- user_info: struct
|    |-- id: string
|    |-- name: string
|    |-- dob: string
|-- purchase_options: struct
|    |-- delivery_address: string

指定以下架构提示:

.option("cloudFiles.schemaHints", "date DATE, user_info.dob DATE, purchase_options MAP<STRING,STRING>, time TIMESTAMP")

可获取:

|-- date: string -> date
|-- quantity: int
|-- user_info: struct
|    |-- id: string
|    |-- name: string
|    |-- dob: string -> date
|-- purchase_options: struct -> map<string,string>
|-- time: timestamp

注意

Databricks Runtime 9.1 LTS 及更高版本支持数组和映射架构提示。

下面是一个具有复杂数据类型的推理架构示例,它演示了使用架构提示时的行为。

推理的架构:

|-- products: array<string>
|-- locations: array<string>
|-- users: array<struct>
|    |-- users.element: struct
|    |    |-- id: string
|    |    |-- name: string
|    |    |-- dob: string
|-- ids: map<string,string>
|-- names: map<string,string>
|-- prices: map<string,string>
|-- discounts: map<struct,string>
|    |-- discounts.key: struct
|    |    |-- id: string
|    |-- discounts.value: string
|-- descriptions: map<string,struct>
|    |-- descriptions.key: string
|    |-- descriptions.value: struct
|    |    |-- content: int

指定以下架构提示:

.option("cloudFiles.schemaHints", "products ARRAY<INT>, locations.element STRING, users.element.id INT, ids MAP<STRING,INT>, names.key INT, prices.value INT, discounts.key.id INT, descriptions.value.content STRING")

可获取:

|-- products: array<string> -> array<int>
|-- locations: array<int> -> array<string>
|-- users: array<struct>
|    |-- users.element: struct
|    |    |-- id: string -> int
|    |    |-- name: string
|    |    |-- dob: string
|-- ids: map<string,string> -> map<string,int>
|-- names: map<string,string> -> map<int,string>
|-- prices: map<string,string> -> map<string,int>
|-- discounts: map<struct,string>
|    |-- discounts.key: struct
|    |    |-- id: string -> int
|    |-- discounts.value: string
|-- descriptions: map<string,struct>
|    |-- descriptions.key: string
|    |-- descriptions.value: struct
|    |    |-- content: int -> string

注意

仅当未向自动加载程序提供架构时,才会使用架构提示。 不管是启用还是禁用了 cloudFiles.inferColumnTypes,都可以使用架构提示。