配置增量实时表的管道设置

本文提供有关配置增量实时表的管道设置的详细信息。 增量实时表具有用于配置和编辑管道设置的用户界面。 该 UI 还具有一个选项用于显示和编辑 JSON 设置。

注意

可以使用 UI 或 JSON 规范配置大多数设置。 某些高级选项仅在使用 JSON 配置时才可用。

Databricks 建议使用 UI 来熟悉增量实时表设置。 如有必要,可以直接在工作区中编辑 JSON 配置。 将管道部署到新环境或使用 CLI 或REST API时,JSON 配置文件也很有用。

有关增量实时表 JSON 配置设置的完整参考,请参阅增量实时表管道配置

选择产品版本

选择具有最适合你的管道要求的功能的增量实时表产品版本。 可以使用以下产品版本:

  • Core,用于运行流式处理引入工作负载。 如果你的管道不需要变更数据捕获 (CDC) 或增量实时表期望等高级功能,请选择 Core 版本。
  • Pro,用于运行流式处理引入和 CDC 工作负载。 Pro 产品版本支持所有 Core 功能,此外还支持需要根据源数据的更改更新表的工作负载。
  • Advanced,用于运行流式处理引入工作负载、CDC 工作负载,以及需要“期望”功能的工作负载。 Advanced 产品版本支持 CorePro 版本的功能,此外还支持通过增量实时表期望强制实施数据质量约束。

在创建或编辑管道时可以选择产品版本。 可为每个管道选择不同的版本。 请参阅增量实时表产品页

注意

如果管道包含所选产品版本不支持的功能(例如期望),你将收到解释错误原因的错误消息。 然后,可以编辑该管道以选择适当的版本。

选择管道模式

可以选择根据管道模式连续或使用手动触发器更新管道。 请参阅连续与触发管道执行

选择群集策略

用户必须有权部署计算才能配置和更新增量实时表管道。 工作区管理员可以配置群集策略,以便为用户提供对增量实时表计算资源的访问权限。 请参阅定义 Delta Live Tables 管道计算的限制

注意

  • 群集策略是可选的。 如果你缺少增量实时表所需的计算特权,请咨询工作区管理员。

  • 若要确保正确应用群集策略默认值,请在管道配置的群集配置中将 apply_policy_default_values 值设置为 true

    {
      "clusters": [
        {
          "label": "default",
          "policy_id": "<policy-id>",
          "apply_policy_default_values": true
        }
      ]
    }
    

配置源代码库

可以使用增量实时表 UI 中的文件选择器来配置用于定义管道的源代码。 管道源代码在 Databricks 笔记本中定义,或者在工作区文件所存储的 SQL 或 Python 脚本中定义。 创建或编辑管道时,可以添加一个或多个笔记本或工作区文件,或者添加笔记本和工作区文件的组合。

由于增量实时表会自动分析数据集依赖项来构造管道的处理图,因此你可以按任意顺序添加源代码库。

还可以修改 JSON 文件,以包含工作区文件中存储的 SQL 和 Python 脚本中定义的增量实时表源代码。 以下示例包含笔记本和工作区文件:

{
  "name": "Example pipeline 3",
  "storage": "dbfs:/pipeline-examples/storage-location/example3",
  "libraries": [
    { "notebook": { "path": "/example-notebook_1" } },
    { "notebook": { "path": "/example-notebook_2" } },
    { "file": { "path": "/Workspace/Users/<user-name>@databricks.com/Apply_Changes_Into/apply_changes_into.sql" } },
    { "file": { "path": "/Workspace/Users/<user-name>@databricks.com/Apply_Changes_Into/apply_changes_into.py" } }
  ]
}

指定存储位置

可以为一个将内容发布到 Hive 元存储的管道指定存储位置。 指定位置的主要目的是控制管道写入的数据的对象存储位置。

由于 Delta Live Tables 管道的所有表、数据、检查点和元数据完全由 Delta Live Tables 管理,因此与 Delta Live Tables 数据集的大多数交互都是通过注册到 Hive 元存储或 Unity Catalog 的表进行的。

指定管道输出表的目标架构

虽然此操作是可选的,但每当你的目的不仅仅是开发和测试新管道时,都应该指定一个目标,以便发布管道所创建的表。 将管道发布到目标后,便可以在 Azure Databricks 环境中的其他位置查询数据集。 请参阅将数据从 Delta Live Tables 发布到 Hive 元存储将 Unity Catalog 与 Delta Live Tables 管道配合使用

配置计算设置

每个增量实时表管道有两个关联的群集:

  • updates 群集处理管道更新。
  • maintenance 群集运行日常维护任务。

这些群集使用的配置由管道设置中指定的 clusters 属性确定。

可以使用群集标签添加仅适用于特定群集类型的计算设置。 配置管道群集时可以使用三个标签:

注意

