Common data loading patterns using
Learn common patterns for using COPY INTO to load data from file sources into Delta Lake.
There are many options for using COPY INTO. You can also use temporary credentials with COPY INTO in combination with these patterns.
See COPY INTO for a full reference of all options.
Create target tables for COPY INTO
COPY INTO must target an existing Delta table.
CREATE TABLE IF NOT EXISTS my_table
[(col_1 col_1_type, col_2 col_2_type, ...)]
[COMMENT <table-description>]
[TBLPROPERTIES (<table-properties>)];
In Databricks Runtime 11.3 LTS and above, setting the schema for these tables is optional for formats that support schema evolution. See Schema inference and evolution using COPY INTO for details.
Load JSON data using COPY INTO
The following example loads JSON data from five files in Azure Data Lake Storage (ADLS) into the Delta table called my_json_data. This table must be created before COPY INTO can be executed. If any data had already been loaded from one of the files, the data will not be reloaded for that file.
COPY INTO my_json_data
FROM 'abfss://container@storageAccount.dfs.core.chinacloudapi.cn/base/path'
FILEFORMAT = JSON
FILES = ('f1.json', 'f2.json', 'f3.json', 'f4.json', 'f5.json')
-- The second execution will not copy any data since the first command already loaded the data
COPY INTO my_json_data
FROM 'abfss://container@storageAccount.dfs.core.chinacloudapi.cn/base/path'
FILEFORMAT = JSON
FILES = ('f1.json', 'f2.json', 'f3.json', 'f4.json', 'f5.json')
Load Avro data using COPY INTO
The following example loads Avro data in ADLS using additional SQL expressions as part of the SELECT statement.
COPY INTO my_delta_table
FROM (SELECT to_date(dt) dt, event as measurement, quantity::double
FROM 'abfss://container@storageAccount.dfs.core.chinacloudapi.cn/base/path')
FILEFORMAT = AVRO
Load CSV files using COPY INTO
The following example loads CSV files from Azure Data Lake Storage under abfss://container@storageAccount.dfs.core.chinacloudapi.cn/base/path/folder1 into a Delta table.
COPY INTO target_table
FROM (SELECT key, index, textData, 'constant_value'
FROM 'abfss://container@storageAccount.dfs.core.chinacloudapi.cn/base/path')
FILEFORMAT = CSV
PATTERN = 'folder1/file_[a-g].csv'
FORMAT_OPTIONS('header' = 'true')
-- The example below loads CSV files without headers in ADLS using COPY INTO.
-- By casting the data and renaming the columns, you can put the data in the schema you want
COPY INTO target_table
FROM (SELECT _c0::bigint key, _c1::int index, _c2 textData
FROM 'abfss://container@storageAccount.dfs.core.chinacloudapi.cn/base/path')
FILEFORMAT = CSV
PATTERN = 'folder1/file_[a-g].csv'
Schema inference and evolution using COPY INTO
This section provides examples for common schema inference and evolution configurations using COPY INTO.
Syntax
COPY INTO my_table
FROM '/path/to/files'
FILEFORMAT = <format>
FORMAT_OPTIONS ('inferSchema' = 'true', `mergeSchema` = `true`)
COPY_OPTIONS ('mergeSchema' = 'true');
The following FORMAT_OPTIONS are available to infer the input schema automatically with COPY INTO:
inferSchema: Whether to infer the data types of the parsed records or to assume all columns are ofStringType.mergeSchema: Whether to infer the schema across multiple source files and to merge the schema of each source file.If the source files have the same schema, Databricks recommends using the default setting for
mergeSchemainFORMAT_OPTIONS(false).
The following COPY_OPTIONS are available to evolve the target schema with COPY INTO:
mergeSchema: Whether to evolve the schema of the target Delta table based on the input schema.If the input schema and the target schema are the same,
mergeSchemacan befalseinCOPY_OPTIONS.
Infer and evolve CSV schema
The following example creates a schemaless Delta table called my_pipe_data and loads pipe-delimited CSV with a header.
mergeSchema is true in FORMAT_OPTIONS because the input files might have header or delimiter differences.
CREATE TABLE IF NOT EXISTS my_pipe_data;
COPY INTO my_pipe_data
FROM 'abfss://container@storageAccount.dfs.core.chinacloudapi.cn/base/path'
FILEFORMAT = CSV
FORMAT_OPTIONS ('mergeSchema' = 'true',
'delimiter' = '|',
'header' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true');
Ignore corrupt files while loading data
If the data you're loading can't be read due to some corruption issue, those files can be skipped by setting ignoreCorruptFiles to true in the FORMAT_OPTIONS.
The result of the COPY INTO command returns how many files were skipped due to corruption in the num_skipped_corrupt_files column. This metric also shows up in the operationMetrics column under numSkippedCorruptFiles after running DESCRIBE HISTORY on the Delta table.
Corrupt files aren't tracked by COPY INTO, so they can be reloaded in a subsequent run if the corruption is fixed. You can see which files are corrupt by running COPY INTO in VALIDATE mode.
COPY INTO my_table
FROM '/path/to/files'
FILEFORMAT = <format>
[VALIDATE ALL]
FORMAT_OPTIONS ('ignoreCorruptFiles' = 'true')
Note
ignoreCorruptFiles is available in Databricks Runtime 11.3 LTS and above.