将数据引入 Azure Databricks 湖屋

Azure Databricks 提供了各种方法,用于将数据引入到 Delta Lake 支持的湖屋中。 本文列出了支持的引入工具和指南,其中介绍了基于数据源和延迟等条件使用的方法。

引入方法

可以使用以下方法将数据引入 Databricks:

  • 批量导入不经常处理的数据行集
  • 流式引入单个数据行或数据行集以便在数据到达时进行实时处理

引入的数据将加载到 Delta 表中 ,然后可在下游数据和 AI 用例中使用。 由于 Databricks 的 Lakehouse 体系结构,无需跨用例复制数据,并且可以利用 Unity 目录跨所有数据集中进行访问控制、审核、世系和数据发现。

批量摄取

使用批量引入可将数据作为行集(或批)加载到 Databricks 中,此操作通常基于计划(例如每日)或手动触发。 这表示传统提取、转换、加载(ETL)用例的“提取”部分。 可以使用批量导入从以下位置加载数据:

  • 本地文件(如 CSV)

  • 云对象存储,包括 Amazon S3 和 Azure Data Lake Storage

  • SaaS 应用程序(如 Salesforce)和 SQL Server 等数据库

批处理引入支持各种文件格式,包括 CSV、TSV、JSON、XML、Avro、ORC、Parquet 和文本文件。

Databricks 支持传统的批处理引入和增量批处理引入选项。 虽然传统批处理引入每次运行时都会处理所有记录,但增量批处理引入会自动检测数据源中的新记录,并忽略已引入的记录。 这意味着需要处理较少的数据,因此引入作业运行速度更快,并更高效地使用计算资源。

传统的(一次性)批量导入

可以使用添加数据 UI 从公共 URL 上传本地数据文件或下载文件。 请参阅 “上传文件”。

增量批处理引入

本部分介绍支持的增量批处理引入工具。

流式处理表

CREATE STREAMING TABLE SQL 命令允许以增量方式将数据从云对象存储加载到流式处理表中。 请参阅 CREATE STREAMING TABLE

示例:使用流式处理表进行增量批处理引入

CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM STREAM read_files(
  "/databricks-datasets/retail-org/customers/",
  format => "csv"
)

云对象存储连接器

内置云对象存储连接器 Auto Loader(自动加载器)允许您在新的数据文件到达 Amazon S3(S3)或 Azure Data Lake Storage Gen 2(ALDS2)时,以增量且高效的方式处理它们。 请参阅 自动加载程序

示例:使用自动加载程序以增量方式批量引入

df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "csv")
  .option("rescuedDataColumn", "_rescued_data")
  .schema("/databricks-datasets/retail-org/customers/schema")
  .load("/databricks-datasets/retail-org/customers/")

完全托管的连接器

Lakeflow Connect 提供完全托管的连接器,用于从 SaaS 应用程序和数据库引入。 可通过以下方式使用托管的连接器:

  • Databricks 用户界面
  • Databricks 命令行界面 (CLI)
  • Databricks API 接口
  • Databricks SDK
  • Databricks 资产捆绑包

流式引入

通过流式引入,可以在数据生成时连续加载数据行或数据行批次,以便在数据到达时近乎实时地对其进行查询。 可以使用流式数据摄入来从 Apache Kafka、Amazon Kinesis 和 Apache Pulsar 等来源加载流数据。

Databricks 还支持使用内置连接器进行流式引入。 使用这些连接器可以增量且有效地处理从流数据源到来的新数据。 请参阅配置流式处理数据源

示例:从 Kafka 流式引入

spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "<server:ip>")
    .option("subscribe", "topic1")
    .option("startingOffsets", "latest")
    .load()

使用 DLT 进行批处理和流式引入

Databricks 建议使用 DLT 生成可靠且可缩放的数据处理管道。 DLT 同时支持批处理和流式引入,并且可以从自动加载程序支持的任何数据源引入数据。

示例:使用 DLT 进行增量批处理引入

@dlt.table
def customers():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/databricks-datasets/retail-org/customers/")
  )

示例:使用 DLT 从 Kafka 进行流式引入

@dlt.table
def kafka_raw():
  return (
    spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "<server:ip>")
      .option("subscribe", "topic1")
      .option("startingOffsets", "latest")
      .load()
  )

引入计划

可以作为一次性操作引入数据、按定期计划引入数据或连续引入数据。

  • 对于准实时流式处理用例,请使用连续模式。
  • 对于批量引入用例,请一次性引入或设置重复计划。

请参阅触发与连续管道模式

数据导入合作伙伴

许多第三方工具支持批量引入或流式引入到 Databricks。 Databricks 会验证各种第三方集成,尽管配置对源系统的访问并引入数据的步骤因工具而异。

DIY 引入

Databricks 提供一个常规计算平台。 因此,可以使用 Databricks 支持的任何编程语言(如 Python 或 Java)创建自己的引入连接器。 还可以导入和利用常用的开源连接器库,例如数据加载工具、Airbyte 和 Debezium。

引入替代项

Databricks 建议引入大多数用例,因为它可缩放以适应数据量大、低延迟查询和第三方 API 限制。 引入会将您的数据从源系统复制到 Azure Databricks,这会导致数据冗余,并可能随着时间推移变得过时。 如果不想复制数据,可以使用以下工具:

  • Lakehouse Federation 允许在不移动数据的情况下查询外部数据源。
  • 增量共享 允许跨平台、云和区域安全地共享数据。

但是,如果您不想复制数据,可以使用 Lakehouse Federation 或 Delta Sharing。

何时使用Delta Sharing

选择Delta Sharing用于以下场景:

  • 限制数据重复
  • 查询可能的最新数据

何时使用 Lakehouse 联合身份验证

对于以下情形,请选择 Lakehouse 联邦系统:

  • 有关 ETL 管道的临时报告或概念证明工作

选择引入方法时的注意事项

注意事项 指南
数据源 如果数据源存在 Lakeflow Connect 本机连接器,则这是引入数据的最简单方法。 对于 Lakeflow Connect 不支持的数据源,请从源中提取数据,然后使用自动加载程序将数据引入 Databricks。 对于本地文件,请使用 Databricks UI 上传数据。
延迟 如果要近乎实时地分析数据,请使用流式处理来利用增量处理。 通过流式处理,每个记录到达后即可对数据进行查询。 否则,请使用批量导入。
数据移动 如果无法将数据从源系统复制到 Databricks,请使用 Lakehouse Federation 或 Delta Sharing。

将数据迁移到 Delta Lake

要了解如何将现有数据迁移到 Delta Lake,请参阅将数据迁移到 Delta Lake

COPY INTO (旧版)

CREATE STREAMING TABLE建议使用 SQL 命令替代旧COPY INTO版 SQL 命令,以便从云对象存储进行增量引入。 请参阅 COPY INTO。 为了获得更具可扩展性且更可靠的文件引入体验,Databricks 建议 SQL 用户使用流式处理表而不是 COPY INTO