配置增量实时表的管道设置

本文提供有关配置增量实时表的管道设置的详细信息。 增量实时表提供一个用于配置和编辑管道设置的用户界面。 该 UI 还提供了一个选项用于显示和编辑 JSON 设置。

注意

可以使用 UI 或 JSON 规范配置大多数设置。 某些高级选项仅在使用 JSON 配置时才可用。

Databricks 建议使用 UI 来熟悉增量实时表设置。 如有必要,可以直接在工作区中编辑 JSON 配置。 在将管道部署到新环境或者使用 CLI 或 REST API 时,JSON 配置文件也很有用。

有关增量实时表 JSON 配置设置的完整参考,请参阅增量实时表管道配置

选择产品版本

选择其功能最符合你的管道要求的增量实时表产品版本。 可以使用以下产品版本:

  • Core,用于运行流式处理引入工作负载。 如果你的管道不需要变更数据捕获 (CDC) 或增量实时表期望等高级功能,请选择 Core 版本。
  • Pro,用于运行流式处理引入和 CDC 工作负载。 Pro 产品版本支持所有 Core 功能,此外还支持需要根据源数据的更改更新表的工作负载。
  • Advanced,用于运行流式处理引入工作负载、CDC 工作负载,以及需要“期望”功能的工作负载。 Advanced 产品版本支持 CorePro 版本的功能,此外还支持通过增量实时表期望强制实施数据质量约束。

在创建或编辑管道时可以选择产品版本。 可为每个管道选择不同的版本。 请参阅增量实时表产品页

注意

如果你的管道包含所选产品版本不支持的功能(例如期望),你将收到错误消息以及出错原因。 然后,可以编辑该管道以选择适当的版本。

选择管道模式

可以选择根据管道模式连续或使用手动触发器更新管道。 请参阅连续与触发管道执行

选择群集策略

用户必须有权部署计算才能配置和更新增量实时表管道。 工作区管理员可以配置群集策略,以便为用户提供对增量实时表计算资源的访问权限。 请参阅定义 Delta Live Tables 管道计算的限制

注意

  • 群集策略是可选的。 如果你缺少增量实时表所需的计算特权,请咨询工作区管理员。

  • 若要确保正确应用群集策略默认值,请在管道配置的群集配置中将 apply_policy_default_values 值设置为 true

    {
      "clusters": [
        {
          "label": "default",
          "policy_id": "<policy-id>",
          "apply_policy_default_values": true
        }
      ]
    }
    

配置源代码库

可以使用增量实时表 UI 中的文件选择器来配置用于定义管道的源代码。 管道源代码在 Databricks 笔记本中定义,或者在工作区文件所存储的 SQL 或 Python 脚本中定义。 创建或编辑管道时,可以添加一个或多个笔记本或工作区文件,或者添加笔记本和工作区文件的组合。

由于增量实时表会自动分析数据集依赖项来构造管道的处理图,因此你可以按任意顺序添加源代码库。

还可以修改 JSON 文件,以包含工作区文件中存储的 SQL 和 Python 脚本中定义的增量实时表源代码。 以下示例包含 Databricks Repos 中的笔记本和工作区文件:

{
  "name": "Example pipeline 3",
  "storage": "dbfs:/pipeline-examples/storage-location/example3",
  "libraries": [
    { "notebook": { "path": "/example-notebook_1" } },
    { "notebook": { "path": "/example-notebook_2" } },
    { "file": { "path": "/Repos/<user-name>@databricks.com/Apply_Changes_Into/apply_changes_into.sql" } },
    { "file": { "path": "/Repos/<user-name>@databricks.com/Apply_Changes_Into/apply_changes_into.py" } }
  ]
}

指定存储位置

可以为一个将内容发布到 Hive 元存储的管道指定存储位置。 指定位置的主要目的是控制管道写入的数据的对象存储位置。

由于 Delta Live Tables 管道的所有表、数据、检查点和元数据完全由 Delta Live Tables 管理,因此与 Delta Live Tables 数据集的大多数交互都是通过注册到 Hive 元存储或 Unity Catalog 的表进行的。

指定管道输出表的目标架构

虽然此操作是可选的,但每当你的目的不仅仅是开发和测试新管道时,都应该指定一个目标,以便发布管道所创建的表。 将管道发布到目标后,便可以在 Azure Databricks 环境中的其他位置查询数据集。 请参阅将数据从 Delta Live Tables 管道发布到 Hive 元存储将 Unity Catalog 与 Delta Live Tables 管道配合使用

配置计算设置

每个增量实时表管道有两个关联的群集:

  • updates 群集处理管道更新。
  • maintenance 群集运行日常维护任务。

这些群集使用的配置由管道设置中指定的 clusters 属性确定。

可以使用群集标签添加仅适用于特定群集类型的计算设置。 配置管道群集时可以使用三个标签:

注意

