使用 COPY INTO 的常见数据加载模式

了解使用 COPY INTO 将数据从文件源加载到 Delta Lake 的常见模式。

有许多选项可用于使用 COPY INTO。 还可以结合这些模式将临时凭据与 COPY INTO 一起使用

有关所有选项的完整参考,请参阅 COPY INTO

为 COPY INTO 创建目标表

COPY INTO 必须面向现有的 Delta 表。 在 Databricks Runtime 11.3 LTS 及更高版本中,对于支持架构演变的格式,设置这些表的架构是可选的:

CREATE TABLE IF NOT EXISTS my_table
[(col_1 col_1_type, col_2 col_2_type, ...)]
[COMMENT <table-description>]
[TBLPROPERTIES (<table-properties>)];

请注意,若要推断具有 COPY INTO 的架构,必须传递其他选项:

COPY INTO my_table
FROM '/path/to/files'
FILEFORMAT = <format>
FORMAT_OPTIONS ('inferSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true');

以下示例创建一个无架构 Delta 表,该表调用 my_pipe_data 并加载带标头的管道分隔的 CSV:

CREATE TABLE IF NOT EXISTS my_pipe_data;

COPY INTO my_pipe_data
  FROM 'abfss://container@storageAccount.dfs.core.chinacloudapi.cn/base/path'
  FILEFORMAT = CSV
  FORMAT_OPTIONS ('mergeSchema' = 'true',
                  'delimiter' = '|',
                  'header' = 'true')
  COPY_OPTIONS ('mergeSchema' = 'true');

使用 COPY INTO 加载 JSON 数据

以下示例将 Azure Data Lake Storage Gen2 (ADLS Gen2) 中的五个文件中的 JSON 数据加载到名为 my_json_data 的 Delta 表中。 必须在执行 COPY INTO 之前创建此表。 如果已从一个文件加载了任何数据,则不会为该文件重新加载数据。

COPY INTO my_json_data
  FROM 'abfss://container@storageAccount.dfs.core.chinacloudapi.cn/base/path'
  FILEFORMAT = JSON
  FILES = ('f1.json', 'f2.json', 'f3.json', 'f4.json', 'f5.json')

 -- The second execution will not copy any data since the first command already loaded the data
 COPY INTO my_json_data
   FROM 'abfss://container@storageAccount.dfs.core.chinacloudapi.cn/base/path'
   FILEFORMAT = JSON
   FILES = ('f1.json', 'f2.json', 'f3.json', 'f4.json', 'f5.json')

使用 COPY INTO 加载 Avro 数据

以下示例使用附加 SQL 表达式作为 SELECT 语句的一部分在 ADLS Gen2 中加载 Avro 数据。

COPY INTO my_delta_table
  FROM (SELECT to_date(dt) dt, event as measurement, quantity::double
          FROM 'abfss://container@storageAccount.dfs.core.chinacloudapi.cn/base/path')
  FILEFORMAT = AVRO

使用 COPY INTO 加载 CSV 文件

以下示例将 CSV 文件从 abfss://container@storageAccount.dfs.core.chinacloudapi.cn/base/path/folder1 下的 Azure Data Lake Storage Gen2 加载到 Delta 表中。

COPY INTO target_table
  FROM (SELECT key, index, textData, 'constant_value'
          FROM 'abfss://container@storageAccount.dfs.core.chinacloudapi.cn/base/path')
  FILEFORMAT = CSV
  PATTERN = 'folder1/file_[a-g].csv'
  FORMAT_OPTIONS('header' = 'true')

-- The example below loads CSV files without headers in ADLS Gen2 using COPY INTO.
-- By casting the data and renaming the columns, you can put the data in the schema you want
COPY INTO target_table
  FROM (SELECT _c0::bigint key, _c1::int index, _c2 textData
        FROM 'abfss://container@storageAccount.dfs.core.chinacloudapi.cn/base/path')
  FILEFORMAT = CSV
  PATTERN = 'folder1/file_[a-g].csv'

加载数据时忽略损坏的文件

如果由于某些损坏问题而无法读取正在加载的数据,可以通过在 FORMAT_OPTIONS 中将 ignoreCorruptFiles 设置为 true 来跳过这些文件。

COPY INTO 命令的结果返回由于 num_skipped_corrupt_files 列中损坏而跳过的文件数。 在 Delta 表上运行 DESCRIBE HISTORY 后,此指标也会显示在 numSkippedCorruptFiles 下的 operationMetrics 列中。

COPY INTO 不会跟踪损坏的文件,因此如果修复损坏,可以在后续运行中重新加载这些文件。 可以通过在 VALIDATE 模式下运行 COPY INTO 来查看哪些文件已损坏。

COPY INTO my_table
FROM '/path/to/files'
FILEFORMAT = <format>
[VALIDATE ALL]
FORMAT_OPTIONS ('ignoreCorruptFiles' = 'true')

注意

ignoreCorruptFiles在 Databricks Runtime 11.3 LTS 及更高版本中可用。