可以将自动加载程序配置为自动检测已加载数据的架构,这样无需显式声明数据架构即可初始化表,并在引入新列时让表架构完成相应的演变。 这样就无需一直手动跟踪和应用架构更改。
自动加载器还能在 JSON blob 列中"检索"意外的数据(例如不同类型的数据),你可以选择稍后使用半结构化数据访问 API查看这些数据。
自动加载程序支持以下格式进行架构推理和演变:
| 文件格式 | 支持的版本 |
|---|---|
JSON |
所有版本 |
CSV |
所有版本 |
XML |
Databricks Runtime 14.3 LTS 及更高版本 |
Avro |
Databricks Runtime 10.4 LTS 及更高版本 |
Parquet |
Databricks Runtime 11.3 LTS 及更高版本 |
ORC |
不支持 |
Text |
不适用(固定架构) |
Binaryfile |
不适用(固定架构) |
架构推理的语法及演变
为选项指定目标目录 cloudFiles.schemaLocation 可启用架构推理和演变。 可以选择使用为 checkpointLocation 指定的目录。 如果使用 Lakeflow Spark 声明性管道,Azure Databricks 会自动管理架构位置和其他检查点信息。
备注
如果将多个源数据位置中的数据加载到目标表中,则每个 Auto Loader 数据引入工作负载都需要单独的流式检查点。
下面的示例将 parquet 用于 cloudFiles.format。 将 csv、avro 或 json 用于其他文件源。 每种格式的默认行为的其他所有读取和写入设置都保持不变。
Python
(spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "parquet")
# The schema location directory keeps track of your data schema over time
.option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
.load("<path-to-source-data>")
.writeStream
.option("checkpointLocation", "<path-to-checkpoint>")
.start("<path-to-target>")
)
Scala(编程语言)
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "parquet")
// The schema location directory keeps track of your data schema over time
.option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
.load("<path-to-source-data>")
.writeStream
.option("checkpointLocation", "<path-to-checkpoint>")
.start("<path-to-target>")
自动加载程序架构推理的工作原理是怎样的?
若要在首次读取数据时推理架构,自动加载程序会在它发现的前 50 GB 或前 1000 个文件(以先超过的限制为准)中采样。 自动加载程序将架构信息存储在配置为 _schemas 的目录 cloudFiles.schemaLocation 中以跟踪之后对输入数据的架构更改。
备注
若要更改所用示例的大小,请设置 SQL 配置:
spark.databricks.cloudFiles.schemaInference.sampleSize.numBytes
(字节字符串,例如 10gb)
以及
spark.databricks.cloudFiles.schemaInference.sampleSize.numFiles
(整数)
默认情况下,自动加载程序架构推理会试图避免由于类型不匹配而出现的架构演变问题。 对于不对数据类型(JSON、CSV 和 XML)进行编码的格式,自动加载程序会将所有列推理为字符串(包括 JSON 文件中的嵌套字段)。 对于具有类型化架构(Parquet 和 Avro)的格式,自动加载程序会采样一部分文件且合并单个文件的架构。 下表汇总了此行为。
| 文件格式 | 默认推理数据类型 |
|---|---|
JSON |
字符串 |
CSV |
字符串 |
XML |
字符串 |
Avro |
Avro 架构中编码的类型 |
Parquet |
Parquet 架构中编码的类型 |
Apache Spark DataFrameReader 使用不同的行为进行架构推理,根据示例数据为 JSON、CSV 和 XML 源中的列选择数据类型。 若要使用自动加载程序实现此行为,请将选项 cloudFiles.inferColumnTypes 设置为 true。
备注
对 CSV 数据的架构进行推理时,自动加载程序会假定文件包含标头。 如果 CSV 文件不包含标头,请提供选项 .option("header", "false")。 此外,自动加载程序会合并样本中所有文件的架构,以得到一个全局架构。 然后,自动加载程序可以根据文件标头读取每个文件,并正确分析 CSV。
备注
当列在两个 Parquet 文件中具有不同的数据类型时,自动加载程序选择最宽泛的类型。 可使用 schemaHints 替代此选项。 在指定架构提示时,自动加载器不会将列强制转换为指定类型,而是指示 Parquet 阅读器按指定类型读取列。 如果不匹配,自动加载程序通过将数据放入 已获救的数据列来拯救该列。
自动加载程序架构演变的工作原理是怎样的?
自动加载程序在处理数据时会检测是否添加了新列。 当自动加载程序检测到新列时,流会停止并出现 UnknownFieldException。 在流触发此错误之前,Auto Loader 会对最新的数据微批进行架构推测,并通过将新列合并到架构末尾来用最新的架构更新架构。 现有列的数据类型将保持不变。
Databricks 建议使用 Lakeflow 作业 配置自动加载程序流,以便在发生此类架构更改后可以自动重启。
自动加载器支持以下通过 cloudFiles.schemaEvolutionMode 选项设置的架构演化模式:
| 模式 | 在读取新列时的行为 |
|---|---|
addNewColumns(默认值) |
数据流失败了 新列已添加到模式中。 现有列不会使数据类型演变。 |
rescue |
自动加载程序不会自动演变架构,数据流不会因架构更改而失败。 Auto Loader 记录所有新列到 已恢复数据列中。 |
failOnNewColumns |
数据流失败。 除非更新提供的架构或删除有问题的数据文件,否则 Stream 不会重启。 |
none |
不会使架构演变,将忽略新列,并且除非设置 rescuedDataColumn 选项,否则不会补救数据。 流不会因为架构更改而失败。 |
备注
当未提供架构时,addNewColumns 模式是默认模式,但当提供架构时,none 是默认模式。 当提供流的架构时,不允许使用 addNewColumns,但如果提供你的架构作为架构提示,则可以使用它。
自动加载程序中的分区如何运作?
如果数据排布在 Hive 样式分区中,则自动加载程序会尝试从数据的基础目录结构推理分区列。 例如,文件路径 base_path/event=click/date=2021-04-01/f0.json 导致 date 和 event 被推理为分区列。 如果基础目录结构包含冲突的 Hive 分区或不包含 Hive 样式分区,则自动加载程序将忽略分区列。
二进制文件 (binaryFile) 和 text 文件格式具有固定数据架构,但支持分区列推理。 Databricks 建议为这些文件格式设置 cloudFiles.schemaLocation。 这可以避免任何潜在的错误或信息丢失,并防止在每次自动加载程序启动时进行分区列推理。
自动加载程序不考虑用于架构演变的分区列。 如果有一个类似于 base_path/event=click/date=2021-04-01/f0.json 的初始目录结构,然后开始以 base_path/event=click/date=2021-04-01/hour=01/f1.json 的形式接收新文件,那么自动加载程序将忽略小时列。 若要捕获新分区列的信息,请将 cloudFiles.partitionColumns 设置为 event,date,hour。
备注
该 cloudFiles.partitionColumns 选项采用以逗号分隔的列名列表。 自动加载程序仅分析目录结构中作为 key=value 对存在的列。
什么是补救数据列?
当 Auto Loader 推断模式时,Auto Loader 会自动将已获救的数据列作为 _rescued_data 添加到您的模式中。 可以通过设置rescuedDataColumn选项来重命名该列,或者在提供架构时包含该列。
已获救的数据列可确保自动加载程序可拯救与架构不匹配的列,而不是删除它们。 补救数据列包含出于以下原因而未被分析的所有数据:
- 架构中缺少该列。
- 类型不匹配。
- 案例不匹配。
已恢复的数据列包括一个 JSON 数据块,其中包含已恢复的列以及记录的源文件路径。
备注
分析记录时,JSON 和 CSV 分析器支持三种模式:PERMISSIVE、DROPMALFORMED 和 FAILFAST。 与rescuedDataColumn一起使用时,数据类型不匹配不会导致自动加载程序在DROPMALFORMED模式下删除记录或在FAILFAST模式下抛出错误。 只有损坏的记录失败或引发错误,例如不完整或格式不正确的 JSON 或 CSV。 如果在分析 JSON 或 CSV 时使用 badRecordsPath ,则在使用 rescuedDataColumnJSON 或 CSV 时,自动加载程序不会将数据类型不匹配视为错误的记录。 自动加载程序仅存储不完整且格式不正确的 JSON 或 CSV 记录 badRecordsPath。
更改区分大小写的行为
除非启用了区分大小写,否则自动加载程序会将列abc、Abc和ABC视为同一列用于架构推理。 自动加载程序根据采样的数据任意选择事例。 可以使用模式提示来强制规定应该使用哪种大小写。 自动加载程序进行选择并推断架构后,它不会考虑那些未被选择与架构一致的大小写变体。
启用 已获救的数据列 后,自动加载程序会将字段名与架构不一致的字段加载到 _rescued_data 列。 通过将选项设置为 readerCaseSensitive false 来更改此行为,在这种情况下,自动加载程序以不区分大小写的方式读取数据。
使用架构提示覆盖架构推理
通过使用架构提示,可以强制实施你所知道的并期望出现在推理架构中的信息。 如果你知道某列采用特定的属性类型,或者想要选择更常规的数据类型(例如不使用 double,而改用 integer),可以为列数据类型提供任意数量的提示(格式为符合 SQL 架构规范语法的字符串),例如下文所示:
.option("cloudFiles.schemaHints", "tags map<string,string>, version int")
有关支持的数据类型的列表,请参阅 语言映射。
如果启动流时不存在某个列,则你还可使用架构提示将该列添加到推理的架构中。
下面的示例演示了一个推断的架构和应用架构提示的结果。
推导的模式定义:
|-- date: string
|-- quantity: int
|-- user_info: struct
| |-- id: string
| |-- name: string
| |-- dob: string
|-- purchase_options: struct
| |-- delivery_address: string
指定以下模式提示:
.option("cloudFiles.schemaHints", "date DATE, user_info.dob DATE, purchase_options MAP<STRING,STRING>, time TIMESTAMP")
您会获得:
|-- date: string -> date
|-- quantity: int
|-- user_info: struct
| |-- id: string
| |-- name: string
| |-- dob: string -> date
|-- purchase_options: struct -> map<string,string>
|-- time: timestamp
备注
Databricks Runtime 9.1 LTS 及更高版本支持数组和映射架构提示。
以下示例显示了具有复杂数据类型的推断架构,以及应用架构提示的结果。
推断的模式
|-- products: array<string>
|-- locations: array<string>
|-- users: array<struct>
| |-- users.element: struct
| | |-- id: string
| | |-- name: string
| | |-- dob: string
|-- ids: map<string,string>
|-- names: map<string,string>
|-- prices: map<string,string>
|-- discounts: map<struct,string>
| |-- discounts.key: struct
| | |-- id: string
| |-- discounts.value: string
|-- descriptions: map<string,struct>
| |-- descriptions.key: string
| |-- descriptions.value: struct
| | |-- content: int
指定以下架构提示:
.option("cloudFiles.schemaHints", "products ARRAY<INT>, locations.element STRING, users.element.id INT, ids MAP<STRING,INT>, names.key INT, prices.value INT, discounts.key.id INT, descriptions.value.content STRING")
您将获得:
|-- products: array<string> -> array<int>
|-- locations: array<int> -> array<string>
|-- users: array<struct>
| |-- users.element: struct
| | |-- id: string -> int
| | |-- name: string
| | |-- dob: string
|-- ids: map<string,string> -> map<string,int>
|-- names: map<string,string> -> map<int,string>
|-- prices: map<string,string> -> map<string,int>
|-- discounts: map<struct,string>
| |-- discounts.key: struct
| | |-- id: string -> int
| |-- discounts.value: string
|-- descriptions: map<string,struct>
| |-- descriptions.key: string
| |-- descriptions.value: struct
| | |-- content: int -> string
备注
仅当 未 提供架构时,自动加载程序才使用架构提示。 不管是启用还是禁用了 cloudFiles.inferColumnTypes,都可以使用架构提示。
后续步骤
- 查看 自动加载程序选项
- 为生产工作负载配置自动加载程序
- 探索 常见数据加载模式