Azure Databricks 提供了各种方法,用于将数据引入到 Delta Lake 支持的湖屋中。 本文列出了支持的引入工具和指南,其中介绍了基于数据源和延迟等条件使用的方法。
可以使用以下方法将数据引入 Databricks:
- 批量导入不经常处理的数据行集
- 流式引入单个数据行或数据行集以便在数据到达时进行实时处理
引入的数据将加载到 Delta 表中 ,然后可在下游数据和 AI 用例中使用。 由于 Databricks 的 Lakehouse 体系结构,无需跨用例复制数据,并且可以利用 Unity 目录跨所有数据集中进行访问控制、审核、世系和数据发现。
使用批量引入可将数据作为行集(或批)加载到 Databricks 中,此操作通常基于计划(例如每日)或手动触发。 这表示传统提取、转换、加载(ETL)用例的“提取”部分。 可以使用批量导入从以下位置加载数据:
本地文件(如 CSV)
云对象存储,包括 Amazon S3 和 Azure Data Lake Storage
SaaS 应用程序(如 Salesforce)和 SQL Server 等数据库
批处理引入支持各种文件格式,包括 CSV、TSV、JSON、XML、Avro、ORC、Parquet 和文本文件。
Databricks 支持传统的批处理引入和增量批处理引入选项。 虽然传统批处理引入每次运行时都会处理所有记录,但增量批处理引入会自动检测数据源中的新记录,并忽略已引入的记录。 这意味着需要处理较少的数据,因此引入作业运行速度更快,并更高效地使用计算资源。
可以使用添加数据 UI 从公共 URL 上传本地数据文件或下载文件。 请参阅 “上传文件”。
本部分介绍支持的增量批处理引入工具。
CREATE STREAMING TABLE
SQL 命令允许以增量方式将数据从云对象存储加载到流式处理表中。 请参阅 CREATE STREAMING TABLE。
示例:使用流式处理表进行增量批处理引入
CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM STREAM read_files(
"/databricks-datasets/retail-org/customers/",
format => "csv"
)
内置云对象存储连接器 Auto Loader(自动加载器)允许您在新的数据文件到达 Amazon S3(S3)或 Azure Data Lake Storage Gen 2(ALDS2)时,以增量且高效的方式处理它们。 请参阅 自动加载程序。
示例:使用自动加载程序以增量方式批量引入
df = spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("rescuedDataColumn", "_rescued_data")
.schema("/databricks-datasets/retail-org/customers/schema")
.load("/databricks-datasets/retail-org/customers/")
Lakeflow Connect 提供完全托管的连接器,用于从 SaaS 应用程序和数据库引入。 可通过以下方式使用托管的连接器:
- Databricks 用户界面
- Databricks 命令行界面 (CLI)
- Databricks API 接口
- Databricks SDK
- Databricks 资产捆绑包
通过流式引入,可以在数据生成时连续加载数据行或数据行批次,以便在数据到达时近乎实时地对其进行查询。 可以使用流式数据摄入来从 Apache Kafka、Amazon Kinesis 和 Apache Pulsar 等来源加载流数据。
Databricks 还支持使用内置连接器进行流式引入。 使用这些连接器可以增量且有效地处理从流数据源到来的新数据。 请参阅配置流式处理数据源。
示例:从 Kafka 流式引入
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "topic1")
.option("startingOffsets", "latest")
.load()
Databricks 建议使用 DLT 生成可靠且可缩放的数据处理管道。 DLT 同时支持批处理和流式引入,并且可以从自动加载程序支持的任何数据源引入数据。
示例:使用 DLT 进行增量批处理引入
@dlt.table
def customers():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/databricks-datasets/retail-org/customers/")
)
示例:使用 DLT 从 Kafka 进行流式引入
@dlt.table
def kafka_raw():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "topic1")
.option("startingOffsets", "latest")
.load()
)
可以作为一次性操作引入数据、按定期计划引入数据或连续引入数据。
- 对于准实时流式处理用例,请使用连续模式。
- 对于批量引入用例,请一次性引入或设置重复计划。
请参阅触发与连续管道模式。
许多第三方工具支持批量引入或流式引入到 Databricks。 Databricks 会验证各种第三方集成,尽管配置对源系统的访问并引入数据的步骤因工具而异。
Databricks 提供一个常规计算平台。 因此,可以使用 Databricks 支持的任何编程语言(如 Python 或 Java)创建自己的引入连接器。 还可以导入和利用常用的开源连接器库,例如数据加载工具、Airbyte 和 Debezium。
Databricks 建议引入大多数用例,因为它可缩放以适应数据量大、低延迟查询和第三方 API 限制。 引入会将您的数据从源系统复制到 Azure Databricks,这会导致数据冗余,并可能随着时间推移变得过时。 如果不想复制数据,可以使用以下工具:
- Lakehouse Federation 允许在不移动数据的情况下查询外部数据源。
- 增量共享 允许跨平台、云和区域安全地共享数据。
但是,如果您不想复制数据,可以使用 Lakehouse Federation 或 Delta Sharing。
选择Delta Sharing用于以下场景:
- 限制数据重复
- 查询可能的最新数据
对于以下情形,请选择 Lakehouse 联邦系统:
- 有关 ETL 管道的临时报告或概念证明工作
注意事项 | 指南 |
---|---|
数据源 | 如果数据源存在 Lakeflow Connect 本机连接器,则这是引入数据的最简单方法。 对于 Lakeflow Connect 不支持的数据源,请从源中提取数据,然后使用自动加载程序将数据引入 Databricks。 对于本地文件,请使用 Databricks UI 上传数据。 |
延迟 | 如果要近乎实时地分析数据,请使用流式处理来利用增量处理。 通过流式处理,每个记录到达后即可对数据进行查询。 否则,请使用批量导入。 |
数据移动 | 如果无法将数据从源系统复制到 Databricks,请使用 Lakehouse Federation 或 Delta Sharing。 |
要了解如何将现有数据迁移到 Delta Lake,请参阅将数据迁移到 Delta Lake。
CREATE STREAMING TABLE
建议使用 SQL 命令替代旧COPY INTO
版 SQL 命令,以便从云对象存储进行增量引入。 请参阅 COPY INTO。 为了获得更具可扩展性且更可靠的文件引入体验,Databricks 建议 SQL 用户使用流式处理表而不是 COPY INTO
。