共用方式為

使用 Lakeflow 声明性管道加载数据

可以使用 Lakeflow 声明性管道从 Azure Databricks 上的 Apache Spark 支持的任何数据源加载数据。 可以在 Lakeflow 的声明性管道中定义数据集(表和视图),这些数据集可以针对任何返回 Spark 数据帧的查询进行定义,包括流式数据帧和适用于 Spark 数据帧的 Pandas。 对于数据引入任务,Databricks 建议为大多数用例使用流式表。 流式表非常适合用于通过自动加载程序或 Kafka 等消息总线从云对象存储引入数据。

注释

以下示例演示了一些常用模式。

从现有表加载

从 Azure Databricks 中的任何现有表加载数据。 可以使用查询来转换数据,或加载该表以便在管道中进行进一步处理。

以下示例从现有表读取数据:

Python

@dlt.table(
  comment="A table summarizing counts of the top baby names for Beijing for 2021."
)
def top_baby_names_2021():
  return (
    spark.read.table("baby_names_prepared")
      .filter(expr("Year_Of_Birth == 2021"))
      .groupBy("First_Name")
      .agg(sum("Count").alias("Total_Count"))
      .sort(desc("Total_Count"))
  )

SQL

CREATE OR REFRESH MATERIALIZED VIEW top_baby_names_2021
COMMENT "A table summarizing counts of the top baby names for Beijing for 2021."
AS SELECT
  First_Name,
  SUM(Count) AS Total_Count
FROM baby_names_prepared
WHERE Year_Of_Birth = 2021
GROUP BY First_Name
ORDER BY Total_Count DESC

从云对象存储加载文件

Databricks 建议将 Auto Loader 自动加载程序与 Lakeflow 声明性管道配合使用,以便从云对象存储或 Unity Catalog 卷中的文件获取数据。 自动加载器和 Lakeflow 声明性管道旨在随着数据到达云存储而以增量方式和幂等方式加载不断增长的数据。

请参阅 什么是自动加载程序? 以及 从对象存储加载数据

以下示例使用自动加载程序从云存储读取数据:

Python

@dlt.table
def customers():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load("abfss://myContainer@myStorageAccount.dfs.core.chinacloudapi.cn/analysis/*/*/*.json")
  )

SQL

CREATE OR REFRESH STREAMING TABLE sales
  AS SELECT *
  FROM STREAM read_files(
    'abfss://myContainer@myStorageAccount.dfs.core.chinacloudapi.cn/analysis/*/*/*.json',
    format => "json"
  );

以下示例使用自动加载程序从 Unity 目录卷中的 CSV 文件创建数据集:

Python

@dlt.table
def customers():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/Volumes/my_catalog/retail_org/customers/")
  )

SQL

CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM STREAM read_files(
  "/Volumes/my_catalog/retail_org/customers/",
  format => "csv"
)

注释

  • 如果将 Auto Loader 与文件通知配合使用,并对你的管道或流式表进行完全刷新,则你必须手动清理你的资源。 可以使用笔记本中的 CloudFilesResourceManager 执行清理。
  • 若要在启用 Unity Catalog 的管道中使用 Auto Loader 加载文件,必须使用 外部位置。 若要详细了解如何将 Unity 目录与 Lakeflow 声明性管道配合使用,请参阅 将 Unity 目录与 Lakeflow 声明性管道配合使用

从消息总线加载数据

可以将 Lakeflow 声明性管道配置为从消息总线引入数据。 Databricks 建议使用流式处理表与连续执行和增强型自动缩放,以最高效地引入来自消息总线的低延迟加载。 请参阅 使用自动缩放优化 Lakeflow 声明性管道的集群利用率

例如,以下代码使用 read_kafka 函数将流式处理表配置为从 Kafka 引入数据:

Python

import dlt

@dlt.table
def kafka_raw():
  return (
    spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "kafka_server:9092")
      .option("subscribe", "topic1")
      .load()
  )

SQL

CREATE OR REFRESH STREAMING TABLE kafka_raw AS
  SELECT *
  FROM STREAM read_kafka(
    bootstrapServers => 'kafka_server:9092',
    subscribe => 'topic1'
  );

若要从其他消息总线源引入,请参阅:

从 Azure 事件中心加载数据

Azure 事件中心是一种数据流式处理服务,提供 Apache Kafka 兼容接口。 可以使用 Lakeflow 声明性管道运行时中包含的结构化流式处理 Kafka 连接器从 Azure 事件中心加载消息。 若要详细了解如何从 Azure 事件中心加载和处理消息,请参阅 使用 Azure 事件中心作为 Lakeflow 声明性管道数据源

从外部系统加载数据