如果只定义一个群集配置,则可以省略群集标签设置。 如果未提供标签的设置,则 default 标签将应用于群集配置。 仅当需要为不同的群集类型自定义设置时,才需要群集标签设置。

  • default 标签定义要应用于 updatesmaintenance 群集的计算设置。 将相同的设置应用于这两个群集可确保将所需的配置(例如存储位置的数据访问凭据)应用于维护群集,从而提高维护运行的可靠性。
  • maintenance 标签定义要仅应用于 maintenance 群集的计算设置。 还可以使用 maintenance 标签替代由 default 标签配置的设置。
  • updates 标签定义要仅应用于 updates 群集的设置。 使用 updates 标签配置不应应用于 maintenance 群集的设置。

使用 defaultupdates 标签定义的设置将合并为 updates 群集创建最终配置。 如果使用 defaultupdates 标签定义相同的设置,则使用 updates 标签定义的设置将替代使用 default 标签定义的设置。

以下示例定义一个 Spark 配置参数,该参数仅添加到 updates 群集的配置中:

{
  "clusters": [
    {
      "label": "default",
      "autoscale": {
        "min_workers": 1,
        "max_workers": 5,
        "mode": "ENHANCED"
      }
    },
    {
      "label": "updates",
      "spark_conf": {
         "key": "value"
      }
    }
  ]
}

增量实时表为群集设置提供了与 Azure Databricks 上的其他计算类似的选项。 与配置其他管道设置一样,你可以修改群集的 JSON 配置以指定不显示在 UI 中的选项。 请参阅计算

注意

  • 由于 Delta Live Tables 运行时会管理管道群集的生命周期并运行自定义版本的 Databricks Runtime,因此你无法在管道配置中手动设置某些群集设置,例如 Spark 版本或群集名称。 请参阅不可由用户设置的群集属性
  • 可以配置增量实时表管道以利用 Photon。 请参阅什么是 Photon?

选择要运行管道的实例类型

默认情况下,增量实时表为运行管道的驱动程序节点和工作器节点选择实例类型,但你也可以手动配置实例类型。 例如,你可能希望选择实例类型以提高管道性能,或者在运行管道时解决内存问题。 在使用 REST API 或增量实时表 UI 创建编辑管道时,可以配置实例类型。

若要在增量实时表 UI 中创建或编辑管道时配置实例类型:

  1. 单击“设置”按钮。
  2. 在“管道设置”页上,单击“JSON”按钮。
  3. 在群集配置中输入实例类型配置:

注意

为了避免向 maintenance 群集分配不必要的资源,此示例使用 updates 标签来仅设置 updates 群集的实例类型。 若要将实例类型分配给 updatesmaintenance 群集,请使用 default 标签或省略标签的设置。 如果未提供标签的设置,则 default 标签将应用于管道群集配置。 请参阅配置计算设置

{
  "clusters": [
    {
      "label": "updates",
      "node_type_id": "Standard_D12_v2",
      "driver_node_type_id": "Standard_D3_v2",
      "..." : "..."
    }
  ]
}

使用自动缩放来提高效率并减少资源使用量

使用增强型自动缩放来优化管道的群集利用率。 仅当系统确定这些资源可以提高管道处理速度时,增强型自动缩放才会添加额外的资源。 不再需要的资源会被释放;一旦所有管道更新完成,群集就会立即关闭。

若要详细了解增强型自动缩放(包括配置详细信息),请参阅使用增强型自动缩放优化增量实时表管道的群集利用率

延迟计算关闭

由于 Delta Live Tables 群集在不使用时自动关闭,因此,引用群集配置中设置 autotermination_minutes 的群集策略会导致错误。 要控制群集关闭行为,可以使用开发或生产模式,也可以使用管道配置中的 pipelines.clusterShutdown.delay 设置。 以下示例将 pipelines.clusterShutdown.delay 值设置为 60 秒:

{
    "configuration": {
      "pipelines.clusterShutdown.delay": "60s"
    }
}

如果启用 production 模式,pipelines.clusterShutdown.delay 的默认值为 0 seconds。 如果启用 development 模式,默认值为 2 hours

创建单节点群集

如果在群集设置中将 num_workers 设置为 0,群集将创建为单节点群集。 配置自动缩放群集,并将 min_workers 设置为 0,将 max_workers 设置为 0 也会创建一个单节点群集。

如果配置自动缩放群集并仅将 min_workers 设置为 0,则群集不会创建为单节点群集。 在终止之前,群集将始终至少句有 1 个活动辅助角色。

在增量实时表中创建单节点群集的示例群集配置:

{
    "clusters": [
      {
        "num_workers": 0
      }
    ]
}

配置群集标记

可以使用群集标记来监视管道群集的使用情况。 可以在创建或编辑管道时在增量实时表 UI 中添加群集标记,或者通过编辑管道群集的 JSON 设置来添加标记。

云存储配置

若要访问 Azure 存储,必须使用群集配置中的 spark.conf 设置配置所需的参数(包括访问令牌)。 有关为 Azure Data Lake Storage Gen2 (ADLS Gen2) 存储帐户配置访问权限的示例,请参阅使用管道中的机密安全访问存储凭据

