开发 DLT 管道

开发和测试管道代码与其他 Apache Spark 工作负载不同。 本文概述了开发管道代码时支持的功能、最佳做法和注意事项。 有关更多建议和最佳做法,请参阅 将软件开发和 DevOps 最佳做法应用于 DLT 管道

备注

必须将源代码添加到管道配置,以验证代码或运行更新。 请参阅 配置 DLT 管道

哪些文件对管道源代码有效?

DLT 管道代码可以是 Python 或 SQL。 可以混合使用支持单个管道的 Python 和 SQL 源代码文件,但每个文件只能包含一种语言。 请参阅 使用 Python 开发管道代码 并使用 SQL 开发管道代码

指定管道的源代码时,可以使用笔记本和工作区文件。 工作区文件表示在首选 IDE 或 Databricks 文件编辑器中创作的 Python 或 SQL 脚本。 请参阅什么是工作区文件?

如果将 Python 代码开发为模块或库,则必须安装和导入代码,然后从配置为源代码的 Python 笔记本或工作区文件调用方法。 请参阅 管理 DLT 管道的 Python 依赖项

备注

如果需要在 Python 笔记本中使用任意 SQL 命令,可以使用语法模式 spark.sql("<QUERY>") 以 Python 代码的形式运行 SQL。

Unity 目录函数允许注册任意 Python 用户定义的函数,以便在 SQL 中使用。 请参阅 Unity Catalog 中的用户定义函数 (UDF)

DLT 开发功能的概述

DLT 扩展并利用了许多 Azure Databricks 功能,并引入了新功能和概念。 下表简要概述了支持管道代码开发的概念和功能:

功能 / 特点 DESCRIPTION
开发模式 默认情况下,新管道配置为在开发模式下运行。 Databricks 建议使用开发模式进行交互式开发和测试。 请参阅开发和生产模式
驗證 Validate 更新验证管道源代码的正确性,而无需在任何表上运行更新。 请参阅 检查管道是否存在错误,而无需等待表更新
笔记本电脑 配置为 DLT 管道源代码的笔记本提供用于验证代码和运行更新的交互式选项。 请参阅 在 DLT 中使用笔记本开发和调试 ETL 管道
参数 利用源代码和管道配置中的参数来简化测试和扩展性。 请参阅 DLT 管道使用参数
Databricks 资产捆绑包 Databricks 资产捆绑包允许在工作区之间移动管道配置和源代码。 请参阅 将 DLT 管道转换为 Databricks 资产捆绑项目

创建用于开发和测试的示例数据集

Databricks 建议创建开发和测试数据集,以测试管道逻辑,所用数据包括预期数据以及可能格式不正确或损坏的记录。 有多种方法可以创建可用于开发和测试的数据集,包括:

  • 从生产数据集中选择一部分数据。
  • 对包含 PII 的源使用匿名或人为生成的数据。
  • 基于下游转换逻辑创建具有定义良好的结果的测试数据。
  • 通过创建破坏数据架构期望的记录来预测潜在的数据损坏、格式不正确的记录和上游数据更改。

例如,如果你有一个使用以下代码定义数据集的笔记本:

CREATE OR REFRESH STREAMING TABLE input_data
AS SELECT * FROM STREAM read_files(
  "/production/data",
  format => "json")

可以使用如下所示的查询创建包含特定记录的示例数据集:

CREATE OR REFRESH MATERIALIZED VIEW input_data AS
SELECT "2021/09/04" AS date, 22.4 as sensor_reading UNION ALL
SELECT "2021/09/05" AS date, 21.5 as sensor_reading

以下示例演示如何筛选已发布的数据,以创建用于开发或测试的生产数据的子集:

CREATE OR REFRESH MATERIALIZED VIEW input_data AS SELECT * FROM prod.input_data WHERE date > current_date() - INTERVAL 1 DAY

若要使用这些不同的数据集,请使用实现转换逻辑的笔记本创建多个管道。 每个管道都可以从 input_data 数据集读取数据,但其配置为包含用于创建适用于特定环境的数据集的笔记本。

DLT 数据集如何处理数据?

下表介绍了具体化视图、流式处理表和视图如何处理数据:

数据集类型 如何通过定义的查询处理记录?
流式处理表 每条记录只处理一次。 此方式采用仅追加源。
具体化视图 根据需要处理记录,以返回当前数据状态的准确结果。 应将具体化视图用于数据处理任务,例如转换、聚合或预先计算速度缓慢的查询和频繁使用的计算。
查看 每次查询视图时都会处理记录。 将视图用于不应发布到公共数据集的中间转换和数据质量检查。