Lakeflow 声明性管道支持从 Azure Databricks 支持的任何数据源加载数据。 请参阅 “连接到数据源和外部服务”。 也可使用 Lakehouse Federation 为受支持的数据源加载外部数据。 由于 Lakehouse Federation 需要 Databricks Runtime 13.3 LTS 或更高版本,因此,若要使用 Lakehouse Federation,管道必须配置为使用预览通道

某些数据源在 SQL 中没有等效支持。 如果无法将 Lakehouse 联邦与这些数据源之一配合使用,您可以使用 Python 笔记本从该源引入数据。 可以将 Python 和 SQL 源代码添加到同一管道。 以下示例声明了一个具体化视图,用于访问远程 PostgreSQL 表中数据的当前状态:

import dlt

@dlt.table
def postgres_raw():
  return (
    spark.read
      .format("postgresql")
      .option("dbtable", table_name)
      .option("host", database_host_url)
      .option("port", 5432)
      .option("database", database_name)
      .option("user", username)
      .option("password", password)
      .load()
  )

从云对象存储加载小型或静态数据集

可以使用 Apache Spark 加载语法加载小型或静态数据集。 Lakeflow 声明性管道支持 Azure Databricks 上的 Apache Spark 支持的所有文件格式。 要获取完整列表,请参阅“数据格式”选项

以下示例演示如何加载 JSON 以创建 Lakeflow 声明性管道表:

Python

@dlt.table
def clickstream_raw():
  return (spark.read.format("json").load("/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"))

SQL

CREATE OR REFRESH MATERIALIZED VIEW clickstream_raw
AS SELECT * FROM read_files(
  "/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"
)

注释

SQL read_files 函数适用于 Azure Databricks 上的所有 SQL 环境。 建议的模式是使用 SQL 和 Lakeflow 声明性管道进行直接文件访问。 有关详细信息,请参阅 选项

将流式表配置为忽略源流式表中的更改

注释

  • skipChangeCommits 标志仅适用于使用 spark.readStream 函数的 option()。 不能在 dlt.read_stream() 函数中使用此标志。
  • 当源流式处理表被定义为skipChangeCommits函数的目标时,不能使用标志。

默认情况下,流式处理表需要“仅追加”源。 如果一个流式表使用另一个流式表作为源,而源流式表需要执行更新或删除操作(例如 GDPR 的“被遗忘权”处理),可以在读取源流式表时设置 skipChangeCommits 标志来忽略那些更改。 有关此标志的详细信息,请参阅 “忽略更新和删除”。

@table
def b():
   return spark.readStream.option("skipChangeCommits", "true").table("A")

使用管道中的机密安全访问存储凭据

可以使用 Azure Databricks 机密来存储凭据(例如访问密钥或密码)。 若要在管道中配置机密,请在管道设置群集配置中使用一个 Spark 属性。 请参阅 配置 Lakeflow 声明性管道的经典计算

以下示例使用机密来存储使用 自动加载程序从 Azure Data Lake Storage (ADLS) 存储帐户读取输入数据所需的访问密钥。 同样可以使用这种方法来配置管道所需的任何机密,例如用于访问 S3 的 AWS 密钥,或用于访问 Apache Hive 元存储的密码。

若要详细了解如何使用 Azure Data Lake Storage,请参阅 连接到 Azure Data Lake Storage 和 Blob 存储

注释

必须将 spark.hadoop. 前缀添加到用于设置机密值的 spark_conf 配置密钥。

{
  "id": "43246596-a63f-11ec-b909-0242ac120002",
  "clusters": [
    {
      "spark_conf": {
        "spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.chinacloudapi.cn": "{{secrets/<scope-name>/<secret-name>}}"
      },
      "autoscale": {
        "min_workers": 1,
        "max_workers": 5,
        "mode": "ENHANCED"
      }
    }
  ],
  "development": true,
  "continuous": false,
  "libraries": [
    {
      "notebook": {
        "path": "/Users/user@databricks.com/:re[LDP] Notebooks/:re[LDP] quickstart"
      }
    }
  ],
  "name": ":re[LDP] quickstart using ADLS2"
}

  • <storage-account-name> 替换为 ADLS 存储帐户名称。
  • <scope-name> 替换为 Azure Databricks 机密范围名称。
  • <secret-name> 替换为包含 Azure 存储帐户访问密钥的密钥的名称。
import dlt

json_path = "abfss://<container-name>@<storage-account-name>.dfs.core.chinacloudapi.cn/<path-to-input-dataset>"
@dlt.create_table(
  comment="Data ingested from an ADLS2 storage account."
)
def read_from_ADLS2():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load(json_path)
  )

  • <container-name> 替换为用于存储输入数据的 Azure 存储帐户容器的名称。
  • <storage-account-name> 替换为 ADLS 存储帐户名称。
  • <path-to-input-dataset> 替换为输入数据集的路径。