开发和测试增量实时表管道的提示、建议和功能

本文介绍可用于开发和测试增量实时表管道的模式。 增量实时表允许你通过管道设置来指定用于隔离开发、测试和生产环境中的管道的配置。 本文中的建议适用于 SQL 和 Python 代码开发。

使用开发模式运行管道更新

增量实时表提供了一个 UI 切换开关来控制管道更新是在开发模式还是生产模式下运行。 此模式控制管道更新的处理方式,包括:

  • 更新成功或失败后,开发模式不会立即终止计算资源。 可以重用相同的计算资源来运行管道的多个更新,而无需等待群集启动。
  • 开发模式在任务失败时不会自动重试,允许你立即检测并修复管道中的逻辑错误或语法错误。

Databricks 建议在开发和测试期间使用开发模式,并始终在部署到生产环境时切换到生产模式。

请参阅开发和生产模式

测试管道源代码,而无需等待表更新

若要在开发和测试期间检查管道源代码的问题(如语法和分析错误),可以运行“验证更新”。 由于 Validate 更新仅验证管道源代码的正确性,而不在任何表上运行实际更新,因此可以在运行实际管道更新之前更快地识别和修复问题。

在所有开发生命周期阶段指定目标架构

增量实时表管道中的所有数据集都引用 LIVE 虚拟架构,该架构不可在管道外部访问。 如果指定了目标架构,则 LIVE 虚拟架构指向该目标架构。 若要在更新期间查看写出到每个表的结果,必须指定目标架构。

必须指定对环境唯一的目标架构。 给定架构中的每个表只能由单个管道更新。

通过为目标不同的开发、测试和生产环境创建不同的管道,可以隔离这些环境。 可以使用目标架构参数来删除使用字符串内插的逻辑,或者可以使用其他小组件或参数来控制数据源和目标。

请参阅将数据从增量实时表发布到 Hive 元存储

使用 Databricks Git 文件夹管理 Delta Live Tables 管道

Databricks 建议在 Delta Live Tables 管道开发、测试以及将其部署到生产环境期间使用 Git 文件夹。 Git 文件夹启用以下内容:

  • 跟踪代码在不同时间的变化。
  • 合并多个开发人员所做的更改。
  • 软件开发实践,例如代码评审。

Databricks 建议为与管道相关的所有代码配置单个 Git 存储库。

每个开发人员应该为开发工作配置自己的 Databricks Git 文件夹。 在开发过程中,用户从其 Databricks Git 文件夹配置自己的管道,并使用开发数据集和隔离的架构与位置测试新逻辑。 开发工作完成后,用户提交更改并将更改推送回到中心 Git 存储库中的分支,并针对测试或 QA 分支提出拉取请求。

应在 Databricks Git 文件夹中签出生成的分支,并使用测试数据集和开发架构配置管道。 如果逻辑按预期运行,则应准备拉取请求或发布分支以将更改推送到生产环境。

虽然 Git 文件夹可用于跨环境同步代码,但需要手动或使用 Terraform 等工具使管道设置保持最新。

此工作流类似于在所有 Databricks 作业中使用 CI/CD 的 Git 文件夹。 请参阅使用 Git 和 Databricks Git 文件夹 (Repos) 的 CI/CD 技术

将用于引入和转换步骤的库分段

Databricks 建议将引入数据的查询与扩充和验证数据的转换逻辑隔离开来。 然后,可以将用于从开发或测试数据源中引入数据的库组织到与生产数据引入逻辑不同的目录中,从而轻松地为不同的环境配置管道。 然后,可以使用较小的数据集进行测试并加速开发。 请参阅创建用于开发和测试的示例数据集

还可以使用参数来控制用于开发、测试和生产的数据源。 请参阅使用参数控制数据源

由于增量实时表管道使用 LIVE 虚拟架构来管理所有数据集关系,因此,通过使用加载示例数据的引入库配置开发和测试管道,可以使用生产表名称替换示例数据集来测试代码。 可以在所有环境中使用相同的转换逻辑。

创建用于开发和测试的示例数据集

Databricks 建议创建开发和测试数据集,以使用预期数据以及潜在的格式错误或损坏的记录来测试管道逻辑。 可以通过多种方法创建可用于开发和测试的数据集,包括:

  • 从生产数据集中选择数据子集。
  • 对包含 PII 的源使用匿名数据或人为生成的数据。
  • 基于下游转换逻辑创建具有明确定义的结果的测试数据。
  • 通过创建违反数据架构预期的记录,来预测潜在的数据损坏、格式错误的记录以及上游数据更改。

例如,如果某个笔记本使用以下代码定义了一个数据集:

CREATE OR REFRESH STREAMING TABLE input_data AS SELECT * FROM cloud_files("/production/data", "json")

可以使用如下查询创建包含特定记录的示例数据集:

CREATE OR REFRESH LIVE TABLE input_data AS
SELECT "2021/09/04" AS date, 22.4 as sensor_reading UNION ALL
SELECT "2021/09/05" AS date, 21.5 as sensor_reading

以下示例演示如何筛选已发布的数据,以创建用于开发或测试的生产数据的子集:

CREATE OR REFRESH LIVE TABLE input_data AS SELECT * FROM prod.input_data WHERE date > current_date() - INTERVAL 1 DAY

若要使用这些不同的数据集,请使用实现转换逻辑的笔记本创建多个管道。 每个管道可以从 LIVE.input_data 数据集读取数据,但配置为包含用于创建特定于环境的数据集的笔记本。

使用参数控制数据源

可以从库中引用在管道配置期间设置的参数。 这些参数在管道设置 UI 的“计算”>“高级”>“配置”部分中设置为键值对。 此模式允许在同一管道的不同配置中指定不同的数据源。

例如,可以使用变量 data_source_path 在管道的开发、测试和生产配置中指定不同的路径,然后使用以下代码引用该变量:

SQL

CREATE STREAMING TABLE bronze
AS (
    SELECT
    *,
    _metadata.file_path AS source_file_path
    FROM cloud_files( '${data_source_path}', 'csv',
            map("header", "true"))
)

Python

import dlt
from pyspark.sql.functions import col

data_source_path = spark.conf.get("data_source_path")

@dlt.table
def bronze():
    return (spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "csv")
        .option("header", True)
        .load(data_source_path )
        .select("*", col("_metadata.file_path").alias("source_file_name"))
    )

如果你需要测试引入逻辑如何在初始引入期间处理架构更改或格式错误的数据,则此模式特别有用。 在切换数据集时,可以在所有环境的整个管道中使用相同的代码。