管道参数化

可以通过管道设置将定义数据集的 Python 和 SQL 代码参数化。 参数化支持以下用例:

  • 从代码中分离长路径和其他变量。
  • 减少在开发或过渡环境中处理的数据量,以加快测试速度。
  • 重用同一转换逻辑来处理多个数据源。

以下示例使用 startDate 配置值将开发管道限制为输入数据的子集:

CREATE OR REFRESH LIVE TABLE customer_events
AS SELECT * FROM sourceTable WHERE date > '${mypipeline.startDate}';
@dlt.table
def customer_events():
  start_date = spark.conf.get("mypipeline.startDate")
  return read("sourceTable").where(col("date") > start_date)
{
  "name": "Data Ingest - DEV",
  "configuration": {
    "mypipeline.startDate": "2021-01-02"
  }
}
{
  "name": "Data Ingest - PROD",
  "configuration": {
    "mypipeline.startDate": "2010-01-02"
  }
}

管道触发间隔

可使用 pipelines.trigger.interval 控制更新表或整个管道的流的触发间隔。 由于触发管道仅处理每个表一次,因此 pipelines.trigger.interval 仅用于连续管道。

由于流式处理查询与批处理查询的默认值不同,Databricks 建议对各个表设置 pipelines.trigger.interval。 仅当你的处理需要控制整个管道图的更新时,才对管道设置该值。

使用 Python 中的 spark_conf 或 SQL 中的 SET 对表设置 pipelines.trigger.interval

@dlt.table(
  spark_conf={"pipelines.trigger.interval" : "10 seconds"}
)
def <function-name>():
    return (<query>)
SET pipelines.trigger.interval=10 seconds;

CREATE OR REFRESH LIVE TABLE TABLE_NAME
AS SELECT ...

要对管道设置 pipelines.trigger.interval,请将其添加到管道设置中的 configuration 对象:

{
  "configuration": {
    "pipelines.trigger.interval": "10 seconds"
  }
}

允许非管理员用户从启用了 Unity Catalog 的管道查看驱动程序日志

默认情况下,只有管道所有者和工作区管理员有权从运行启用了 Unity Catalog 的管道的群集中查看驱动程序日志。 可以通过在管道设置中将以下 Spark 配置参数添加到 configuration 对象,为任何具有“可管理”、“可查看”或“可运行”权限的用户启用对驱动程序日志的访问权限:

{
  "configuration": {
    "spark.databricks.acl.needAdminPermissionToViewLogs": "false"
  }
}

为管道事件添加电子邮件通知

可在出现以下情况时配置一个或多个电子邮件地址来接收通知:

  • 管道更新成功完成。
  • 管道更新失败,出现可重试或不可重试错误。 选择此选项将接收有关所有管道故障的通知。
  • 管道更新失败并出现不可重试(致命)错误。 选择此选项将仅在发生不可重试错误时接收通知。
  • 单个数据流失败。

若要在创建或编辑管道时配置电子邮件通知,请执行以下操作:

  1. 单击“添加通知”。
  2. 输入单个或多个电子邮件地址以接收通知。
  3. 单击要发送到配置的电子邮件地址的每种通知类型的复选框。
  4. 单击“添加通知”。

SCD 类型 1 查询的控制逻辑删除管理

以下设置可用于控制 SCD 类型 1 处理期间 DELETE 事件的逻辑删除管理行为:

  • pipelines.applyChanges.tombstoneGCThresholdInSeconds:设置此值以匹配无序数据之间的最高预期间隔(以秒为单位)。 默认为 172800 秒(2 天)。
  • pipelines.applyChanges.tombstoneGCFrequencyInSeconds:此设置控制检查逻辑删除的清理频率(以秒为单位)。 默认值为 1800 秒(30 分钟)。

请参阅在 Delta Live Tables 中使用 APPLY CHANGES API 简化变更数据捕获

配置管道权限

你必须对管道具有 CAN MANAGEIS OWNER 权限才能管理管道的权限。

  1. 在边栏中,单击“增量实时表”

  2. 选择某个管道的名称。

  3. 单击 kebab 菜单 垂直省略号 并选择“权限”

  4. 在“权限设置”中,选择“选择用户、组或服务主体...”下拉菜单,然后选择一个用户、组或服务主体。

    “权限设置”对话框

  5. 从“权限”下拉菜单中选择权限。

  6. 单击“添加” 。

  7. 单击“ 保存”。

为增量实时表启用 RocksDB 状态存储

在部署管道之前,你可以通过设置以下配置来启用基于 RocksDB 的状态管理:

{
  "configuration": {
     "spark.sql.streaming.stateStore.providerClass": "com.databricks.sql.streaming.state.RocksDBStateStoreProvider"
  }
}

若要详细了解 RocksDB 状态存储(包括针对 RocksDB 的配置建议),请参阅在 Azure Databricks 上配置 RocksDB 状态存储