使用 COPY INTO 的常见数据加载模式
了解使用 COPY INTO
将数据从文件源加载到 Delta Lake 的常见模式。
有许多选项可用于使用 COPY INTO
。 还可以结合这些模式将临时凭据与 COPY INTO 一起使用。
有关所有选项的完整参考,请参阅 COPY INTO。
为 COPY INTO 创建目标表
COPY INTO
必须面向现有的 Delta 表。 在 Databricks Runtime 11.3 LTS 及更高版本中,对于支持架构演变的格式,设置这些表的架构是可选的:
CREATE TABLE IF NOT EXISTS my_table
[(col_1 col_1_type, col_2 col_2_type, ...)]
[COMMENT <table-description>]
[TBLPROPERTIES (<table-properties>)];
请注意,若要推断具有 COPY INTO
的架构,必须传递其他选项:
COPY INTO my_table
FROM '/path/to/files'
FILEFORMAT = <format>
FORMAT_OPTIONS ('inferSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true');
以下示例创建一个无架构 Delta 表,该表调用 my_pipe_data
并加载带标头的管道分隔的 CSV:
CREATE TABLE IF NOT EXISTS my_pipe_data;
COPY INTO my_pipe_data
FROM 'abfss://container@storageAccount.dfs.core.chinacloudapi.cn/base/path'
FILEFORMAT = CSV
FORMAT_OPTIONS ('mergeSchema' = 'true',
'delimiter' = '|',
'header' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true');
使用 COPY INTO 加载 JSON 数据
以下示例将 Azure Data Lake Storage Gen2 (ADLS Gen2) 中的五个文件中的 JSON 数据加载到名为 my_json_data
的 Delta 表中。 必须在执行 COPY INTO
之前创建此表。 如果已从一个文件加载了任何数据,则不会为该文件重新加载数据。
COPY INTO my_json_data
FROM 'abfss://container@storageAccount.dfs.core.chinacloudapi.cn/base/path'
FILEFORMAT = JSON
FILES = ('f1.json', 'f2.json', 'f3.json', 'f4.json', 'f5.json')
-- The second execution will not copy any data since the first command already loaded the data
COPY INTO my_json_data
FROM 'abfss://container@storageAccount.dfs.core.chinacloudapi.cn/base/path'
FILEFORMAT = JSON
FILES = ('f1.json', 'f2.json', 'f3.json', 'f4.json', 'f5.json')
使用 COPY INTO 加载 Avro 数据
以下示例使用附加 SQL 表达式作为 SELECT
语句的一部分在 ADLS Gen2 中加载 Avro 数据。
COPY INTO my_delta_table
FROM (SELECT to_date(dt) dt, event as measurement, quantity::double
FROM 'abfss://container@storageAccount.dfs.core.chinacloudapi.cn/base/path')
FILEFORMAT = AVRO
使用 COPY INTO 加载 CSV 文件
以下示例将 CSV 文件从 abfss://container@storageAccount.dfs.core.chinacloudapi.cn/base/path/folder1
下的 Azure Data Lake Storage Gen2 加载到 Delta 表中。
COPY INTO target_table
FROM (SELECT key, index, textData, 'constant_value'
FROM 'abfss://container@storageAccount.dfs.core.chinacloudapi.cn/base/path')
FILEFORMAT = CSV
PATTERN = 'folder1/file_[a-g].csv'
FORMAT_OPTIONS('header' = 'true')
-- The example below loads CSV files without headers in ADLS Gen2 using COPY INTO.
-- By casting the data and renaming the columns, you can put the data in the schema you want
COPY INTO target_table
FROM (SELECT _c0::bigint key, _c1::int index, _c2 textData
FROM 'abfss://container@storageAccount.dfs.core.chinacloudapi.cn/base/path')
FILEFORMAT = CSV
PATTERN = 'folder1/file_[a-g].csv'
加载数据时忽略损坏的文件
如果由于某些损坏问题而无法读取正在加载的数据,可以通过在 FORMAT_OPTIONS
中将 ignoreCorruptFiles
设置为 true
来跳过这些文件。
COPY INTO
命令的结果返回由于 num_skipped_corrupt_files
列中损坏而跳过的文件数。 在 Delta 表上运行 DESCRIBE HISTORY
后,此指标也会显示在 numSkippedCorruptFiles
下的 operationMetrics
列中。
COPY INTO
不会跟踪损坏的文件,因此如果修复损坏,可以在后续运行中重新加载这些文件。 可以通过在 VALIDATE
模式下运行 COPY INTO
来查看哪些文件已损坏。
COPY INTO my_table
FROM '/path/to/files'
FILEFORMAT = <format>
[VALIDATE ALL]
FORMAT_OPTIONS ('ignoreCorruptFiles' = 'true')
注意
ignoreCorruptFiles
在 Databricks Runtime 11.3 LTS 及更高版本中可用。