Common data loading patterns using COPY INTO

Learn common patterns for using COPY INTO to load data from file sources into Delta Lake.

There are many options for using COPY INTO. You can also use temporary credentials with COPY INTO in combination with these patterns.

See COPY INTO for a full reference of all options.

Create target tables for COPY INTO

COPY INTO must target an existing Delta table.

CREATE TABLE IF NOT EXISTS my_table
[(col_1 col_1_type, col_2 col_2_type, ...)]
[COMMENT <table-description>]
[TBLPROPERTIES (<table-properties>)];

In Databricks Runtime 11.3 LTS and above, setting the schema for these tables is optional for formats that support schema evolution. See Schema inference and evolution using COPY INTO for details.

Load JSON data using COPY INTO

The following example loads JSON data from five files in Azure Data Lake Storage Gen2 (ADLS Gen2) into the Delta table called my_json_data. This table must be created before COPY INTO can be executed. If any data had already been loaded from one of the files, the data will not be reloaded for that file.

COPY INTO my_json_data
  FROM 'abfss://container@storageAccount.dfs.core.chinacloudapi.cn/base/path'
  FILEFORMAT = JSON
  FILES = ('f1.json', 'f2.json', 'f3.json', 'f4.json', 'f5.json')

 -- The second execution will not copy any data since the first command already loaded the data
 COPY INTO my_json_data
   FROM 'abfss://container@storageAccount.dfs.core.chinacloudapi.cn/base/path'
   FILEFORMAT = JSON
   FILES = ('f1.json', 'f2.json', 'f3.json', 'f4.json', 'f5.json')

Load Avro data using COPY INTO

The following example loads Avro data in ADLS Gen2 using additional SQL expressions as part of the SELECT statement.

COPY INTO my_delta_table
  FROM (SELECT to_date(dt) dt, event as measurement, quantity::double
          FROM 'abfss://container@storageAccount.dfs.core.chinacloudapi.cn/base/path')
  FILEFORMAT = AVRO

Load CSV files using COPY INTO

The following example loads CSV files from Azure Data Lake Storage Gen2 under abfss://container@storageAccount.dfs.core.chinacloudapi.cn/base/path/folder1 into a Delta table.

COPY INTO target_table
  FROM (SELECT key, index, textData, 'constant_value'
          FROM 'abfss://container@storageAccount.dfs.core.chinacloudapi.cn/base/path')
  FILEFORMAT = CSV
  PATTERN = 'folder1/file_[a-g].csv'
  FORMAT_OPTIONS('header' = 'true')

-- The example below loads CSV files without headers in ADLS Gen2 using COPY INTO.
-- By casting the data and renaming the columns, you can put the data in the schema you want
COPY INTO target_table
  FROM (SELECT _c0::bigint key, _c1::int index, _c2 textData
        FROM 'abfss://container@storageAccount.dfs.core.chinacloudapi.cn/base/path')
  FILEFORMAT = CSV
  PATTERN = 'folder1/file_[a-g].csv'

Schema inference and evolution using COPY INTO

This section provides examples for common schema inference and evolution configurations using COPY INTO.

Syntax

COPY INTO my_table
FROM '/path/to/files'
FILEFORMAT = <format>
FORMAT_OPTIONS ('inferSchema' = 'true', `mergeSchema` = `true`)
COPY_OPTIONS ('mergeSchema' = 'true');

The following FORMAT_OPTIONS are available to infer the input schema automatically with COPY INTO:

  • inferSchema: Whether to infer the data types of the parsed records or to assume all columns are of StringType.

  • mergeSchema: Whether to infer the schema across multiple source files and to merge the schema of each source file.

    If the source files have the same schema, Databricks recommends using the default setting for mergeSchema in FORMAT_OPTIONS (false).

The following COPY_OPTIONS are available to evolve the target schema with COPY INTO:

  • mergeSchema: Whether to evolve the schema of the target Delta table based on the input schema.

    If the input schema and the target schema are the same, mergeSchema can be false in COPY_OPTIONS.

Infer and evolve CSV schema

The following example creates a schemaless Delta table called my_pipe_data and loads pipe-delimited CSV with a header.

mergeSchema is true in FORMAT_OPTIONS because the input files might have header or delimiter differences.

CREATE TABLE IF NOT EXISTS my_pipe_data;

COPY INTO my_pipe_data
  FROM 'abfss://container@storageAccount.dfs.core.chinacloudapi.cn/base/path'
  FILEFORMAT = CSV
  FORMAT_OPTIONS ('mergeSchema' = 'true',
                  'delimiter' = '|',
                  'header' = 'true')
  COPY_OPTIONS ('mergeSchema' = 'true');

Ignore corrupt files while loading data

If the data you're loading can't be read due to some corruption issue, those files can be skipped by setting ignoreCorruptFiles to true in the FORMAT_OPTIONS.

The result of the COPY INTO command returns how many files were skipped due to corruption in the num_skipped_corrupt_files column. This metric also shows up in the operationMetrics column under numSkippedCorruptFiles after running DESCRIBE HISTORY on the Delta table.

Corrupt files aren't tracked by COPY INTO, so they can be reloaded in a subsequent run if the corruption is fixed. You can see which files are corrupt by running COPY INTO in VALIDATE mode.

COPY INTO my_table
FROM '/path/to/files'
FILEFORMAT = <format>
[VALIDATE ALL]
FORMAT_OPTIONS ('ignoreCorruptFiles' = 'true')

Note

ignoreCorruptFiles is available in Databricks Runtime 11.3 LTS and above.