可以使用 DLT 从 Azure Databricks 上的 Apache Spark 支持的任何数据源加载数据。 可以在 DLT 中针对返回 Spark 数据帧的任何查询定义数据集(表和视图),包括流式处理数据帧和 Pandas for Spark 数据帧。 对于数据引入任务,Databricks 建议为大多数用例使用流式表。 流式表非常适合用于通过自动加载程序或 Kafka 等消息总线从云对象存储引入数据。
备注
- 并非所有数据源都支持 SQL。 可以在 DLT 管道中混合使用 SQL 和 Python 笔记本,以便在引入之外的所有操作中使用 SQL。
- 有关使用默认情况下未打包在 DLT 中的库的详细信息,请参阅 管理 DLT 管道的 Python 依赖项。
- 有关 Azure Databricks 中引入的常规信息,请参阅 Lakeflow Connect 中的标准连接器。
以下示例演示了一些常用模式。
从 Azure Databricks 中的任何现有表加载数据。 可以使用查询来转换数据,或加载该表以便在管道中进行进一步处理。
以下示例从现有表读取数据:
@dlt.table(
comment="A table summarizing counts of the top baby names for Beijing for 2021."
)
def top_baby_names_2021():
return (
spark.read.table("baby_names_prepared")
.filter(expr("Year_Of_Birth == 2021"))
.groupBy("First_Name")
.agg(sum("Count").alias("Total_Count"))
.sort(desc("Total_Count"))
)
CREATE OR REFRESH MATERIALIZED VIEW top_baby_names_2021
COMMENT "A table summarizing counts of the top baby names for Beijing for 2021."
AS SELECT
First_Name,
SUM(Count) AS Total_Count
FROM baby_names_prepared
WHERE Year_Of_Birth = 2021
GROUP BY First_Name
ORDER BY Total_Count DESC
Databricks 建议将自动加载程序与 DLT 配合使用,以便从云对象存储或 Unity 目录卷中的文件执行大多数数据引入任务。 自动加载程序和 DLT 能够以增量方式和幂等方式加载进入云存储的、不断增长的数据。
请参阅 什么是自动加载程序? 以及 从对象存储加载数据。
以下示例使用自动加载程序从云存储读取数据:
@dlt.table
def customers():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("abfss://myContainer@myStorageAccount.dfs.core.chinacloudapi.cn/analysis/*/*/*.json")
)
CREATE OR REFRESH STREAMING TABLE sales
AS SELECT *
FROM STREAM read_files(
'abfss://myContainer@myStorageAccount.dfs.core.chinacloudapi.cn/analysis/*/*/*.json',
format => "json"
);
以下示例使用自动加载程序从 Unity 目录卷中的 CSV 文件创建数据集:
@dlt.table
def customers():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/Volumes/my_catalog/retail_org/customers/")
)
CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM STREAM read_files(
"/Volumes/my_catalog/retail_org/customers/",
format => "csv"
)
备注
- 如果将 Auto Loader 与文件通知配合使用,并对你的管道或流式表进行完全刷新,则你必须手动清理你的资源。 可以使用笔记本中的 CloudFilesResourceManager 执行清理。
- 若要在启用 Unity Catalog 的管道中使用 Auto Loader 加载文件,必须使用 外部位置。 若要详细了解如何将 Unity 目录与 DLT 配合使用,请参阅 将 Unity 目录与 DLT 管道配合使用。
可以将 DLT 管道配置为从消息总线引入数据。 Databricks 建议使用流式处理表与连续执行和增强型自动缩放,以最高效地引入来自消息总线的低延迟加载。 请参阅 使用自动缩放优化 DLT 管道的群集利用率。
例如,以下代码使用 read_kafka 函数将流式处理表配置为从 Kafka 引入数据:
import dlt
@dlt.table
def kafka_raw():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafka_server:9092")
.option("subscribe", "topic1")
.load()
)
CREATE OR REFRESH STREAMING TABLE kafka_raw AS
SELECT *
FROM STREAM read_kafka(
bootstrapServers => 'kafka_server:9092',
subscribe => 'topic1'
);
若要从其他消息总线源引入,请参阅:
Kinesis:read_kinesis
脉冲:read_pulsar
Azure 事件中心是一种数据流式处理服务,提供 Apache Kafka 兼容接口。 可以使用 DLT 运行时中包含的结构化流式处理 Kafka 连接器从 Azure 事件中心加载消息。 若要详细了解如何从 Azure 事件中心加载和处理消息,请参阅 使用 Azure 事件中心作为 DLT 数据源。
DLT 支持从 Azure Databricks 支持的任何数据源加载数据。 请参阅 “连接到数据源”。 也可使用 Lakehouse Federation 为受支持的数据源加载外部数据。 由于 Lakehouse Federation 需要 Databricks Runtime 13.3 LTS 或更高版本,因此,若要使用 Lakehouse Federation,管道必须配置为使用预览通道。
某些数据源在 SQL 中没有等效支持。 如果无法将 Lakehouse 联邦与这些数据源之一配合使用,您可以使用 Python 笔记本从该源引入数据。 可以将 Python 和 SQL 源代码添加到同一 DLT 管道。 以下示例声明了一个具体化视图,用于访问远程 PostgreSQL 表中数据的当前状态:
import dlt
@dlt.table
def postgres_raw():
return (
spark.read
.format("postgresql")
.option("dbtable", table_name)
.option("host", database_host_url)
.option("port", 5432)
.option("database", database_name)
.option("user", username)
.option("password", password)
.load()
)
可以使用 Apache Spark 加载语法加载小型或静态数据集。 DLT 支持 Azure Databricks 上的 Apache Spark 支持的所有文件格式。 要获取完整列表,请参阅“数据格式”选项。
以下示例演示如何加载 JSON 以创建 DLT 表:
@dlt.table
def clickstream_raw():
return (spark.read.format("json").load("/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"))
CREATE OR REFRESH MATERIALIZED VIEW clickstream_raw
AS SELECT * FROM read_files(
"/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"
)
备注
SQL read_files
函数适用于 Azure Databricks 上的所有 SQL 环境。 这是一种推荐的模式,建议使用 SQL 和 DLT 直接访问文件。 有关详细信息,请参阅 选项。
备注
默认情况下,流式处理表需要“仅追加”源。 如果一个流式表使用另一个流式表作为源,而源流式表需要执行更新或删除操作(例如 GDPR 的“被遗忘权”处理),可以在读取源流式表时设置 skipChangeCommits
标志来忽略那些更改。 有关此标志的详细信息,请参阅 “忽略更新和删除”。
@table
def b():
return spark.readStream.option("skipChangeCommits", "true").table("A")
可以使用 Azure Databricks 机密来存储凭据(例如访问密钥或密码)。 若要在管道中配置机密,请在管道设置群集配置中使用一个 Spark 属性。 请参阅为 DLT 管道配置计算。
以下示例使用机密来存储使用 自动加载程序从 Azure Data Lake Storage (ADLS) 存储帐户读取输入数据所需的访问密钥。 同样可以使用这种方法来配置管道所需的任何机密,例如用于访问 S3 的 AWS 密钥,或用于访问 Apache Hive 元存储的密码。
若要详细了解如何使用 Azure Data Lake Storage,请参阅 连接到 Azure Data Lake Storage 和 Blob 存储。
备注
必须将 spark.hadoop.
前缀添加到用于设置机密值的 spark_conf
配置密钥。
{
"id": "43246596-a63f-11ec-b909-0242ac120002",
"clusters": [
{
"spark_conf": {
"spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.chinacloudapi.cn": "{{secrets/<scope-name>/<secret-name>}}"
},
"autoscale": {
"min_workers": 1,
"max_workers": 5,
"mode": "ENHANCED"
}
}
],
"development": true,
"continuous": false,
"libraries": [
{
"notebook": {
"path": "/Users/user@databricks.com/DLT Notebooks/DLT quickstart"
}
}
],
"name": "DLT quickstart using ADLS2"
}
将
-
<storage-account-name>
替换为 ADLS 存储帐户名称。 - 将
<scope-name>
替换为 Azure Databricks 机密范围名称。 -
<secret-name>
替换为包含 Azure 存储帐户访问密钥的密钥的名称。
import dlt
json_path = "abfss://<container-name>@<storage-account-name>.dfs.core.chinacloudapi.cn/<path-to-input-dataset>"
@dlt.create_table(
comment="Data ingested from an ADLS2 storage account."
)
def read_from_ADLS2():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load(json_path)
)
将
- 将
<container-name>
替换为用于存储输入数据的 Azure 存储帐户容器的名称。 -
<storage-account-name>
替换为 ADLS 存储帐户名称。 -
<path-to-input-dataset>
替换为输入数据集的路径。