将 Parquet 数据湖迁移到 Delta Lake
本文提供有关将现有 Parquet 数据湖转换为 Delta Lake 的建议。 Delta Lake 是 Databricks 湖屋中的基础格式。 请参阅什么是 Delta Lake?。
转换为 Delta Lake 之前的注意事项
Parquet 数据湖可能具有针对现有工作负载和系统进行优化的分区策略。 虽然你可以转换为 Delta Lake 并保留此分区结构,但过度分区的表是导致 Delta Lake 上的工作负载运行缓慢的主要原因之一。 请参阅何时对 Azure Databricks 上的表进行分区,以及有关根据 Databricks 改编 Spark 代码的指导原则。
此外,需要考虑所要转换的数据是否仍在增长,以及当前查询数据的频率。 可为数据湖中的不同 Parquet 表选择不同的方法。
Delta Lake 转换方法
以下矩阵概述了将 Parquet 数据湖转换为 Delta Lake 的四种主要方法以及一些利弊。 每列的澄清:
- 增量:表示支持在转换开始之后转换追加到转换源的其他数据的功能。
- 复制数据:指示是将数据写入新位置还是就地修改数据。
- 保留数据结构:指示是否在转换期间保留分区策略。
- 回填数据:表示支持在转换开始之后回填已添加到转换源的数据的功能。
- 易用性:指示用户配置和运行数据转换的难易程度。
方法 | 增量 | 复制数据 | 保留数据结构 | 回填数据 | 易于使用 |
---|---|---|---|---|---|
深度 CLONE Parquet |
是 | 是 | 是 | 是 | 简单 |
浅度 CLONE Parquet |
是 | No | 是 | 是 | 简单 |
CONVERT TO DELTA |
否 | No | 是 | 否 | 简单 |
自动加载程序 | 是 | 是 | 否 | 可选 | 某种配置 |
批处理 Spark 作业 | 自定义逻辑 | 是 | 否 | 自定义逻辑 | 自定义逻辑 |
以下部分更深入地讨论了其中每个选项。
使用 CLONE
Parquet 迁移 Parquet 数据
可以使用 CLONE
Parquet 以增量方式将 Parquet 数据湖中的数据复制到 Delta Lake。 浅度克隆创建指向现有 Parquet 文件的指针,使 Parquet 表保留其原始位置和格式,同时通过收集的文件统计信息提供优化的访问。 可以写入到由浅度克隆创建的表,而不会影响原始数据源。
深度克隆将源中的所有数据文件复制到新位置,同时转换为 Delta Lake。 深度克隆允许在后续执行逻辑时以增量方式检测新文件,包括回填操作。 请参阅将 Parquet 和 Iceberg 表增量克隆到 Delta Lake。
以下示例演示如何使用 CLONE
:
CREATE OR REPLACE TABLE <target-table-name> [SHALLOW] CLONE parquet.`/path/to/data`;
使用 CONVERT TO DELTA
迁移 Parquet 数据
可以使用 CONVERT TO DELTA
通过单个命令将 Parquet 文件的目录转换为 Delta 表。 将某个表转换为 Delta Lake 后,应停止使用 Parquet 逻辑读取和写入该表。 在转换开始之后写入目标目录的数据可能不会反映在生成的 Delta 表中。 请参阅转换为 Delta Lake。
以下示例演示如何使用 CONVERT TO DELTA
:
CONVERT TO DELTA parquet.`abfss://container@storageAccount.dfs.core.chinacloudapi.cn/parquet-data`;
使用自动加载程序迁移 Parquet 数据
虽然自动加载程序是专为从云对象存储引入增量数据而设计的产品,但你可以利用它实现一种模式,以增量方式将给定目录中的所有数据复制到目标表。 请参阅什么是自动加载程序?。
以下代码示例包含用于执行以下操作的配置:
- 处理源目录中的所有现有文件。
- 触发每周自动回填作业,以捕获可能遗漏的文件。
- 允许 Apache Spark 使用许多 Spark 作业,以避免与大型数据分区相关的溢出和内存不足错误。
- 提供端到端的“恰好处理一次”保证。
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "parquet")
.option("cloudFiles.includeExistingFiles", "true")
.option("cloudFiles.backfillInterval", "1 week")
.option("cloudFiles.schemaLocation", checkpoint_path)
.load(file_path)
.writeStream
.option("checkpointLocation", checkpoint_path)
.trigger(availableNow=True)
.toTable(table_name)
)
可以通过 Python 或 SQL 在增量实时表中使用自动加载程序:
使用自定义 Apache Spark 批处理逻辑迁移 Parquet 数据
编写自定义 Apache Spark 逻辑可以十分灵活地控制从源系统迁移不同数据的方式和时机,但可能需要完成多种配置才能提供其他方法内置的功能。
此方法的核心是一个简单的 Apache Spark 读取和写入操作,如下所示:
spark.read.format("parquet").load(file_path).write.mode("append").saveAsTable(table_name)
若要执行回填或增量迁移,你也许可以依赖于数据源的分区结构,但同时可能需要编写自定义逻辑来跟踪自上次从源加载数据以来已添加的文件。 虽然可以使用 Delta Lake 合并功能来避免写入重复的记录,但将大型 Parquet 源表中的所有记录与大型 Delta 表的内容进行比较是一项计算成本高昂的任务。
在哪种情况下不应转换为 Delta Lake?
在将所有现有 Parquet 数据转换为 Delta Lake 之前,你可能会考虑潜在的利弊。
Azure Databricks 设计了许多优化的与 Delta Lake 相关的湖屋功能,Delta Lake 提供了丰富的开源生态系统,其中包含适用于多种语言和企业数据系统的本机连接器。 Delta Sharing 扩展了将使用 Delta Lake 存储的数据共享给其他客户端的功能。
Delta Lake 建立在 Parquet 之上,因此,Azure Databricks 还针对与 Parquet 文件的交互优化了读取器和写入器。
Databricks 建议对从 Azure Databricks 接收定期更新或查询的所有表使用 Delta Lake。 在某些情况下,你可以选择以 Parquet 格式维护数据,例如:
- 将数据写入 Parquet 的上游系统不支持原生的将数据写入 Delta Lake 的功能。
- 读取 Parquet 数据的下游系统无法读取 Delta Lake。
在这两种情况下,你可能需要将表复制到 Delta Lake 以在读取、写入、更新和删除表中的记录时利用性能优势。