使用增量实时表加载数据

可以使用增量实时表在 Azure Databricks 上从 Apache Spark 支持的任何数据源加载数据。 可以在增量实时表中针对返回 Spark 数据帧的任何查询定义数据集(表和视图),包括流式处理数据帧和 Pandas for Spark 数据帧。 对于数据引入任务,Databricks 建议为大多数用例使用流式处理表。 流式处理表非常适合用于通过自动加载程序或 Kafka 等消息总线从云对象存储引入数据。 以下示例演示了一些常用模式。

重要

并非所有数据源都支持 SQL。 可以在增量实时表管道中混合使用 SQL 和 Python 笔记本,以便将 SQL 用于除引入之外的所有操作。

有关使用默认未打包在增量实时表中的库的详细信息,请参阅管道依赖项

从云对象存储加载文件

对于从云对象存储引入数据的大多数任务,Databricks 建议将自动加载程序与增量实时表配合使用。 自动加载程序和增量实时表能够以增量方式和幂等方式加载进入云存储的、不断增长的数据。 以下示例使用自动加载程序从 CSV 和 JSON 文件创建数据集:

注意

若要在启用了 Unity Catalog 的管道中使用自动加载程序加载文件,必须使用外部位置。 若要详细了解如何将 Unity Catalog 与 Delta Live Tables 配合使用,请参阅将 Unity Catalog 与 Delta Live Tables 管道配合使用

Python

@dlt.table
def customers():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/databricks-datasets/retail-org/customers/")
  )

@dlt.table
def sales_orders_raw():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load("/databricks-datasets/retail-org/sales_orders/")
  )

SQL

CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/customers/", "csv")

CREATE OR REFRESH STREAMING TABLE sales_orders_raw
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/sales_orders/", "json")

请参阅什么是自动加载程序?自动加载程序 SQL 语法

警告

如果将自动加载程序与文件通知配合使用,并对管道或流式处理表运行完全刷新,则必须手动清理资源。 可以使用笔记本中的 CloudFilesResourceManager 执行清理。

从消息总线加载数据

可以将增量实时表管道配置为使用流式处理表从消息总线引入数据。 Databricks 建议将流式处理表与连续执行和增强型自动缩放相结合,以最高效地引入来自消息总线的低延迟加载。 请参阅什么是增强型自动缩放?

例如,以下代码将流式处理表配置为从 Kafka 引入数据:

import dlt

@dlt.table
def kafka_raw():
  return (
    spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "<server:ip>")
      .option("subscribe", "topic1")
      .option("startingOffsets", "latest")
      .load()
  )

可以使用纯 SQL 编写下游操作来对这些数据执行流式转换,如以下示例所示:

CREATE OR REFRESH STREAMING TABLE streaming_silver_table
AS SELECT
  *
FROM
  STREAM(LIVE.kafka_raw)
WHERE ...

有关使用事件中心的示例,请参阅使用 Azure 事件中心作为增量实时表数据源

请参阅配置流式处理数据源

从外部系统加载数据

增量实时表支持从 Azure Databricks 所支持的任何数据源加载数据。 请参阅连接到数据源。 也可使用 Lakehouse Federation 为受支持的数据源加载外部数据。 由于 Lakehouse Federation 需要 Databricks Runtime 13.1 或更高版本,因此,若要使用 Lakehouse Federation,管道必须配置为使用预览通道

某些数据源在 SQL 中没有等效支持。 如果无法将 Lakehouse 联合身份验证与其中一个数据源配合使用,则可以使用独立的 Python 笔记本从源引入数据。 然后,可以使用 SQL 笔记本将此笔记本添加为源库,以生成增量实时表管道。 以下示例声明了一个具体化视图,用于访问远程 PostgreSQL 表中数据的当前状态:

import dlt

@dlt.table
def postgres_raw():
  return (
    spark.read
      .format("postgresql")
      .option("dbtable", table_name)
      .option("host", database_host_url)
      .option("port", 5432)
      .option("database", database_name)
      .option("user", username)
      .option("password", password)
      .load()
  )

从云对象存储加载小型或静态数据集

可以使用 Apache Spark 加载语法加载小型或静态数据集。 增量实时表支持 Azure Databricks 上的 Apache Spark 所支持的所有文件格式。 要获取完整列表,请参阅“数据格式”选项

以下示例演示如何加载 JSON 以创建增量实时表表:

Python

@dlt.table
def clickstream_raw():
  return (spark.read.format("json").load("/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"))

SQL

CREATE OR REFRESH LIVE TABLE clickstream_raw
AS SELECT * FROM json.`/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json`;

注意

SELECT * FROM format.`path`; SQL 构造对于 Azure Databricks 上的所有 SQL 环境是通用的。 这是结合使用 SQL 和增量实时表进行直接文件访问的建议模式。

使用管道中的机密安全访问存储凭据

可以使用 Azure Databricks 机密来存储凭据(例如访问密钥或密码)。 若要在管道中配置机密,请在管道设置群集配置中使用一个 Spark 属性。 请参阅配置计算设置

以下示例使用机密来存储通过自动加载程序从 Azure Data Lake Storage Gen2 (ADLS Gen2) 存储帐户读取输入数据所需的访问密钥。 同样可以使用这种方法来配置管道所需的任何机密,例如用于访问 S3 的 AWS 密钥,或用于访问 Apache Hive 元存储的密码。

若要详细了解如何使用 Azure Data Lake Storage Gen2,请参阅连接到 Azure Data Lake Storage Gen2 和 Blob 存储

注意

必须将 spark.hadoop. 前缀添加到用于设置机密值的 spark_conf 配置密钥。

{
    "id": "43246596-a63f-11ec-b909-0242ac120002",
    "clusters": [
      {
        "spark_conf": {
          "spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.chinacloudapi.cn": "{{secrets/<scope-name>/<secret-name>}}"
        },
        "autoscale": {
          "min_workers": 1,
          "max_workers": 5,
          "mode": "ENHANCED"
        }
      }
    ],
    "development": true,
    "continuous": false,
    "libraries": [
      {
        "notebook": {
          "path": "/Users/user@databricks.com/DLT Notebooks/Delta Live Tables quickstart"
        }
      }
    ],
    "name": "DLT quickstart using ADLS2"
}

Replace

  • <storage-account-name> 替换为 ADLS Gen2 存储帐户名称。
  • <scope-name> 替换为 Azure Databricks 机密范围名称。
  • <secret-name> 替换为包含 Azure 存储帐户访问密钥的密钥的名称。
import dlt

json_path = "abfss://<container-name>@<storage-account-name>.dfs.core.chinacloudapi.cn/<path-to-input-dataset>"
@dlt.create_table(
  comment="Data ingested from an ADLS2 storage account."
)
def read_from_ADLS2():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load(json_path)
  )

替换

  • <container-name> 替换为用于存储输入数据的 Azure 存储帐户容器的名称。
  • <storage-account-name> 替换为 ADLS Gen2 存储帐户名称。
  • <path-to-input-dataset> 替换为输入数据集的路径。