如果只定义一个群集配置,则可以省略群集标签设置。 如果未提供标签的设置,则 default 标签将应用于群集配置。 仅当需要为不同的群集类型自定义设置时,才需要群集标签设置。

  • default 标签定义要应用于 updatesmaintenance 群集的计算设置。 将相同的设置应用于这两个群集可确保将所需的配置(例如存储位置的数据访问凭据)应用于维护群集,从而提高维护运行的可靠性。
  • maintenance标签定义要仅应用于maintenance群集的计算设置。 还可以使用 maintenance 标签替代由 default 标签配置的设置。
  • updates标签定义要仅应用于updates群集的设置。 使用 updates 标签配置不应应用于 maintenance 群集的设置。

使用 defaultupdates 标签定义的设置将合并为 updates 群集创建最终配置。 如果使用 defaultupdates 标签定义相同的设置,则使用 updates 标签定义的设置将替代使用 default 标签定义的设置。

以下示例定义一个 Spark 配置参数,该参数仅添加到 updates 群集的配置中:

{
  "clusters": [
    {
      "label": "default",
      "autoscale": {
        "min_workers": 1,
        "max_workers": 5,
        "mode": "ENHANCED"
      }
    },
    {
      "label": "updates",
      "spark_conf": {
         "key": "value"
      }
    }
  ]
}

增量实时表的群集设置选项与 Azure Databricks 上的其他计算类似。 与配置其他管道设置一样,你可以修改群集的 JSON 配置以指定不显示在 UI 中的选项。 请参阅计算

注意

  • 由于 Delta Live Tables 运行时会管理管道群集的生命周期并运行自定义版本的 Databricks Runtime,因此你无法在管道配置中手动设置某些群集设置,例如 Spark 版本或群集名称。 请参阅不可由用户设置的群集属性
  • 可以配置增量实时表管道以利用 Photon。 请参阅什么是 Photon?

选择要运行管道的实例类型

默认情况下,增量实时表为运行管道的驱动程序节点和工作器节点选择实例类型,但你也可以手动配置实例类型。 例如,你可能希望选择实例类型以提高管道性能,或者在运行管道时解决内存问题。 在使用 REST API 或增量实时表 UI 创建编辑管道时,可以配置实例类型。

若要在增量实时表 UI 中创建或编辑管道时配置实例类型:

  1. 单击“设置”按钮。
  2. 在管道设置的“高级”部分中,在“辅助角色类型”和“驱动程序类型”下拉菜单中,选择管道的实例类型。

若要在管道的 JSON 设置中配置实例类型,请单击 JSON 按钮,并在群集配置中输入实例类型配置:

注意

为了避免向 maintenance 群集分配不必要的资源,此示例使用 updates 标签来仅设置 updates 群集的实例类型。 若要将实例类型分配给 updatesmaintenance 群集,请使用 default 标签或省略标签的设置。 如果未提供标签的设置,则 default 标签将应用于管道群集配置。 请参阅配置计算设置

{
  "clusters": [
    {
      "label": "updates",
      "node_type_id": "Standard_D12_v2",
      "driver_node_type_id": "Standard_D3_v2",
      "..." : "..."
    }
  ]
}

使用自动缩放来提高效率并减少资源使用量

使用增强型自动缩放来优化管道的群集利用率。 仅当系统确定这些资源可以提高管道处理速度时,增强型自动缩放才会添加额外的资源。 不再需要的资源会被释放;一旦所有管道更新完成,群集就会立即关闭。

若要详细了解增强型自动缩放(包括配置详细信息),请参阅使用增强型自动缩放优化增量实时表管道的群集利用率

延迟计算关闭

由于 Delta Live Tables 群集在不使用时自动关闭,因此,引用群集配置中设置 autotermination_minutes 的群集策略会导致错误。 要控制群集关闭行为,可以使用开发或生产模式,也可以使用管道配置中的 pipelines.clusterShutdown.delay 设置。 以下示例将 pipelines.clusterShutdown.delay 值设置为 60 秒:

{
    "configuration": {
      "pipelines.clusterShutdown.delay": "60s"
    }
}

如果启用 production 模式,pipelines.clusterShutdown.delay 的默认值为 0 seconds。 如果启用 development 模式,默认值为 2 hours

创建单节点群集

如果在群集设置中将 num_workers 设置为 0,群集将创建为单节点群集。 配置自动缩放群集,并将 min_workers 设置为 0,将 max_workers 设置为 0 也会创建一个单节点群集。

如果配置自动缩放群集并仅将 min_workers 设置为 0,则群集不会创建为单节点群集。 在终止之前,群集将始终至少句有 1 个活动辅助角色。

在增量实时表中创建单节点群集的示例群集配置:

{
    "clusters": [
      {
        "num_workers": 0
      }
    ]
}

配置群集标记

可以使用群集标记来监视管道群集的使用情况。 可以在创建或编辑管道时在增量实时表 UI 中添加群集标记,或者通过编辑管道群集的 JSON 设置来添加标记。

云存储配置

若要访问 Azure 存储,必须使用群集配置中的 spark.conf 设置配置所需的参数(包括访问令牌)。 有关为 Azure Data Lake Storage Gen2 (ADLS Gen2) 存储帐户配置访问权限的示例,请参阅使用管道中的机密安全访问存储凭据