在 DLT 中定义您的第一个数据集

DLT 引入了 Python 和 SQL 的新语法。 若要了解管道语法的基础知识,请参阅使用 Python 开发管道代码使用 SQL 开发管道代码

备注

DLT 将数据集定义与更新处理分开,DLT 笔记本不适用于交互式执行。

如何配置 DLT 管道?

DLT 管道的设置分为两大类:

  1. 用于定义笔记本或文件的集合(称为 源代码)的配置,这些笔记本或文件使用 DLT 语法声明数据集。

  2. 用于控制管道基础结构、依赖项管理、如何处理更新以及如何在工作区中保存表的配置。

大多数配置是可选的,但有些配置需要特别注意,尤其是在配置生产管道时。 其中包括:

  • 要使数据在管道外部可用,你必须声明一个目标架构以发布到 Hive 云存储,或者声明一个目标目录和目标架构以发布到 Unity Catalog。
  • 数据访问权限是通过用于执行的群集来配置的。 确保群集具有针对数据源和目标 存储位置(如果指定)的适当权限。

有关使用 Python 和 SQL 编写管道源代码的详细信息,请参阅 DLT SQL 语言参考DLT Python 语言参考

有关管道设置和配置的详细信息,请参阅 “配置 DLT 管道”。

部署第一个管道并触发更新

在使用 DLT 处理数据之前,必须配置管道。 配置管道后,可以触发更新以计算管道中每个数据集的结果。 若要开始使用 DLT 管道,请参阅 教程:通过 DLT 使用变更数据捕获生成 ETL 管道

什么是管道更新?

当你开始更新时,管道会部署基础结构并重新计算数据状态。 更新执行以下操作:

  • 使用正确的配置启动群集。
  • 发现定义的所有表和视图,并检查是否存在任何分析错误,例如无效的列名、缺少依赖项和语法错误。
  • 使用最新的可用数据创建或更新表和视图。

可以根据用例的成本和延迟要求连续或按计划运行管道。 请参阅在 DLT 管道上运行更新

使用 DLT 引入数据

DLT 支持 Azure Databricks 中提供的所有数据源。

Databricks 建议为大多数引入用例使用流式处理表。 对于进入云对象存储的文件,Databricks 建议使用自动加载程序。 可以直接从大多数消息总线使用 DLT 引入数据。

若要详细了解如何配置对云存储的访问,请参阅云存储配置

对于自动加载程序不支持的格式,可以使用 Python 或 SQL 查询 Apache Spark 支持的任何格式。 请参阅 使用 DLT 加载数据

监控和保证数据质量

可以使用期望来指定对数据集内容的数据质量控制。 与传统数据库中的 CHECK 约束(用于阻止添加任何不符合约束的记录)不同,预期在处理不符合数据质量要求的数据时比较灵活。 这种灵活性允许你处理和存储预计会出现混乱的数据以及必须满足严格质量要求的数据。 请参阅通过管道预期管理数据质量

DLT 扩展 Delta Lake 的功能。 由于由 DLT 创建的和管理表是 Delta 表,因此它们具有 Delta Lake 提供的相同保证和功能。 请参阅什么是 Delta Lake?

DLT 除了可以在 Delta Lake 中设置的许多表属性外,还添加了多个表属性。 请参阅 DLT 属性参考Delta 表属性参考

如何通过 DLT 创建和管理表

Azure Databricks 自动管理使用 DLT 创建的表,确定需要如何处理更新以正确计算表的当前状态并执行许多维护和优化任务。

对于大多数操作,您应允许 DLT 处理目标表的所有更新、插入和删除。 有关详细信息和限制,请参阅 “保留手动删除或更新”。

DLT 执行的维护任务

DLT 在更新表后的 24 小时内执行维护任务。 维护可以通过删除旧版本的表来提高查询性能和降低成本。 默认情况下,系统执行完整的 OPTIMIZE 操作,随后执行 VACUUM。 可以通过在表的pipelines.autoOptimize.managed = false中设置 来对表禁用 OPTIMIZE。 仅在计划维护任务之前的 24 小时内运行过管道更新时,才会执行维护任务。

Delta Live Tables 现已改为 DLT

以前称为增量实时表的产品现在是 DLT。

局限性

有关限制列表,请参阅 DLT 限制

有关特定于将 DLT 与 Unity 目录配合使用的要求和限制的列表,请参阅 将 Unity 目录与 DLT 管道配合使用

其他资源