Configure schema inference and evolution in Auto Loader
You can configure Auto Loader to automatically detect the schema of loaded data, allowing you to initialize tables without explicitly declaring the data schema and evolve the table schema as new columns are introduced. This eliminates the need to manually track and apply schema changes over time.
Auto Loader can also "rescue" data that was unexpected (for example, of differing data types) in a JSON blob column, that you can choose to access later using the semi-structured data access APIs.
The following formats are supported for schema inference and evolution:
File format | Supported versions |
---|---|
JSON |
All versions |
CSV |
All versions |
XML |
Databricks Runtime 14.3 LTS and above |
Avro |
Databricks Runtime 10.4 LTS and above |
Parquet |
Databricks Runtime 11.3 LTS and above |
ORC |
Unsupported |
Text |
Not applicable (fixed-schema) |
Binaryfile |
Not applicable (fixed-schema) |
Syntax for schema inference and evolution
Specifying a target directory for the option cloudFiles.schemaLocation
enables schema inference and evolution. You can choose to use the same directory you specify for the checkpointLocation
. If you use Delta Live Tables, Azure Databricks manages schema location and other checkpoint information automatically.
Note
If you have more than one source data location being loaded into the target table, each Auto Loader ingestion workload requires a separate streaming checkpoint.
The following example uses parquet
for the cloudFiles.format
. Use csv
, avro
, or json
for other file sources. All other settings for read and write stay the same for the default behaviors for each format.
Python
(spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "parquet")
# The schema location directory keeps track of your data schema over time
.option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
.load("<path-to-source-data>")
.writeStream
.option("checkpointLocation", "<path-to-checkpoint>")
.start("<path_to_target")
)
Scala
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "parquet")
// The schema location directory keeps track of your data schema over time
.option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
.load("<path-to-source-data>")
.writeStream
.option("checkpointLocation", "<path-to-checkpoint>")
.start("<path_to_target")
How does Auto Loader schema inference work?
To infer the schema when first reading data, Auto Loader samples the first 50 GB or 1000 files that it discovers, whichever limit is crossed first. Auto Loader stores the schema information in a directory _schemas
at the configured cloudFiles.schemaLocation
to track schema changes to the input data over time.
Note
To change the size of the sample that's used you can set the SQL configurations:
spark.databricks.cloudFiles.schemaInference.sampleSize.numBytes
(byte string, for example 10gb
)
and
spark.databricks.cloudFiles.schemaInference.sampleSize.numFiles
(integer)
By default, Auto Loader schema inference seeks to avoid schema evolution issues due to type mismatches. For formats that don't encode data types (JSON, CSV, and XML), Auto Loader infers all columns as strings (including nested fields in JSON files). For formats with typed schema (Parquet and Avro), Auto Loader samples a subset of files and merges the schemas of individual files. This behavior is summarized in the following table:
File format | Default inferred data type |
---|---|
JSON |
String |
CSV |
String |
XML |
String |
Avro |
Types encoded in Avro schema |
Parquet |
Types encoded in Parquet schema |
The Apache Spark DataFrameReader uses different behavior for schema inference, selecting data types for columns in JSON, CSV, and XML sources based on sample data. To enable this behavior with Auto Loader, set the option cloudFiles.inferColumnTypes
to true
.
Note
When inferring the schema for CSV data, Auto Loader assumes that the files contain headers. If your CSV files do not contain headers, provide the option .option("header", "false")
. In addition, Auto Loader merges the schemas of all the files in the sample to come up with a global schema. Auto Loader can then read each file according to its header and parse the CSV correctly.
Note
When a column has different data types in two Parquet files, Auto Loader chooses the widest type. You can use schemaHints to override this choice. When you specify schema hints, Auto Loader doesn't cast the column to the specified type, but rather tells the Parquet reader to read the column as the specified type. In the case of a mismatch, the column is rescued in the rescued data column.
How does Auto Loader schema evolution work?
Auto Loader detects the addition of new columns as it processes your data. When Auto Loader detects a new column, the stream stops with an UnknownFieldException
. Before your stream throws this error, Auto Loader performs schema inference on the latest micro-batch of data and updates the schema location with the latest schema by merging new columns to the end of the schema. The data types of existing columns remain unchanged.
Databricks recommends configuring Auto Loader streams with Databricks Jobs to restart automatically after such schema changes.
Auto Loader supports the following modes for schema evolution, which you set in the option cloudFiles.schemaEvolutionMode
:
Mode | Behavior on reading new column |
---|---|
addNewColumns (default) |
Stream fails. New columns are added to the schema. Existing columns do not evolve data types. |
rescue |
Schema is never evolved and stream does not fail due to schema changes. All new columns are recorded in the rescued data column. |
failOnNewColumns |
Stream fails. Stream does not restart unless the provided schema is updated, or the offending data file is removed. |
none |
Does not evolve the schema, new columns are ignored, and data is not rescued unless the rescuedDataColumn option is set. Stream does not fail due to schema changes. |
Note
addNewColumns
mode is the default when a schema is not provided, but none
is the default when you provide a schema. addNewColumns
is not allowed when the schema of the stream is provided, but does work if you provide your schema as a schema hint.
How do partitions work with Auto Loader?
Auto Loader attempts to infer partition columns from the underlying directory structure of the data if the data is laid out in Hive style partitioning. For example, the file path base_path/event=click/date=2021-04-01/f0.json
results in the inference of date
and event
as partition columns. If the underlying directory structure contains conflicting Hive partitions or doesn't contain Hive style partitioning, partition columns are ignored.
Binary file (binaryFile
) and text
file formats have fixed data schemas, but support partition column inference. Databricks recommends setting cloudFiles.schemaLocation
for these file formats. This avoids any potential errors or information loss and prevents inference of partitions columns each time an Auto Loader begins.
Partition columns are not considered for schema evolution. If you had an initial directory structure like base_path/event=click/date=2021-04-01/f0.json
, and then start receiving new files as base_path/event=click/date=2021-04-01/hour=01/f1.json
, Auto Loader ignores the hour column. To capture information for new partition columns, set cloudFiles.partitionColumns
to event,date,hour
.
Note
The option cloudFiles.partitionColumns
takes a comma-separated list of column names. Only columns that exist as key=value
pairs in your directory structure are parsed.
What is the rescued data column?
When Auto Loader infers the schema, a rescued data column is automatically added to your schema as _rescued_data
. You can rename the column or include it in cases where you provide a schema by setting the option rescuedDataColumn
.
The rescued data column ensures that columns that don't match with the schema are rescued instead of being dropped. The rescued data column contains any data that isn't parsed for the following reasons:
- The column is missing from the schema.
- Type mismatches.
- Case mismatches.
The rescued data column contains a JSON containing the rescued columns and the source file path of the record.
Note
The JSON and CSV parsers support three modes when parsing records: PERMISSIVE
, DROPMALFORMED
, and FAILFAST
. When used together with rescuedDataColumn
, data type mismatches do not cause records to be dropped in DROPMALFORMED
mode or throw an error in FAILFAST
mode. Only corrupt records are dropped or throw errors, such as incomplete or malformed JSON or CSV. If you use badRecordsPath
when parsing JSON or CSV, data type mismatches are not considered as bad records when using the rescuedDataColumn
. Only incomplete and malformed JSON or CSV records are stored in badRecordsPath
.
Change case-sensitive behavior
Unless case sensitivity is enabled, the columns abc
, Abc
, and ABC
are considered the same column for the purposes of schema inference. The case that is chosen is arbitrary and depends on the sampled data. You can use schema hints to enforce which case should be used. Once a selection has been made and the schema is inferred, Auto Loader does not consider the casing variants that were not selected consistent with the schema.
When rescued data column is enabled, fields named in a case other than that of the schema are loaded to the _rescued_data
column. Change this behavior by setting the option readerCaseSensitive
to false, in which case Auto Loader reads data in a case-insensitive way.
Override schema inference with schema hints
You can use schema hints to enforce the schema information that you know and expect on an inferred schema. When you know that a column is of a specific data type, or if you want to choose a more general data type (for example, a double
instead of an integer
), you can provide an arbitrary number of hints for column data types as a string using SQL schema specification syntax, such as the following:
.option("cloudFiles.schemaHints", "tags map<string,string>, version int")
See the documentation on data types for the list of supported data types.
If a column is not present at the start of the stream, you can also use schema hints to add that column to the inferred schema.
Here is an example of an inferred schema to see the behavior with schema hints.
Inferred schema:
|-- date: string
|-- quantity: int
|-- user_info: struct
| |-- id: string
| |-- name: string
| |-- dob: string
|-- purchase_options: struct
| |-- delivery_address: string
By specifying the following schema hints:
.option("cloudFiles.schemaHints", "date DATE, user_info.dob DATE, purchase_options MAP<STRING,STRING>, time TIMESTAMP")
you get:
|-- date: string -> date
|-- quantity: int
|-- user_info: struct
| |-- id: string
| |-- name: string
| |-- dob: string -> date
|-- purchase_options: struct -> map<string,string>
|-- time: timestamp
Note
Array and Map schema hints support is available in Databricks Runtime 9.1 LTS and above.
Here is an example of an inferred schema with complex datatypes to see the behavior with schema hints.
Inferred schema:
|-- products: array<string>
|-- locations: array<string>
|-- users: array<struct>
| |-- users.element: struct
| | |-- id: string
| | |-- name: string
| | |-- dob: string
|-- ids: map<string,string>
|-- names: map<string,string>
|-- prices: map<string,string>
|-- discounts: map<struct,string>
| |-- discounts.key: struct
| | |-- id: string
| |-- discounts.value: string
|-- descriptions: map<string,struct>
| |-- descriptions.key: string
| |-- descriptions.value: struct
| | |-- content: int
By specifying the following schema hints:
.option("cloudFiles.schemaHints", "products ARRAY<INT>, locations.element STRING, users.element.id INT, ids MAP<STRING,INT>, names.key INT, prices.value INT, discounts.key.id INT, descriptions.value.content STRING")
you get:
|-- products: array<string> -> array<int>
|-- locations: array<int> -> array<string>
|-- users: array<struct>
| |-- users.element: struct
| | |-- id: string -> int
| | |-- name: string
| | |-- dob: string
|-- ids: map<string,string> -> map<string,int>
|-- names: map<string,string> -> map<int,string>
|-- prices: map<string,string> -> map<string,int>
|-- discounts: map<struct,string>
| |-- discounts.key: struct
| | |-- id: string -> int
| |-- discounts.value: string
|-- descriptions: map<string,struct>
| |-- descriptions.key: string
| |-- descriptions.value: struct
| | |-- content: int -> string
Note
Schema hints are used only if you do not provide a schema to Auto Loader. You can use schema hints whether cloudFiles.inferColumnTypes
is enabled or disabled.