在 Python 或 SQL 中参数化数据集声明

可以通过管道设置将定义数据集的 Python 和 SQL 代码参数化。 参数化支持以下用例:

  • 从代码中分离长路径和其他变量。
  • 减少在开发或过渡环境中处理的数据量,以加快测试速度。
  • 重用同一转换逻辑来处理多个数据源。

以下示例使用 startDate 配置值将开发管道限制为输入数据的子集:

CREATE OR REFRESH MATERIALIZED VIEW customer_events
AS SELECT * FROM sourceTable WHERE date > '${mypipeline.startDate}';
@dlt.table
def customer_events():
  start_date = spark.conf.get("mypipeline.startDate")
  return read("sourceTable").where(col("date") > start_date)
{
  "name": "Data Ingest - DEV",
  "configuration": {
    "mypipeline.startDate": "2021-01-02"
  }
}
{
  "name": "Data Ingest - PROD",
  "configuration": {
    "mypipeline.startDate": "2010-01-02"
  }
}

管道触发间隔

可使用 pipelines.trigger.interval 控制更新表或整个管道的流的触发间隔。 由于触发管道仅处理每个表一次,因此 pipelines.trigger.interval 仅用于连续管道。

由于流式处理查询与批处理查询的默认值不同,Databricks 建议对各个表设置 pipelines.trigger.interval。 仅当你的处理需要控制整个管道图的更新时,才对管道设置该值。

使用 Python 中的 spark_conf 或 SQL 中的 SET 对表设置 pipelines.trigger.interval

@dlt.table(
  spark_conf={"pipelines.trigger.interval" : "10 seconds"}
)
def <function-name>():
    return (<query>)
SET pipelines.trigger.interval=10 seconds;

CREATE OR REFRESH MATERIALIZED VIEW TABLE_NAME
AS SELECT ...

要对管道设置 pipelines.trigger.interval,请将其添加到管道设置中的 configuration 对象:

{
  "configuration": {
    "pipelines.trigger.interval": "10 seconds"
  }
}

允许非管理员用户从启用了 Unity Catalog 的管道查看驱动程序日志

默认情况下,只有管道所有者和工作区管理员有权从运行启用了 Unity Catalog 的管道的群集中查看驱动程序日志。 可以通过在管道设置中将以下 Spark 配置参数添加到 configuration 对象,为任何具有“可管理”、“可查看”或“可运行”权限的用户启用对驱动程序日志的访问权限:

{
  "configuration": {
    "spark.databricks.acl.needAdminPermissionToViewLogs": "false"
  }
}

为管道事件添加电子邮件通知

可在出现以下情况时配置一个或多个电子邮件地址来接收通知:

  • 管道更新成功完成。
  • 管道更新失败,出现可重试或不可重试错误。 选择此选项将接收有关所有管道故障的通知。
  • 管道更新失败并出现不可重试(致命)错误。 选择此选项将仅在发生不可重试错误时接收通知。
  • 单个数据流失败。

若要在创建或编辑管道时配置电子邮件通知,请执行以下操作:

  1. 单击“添加通知”。
  2. 输入单个或多个电子邮件地址以接收通知。
  3. 单击要发送到配置的电子邮件地址的每种通知类型的复选框。
  4. 单击“添加通知”。

SCD 类型 1 查询的控制逻辑删除管理

以下设置可用于控制 SCD 类型 1 处理期间 DELETE 事件的逻辑删除管理行为:

  • pipelines.applyChanges.tombstoneGCThresholdInSeconds:设置此值以匹配无序数据之间的最高预期间隔(以秒为单位)。 默认为 172800 秒(2 天)。
  • pipelines.applyChanges.tombstoneGCFrequencyInSeconds:此设置控制检查逻辑删除的清理频率(以秒为单位)。 默认值为 1800 秒(30 分钟)。

请参阅 APPLY CHANGES API:使用增量实时表简化变更数据捕获

配置管道权限

你必须对管道具有 CAN MANAGEIS OWNER 权限才能管理管道的权限。

  1. 在边栏中,单击“增量实时表”

  2. 选择某个管道的名称。

  3. 单击 kebab 菜单 Kebab 菜单 并选择“权限”

  4. 在“权限设置”中,选择“选择用户、组或服务主体...”下拉菜单,然后选择一个用户、组或服务主体。

    “权限设置”对话框

  5. 从“权限”下拉菜单中选择权限。

  6. 单击“添加” 。

  7. 单击“ 保存”。

为增量实时表启用 RocksDB 状态存储

在部署管道之前,你可以通过设置以下配置来启用基于 RocksDB 的状态管理:

{
  "configuration": {
     "spark.sql.streaming.stateStore.providerClass": "com.databricks.sql.streaming.state.RocksDBStateStoreProvider"
  }
}

若要详细了解 RocksDB 状态存储(包括针对 RocksDB 的配置建议),请参阅在 Azure Databricks 上配置 RocksDB 状态存储